多智能体编排模式:从串行到蜂群

串行/并行/层级/蜂群四大编排模式、LangGraph 状态机编排、CrewAI 角色协作与故障处理实战

引言

单一 Agent 的能力上限由其上下文窗口和工具集决定。当任务复杂度超过单 Agent 的能力边界时,需要多个 Agent 协作完成——就像一个团队比一个人能处理更复杂的项目一样。

多智能体编排(Multi-Agent Orchestration)的核心问题是:如何让多个 Agent 高效协作,同时保持可控性和可观测性?本文系统对比四种编排模式,并给出 LangGraph 和 CrewAI 的实战代码。

编排模式全景

四种核心模式

模式 1: 串行 (Sequential / Pipeline)
  Agent A ──→ Agent B ──→ Agent C ──→ 结果
  特点: 简单可预测, 总延迟 = 各阶段之和

模式 2: 并行 (Parallel / Fan-out Fan-in)
          ┌──→ Agent B ──┐
  Agent A ─┤              ├──→ Agent D (汇总)
          └──→ Agent C ──┘
  特点: 高吞吐, 总延迟 = 最慢分支

模式 3: 层级 (Hierarchical / Manager-Worker)
              ┌── Agent B
  Agent A ────┤── Agent C
  (Manager)   └── Agent D
  特点: 动态分派, Manager 协调全局

模式 4: 蜂群 (Swarm / Handoff)
  Agent A ←──→ Agent B
     ↕            ↕
  Agent C ←──→ Agent D
  特点: 去中心化, 任意节点可交接

模式选型对比

维度 串行 并行 层级 蜂群
复杂度
延迟 高(顺序累加) 低(最慢分支) 不确定
可控性
灵活性 极高
调试难度
适用场景 固定流程 独立子任务 复杂项目管理 对话式协作
示例 翻译流水线 多源研究 软件开发团队 客服路由

LangGraph 编排实战

串行流水线

# Multi-agent translation pipeline
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator

class TranslationState(TypedDict):
    source_text: str
    source_lang: str
    target_lang: str
    initial_translation: str
    review_feedback: str
    final_translation: str
    quality_score: float
    messages: Annotated[list, operator.add]

# Agent 1: Translator
async def translate(state: TranslationState) -> dict:
    prompt = f"""Translate the following {state['source_lang']} text to {state['target_lang']}.
Focus on accuracy and naturalness.

Text: {state['source_text']}"""

    result = await call_llm(prompt, model="gpt-4o")
    return {"initial_translation": result, "messages": [f"Translator: completed"]}

# Agent 2: Reviewer
async def review(state: TranslationState) -> dict:
    prompt = f"""Review this translation for accuracy, fluency, and cultural appropriateness.

Source ({state['source_lang']}): {state['source_text']}
Translation ({state['target_lang']}): {state['initial_translation']}

Provide specific feedback on:
1. Accuracy errors
2. Fluency issues
3. Cultural adaptation needs"""

    feedback = await call_llm(prompt, model="claude-sonnet-4-20250514")
    return {"review_feedback": feedback, "messages": [f"Reviewer: completed"]}

# Agent 3: Editor
async def edit(state: TranslationState) -> dict:
    prompt = f"""Improve this translation based on the reviewer's feedback.

Original: {state['source_text']}
Current translation: {state['initial_translation']}
Feedback: {state['review_feedback']}

Produce the final polished translation."""

    final = await call_llm(prompt, model="gpt-4o")
    return {"final_translation": final, "messages": [f"Editor: completed"]}

# Agent 4: Quality Assessor
async def assess_quality(state: TranslationState) -> dict:
    prompt = f"""Rate this translation on a scale of 0-1.

Source: {state['source_text']}
Translation: {state['final_translation']}

Respond with only a number."""

    score = float(await call_llm(prompt, model="gpt-4o-mini"))
    return {"quality_score": score, "messages": [f"QA: score={score:.2f}"]}

# Build graph
workflow = StateGraph(TranslationState)
workflow.add_node("translate", translate)
workflow.add_node("review", review)
workflow.add_node("edit", edit)
workflow.add_node("assess", assess_quality)

workflow.add_edge(START, "translate")
workflow.add_edge("translate", "review")
workflow.add_edge("review", "edit")
workflow.add_edge("edit", "assess")

