LangGraph 工作流编排:从 DAG 到循环 Agent

概述

LangGraph 是 LangChain 团队推出的 Agent 工作流编排框架。与传统的 DAG(有向无环图)工作流引擎不同,LangGraph 原生支持循环(Cycles),这使得它能够表达"Agent 反复推理直到满意"的模式。

核心设计理念:将 Agent 的行为建模为一个状态机(State Machine),每个节点是一个计算步骤,边是条件转移,状态在节点之间流转。

核心概念

LangGraph 核心概念
    |
    ├── State(状态)
    │   └── 在节点之间传递的数据容器
    |
    ├── Node(节点)
    │   └── 执行一个计算步骤(LLM 调用/工具执行/数据处理)
    |
    ├── Edge(边)
    │   ├── Normal Edge - 无条件转移
    │   └── Conditional Edge - 基于状态的条件路由
    |
    ├── Graph(图)
    │   └── 由节点和边组成的工作流
    |
    └── Checkpointer(检查点)
        └── 持久化状态,支持断点恢复和 Human-in-the-Loop

安装

pip install langgraph langchain-openai langchain-core

基础:构建第一个 Graph

定义状态

from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

# 状态定义:使用 TypedDict
class AgentState(TypedDict):
    # add_messages 是一个 reducer:新消息追加到列表,而非替换
    messages: Annotated[list, add_messages]
    # 普通字段:直接覆盖
    current_step: str
    iteration_count: int

定义节点

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

llm = ChatOpenAI(model="gpt-4o", temperature=0)

def chatbot_node(state: AgentState) -> dict:
    """聊天节点:调用 LLM 生成回复"""
    messages = state["messages"]
    response = llm.invoke(messages)
    return {
        "messages": [response],
        "current_step": "chatbot",
    }

def tool_executor_node(state: AgentState) -> dict:
    """工具执行节点:执行 LLM 请求的工具调用"""
    last_message = state["messages"][-1]
    results = []

    for tool_call in last_message.tool_calls:
        result = execute_tool(tool_call["name"], tool_call["args"])
        results.append(ToolMessage(
            content=str(result),
            tool_call_id=tool_call["id"],
        ))

    return {"messages": results}

构建 Graph

from langgraph.graph import StateGraph, START, END

# 创建 graph builder
builder = StateGraph(AgentState)

# 添加节点
builder.add_node("chatbot", chatbot_node)
builder.add_node("tools", tool_executor_node)

# 添加边
builder.add_edge(START, "chatbot")         # 入口 -> 聊天

# 条件边:根据 LLM 输出决定下一步
def should_use_tools(state: AgentState) -> str:
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"     # 有工具调用 -> 执行工具
    return END             # 无工具调用 -> 结束

builder.add_conditional_edges("chatbot", should_use_tools)
builder.add_edge("tools", "chatbot")       # 工具执行后 -> 回到聊天(循环)

# 编译 graph
graph = builder.compile()

# 可视化
print(graph.get_graph().draw_mermaid())

运行 Graph

# 同步执行
result = graph.invoke({
    "messages": [HumanMessage(content="北京今天天气怎么样?")],
    "current_step": "",
    "iteration_count": 0,
})

print(result["messages"][-1].content)

# 流式执行
for event in graph.stream({
    "messages": [HumanMessage(content="分析一下特斯拉的股价趋势")],
}):
    for node_name, output in event.items():
        print(f"[{node_name}]", output)

# 异步执行
import asyncio

async def run():
    result = await graph.ainvoke({
        "messages": [HumanMessage(content="Hello")],
    })
    return result

asyncio.run(run())

进阶:带工具的 ReAct Agent

from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent

# 定义工具
@tool
def search(query: str) -> str:
    """搜索网络获取最新信息"""
    return f"搜索结果:关于 '{query}' 的最新信息..."

@tool
def calculator(expression: str) -> float:
    """计算数学表达式"""
    return eval(expression)

@tool
def get_weather(city: str) -> str:
    """获取城市天气"""
    return f"{city}:晴,25度,东南风3级"

