LangGraph 深度实战:状态机驱动的 Agent

图结构 Agent 设计、状态管理与类型安全、条件边与循环、Human-in-the-Loop 与持久化

引言

LangChain 团队在 2024 年推出 LangGraph,将 Agent 的控制流从"一锅炖"的 AgentExecutor 重构为显式的状态图(StateGraph)。核心洞察是:Agent 的决策过程本质上是一个状态机——从一个状态转移到另一个状态,每个转移由条件决定。

LangGraph 让这个状态机变得可见、可控、可调试。你不再需要猜测 Agent 下一步会做什么,因为所有可能的路径都在图中显式定义。

核心概念

StateGraph 组成要素

┌──────────────────────────────────────────┐
│              StateGraph                   │
│                                          │
│  State (状态):                            │
│    TypedDict 定义的结构化数据              │
│    在整个图执行过程中传递和修改              │
│                                          │
│  Nodes (节点):                            │
│    Python 函数,接收 State,返回部分更新    │
│    每个节点是一个处理步骤                   │
│                                          │
│  Edges (边):                              │
│    Normal: A → B (无条件)                 │
│    Conditional: A → B|C|D (根据状态决定)   │
│                                          │
│  Special nodes:                          │
│    START: 入口                            │
│    END: 终止                              │
│                                          │
│  Reducers:                               │
│    定义同一字段多次更新时的合并策略          │
│    如: messages 使用 append 而非 replace    │
└──────────────────────────────────────────┘

状态设计原则

原则 说明 示例
类型安全 用 TypedDict + Annotated 定义 messages: Annotated[list, add_messages]
不可变语义 节点返回增量更新,不直接修改 return {"count": state["count"] + 1}
Reducer 明确 列表字段必须声明合并策略 operator.add 或自定义 reducer
最小化 只放必要的共享状态 局部变量留在节点内部

基础实战:ReAct Agent

带工具调用的 Agent

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from typing import TypedDict, Annotated, Literal
from langgraph.graph.message import add_messages

# Define state
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    iteration_count: int

# Define tools
from langchain_core.tools import tool

@tool
def search_web(query: str) -> str:
    """Search the web for current information."""
    # Real implementation would call a search API
    return f"Search results for: {query}"

@tool
def calculate(expression: str) -> str:
    """Evaluate a mathematical expression."""
    try:
        result = eval(expression)  # Use safer evaluation in production
        return str(result)
    except Exception as e:
        return f"Error: {e}"

@tool
def get_weather(city: str) -> str:
    """Get current weather for a city."""
    return f"Weather in {city}: Sunny, 22C"

tools = [search_web, calculate, get_weather]

# Configure LLM with tools
llm = ChatOpenAI(model="gpt-4o", temperature=0)
llm_with_tools = llm.bind_tools(tools)

# Agent node: decides what to do
async def agent(state: AgentState) -> dict:
    messages = state["messages"]
    response = await llm_with_tools.ainvoke(messages)
    return {
        "messages": [response],
        "iteration_count": state.get("iteration_count", 0) + 1,
    }

# Router: should we use tools or finish?
def should_continue(state: AgentState) -> Literal["tools", "end"]:
    last_message = state["messages"][-1]

    # Safety: prevent infinite loops
    if state.get("iteration_count", 0) > 10:
        return "end"

    # If the LLM made tool calls, route to tool execution
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"

    return "end"

# Build graph
workflow = StateGraph(AgentState)

# Add nodes
workflow.add_node("agent", agent)
workflow.add_node("tools", ToolNode(tools))

# Add edges
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {"tools": "tools", "end": END},
)
workflow.add_edge("tools", "agent")  # After tools, go back to agent

# Compile
app = workflow.compile()

# Execute
result = await app.ainvoke({
    "messages": [HumanMessage(content="What's the weather in Tokyo and what is 42 * 17?")],
    "iteration_count": 0,
})

高级模式

Human-in-the-Loop

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

class ApprovalState(TypedDict):
    messages: Annotated[list, add_messages]
    plan: str
    approved: bool
    execution_result: str

async def create_plan(state: ApprovalState) -> dict:
    """Agent creates an action plan."""
    prompt = f"Create a plan for: {state['messages'][-1].content}"
    plan = await llm.ainvoke(prompt)
    return {"plan": plan.content}

async def execute_plan(state: ApprovalState) -> dict:
    """Execute the approved plan."""
    result = await execute(state["plan"])
    return {"execution_result": result}

def check_approval(state: ApprovalState) -> str:
    if state.get("approved"):
        return "execute"
    return "wait_for_human"

# Build graph with interrupt
workflow = StateGraph(ApprovalState)
workflow.add_node("plan", create_plan)
workflow.add_node("execute", execute_plan)

workflow.add_edge(START, "plan")
workflow.add_conditional_edges("plan", check_approval, {
    "execute": "execute",
    "wait_for_human": END,  # Pause here for human input
})
workflow.add_edge("execute", END)

# Compile with checkpointing for pause/resume
memory = MemorySaver()
app = workflow.compile(
    checkpointer=memory,
    interrupt_before=["execute"],  # Interrupt before execution
)

# Run until interrupt
config = {"configurable": {"thread_id": "approval-1"}}
result = await app.ainvoke(
    {"messages": [HumanMessage("Deploy new version to production")]},
    config=config,
)

# Show plan to human
print(f"Plan: {result['plan']}")
# Human reviews and approves...

# Resume with approval
await app.aupdate_state(config, {"approved": True})
result = await app.ainvoke(None, config)  # Resume from checkpoint
print(f"Result: {result['execution_result']}")

