多智能体编排模式:从串行到蜂群
原创
灵阙教研团队
S 精选 进阶 |
约 8 分钟阅读
更新于 2026-02-28 AI 导读
多智能体编排模式:从串行到蜂群 串行/并行/层级/蜂群四大编排模式、LangGraph 状态机编排、CrewAI 角色协作与故障处理实战 引言 单一 Agent 的能力上限由其上下文窗口和工具集决定。当任务复杂度超过单 Agent 的能力边界时,需要多个 Agent 协作完成——就像一个团队比一个人能处理更复杂的项目一样。 多智能体编排(Multi-Agent...
多智能体编排模式:从串行到蜂群
串行/并行/层级/蜂群四大编排模式、LangGraph 状态机编排、CrewAI 角色协作与故障处理实战
引言
单一 Agent 的能力上限由其上下文窗口和工具集决定。当任务复杂度超过单 Agent 的能力边界时,需要多个 Agent 协作完成——就像一个团队比一个人能处理更复杂的项目一样。
多智能体编排(Multi-Agent Orchestration)的核心问题是:如何让多个 Agent 高效协作,同时保持可控性和可观测性?本文系统对比四种编排模式,并给出 LangGraph 和 CrewAI 的实战代码。
编排模式全景
四种核心模式
模式 1: 串行 (Sequential / Pipeline)
Agent A ──→ Agent B ──→ Agent C ──→ 结果
特点: 简单可预测, 总延迟 = 各阶段之和
模式 2: 并行 (Parallel / Fan-out Fan-in)
┌──→ Agent B ──┐
Agent A ─┤ ├──→ Agent D (汇总)
└──→ Agent C ──┘
特点: 高吞吐, 总延迟 = 最慢分支
模式 3: 层级 (Hierarchical / Manager-Worker)
┌── Agent B
Agent A ────┤── Agent C
(Manager) └── Agent D
特点: 动态分派, Manager 协调全局
模式 4: 蜂群 (Swarm / Handoff)
Agent A ←──→ Agent B
↕ ↕
Agent C ←──→ Agent D
特点: 去中心化, 任意节点可交接
模式选型对比
| 维度 | 串行 | 并行 | 层级 | 蜂群 |
|---|---|---|---|---|
| 复杂度 | 低 | 中 | 中 | 高 |
| 延迟 | 高(顺序累加) | 低(最慢分支) | 中 | 不确定 |
| 可控性 | 高 | 高 | 中 | 低 |
| 灵活性 | 低 | 中 | 高 | 极高 |
| 调试难度 | 低 | 中 | 中 | 高 |
| 适用场景 | 固定流程 | 独立子任务 | 复杂项目管理 | 对话式协作 |
| 示例 | 翻译流水线 | 多源研究 | 软件开发团队 | 客服路由 |
LangGraph 编排实战
串行流水线
# Multi-agent translation pipeline
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
class TranslationState(TypedDict):
source_text: str
source_lang: str
target_lang: str
initial_translation: str
review_feedback: str
final_translation: str
quality_score: float
messages: Annotated[list, operator.add]
# Agent 1: Translator
async def translate(state: TranslationState) -> dict:
prompt = f"""Translate the following {state['source_lang']} text to {state['target_lang']}.
Focus on accuracy and naturalness.
Text: {state['source_text']}"""
result = await call_llm(prompt, model="gpt-4o")
return {"initial_translation": result, "messages": [f"Translator: completed"]}
# Agent 2: Reviewer
async def review(state: TranslationState) -> dict:
prompt = f"""Review this translation for accuracy, fluency, and cultural appropriateness.
Source ({state['source_lang']}): {state['source_text']}
Translation ({state['target_lang']}): {state['initial_translation']}
Provide specific feedback on:
1. Accuracy errors
2. Fluency issues
3. Cultural adaptation needs"""
feedback = await call_llm(prompt, model="claude-sonnet-4-20250514")
return {"review_feedback": feedback, "messages": [f"Reviewer: completed"]}
# Agent 3: Editor
async def edit(state: TranslationState) -> dict:
prompt = f"""Improve this translation based on the reviewer's feedback.
Original: {state['source_text']}
Current translation: {state['initial_translation']}
Feedback: {state['review_feedback']}
Produce the final polished translation."""
final = await call_llm(prompt, model="gpt-4o")
return {"final_translation": final, "messages": [f"Editor: completed"]}
# Agent 4: Quality Assessor
async def assess_quality(state: TranslationState) -> dict:
prompt = f"""Rate this translation on a scale of 0-1.
Source: {state['source_text']}
Translation: {state['final_translation']}
Respond with only a number."""
score = float(await call_llm(prompt, model="gpt-4o-mini"))
return {"quality_score": score, "messages": [f"QA: score={score:.2f}"]}
# Build graph
workflow = StateGraph(TranslationState)
workflow.add_node("translate", translate)
workflow.add_node("review", review)
workflow.add_node("edit", edit)
workflow.add_node("assess", assess_quality)
workflow.add_edge(START, "translate")
workflow.add_edge("translate", "review")
workflow.add_edge("review", "edit")
workflow.add_edge("edit", "assess")
# Conditional: re-translate if quality too low
def should_retry(state: TranslationState) -> str:
if state.get("quality_score", 0) < 0.85:
return "translate" # Retry from scratch
return END
workflow.add_conditional_edges("assess", should_retry)
app = workflow.compile()
并行研究
# Parallel research with fan-out/fan-in
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
class ResearchState(TypedDict):
query: str
web_results: str
academic_results: str
code_results: str
synthesis: str
messages: Annotated[list, operator.add]
# Parallel research agents
async def search_web(state: ResearchState) -> dict:
results = await web_search(state["query"])
summary = await call_llm(
f"Summarize these web results for: {state['query']}\n{results}"
)
return {"web_results": summary, "messages": ["Web researcher: done"]}
async def search_academic(state: ResearchState) -> dict:
papers = await arxiv_search(state["query"])
summary = await call_llm(
f"Summarize these academic papers for: {state['query']}\n{papers}"
)
return {"academic_results": summary, "messages": ["Academic researcher: done"]}
async def search_code(state: ResearchState) -> dict:
repos = await github_search(state["query"])
summary = await call_llm(
f"Summarize these code repositories for: {state['query']}\n{repos}"
)
return {"code_results": summary, "messages": ["Code researcher: done"]}
# Synthesis agent (fan-in)
async def synthesize(state: ResearchState) -> dict:
prompt = f"""Synthesize findings from multiple sources about: {state['query']}
Web findings: {state['web_results']}
Academic findings: {state['academic_results']}
Code examples: {state['code_results']}
Produce a comprehensive analysis."""
result = await call_llm(prompt, model="claude-sonnet-4-20250514")
return {"synthesis": result, "messages": ["Synthesizer: done"]}
# Build parallel graph
workflow = StateGraph(ResearchState)
workflow.add_node("search_web", search_web)
workflow.add_node("search_academic", search_academic)
workflow.add_node("search_code", search_code)
workflow.add_node("synthesize", synthesize)
# Fan-out: all three search agents run in parallel
workflow.add_edge(START, "search_web")
workflow.add_edge(START, "search_academic")
workflow.add_edge(START, "search_code")
# Fan-in: all three must complete before synthesis
workflow.add_edge("search_web", "synthesize")
workflow.add_edge("search_academic", "synthesize")
workflow.add_edge("search_code", "synthesize")
workflow.add_edge("synthesize", END)
app = workflow.compile()
层级管理模式
# Hierarchical: Manager delegates to specialized workers
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Literal
class ProjectState(TypedDict):
task_description: str
plan: str
subtasks: list[dict]
results: dict[str, str]
final_output: str
async def manager_plan(state: ProjectState) -> dict:
"""Manager analyzes task and creates execution plan."""
prompt = f"""You are a project manager. Analyze this task and break it into subtasks.
Assign each subtask to the most appropriate specialist: researcher, coder, writer, reviewer.
Task: {state['task_description']}
Output as JSON: [{{"subtask": "...", "assignee": "researcher|coder|writer|reviewer", "priority": 1}}]"""
plan = await call_llm(prompt, model="claude-sonnet-4-20250514")
subtasks = parse_json(plan)
return {"plan": plan, "subtasks": subtasks}
async def route_to_worker(state: ProjectState) -> str:
"""Route to the next unfinished subtask's assignee."""
for task in state.get("subtasks", []):
if task["subtask"] not in state.get("results", {}):
return task["assignee"]
return "manager_review"
async def researcher(state: ProjectState) -> dict:
task = get_current_task(state, "researcher")
result = await call_llm(f"Research: {task['subtask']}")
results = state.get("results", {})
results[task["subtask"]] = result
return {"results": results}
async def coder(state: ProjectState) -> dict:
task = get_current_task(state, "coder")
result = await call_llm(f"Write code for: {task['subtask']}")
results = state.get("results", {})
results[task["subtask"]] = result
return {"results": results}
async def writer(state: ProjectState) -> dict:
task = get_current_task(state, "writer")
result = await call_llm(f"Write documentation for: {task['subtask']}")
results = state.get("results", {})
results[task["subtask"]] = result
return {"results": results}
async def manager_review(state: ProjectState) -> dict:
"""Manager reviews all results and produces final output."""
prompt = f"""Review all completed subtasks and produce the final deliverable.
Original task: {state['task_description']}
Plan: {state['plan']}
Results: {state['results']}"""
final = await call_llm(prompt, model="claude-sonnet-4-20250514")
return {"final_output": final}
# Build hierarchical graph
workflow = StateGraph(ProjectState)
workflow.add_node("manager_plan", manager_plan)
workflow.add_node("researcher", researcher)
workflow.add_node("coder", coder)
workflow.add_node("writer", writer)
workflow.add_node("manager_review", manager_review)
workflow.add_edge(START, "manager_plan")
workflow.add_conditional_edges("manager_plan", route_to_worker)
workflow.add_conditional_edges("researcher", route_to_worker)
workflow.add_conditional_edges("coder", route_to_worker)
workflow.add_conditional_edges("writer", route_to_worker)
workflow.add_edge("manager_review", END)
app = workflow.compile()
故障处理
Agent 级别容错
# Resilient agent wrapper
from langgraph.graph import StateGraph
import asyncio
class AgentError(Exception):
def __init__(self, agent_name: str, original_error: Exception):
self.agent_name = agent_name
self.original_error = original_error
super().__init__(f"Agent {agent_name} failed: {original_error}")
def resilient_agent(
agent_fn,
name: str,
max_retries: int = 2,
timeout_seconds: int = 60,
fallback_fn=None,
):
"""Wrap an agent function with retry, timeout, and fallback."""
async def wrapper(state):
last_error = None
for attempt in range(max_retries + 1):
try:
result = await asyncio.wait_for(
agent_fn(state),
timeout=timeout_seconds,
)
return result
except asyncio.TimeoutError:
last_error = TimeoutError(f"Timeout after {timeout_seconds}s")
except Exception as e:
last_error = e
if attempt < max_retries:
await asyncio.sleep(2 ** attempt) # Exponential backoff
# All retries exhausted, try fallback
if fallback_fn:
try:
return await fallback_fn(state)
except Exception:
pass
raise AgentError(name, last_error)
wrapper.__name__ = name
return wrapper
全局状态恢复
# Checkpoint and recovery with LangGraph persistence
from langgraph.checkpoint.sqlite import SqliteSaver
# Create checkpointer
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
# Compile graph with checkpointing
app = workflow.compile(checkpointer=checkpointer)
# Run with thread_id for persistence
config = {"configurable": {"thread_id": "project-123"}}
# If interrupted, resume from last checkpoint
try:
result = await app.ainvoke(initial_state, config)
except Exception as e:
print(f"Interrupted at: {e}")
# Resume from checkpoint
state = await app.aget_state(config)
print(f"Last completed step: {state.next}")
result = await app.ainvoke(None, config) # Resume
编排模式选型决策
选型决策树:
任务特征分析
│
├─ 步骤固定,顺序明确?
│ └─ YES → 串行流水线
│
├─ 多个独立子任务?
│ └─ YES → 并行 (Fan-out/Fan-in)
│
├─ 需要动态分派和协调?
│ ├─ 有明确的协调者角色?
│ │ └─ YES → 层级 (Manager-Worker)
│ └─ 任务在 Agent 间流转?
│ └─ YES → 蜂群 (Swarm/Handoff)
│
└─ 混合模式?
└─ 大多数实际系统是多种模式的组合
总结
- 从简单模式开始:串行流水线是最容易理解和调试的模式,除非有明确的并行需求,否则先用串行。
- 并行要有汇总点:Fan-out 必须配合 Fan-in,否则结果散落各处无法整合。
- 层级模式最像人类团队:Manager 负责规划和协调,Worker 负责执行,适合复杂项目。
- 蜂群模式灵活但难调试:去中心化意味着执行路径不确定,需要完善的日志和追踪。
- 容错是必需的:每个 Agent 都可能失败,重试、超时和降级机制必须从一开始就设计进去。
Maurice | maurice_wen@proton.me