# Conditional: re-translate if quality too low
def should_retry(state: TranslationState) -> str:
    if state.get("quality_score", 0) < 0.85:
        return "translate"  # Retry from scratch
    return END

workflow.add_conditional_edges("assess", should_retry)

app = workflow.compile()

并行研究

# Parallel research with fan-out/fan-in
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator

class ResearchState(TypedDict):
    query: str
    web_results: str
    academic_results: str
    code_results: str
    synthesis: str
    messages: Annotated[list, operator.add]

# Parallel research agents
async def search_web(state: ResearchState) -> dict:
    results = await web_search(state["query"])
    summary = await call_llm(
        f"Summarize these web results for: {state['query']}\n{results}"
    )
    return {"web_results": summary, "messages": ["Web researcher: done"]}

async def search_academic(state: ResearchState) -> dict:
    papers = await arxiv_search(state["query"])
    summary = await call_llm(
        f"Summarize these academic papers for: {state['query']}\n{papers}"
    )
    return {"academic_results": summary, "messages": ["Academic researcher: done"]}

async def search_code(state: ResearchState) -> dict:
    repos = await github_search(state["query"])
    summary = await call_llm(
        f"Summarize these code repositories for: {state['query']}\n{repos}"
    )
    return {"code_results": summary, "messages": ["Code researcher: done"]}

# Synthesis agent (fan-in)
async def synthesize(state: ResearchState) -> dict:
    prompt = f"""Synthesize findings from multiple sources about: {state['query']}

Web findings: {state['web_results']}
Academic findings: {state['academic_results']}
Code examples: {state['code_results']}

Produce a comprehensive analysis."""

    result = await call_llm(prompt, model="claude-sonnet-4-20250514")
    return {"synthesis": result, "messages": ["Synthesizer: done"]}

# Build parallel graph
workflow = StateGraph(ResearchState)
workflow.add_node("search_web", search_web)
workflow.add_node("search_academic", search_academic)
workflow.add_node("search_code", search_code)
workflow.add_node("synthesize", synthesize)

# Fan-out: all three search agents run in parallel
workflow.add_edge(START, "search_web")
workflow.add_edge(START, "search_academic")
workflow.add_edge(START, "search_code")

# Fan-in: all three must complete before synthesis
workflow.add_edge("search_web", "synthesize")
workflow.add_edge("search_academic", "synthesize")
workflow.add_edge("search_code", "synthesize")
workflow.add_edge("synthesize", END)

app = workflow.compile()

层级管理模式

# Hierarchical: Manager delegates to specialized workers
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Literal

class ProjectState(TypedDict):
    task_description: str
    plan: str
    subtasks: list[dict]
    results: dict[str, str]
    final_output: str

async def manager_plan(state: ProjectState) -> dict:
    """Manager analyzes task and creates execution plan."""
    prompt = f"""You are a project manager. Analyze this task and break it into subtasks.
Assign each subtask to the most appropriate specialist: researcher, coder, writer, reviewer.

Task: {state['task_description']}

Output as JSON: [{{"subtask": "...", "assignee": "researcher|coder|writer|reviewer", "priority": 1}}]"""

    plan = await call_llm(prompt, model="claude-sonnet-4-20250514")
    subtasks = parse_json(plan)
    return {"plan": plan, "subtasks": subtasks}

async def route_to_worker(state: ProjectState) -> str:
    """Route to the next unfinished subtask's assignee."""
    for task in state.get("subtasks", []):
        if task["subtask"] not in state.get("results", {}):
            return task["assignee"]
    return "manager_review"

async def researcher(state: ProjectState) -> dict:
    task = get_current_task(state, "researcher")
    result = await call_llm(f"Research: {task['subtask']}")
    results = state.get("results", {})
    results[task["subtask"]] = result
    return {"results": results}

async def coder(state: ProjectState) -> dict:
    task = get_current_task(state, "coder")
    result = await call_llm(f"Write code for: {task['subtask']}")
    results = state.get("results", {})
    results[task["subtask"]] = result
    return {"results": results}

async def writer(state: ProjectState) -> dict:
    task = get_current_task(state, "writer")
    result = await call_llm(f"Write documentation for: {task['subtask']}")
    results = state.get("results", {})
    results[task["subtask"]] = result
    return {"results": results}

