LangGraph 工作流编排:从 DAG 到循环 Agent
原创
灵阙教研团队
S 精选 进阶 |
约 7 分钟阅读
更新于 2026-02-28 AI 导读
LangGraph 工作流编排:从 DAG 到循环 Agent 概述 LangGraph 是 LangChain 团队推出的 Agent 工作流编排框架。与传统的 DAG(有向无环图)工作流引擎不同,LangGraph 原生支持循环(Cycles),这使得它能够表达"Agent 反复推理直到满意"的模式。 核心设计理念:将 Agent 的行为建模为一个状态机(State...
LangGraph 工作流编排:从 DAG 到循环 Agent
概述
LangGraph 是 LangChain 团队推出的 Agent 工作流编排框架。与传统的 DAG(有向无环图)工作流引擎不同,LangGraph 原生支持循环(Cycles),这使得它能够表达"Agent 反复推理直到满意"的模式。
核心设计理念:将 Agent 的行为建模为一个状态机(State Machine),每个节点是一个计算步骤,边是条件转移,状态在节点之间流转。
核心概念
LangGraph 核心概念
|
├── State(状态)
│ └── 在节点之间传递的数据容器
|
├── Node(节点)
│ └── 执行一个计算步骤(LLM 调用/工具执行/数据处理)
|
├── Edge(边)
│ ├── Normal Edge - 无条件转移
│ └── Conditional Edge - 基于状态的条件路由
|
├── Graph(图)
│ └── 由节点和边组成的工作流
|
└── Checkpointer(检查点)
└── 持久化状态,支持断点恢复和 Human-in-the-Loop
安装
pip install langgraph langchain-openai langchain-core
基础:构建第一个 Graph
定义状态
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
# 状态定义:使用 TypedDict
class AgentState(TypedDict):
# add_messages 是一个 reducer:新消息追加到列表,而非替换
messages: Annotated[list, add_messages]
# 普通字段:直接覆盖
current_step: str
iteration_count: int
定义节点
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def chatbot_node(state: AgentState) -> dict:
"""聊天节点:调用 LLM 生成回复"""
messages = state["messages"]
response = llm.invoke(messages)
return {
"messages": [response],
"current_step": "chatbot",
}
def tool_executor_node(state: AgentState) -> dict:
"""工具执行节点:执行 LLM 请求的工具调用"""
last_message = state["messages"][-1]
results = []
for tool_call in last_message.tool_calls:
result = execute_tool(tool_call["name"], tool_call["args"])
results.append(ToolMessage(
content=str(result),
tool_call_id=tool_call["id"],
))
return {"messages": results}
构建 Graph
from langgraph.graph import StateGraph, START, END
# 创建 graph builder
builder = StateGraph(AgentState)
# 添加节点
builder.add_node("chatbot", chatbot_node)
builder.add_node("tools", tool_executor_node)
# 添加边
builder.add_edge(START, "chatbot") # 入口 -> 聊天
# 条件边:根据 LLM 输出决定下一步
def should_use_tools(state: AgentState) -> str:
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools" # 有工具调用 -> 执行工具
return END # 无工具调用 -> 结束
builder.add_conditional_edges("chatbot", should_use_tools)
builder.add_edge("tools", "chatbot") # 工具执行后 -> 回到聊天(循环)
# 编译 graph
graph = builder.compile()
# 可视化
print(graph.get_graph().draw_mermaid())
运行 Graph
# 同步执行
result = graph.invoke({
"messages": [HumanMessage(content="北京今天天气怎么样?")],
"current_step": "",
"iteration_count": 0,
})
print(result["messages"][-1].content)
# 流式执行
for event in graph.stream({
"messages": [HumanMessage(content="分析一下特斯拉的股价趋势")],
}):
for node_name, output in event.items():
print(f"[{node_name}]", output)
# 异步执行
import asyncio
async def run():
result = await graph.ainvoke({
"messages": [HumanMessage(content="Hello")],
})
return result
asyncio.run(run())
进阶:带工具的 ReAct Agent
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# 定义工具
@tool
def search(query: str) -> str:
"""搜索网络获取最新信息"""
return f"搜索结果:关于 '{query}' 的最新信息..."
@tool
def calculator(expression: str) -> float:
"""计算数学表达式"""
return eval(expression)
@tool
def get_weather(city: str) -> str:
"""获取城市天气"""
return f"{city}:晴,25度,东南风3级"
# 创建 ReAct Agent(LangGraph 内置)
model = ChatOpenAI(model="gpt-4o")
agent = create_react_agent(
model=model,
tools=[search, calculator, get_weather],
state_modifier="你是一个全能助手,善于使用工具来回答问题。",
)
# 运行
result = agent.invoke({
"messages": [HumanMessage(content="北京今天天气如何?如果温度超过30度,计算一下需要开空调几小时(假设每小时2度电)才能把室温降到25度")]
})
Human-in-the-Loop(人工介入)
中断节点
from langgraph.checkpoint.memory import MemorySaver
# 创建检查点存储
memory = MemorySaver()
# 编译时指定中断点
graph = builder.compile(
checkpointer=memory,
interrupt_before=["tools"], # 在执行工具前中断,等待人工确认
)
# 第一次运行:会在 tools 节点前停下
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke(
{"messages": [HumanMessage(content="帮我发送一封邮件给张三")]},
config=config,
)
# 查看 Agent 准备执行的工具调用
print("Agent wants to call:", result["messages"][-1].tool_calls)
# [{"name": "send_email", "args": {"to": "zhangsan@...", "content": "..."}}]
# 人工审核后继续执行
# 方式一:直接继续
graph.invoke(None, config=config)
# 方式二:修改后继续
graph.update_state(
config,
{"messages": [ToolMessage(content="已确认发送", tool_call_id="xxx")]},
as_node="tools",
)
graph.invoke(None, config=config)
时间旅行(回溯)
# 查看所有检查点
checkpoints = list(memory.list(config))
for cp in checkpoints:
print(f"Step: {cp.metadata.get('step')}, "
f"Node: {cp.metadata.get('source')}")
# 回溯到特定检查点
old_config = {
"configurable": {
"thread_id": "user-123",
"checkpoint_id": checkpoints[2].config["configurable"]["checkpoint_id"],
}
}
# 从该检查点重新执行
result = graph.invoke(
{"messages": [HumanMessage(content="换一个方案")]},
config=old_config,
)
多 Agent 工作流
监督者模式(Supervisor)
from typing import Literal
class SupervisorState(TypedDict):
messages: Annotated[list, add_messages]
next_agent: str
def supervisor_node(state: SupervisorState) -> dict:
"""监督者:决定下一个执行的 Agent"""
system_prompt = """你是一个任务调度器。根据当前对话状态,
选择下一个应该执行的 Agent:
- researcher: 需要搜索或调研信息时
- coder: 需要编写或修改代码时
- reviewer: 需要审查代码或方案时
- FINISH: 任务已完成
仅输出 Agent 名称。"""
messages = [
SystemMessage(content=system_prompt),
*state["messages"],
]
response = llm.invoke(messages)
next_agent = response.content.strip()
return {"next_agent": next_agent}
def researcher_node(state: SupervisorState) -> dict:
researcher_llm = llm.bind(system_message="你是研究员...")
response = researcher_llm.invoke(state["messages"])
return {"messages": [AIMessage(content=response.content, name="researcher")]}
def coder_node(state: SupervisorState) -> dict:
coder_llm = llm.bind(system_message="你是开发者...")
response = coder_llm.invoke(state["messages"])
return {"messages": [AIMessage(content=response.content, name="coder")]}
def reviewer_node(state: SupervisorState) -> dict:
reviewer_llm = llm.bind(system_message="你是审查员...")
response = reviewer_llm.invoke(state["messages"])
return {"messages": [AIMessage(content=response.content, name="reviewer")]}
# 构建图
builder = StateGraph(SupervisorState)
builder.add_node("supervisor", supervisor_node)
builder.add_node("researcher", researcher_node)
builder.add_node("coder", coder_node)
builder.add_node("reviewer", reviewer_node)
builder.add_edge(START, "supervisor")
# 监督者条件路由
def route_supervisor(state: SupervisorState) -> str:
return state["next_agent"]
builder.add_conditional_edges(
"supervisor",
route_supervisor,
{
"researcher": "researcher",
"coder": "coder",
"reviewer": "reviewer",
"FINISH": END,
},
)
# 所有 Agent 执行后回到监督者
builder.add_edge("researcher", "supervisor")
builder.add_edge("coder", "supervisor")
builder.add_edge("reviewer", "supervisor")
graph = builder.compile()
层级 Agent(Sub-Graph)
# 子图:研究团队内部的工作流
def build_research_subgraph():
class ResearchState(TypedDict):
messages: Annotated[list, add_messages]
research_notes: list[str]
builder = StateGraph(ResearchState)
builder.add_node("search", search_node)
builder.add_node("analyze", analyze_node)
builder.add_node("summarize", summarize_node)
builder.add_edge(START, "search")
builder.add_edge("search", "analyze")
def needs_more_search(state):
if len(state["research_notes"]) < 3:
return "search" # 信息不足,继续搜索
return "summarize" # 信息充足,生成摘要
builder.add_conditional_edges("analyze", needs_more_search)
builder.add_edge("summarize", END)
return builder.compile()
# 在主图中使用子图
research_subgraph = build_research_subgraph()
main_builder = StateGraph(MainState)
main_builder.add_node("research", research_subgraph) # 子图作为节点
main_builder.add_node("write", write_node)
main_builder.add_edge(START, "research")
main_builder.add_edge("research", "write")
main_builder.add_edge("write", END)
持久化与部署
数据库持久化
# SQLite 持久化
from langgraph.checkpoint.sqlite import SqliteSaver
with SqliteSaver.from_conn_string("checkpoints.db") as memory:
graph = builder.compile(checkpointer=memory)
result = graph.invoke(
{"messages": [HumanMessage(content="Hello")]},
config={"configurable": {"thread_id": "user-456"}},
)
# PostgreSQL 持久化(生产推荐)
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost:5432/langgraph"
) as memory:
graph = builder.compile(checkpointer=memory)
LangGraph Platform 部署
# langgraph.json - 部署配置
"""
{
"dependencies": ["langchain-openai", "langchain-community"],
"graphs": {
"agent": "./agent.py:graph"
},
"env": ".env"
}
"""
# 部署到 LangGraph Cloud
# langgraph up # 本地开发服务器
# langgraph deploy # 部署到云端
# 本地开发
pip install langgraph-cli
langgraph dev --config langgraph.json
# API 调用
curl -X POST http://localhost:8123/runs/stream \
-H "Content-Type: application/json" \
-d '{
"assistant_id": "agent",
"input": {"messages": [{"role": "user", "content": "Hello"}]},
"config": {"configurable": {"thread_id": "abc123"}}
}'
与其他框架对比
| 特性 | LangGraph | LangChain LCEL | AutoGen | CrewAI |
|---|---|---|---|---|
| 循环支持 | 原生 | 不支持 | 对话循环 | 不支持 |
| 状态管理 | 显式 TypedDict | 隐式 | 消息历史 | 隐式 |
| 持久化 | Checkpointer | 无 | 无 | 无 |
| Human-in-Loop | 原生支持 | 无 | 有 | 无 |
| 可视化 | Mermaid 图 | 无 | 无 | 无 |
| 部署 | LangGraph Platform | LangServe | 无 | 无 |
| 学习曲线 | 中等 | 低 | 低 | 低 |
最佳实践
- 状态设计:使用 TypedDict 明确定义状态字段,用 Annotated + reducer 处理列表追加
- 节点粒度:每个节点做一件事,保持可测试和可替换
- 条件边路由:基于状态字段路由,避免复杂的嵌套逻辑
- 检查点:生产环境必须配置持久化 checkpointer,否则重启丢失所有状态
- 错误处理:在节点内部处理异常,通过状态字段传递错误信息
- 迭代上限:所有循环必须设置
recursion_limit,防止无限循环
Maurice | maurice_wen@proton.me