# 创建 ReAct Agent(LangGraph 内置)
model = ChatOpenAI(model="gpt-4o")

agent = create_react_agent(
    model=model,
    tools=[search, calculator, get_weather],
    state_modifier="你是一个全能助手,善于使用工具来回答问题。",
)

# 运行
result = agent.invoke({
    "messages": [HumanMessage(content="北京今天天气如何?如果温度超过30度,计算一下需要开空调几小时(假设每小时2度电)才能把室温降到25度")]
})

Human-in-the-Loop(人工介入)

中断节点

from langgraph.checkpoint.memory import MemorySaver

# 创建检查点存储
memory = MemorySaver()

# 编译时指定中断点
graph = builder.compile(
    checkpointer=memory,
    interrupt_before=["tools"],  # 在执行工具前中断,等待人工确认
)

# 第一次运行:会在 tools 节点前停下
config = {"configurable": {"thread_id": "user-123"}}

result = graph.invoke(
    {"messages": [HumanMessage(content="帮我发送一封邮件给张三")]},
    config=config,
)

# 查看 Agent 准备执行的工具调用
print("Agent wants to call:", result["messages"][-1].tool_calls)
# [{"name": "send_email", "args": {"to": "zhangsan@...", "content": "..."}}]

# 人工审核后继续执行
# 方式一:直接继续
graph.invoke(None, config=config)

# 方式二:修改后继续
graph.update_state(
    config,
    {"messages": [ToolMessage(content="已确认发送", tool_call_id="xxx")]},
    as_node="tools",
)
graph.invoke(None, config=config)

时间旅行(回溯)

# 查看所有检查点
checkpoints = list(memory.list(config))

for cp in checkpoints:
    print(f"Step: {cp.metadata.get('step')}, "
          f"Node: {cp.metadata.get('source')}")

# 回溯到特定检查点
old_config = {
    "configurable": {
        "thread_id": "user-123",
        "checkpoint_id": checkpoints[2].config["configurable"]["checkpoint_id"],
    }
}

# 从该检查点重新执行
result = graph.invoke(
    {"messages": [HumanMessage(content="换一个方案")]},
    config=old_config,
)

多 Agent 工作流

监督者模式(Supervisor)

from typing import Literal

class SupervisorState(TypedDict):
    messages: Annotated[list, add_messages]
    next_agent: str

def supervisor_node(state: SupervisorState) -> dict:
    """监督者:决定下一个执行的 Agent"""
    system_prompt = """你是一个任务调度器。根据当前对话状态,
选择下一个应该执行的 Agent:
- researcher: 需要搜索或调研信息时
- coder: 需要编写或修改代码时
- reviewer: 需要审查代码或方案时
- FINISH: 任务已完成

仅输出 Agent 名称。"""

    messages = [
        SystemMessage(content=system_prompt),
        *state["messages"],
    ]

    response = llm.invoke(messages)
    next_agent = response.content.strip()

    return {"next_agent": next_agent}

def researcher_node(state: SupervisorState) -> dict:
    researcher_llm = llm.bind(system_message="你是研究员...")
    response = researcher_llm.invoke(state["messages"])
    return {"messages": [AIMessage(content=response.content, name="researcher")]}

def coder_node(state: SupervisorState) -> dict:
    coder_llm = llm.bind(system_message="你是开发者...")
    response = coder_llm.invoke(state["messages"])
    return {"messages": [AIMessage(content=response.content, name="coder")]}

def reviewer_node(state: SupervisorState) -> dict:
    reviewer_llm = llm.bind(system_message="你是审查员...")
    response = reviewer_llm.invoke(state["messages"])
    return {"messages": [AIMessage(content=response.content, name="reviewer")]}

# 构建图
builder = StateGraph(SupervisorState)

builder.add_node("supervisor", supervisor_node)
builder.add_node("researcher", researcher_node)
builder.add_node("coder", coder_node)
builder.add_node("reviewer", reviewer_node)

