LangGraph 深度实战:状态机驱动的 Agent
原创
灵阙教研团队
S 精选 提升 |
约 7 分钟阅读
更新于 2026-02-28 AI 导读
LangGraph 深度实战:状态机驱动的 Agent 图结构 Agent 设计、状态管理与类型安全、条件边与循环、Human-in-the-Loop 与持久化 引言 LangChain 团队在 2024 年推出 LangGraph,将 Agent 的控制流从"一锅炖"的 AgentExecutor 重构为显式的状态图(StateGraph)。核心洞察是:Agent...
LangGraph 深度实战:状态机驱动的 Agent
图结构 Agent 设计、状态管理与类型安全、条件边与循环、Human-in-the-Loop 与持久化
引言
LangChain 团队在 2024 年推出 LangGraph,将 Agent 的控制流从"一锅炖"的 AgentExecutor 重构为显式的状态图(StateGraph)。核心洞察是:Agent 的决策过程本质上是一个状态机——从一个状态转移到另一个状态,每个转移由条件决定。
LangGraph 让这个状态机变得可见、可控、可调试。你不再需要猜测 Agent 下一步会做什么,因为所有可能的路径都在图中显式定义。
核心概念
StateGraph 组成要素
┌──────────────────────────────────────────┐
│ StateGraph │
│ │
│ State (状态): │
│ TypedDict 定义的结构化数据 │
│ 在整个图执行过程中传递和修改 │
│ │
│ Nodes (节点): │
│ Python 函数,接收 State,返回部分更新 │
│ 每个节点是一个处理步骤 │
│ │
│ Edges (边): │
│ Normal: A → B (无条件) │
│ Conditional: A → B|C|D (根据状态决定) │
│ │
│ Special nodes: │
│ START: 入口 │
│ END: 终止 │
│ │
│ Reducers: │
│ 定义同一字段多次更新时的合并策略 │
│ 如: messages 使用 append 而非 replace │
└──────────────────────────────────────────┘
状态设计原则
| 原则 | 说明 | 示例 |
|---|---|---|
| 类型安全 | 用 TypedDict + Annotated 定义 | messages: Annotated[list, add_messages] |
| 不可变语义 | 节点返回增量更新,不直接修改 | return {"count": state["count"] + 1} |
| Reducer 明确 | 列表字段必须声明合并策略 | operator.add 或自定义 reducer |
| 最小化 | 只放必要的共享状态 | 局部变量留在节点内部 |
基础实战:ReAct Agent
带工具调用的 Agent
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from typing import TypedDict, Annotated, Literal
from langgraph.graph.message import add_messages
# Define state
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
iteration_count: int
# Define tools
from langchain_core.tools import tool
@tool
def search_web(query: str) -> str:
"""Search the web for current information."""
# Real implementation would call a search API
return f"Search results for: {query}"
@tool
def calculate(expression: str) -> str:
"""Evaluate a mathematical expression."""
try:
result = eval(expression) # Use safer evaluation in production
return str(result)
except Exception as e:
return f"Error: {e}"
@tool
def get_weather(city: str) -> str:
"""Get current weather for a city."""
return f"Weather in {city}: Sunny, 22C"
tools = [search_web, calculate, get_weather]
# Configure LLM with tools
llm = ChatOpenAI(model="gpt-4o", temperature=0)
llm_with_tools = llm.bind_tools(tools)
# Agent node: decides what to do
async def agent(state: AgentState) -> dict:
messages = state["messages"]
response = await llm_with_tools.ainvoke(messages)
return {
"messages": [response],
"iteration_count": state.get("iteration_count", 0) + 1,
}
# Router: should we use tools or finish?
def should_continue(state: AgentState) -> Literal["tools", "end"]:
last_message = state["messages"][-1]
# Safety: prevent infinite loops
if state.get("iteration_count", 0) > 10:
return "end"
# If the LLM made tool calls, route to tool execution
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return "end"
# Build graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("agent", agent)
workflow.add_node("tools", ToolNode(tools))
# Add edges
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
"agent",
should_continue,
{"tools": "tools", "end": END},
)
workflow.add_edge("tools", "agent") # After tools, go back to agent
# Compile
app = workflow.compile()
# Execute
result = await app.ainvoke({
"messages": [HumanMessage(content="What's the weather in Tokyo and what is 42 * 17?")],
"iteration_count": 0,
})
高级模式
Human-in-the-Loop
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
class ApprovalState(TypedDict):
messages: Annotated[list, add_messages]
plan: str
approved: bool
execution_result: str
async def create_plan(state: ApprovalState) -> dict:
"""Agent creates an action plan."""
prompt = f"Create a plan for: {state['messages'][-1].content}"
plan = await llm.ainvoke(prompt)
return {"plan": plan.content}
async def execute_plan(state: ApprovalState) -> dict:
"""Execute the approved plan."""
result = await execute(state["plan"])
return {"execution_result": result}
def check_approval(state: ApprovalState) -> str:
if state.get("approved"):
return "execute"
return "wait_for_human"
# Build graph with interrupt
workflow = StateGraph(ApprovalState)
workflow.add_node("plan", create_plan)
workflow.add_node("execute", execute_plan)
workflow.add_edge(START, "plan")
workflow.add_conditional_edges("plan", check_approval, {
"execute": "execute",
"wait_for_human": END, # Pause here for human input
})
workflow.add_edge("execute", END)
# Compile with checkpointing for pause/resume
memory = MemorySaver()
app = workflow.compile(
checkpointer=memory,
interrupt_before=["execute"], # Interrupt before execution
)
# Run until interrupt
config = {"configurable": {"thread_id": "approval-1"}}
result = await app.ainvoke(
{"messages": [HumanMessage("Deploy new version to production")]},
config=config,
)
# Show plan to human
print(f"Plan: {result['plan']}")
# Human reviews and approves...
# Resume with approval
await app.aupdate_state(config, {"approved": True})
result = await app.ainvoke(None, config) # Resume from checkpoint
print(f"Result: {result['execution_result']}")
子图组合
# Define a sub-graph for research tasks
class ResearchState(TypedDict):
query: str
sources: list[str]
summary: str
async def search(state: ResearchState) -> dict:
results = await web_search(state["query"])
return {"sources": results}
async def summarize(state: ResearchState) -> dict:
summary = await llm.ainvoke(f"Summarize: {state['sources']}")
return {"summary": summary.content}
research_graph = StateGraph(ResearchState)
research_graph.add_node("search", search)
research_graph.add_node("summarize", summarize)
research_graph.add_edge(START, "search")
research_graph.add_edge("search", "summarize")
research_graph.add_edge("summarize", END)
research_subgraph = research_graph.compile()
# Main graph uses sub-graph as a node
class MainState(TypedDict):
messages: Annotated[list, add_messages]
research_result: str
async def do_research(state: MainState) -> dict:
query = state["messages"][-1].content
result = await research_subgraph.ainvoke({"query": query, "sources": [], "summary": ""})
return {"research_result": result["summary"]}
async def respond(state: MainState) -> dict:
response = await llm.ainvoke(
f"Based on research: {state['research_result']}\n"
f"Answer: {state['messages'][-1].content}"
)
return {"messages": [AIMessage(content=response.content)]}
main_graph = StateGraph(MainState)
main_graph.add_node("research", do_research)
main_graph.add_node("respond", respond)
main_graph.add_edge(START, "research")
main_graph.add_edge("research", "respond")
main_graph.add_edge("respond", END)
app = main_graph.compile()
持久化与恢复
# Production persistence with SQLite/PostgreSQL
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver
# SQLite (development)
sqlite_saver = SqliteSaver.from_conn_string("./checkpoints.db")
# PostgreSQL (production)
postgres_saver = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost:5432/langgraph"
)
# Compile with persistent checkpointing
app = workflow.compile(checkpointer=postgres_saver)
# Each thread_id maintains its own conversation state
config_user_1 = {"configurable": {"thread_id": "user-001-conv-1"}}
config_user_2 = {"configurable": {"thread_id": "user-002-conv-1"}}
# User 1's conversation
await app.ainvoke(
{"messages": [HumanMessage("Hello, I'm Alice")]},
config_user_1,
)
# User 2's separate conversation
await app.ainvoke(
{"messages": [HumanMessage("Hello, I'm Bob")]},
config_user_2,
)
# Resume User 1's conversation later
await app.ainvoke(
{"messages": [HumanMessage("What's my name?")]}, # Should know "Alice"
config_user_1,
)
# Inspect state at any point
state = await app.aget_state(config_user_1)
print(f"Messages: {len(state.values['messages'])}")
print(f"Next node: {state.next}")
# Time travel: get state history
async for snapshot in app.aget_state_history(config_user_1):
print(f"Step: {snapshot.metadata['step']}, Nodes: {snapshot.next}")
调试与可观测性
流式事件监听
# Stream execution events for real-time monitoring
async for event in app.astream_events(
{"messages": [HumanMessage("Research quantum computing")]},
config={"configurable": {"thread_id": "debug-1"}},
version="v2",
):
kind = event["event"]
if kind == "on_chain_start":
print(f"[START] {event['name']}")
elif kind == "on_chain_end":
print(f"[END] {event['name']}, output keys: {list(event['data']['output'].keys())}")
elif kind == "on_chat_model_stream":
# Real-time token streaming
chunk = event["data"]["chunk"]
if chunk.content:
print(chunk.content, end="", flush=True)
elif kind == "on_tool_start":
print(f"[TOOL] {event['name']}: {event['data']['input']}")
elif kind == "on_tool_end":
print(f"[TOOL RESULT] {event['name']}: {event['data']['output'][:100]}")
图可视化
# Generate Mermaid diagram of the graph
print(app.get_graph().draw_mermaid())
# Output:
# %%{init: {'flowchart': {'curve': 'linear'}}}%%
# graph TD;
# __start__([__start__]):::first
# agent(agent)
# tools(tools)
# __end__([__end__]):::last
# __start__ --> agent;
# tools --> agent;
# agent -. tools .-> tools;
# agent -. end .-> __end__;
# Save as PNG (requires graphviz)
from IPython.display import Image
img = app.get_graph().draw_mermaid_png()
with open("agent_graph.png", "wb") as f:
f.write(img)
设计模式总结
| 模式 | 何时使用 | 关键实现 |
|---|---|---|
| ReAct Loop | 工具调用 Agent | 条件边 + 循环 |
| Plan-Execute | 复杂任务分解 | 两阶段:规划 + 执行 |
| Reflection | 自我改进 | 反思节点 + 条件重试 |
| HITL | 高风险操作 | interrupt_before + 检查点 |
| Map-Reduce | 批量并行处理 | Fan-out + Fan-in 边 |
| Sub-graph | 模块化复用 | 嵌套 StateGraph |
| Multi-Agent | 角色分工 | 多个 Agent 节点 + 路由 |
总结
- 状态图让 Agent 可控:所有可能的执行路径显式定义在图中,不再依赖 LLM 的"灵感"。
- TypedDict 状态是基础:类型安全的状态定义让 bug 在编译时暴露,而非运行时。
- 条件边是决策点:Agent 的"智能"体现在条件边的路由函数中,根据状态选择下一步。
- 持久化实现断点续传:Checkpointer 让长时间运行的 Agent 可以暂停、恢复和回溯。
- 子图实现模块化:复杂的 Agent 系统应该拆分为可复用的子图,降低认知复杂度。
Maurice | maurice_wen@proton.me