子图组合

# Define a sub-graph for research tasks
class ResearchState(TypedDict):
    query: str
    sources: list[str]
    summary: str

async def search(state: ResearchState) -> dict:
    results = await web_search(state["query"])
    return {"sources": results}

async def summarize(state: ResearchState) -> dict:
    summary = await llm.ainvoke(f"Summarize: {state['sources']}")
    return {"summary": summary.content}

research_graph = StateGraph(ResearchState)
research_graph.add_node("search", search)
research_graph.add_node("summarize", summarize)
research_graph.add_edge(START, "search")
research_graph.add_edge("search", "summarize")
research_graph.add_edge("summarize", END)
research_subgraph = research_graph.compile()

# Main graph uses sub-graph as a node
class MainState(TypedDict):
    messages: Annotated[list, add_messages]
    research_result: str

async def do_research(state: MainState) -> dict:
    query = state["messages"][-1].content
    result = await research_subgraph.ainvoke({"query": query, "sources": [], "summary": ""})
    return {"research_result": result["summary"]}

async def respond(state: MainState) -> dict:
    response = await llm.ainvoke(
        f"Based on research: {state['research_result']}\n"
        f"Answer: {state['messages'][-1].content}"
    )
    return {"messages": [AIMessage(content=response.content)]}

main_graph = StateGraph(MainState)
main_graph.add_node("research", do_research)
main_graph.add_node("respond", respond)
main_graph.add_edge(START, "research")
main_graph.add_edge("research", "respond")
main_graph.add_edge("respond", END)
app = main_graph.compile()

持久化与恢复

# Production persistence with SQLite/PostgreSQL
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver

# SQLite (development)
sqlite_saver = SqliteSaver.from_conn_string("./checkpoints.db")

# PostgreSQL (production)
postgres_saver = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost:5432/langgraph"
)

# Compile with persistent checkpointing
app = workflow.compile(checkpointer=postgres_saver)

# Each thread_id maintains its own conversation state
config_user_1 = {"configurable": {"thread_id": "user-001-conv-1"}}
config_user_2 = {"configurable": {"thread_id": "user-002-conv-1"}}

# User 1's conversation
await app.ainvoke(
    {"messages": [HumanMessage("Hello, I'm Alice")]},
    config_user_1,
)

# User 2's separate conversation
await app.ainvoke(
    {"messages": [HumanMessage("Hello, I'm Bob")]},
    config_user_2,
)

# Resume User 1's conversation later
await app.ainvoke(
    {"messages": [HumanMessage("What's my name?")]},  # Should know "Alice"
    config_user_1,
)

# Inspect state at any point
state = await app.aget_state(config_user_1)
print(f"Messages: {len(state.values['messages'])}")
print(f"Next node: {state.next}")

# Time travel: get state history
async for snapshot in app.aget_state_history(config_user_1):
    print(f"Step: {snapshot.metadata['step']}, Nodes: {snapshot.next}")

调试与可观测性

流式事件监听

# Stream execution events for real-time monitoring
async for event in app.astream_events(
    {"messages": [HumanMessage("Research quantum computing")]},
    config={"configurable": {"thread_id": "debug-1"}},
    version="v2",
):
    kind = event["event"]

    if kind == "on_chain_start":
        print(f"[START] {event['name']}")

    elif kind == "on_chain_end":
        print(f"[END] {event['name']}, output keys: {list(event['data']['output'].keys())}")

    elif kind == "on_chat_model_stream":
        # Real-time token streaming
        chunk = event["data"]["chunk"]
        if chunk.content:
            print(chunk.content, end="", flush=True)

    elif kind == "on_tool_start":
        print(f"[TOOL] {event['name']}: {event['data']['input']}")

    elif kind == "on_tool_end":
        print(f"[TOOL RESULT] {event['name']}: {event['data']['output'][:100]}")

图可视化

# Generate Mermaid diagram of the graph
print(app.get_graph().draw_mermaid())

# Output:
# %%{init: {'flowchart': {'curve': 'linear'}}}%%
# graph TD;
#   __start__([__start__]):::first
#   agent(agent)
#   tools(tools)
#   __end__([__end__]):::last
#   __start__ --> agent;
#   tools --> agent;
#   agent -. tools .-> tools;
#   agent -. end .-> __end__;

# Save as PNG (requires graphviz)
from IPython.display import Image
img = app.get_graph().draw_mermaid_png()
with open("agent_graph.png", "wb") as f:
    f.write(img)

设计模式总结

模式 何时使用 关键实现
ReAct Loop 工具调用 Agent 条件边 + 循环
Plan-Execute 复杂任务分解 两阶段:规划 + 执行
Reflection 自我改进 反思节点 + 条件重试
HITL 高风险操作 interrupt_before + 检查点
Map-Reduce 批量并行处理 Fan-out + Fan-in 边
Sub-graph 模块化复用 嵌套 StateGraph
Multi-Agent 角色分工 多个 Agent 节点 + 路由

总结

  1. 状态图让 Agent 可控:所有可能的执行路径显式定义在图中,不再依赖 LLM 的"灵感"。
  2. TypedDict 状态是基础:类型安全的状态定义让 bug 在编译时暴露,而非运行时。
  3. 条件边是决策点:Agent 的"智能"体现在条件边的路由函数中,根据状态选择下一步。
  4. 持久化实现断点续传:Checkpointer 让长时间运行的 Agent 可以暂停、恢复和回溯。
  5. 子图实现模块化:复杂的 Agent 系统应该拆分为可复用的子图,降低认知复杂度。

Maurice | maurice_wen@proton.me