builder.add_edge(START, "supervisor")

# 监督者条件路由
def route_supervisor(state: SupervisorState) -> str:
    return state["next_agent"]

builder.add_conditional_edges(
    "supervisor",
    route_supervisor,
    {
        "researcher": "researcher",
        "coder": "coder",
        "reviewer": "reviewer",
        "FINISH": END,
    },
)

# 所有 Agent 执行后回到监督者
builder.add_edge("researcher", "supervisor")
builder.add_edge("coder", "supervisor")
builder.add_edge("reviewer", "supervisor")

graph = builder.compile()

层级 Agent(Sub-Graph)

# 子图:研究团队内部的工作流
def build_research_subgraph():
    class ResearchState(TypedDict):
        messages: Annotated[list, add_messages]
        research_notes: list[str]

    builder = StateGraph(ResearchState)

    builder.add_node("search", search_node)
    builder.add_node("analyze", analyze_node)
    builder.add_node("summarize", summarize_node)

    builder.add_edge(START, "search")
    builder.add_edge("search", "analyze")

    def needs_more_search(state):
        if len(state["research_notes"]) < 3:
            return "search"       # 信息不足,继续搜索
        return "summarize"        # 信息充足,生成摘要

    builder.add_conditional_edges("analyze", needs_more_search)
    builder.add_edge("summarize", END)

    return builder.compile()

# 在主图中使用子图
research_subgraph = build_research_subgraph()

main_builder = StateGraph(MainState)
main_builder.add_node("research", research_subgraph)  # 子图作为节点
main_builder.add_node("write", write_node)

main_builder.add_edge(START, "research")
main_builder.add_edge("research", "write")
main_builder.add_edge("write", END)

持久化与部署

数据库持久化

# SQLite 持久化
from langgraph.checkpoint.sqlite import SqliteSaver

with SqliteSaver.from_conn_string("checkpoints.db") as memory:
    graph = builder.compile(checkpointer=memory)

    result = graph.invoke(
        {"messages": [HumanMessage(content="Hello")]},
        config={"configurable": {"thread_id": "user-456"}},
    )

# PostgreSQL 持久化(生产推荐)
from langgraph.checkpoint.postgres import PostgresSaver

with PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost:5432/langgraph"
) as memory:
    graph = builder.compile(checkpointer=memory)

LangGraph Platform 部署

# langgraph.json - 部署配置
"""
{
    "dependencies": ["langchain-openai", "langchain-community"],
    "graphs": {
        "agent": "./agent.py:graph"
    },
    "env": ".env"
}
"""

# 部署到 LangGraph Cloud
# langgraph up           # 本地开发服务器
# langgraph deploy       # 部署到云端
# 本地开发
pip install langgraph-cli
langgraph dev --config langgraph.json

# API 调用
curl -X POST http://localhost:8123/runs/stream \
  -H "Content-Type: application/json" \
  -d '{
    "assistant_id": "agent",
    "input": {"messages": [{"role": "user", "content": "Hello"}]},
    "config": {"configurable": {"thread_id": "abc123"}}
  }'

与其他框架对比

特性 LangGraph LangChain LCEL AutoGen CrewAI
循环支持 原生 不支持 对话循环 不支持
状态管理 显式 TypedDict 隐式 消息历史 隐式
持久化 Checkpointer
Human-in-Loop 原生支持
可视化 Mermaid 图
部署 LangGraph Platform LangServe
学习曲线 中等

最佳实践

  1. 状态设计:使用 TypedDict 明确定义状态字段,用 Annotated + reducer 处理列表追加
  2. 节点粒度:每个节点做一件事,保持可测试和可替换
  3. 条件边路由:基于状态字段路由,避免复杂的嵌套逻辑
  4. 检查点:生产环境必须配置持久化 checkpointer,否则重启丢失所有状态
  5. 错误处理:在节点内部处理异常,通过状态字段传递错误信息
  6. 迭代上限:所有循环必须设置 recursion_limit,防止无限循环

Maurice | maurice_wen@proton.me