async def manager_review(state: ProjectState) -> dict:
    """Manager reviews all results and produces final output."""
    prompt = f"""Review all completed subtasks and produce the final deliverable.

Original task: {state['task_description']}
Plan: {state['plan']}
Results: {state['results']}"""

    final = await call_llm(prompt, model="claude-sonnet-4-20250514")
    return {"final_output": final}

# Build hierarchical graph
workflow = StateGraph(ProjectState)
workflow.add_node("manager_plan", manager_plan)
workflow.add_node("researcher", researcher)
workflow.add_node("coder", coder)
workflow.add_node("writer", writer)
workflow.add_node("manager_review", manager_review)

workflow.add_edge(START, "manager_plan")
workflow.add_conditional_edges("manager_plan", route_to_worker)
workflow.add_conditional_edges("researcher", route_to_worker)
workflow.add_conditional_edges("coder", route_to_worker)
workflow.add_conditional_edges("writer", route_to_worker)
workflow.add_edge("manager_review", END)

app = workflow.compile()

故障处理

Agent 级别容错

# Resilient agent wrapper
from langgraph.graph import StateGraph
import asyncio

class AgentError(Exception):
    def __init__(self, agent_name: str, original_error: Exception):
        self.agent_name = agent_name
        self.original_error = original_error
        super().__init__(f"Agent {agent_name} failed: {original_error}")

def resilient_agent(
    agent_fn,
    name: str,
    max_retries: int = 2,
    timeout_seconds: int = 60,
    fallback_fn=None,
):
    """Wrap an agent function with retry, timeout, and fallback."""
    async def wrapper(state):
        last_error = None

        for attempt in range(max_retries + 1):
            try:
                result = await asyncio.wait_for(
                    agent_fn(state),
                    timeout=timeout_seconds,
                )
                return result
            except asyncio.TimeoutError:
                last_error = TimeoutError(f"Timeout after {timeout_seconds}s")
            except Exception as e:
                last_error = e

            if attempt < max_retries:
                await asyncio.sleep(2 ** attempt)  # Exponential backoff

        # All retries exhausted, try fallback
        if fallback_fn:
            try:
                return await fallback_fn(state)
            except Exception:
                pass

        raise AgentError(name, last_error)

    wrapper.__name__ = name
    return wrapper

全局状态恢复

# Checkpoint and recovery with LangGraph persistence
from langgraph.checkpoint.sqlite import SqliteSaver

# Create checkpointer
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")

# Compile graph with checkpointing
app = workflow.compile(checkpointer=checkpointer)

# Run with thread_id for persistence
config = {"configurable": {"thread_id": "project-123"}}

# If interrupted, resume from last checkpoint
try:
    result = await app.ainvoke(initial_state, config)
except Exception as e:
    print(f"Interrupted at: {e}")
    # Resume from checkpoint
    state = await app.aget_state(config)
    print(f"Last completed step: {state.next}")
    result = await app.ainvoke(None, config)  # Resume

编排模式选型决策

选型决策树:

任务特征分析
  │
  ├─ 步骤固定,顺序明确?
  │   └─ YES → 串行流水线
  │
  ├─ 多个独立子任务?
  │   └─ YES → 并行 (Fan-out/Fan-in)
  │
  ├─ 需要动态分派和协调?
  │   ├─ 有明确的协调者角色?
  │   │   └─ YES → 层级 (Manager-Worker)
  │   └─ 任务在 Agent 间流转?
  │       └─ YES → 蜂群 (Swarm/Handoff)
  │
  └─ 混合模式?
      └─ 大多数实际系统是多种模式的组合

总结

  1. 从简单模式开始:串行流水线是最容易理解和调试的模式,除非有明确的并行需求,否则先用串行。
  2. 并行要有汇总点:Fan-out 必须配合 Fan-in,否则结果散落各处无法整合。
  3. 层级模式最像人类团队:Manager 负责规划和协调,Worker 负责执行,适合复杂项目。
  4. 蜂群模式灵活但难调试:去中心化意味着执行路径不确定,需要完善的日志和追踪。
  5. 容错是必需的:每个 Agent 都可能失败,重试、超时和降级机制必须从一开始就设计进去。

Maurice | maurice_wen@proton.me