Multi-Agent 协作架构设计模式
AI 导读
Multi-Agent 协作架构设计模式 从单体 Agent 到多 Agent 协作系统的架构演进与工程实践 为什么需要多 Agent 协作 单个 Agent 在面对复杂任务时,存在三个本质瓶颈: 认知负载上限:上下文窗口有限,单 Agent 无法同时持有完整的任务状态、领域知识和执行历史 能力边界固定:一个 Agent 的 System Prompt 和工具集难以覆盖所有专业领域...
Multi-Agent 协作架构设计模式
从单体 Agent 到多 Agent 协作系统的架构演进与工程实践
为什么需要多 Agent 协作
单个 Agent 在面对复杂任务时,存在三个本质瓶颈:
- 认知负载上限:上下文窗口有限,单 Agent 无法同时持有完整的任务状态、领域知识和执行历史
- 能力边界固定:一个 Agent 的 System Prompt 和工具集难以覆盖所有专业领域
- 串行执行瓶颈:复杂任务的子任务之间存在并行机会,单 Agent 只能逐步推进
多 Agent 协作的核心思想:将复杂任务分解为多个专业角色,每个角色拥有独立的上下文、工具集和决策逻辑,通过协作协议完成整体目标。
单体 Agent 的能力天花板:
┌─────────────────────────────────────────┐
│ Agent │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 规划 │ │ 执行 │ │ 验证 │ │
│ │ Planning│ │ Execute │ │ Verify │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ └───────────┼───────────┘ │
│ 单一上下文窗口 │
│ 单一工具集 │
│ 串行执行 │
└─────────────────────────────────────────┘
多 Agent 协作的能力扩展:
┌──────────────────────────────────────────────────┐
│ Orchestrator │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Agent-A │ │ Agent-B │ │ Agent-C │ │
│ │ 研究专家 │ │ 代码专家 │ │ 测试专家 │ │
│ │ 独立上下文│ │ 独立上下文│ │ 独立上下文│ │
│ │ 专属工具集│ │ 专属工具集│ │ 专属工具集│ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ 可并行 ──── 可串行 ──── 可组合 │
└──────────────────────────────────────────────────┘
五种核心协作模式
模式一:Supervisor(主管模式)
核心思想:一个 Supervisor Agent 负责任务分解、子任务分配和结果汇总,Worker Agent 负责具体执行。
适用场景:任务可以清晰分解为独立子任务,子任务之间依赖关系简单。
┌──────────────┐
│ Supervisor │
│ 任务分解 │
│ 结果汇总 │
└──┬───┬───┬───┘
│ │ │
┌───────┘ │ └───────┐
v v v
┌────────┐ ┌────────┐ ┌────────┐
│Worker-A│ │Worker-B│ │Worker-C│
│ 研究 │ │ 编码 │ │ 测试 │
└────────┘ └────────┘ └────────┘
实现要点:
# Supervisor 的核心决策逻辑
class SupervisorAgent:
def __init__(self, workers: list[WorkerAgent]):
self.workers = {w.name: w for w in workers}
self.task_queue = []
self.results = {}
def decompose(self, task: str) -> list[SubTask]:
"""将复合任务拆解为原子子任务"""
# LLM 调用:分析任务结构,输出子任务列表
prompt = f"""
将以下任务分解为可独立执行的子任务:
任务:{task}
输出 JSON 格式:
[
{{"id": "t1", "description": "...", "worker": "研究", "depends_on": []}},
{{"id": "t2", "description": "...", "worker": "编码", "depends_on": ["t1"]}}
]
"""
return self.llm.structured_output(prompt, schema=SubTaskList)
def dispatch(self, subtasks: list[SubTask]):
"""按依赖关系调度子任务"""
ready = [t for t in subtasks if all(
d in self.results for d in t.depends_on
)]
for task in ready:
worker = self.workers[task.worker]
context = {d: self.results[d] for d in task.depends_on}
result = worker.execute(task, context)
self.results[task.id] = result
def aggregate(self) -> str:
"""汇总所有子任务结果"""
return self.llm.summarize(self.results)
优势与局限:
| 维度 | 优势 | 局限 |
|---|---|---|
| 控制力 | Supervisor 拥有全局视野 | Supervisor 成为单点瓶颈 |
| 可预测性 | 执行路径确定 | 灵活性不足 |
| 调试性 | 决策链路清晰 | Supervisor 的分解质量决定上限 |
模式二:Swarm(蜂群模式)
核心思想:没有固定的中心控制者,Agent 之间通过 Handoff(接力)协议自主传递控制权。当前 Agent 判断任务超出自身能力范围时,主动将控制权转交给更合适的 Agent。
适用场景:客服路由、多步骤工作流中角色切换频繁的场景。
用户输入 ──→ Agent-A(分诊)
│
├──→ "这是技术问题" ──→ Handoff to Agent-B(技术支持)
│ │
│ ├──→ "需要退款" ──→ Handoff to Agent-C(财务)
│ │
│ └──→ 解决 ──→ 返回结果
│
└──→ "这是账单问题" ──→ Handoff to Agent-C(财务)
实现要点:
# OpenAI Swarm 风格的 Handoff 实现
class SwarmAgent:
def __init__(self, name: str, instructions: str, tools: list):
self.name = name
self.instructions = instructions
self.tools = tools
def run(self, messages: list) -> Response:
"""执行当前 Agent 的逻辑"""
response = self.llm.chat(
system=self.instructions,
messages=messages,
tools=self.tools + self.handoff_tools()
)
# 检查是否触发了 Handoff
if response.tool_call and response.tool_call.name.startswith("transfer_to_"):
target_agent = self.resolve_handoff(response.tool_call)
return HandoffResponse(target=target_agent, context=messages)
return response
def handoff_tools(self) -> list:
"""声明可以转交的目标 Agent"""
return [
{
"name": "transfer_to_tech_support",
"description": "将对话转交给技术支持专家",
"parameters": {"reason": "转交原因"}
},
{
"name": "transfer_to_billing",
"description": "将对话转交给财务专家",
"parameters": {"reason": "转交原因"}
}
]
# 运行时循环
def swarm_loop(initial_agent, messages):
current_agent = initial_agent
while True:
response = current_agent.run(messages)
if isinstance(response, HandoffResponse):
current_agent = response.target
messages = response.context
continue
return response
关键设计决策:
- 上下文传递策略:Handoff 时传递完整对话历史还是摘要?完整历史保留信息但消耗 Token,摘要节省成本但可能丢失细节
- 回退机制:当目标 Agent 也无法处理时,是否允许回退到上一个 Agent 或升级到人工
- 循环检测:必须设置最大 Handoff 次数,防止 Agent 之间无限互相转交
模式三:Pipeline(流水线模式)
核心思想:Agent 按照固定顺序组成流水线,每个 Agent 处理特定阶段,输出作为下一个 Agent 的输入。
适用场景:任务具有明确的阶段性,每个阶段有不同的专业要求。
输入 ──→ [需求分析Agent] ──→ [架构设计Agent] ──→ [代码实现Agent] ──→ [测试Agent] ──→ 输出
│ │ │ │
v v v v
需求文档 架构方案 代码产物 测试报告
实现要点:
class PipelineOrchestrator:
def __init__(self, stages: list[PipelineStage]):
self.stages = stages
def execute(self, input_data: dict) -> dict:
"""顺序执行流水线"""
current_data = input_data
execution_log = []
for stage in self.stages:
# 门禁检查:上一阶段的输出是否满足本阶段的输入要求
if not stage.validate_input(current_data):
raise PipelineGateError(
stage=stage.name,
reason=f"Input validation failed: {stage.input_schema}"
)
# 执行当前阶段
result = stage.agent.execute(current_data)
# 质量门禁:当前阶段的输出是否达标
if stage.quality_gate:
passed, feedback = stage.quality_gate.check(result)
if not passed:
# 允许重试,但有最大次数限制
result = self._retry_with_feedback(
stage, current_data, feedback, max_retries=3
)
execution_log.append({
"stage": stage.name,
"input_hash": hash(str(current_data)),
"output_hash": hash(str(result)),
"duration_ms": stage.last_duration_ms
})
current_data = result
return {"result": current_data, "log": execution_log}
def _retry_with_feedback(self, stage, input_data, feedback, max_retries):
"""带反馈的重试机制"""
for attempt in range(max_retries):
enriched_input = {
**input_data,
"previous_feedback": feedback,
"attempt": attempt + 1
}
result = stage.agent.execute(enriched_input)
passed, feedback = stage.quality_gate.check(result)
if passed:
return result
raise PipelineQualityError(stage=stage.name, attempts=max_retries)
门禁设计(Quality Gate):
| 阶段 | 门禁条件 | 失败处理 |
|---|---|---|
| 需求分析 | 输出包含 user story + 验收标准 | 重试 + 补充提问 |
| 架构设计 | Mermaid 图可解析 + 组件边界清晰 | 重试 + 参考模板 |
| 代码实现 | lint 通过 + 类型检查通过 | 自动修复 + 重试 |
| 测试 | 覆盖率 > 80% + 核心路径全绿 | 补充测试用例 |
模式四:Council(委员会模式)
核心思想:多个 Agent 从不同视角分析同一问题,通过投票、辩论或加权聚合得出最终决策。
适用场景:需要多角度评估的决策问题,如代码审查、方案选型、风险评估。
问题输入
│
┌─────────────┼─────────────┐
v v v
┌─────────┐ ┌─────────┐ ┌─────────┐
│安全专家 │ │性能专家 │ │可维护性 │
│视角 │ │视角 │ │专家视角 │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
v v v
┌──────────────────────────────────┐
│ Judge / Aggregator │
│ 投票 / 加权聚合 / 辩论仲裁 │
└──────────────────────────────────┘
│
v
最终决策
聚合策略对比:
class CouncilAggregator:
"""三种聚合策略"""
def majority_vote(self, opinions: list[Opinion]) -> Decision:
"""多数投票:适用于二元决策"""
votes = Counter(o.recommendation for o in opinions)
winner = votes.most_common(1)[0]
return Decision(
choice=winner[0],
confidence=winner[1] / len(opinions),
dissent=[o for o in opinions if o.recommendation != winner[0]]
)
def weighted_aggregate(self, opinions: list[Opinion],
weights: dict[str, float]) -> Decision:
"""加权聚合:不同专家权重不同"""
scores = {}
for opinion in opinions:
w = weights.get(opinion.agent_name, 1.0)
for option, score in opinion.scores.items():
scores[option] = scores.get(option, 0) + score * w
best = max(scores, key=scores.get)
return Decision(choice=best, scores=scores)
def debate_and_judge(self, opinions: list[Opinion],
judge: Agent) -> Decision:
"""辩论仲裁:专家互相质疑后由 Judge 裁决"""
debate_transcript = []
for i, opinion_a in enumerate(opinions):
for opinion_b in opinions[i+1:]:
challenge = opinion_a.agent.challenge(opinion_b)
rebuttal = opinion_b.agent.rebut(challenge)
debate_transcript.append({
"challenger": opinion_a.agent_name,
"challenged": opinion_b.agent_name,
"challenge": challenge,
"rebuttal": rebuttal
})
return judge.decide(opinions, debate_transcript)
模式五:Watchdog(监督者模式)
核心思想:一个或多个 Watchdog Agent 不参与任务执行,专门负责监控其他 Agent 的行为和输出质量。
适用场景:高风险操作、需要持续质量保证、安全审计。
┌─────────────────────────────────────────┐
│ Execution Layer │
│ Agent-A ──→ Agent-B ──→ Agent-C │
└─────────────┬───────────┬──────────────┘
│ │
v v
┌─────────────────────────────────────────┐
│ Watchdog Layer │
│ ┌──────────┐ ┌──────────┐ │
│ │ 安全审计 │ │ 质量监控 │ │
│ │ Watchdog │ │ Watchdog │ │
│ └──────────┘ └──────────┘ │
│ │
│ 检查项: │
│ - 工具调用是否在白名单内 │
│ - 输出是否包含敏感信息 │
│ - 执行时间是否异常 │
│ - 结果是否满足质量标准 │
└─────────────────────────────────────────┘
实现要点:
class WatchdogAgent:
def __init__(self, rules: list[WatchdogRule]):
self.rules = rules
self.violations = []
def inspect(self, event: AgentEvent) -> WatchdogVerdict:
"""检查 Agent 行为是否合规"""
for rule in self.rules:
if rule.matches(event):
result = rule.evaluate(event)
if not result.passed:
self.violations.append(result)
if result.severity == "critical":
return WatchdogVerdict.HALT # 立即停止
elif result.severity == "warning":
return WatchdogVerdict.WARN # 警告但继续
return WatchdogVerdict.PASS
# Watchdog 规则示例
RULES = [
WatchdogRule(
name="no_secret_in_output",
pattern=r"(sk-[a-zA-Z0-9]{48}|ghp_[a-zA-Z0-9]{36})",
severity="critical",
action="halt_and_redact"
),
WatchdogRule(
name="execution_time_limit",
condition=lambda e: e.duration_ms > 30000,
severity="warning",
action="log_and_notify"
),
WatchdogRule(
name="tool_whitelist",
condition=lambda e: e.tool_name not in ALLOWED_TOOLS,
severity="critical",
action="deny_execution"
),
]
通信协议设计
多 Agent 之间的通信是协作的基础。核心需要解决三个问题:消息格式、路由机制、状态同步。
消息格式标准化
@dataclass
class AgentMessage:
"""Agent 间通信的标准消息格式"""
id: str # 消息唯一 ID
sender: str # 发送者 Agent 名称
receiver: str # 接收者 Agent 名称(或 "broadcast")
type: MessageType # REQUEST / RESPONSE / EVENT / HANDOFF
payload: dict # 业务数据
context: dict # 共享上下文(任务 ID、对话历史摘要等)
metadata: MessageMetadata # 时间戳、优先级、过期时间
trace_id: str # 分布式追踪 ID
class MessageType(Enum):
REQUEST = "request" # 请求执行
RESPONSE = "response" # 执行结果
EVENT = "event" # 事件通知
HANDOFF = "handoff" # 控制权转交
FEEDBACK = "feedback" # 反馈(用于迭代改进)
共享状态管理
class SharedBlackboard:
"""黑板模式:Agent 通过共享黑板交换信息"""
def __init__(self):
self._store = {}
self._locks = {}
self._history = []
def write(self, agent: str, key: str, value: any):
"""写入共享数据"""
with self._locks.setdefault(key, Lock()):
old_value = self._store.get(key)
self._store[key] = value
self._history.append({
"agent": agent,
"key": key,
"old": old_value,
"new": value,
"timestamp": time.time()
})
def read(self, key: str) -> any:
"""读取共享数据"""
return self._store.get(key)
def subscribe(self, key: str, callback: Callable):
"""订阅数据变更"""
# 当 key 对应的值被修改时触发回调
pass
容错与恢复策略
单 Agent 失败处理
Agent 执行失败
│
v
┌─────────────────┐
│ 失败类型判断 │
├─────────────────┤
│ 可重试错误? │──→ Yes ──→ 指数退避重试(最多 3 次)
│ (超时/限流/网络) │
├─────────────────┤
│ 逻辑错误? │──→ Yes ──→ 附带错误反馈重新执行
│ (输出不符合格式) │
├─────────────────┤
│ 能力不足? │──→ Yes ──→ 升级到更强模型 / Handoff 到专家
│ (hallucination) │
├─────────────────┤
│ 不可恢复? │──→ Yes ──→ 标记失败 + 通知 Supervisor
└─────────────────┘
全局回滚机制
class CheckpointManager:
"""检查点管理器:支持全局回滚"""
def __init__(self):
self.checkpoints = []
def save(self, state: dict, label: str):
"""保存检查点"""
self.checkpoints.append({
"label": label,
"state": deepcopy(state),
"timestamp": time.time()
})
def rollback(self, label: str) -> dict:
"""回滚到指定检查点"""
for cp in reversed(self.checkpoints):
if cp["label"] == label:
return cp["state"]
raise CheckpointNotFoundError(label)
模式选型决策树
任务特征分析
│
├── 子任务是否独立?
│ ├── Yes + 需要中心协调 ──→ Supervisor
│ └── Yes + 角色切换频繁 ──→ Swarm
│
├── 是否有严格的阶段顺序?
│ └── Yes ──→ Pipeline
│
├── 是否需要多视角评估?
│ └── Yes ──→ Council
│
├── 是否需要持续质量监控?
│ └── Yes ──→ 在任何模式上叠加 Watchdog
│
└── 复杂度极高?
└── 组合使用:Pipeline(Supervisor(Workers)) + Watchdog
| 决策因素 | Supervisor | Swarm | Pipeline | Council | Watchdog |
|---|---|---|---|---|---|
| 任务分解明确 | 必需 | 不需要 | 必需 | 不需要 | N/A |
| 角色切换频繁 | 不适合 | 最佳 | 不适合 | 不适合 | N/A |
| 阶段性强 | 可以 | 不适合 | 最佳 | 不适合 | N/A |
| 需要多视角 | 不适合 | 不适合 | 不适合 | 最佳 | N/A |
| 高风险操作 | 需叠加 | 需叠加 | 需叠加 | 需叠加 | 必需 |
| 实现复杂度 | 中 | 低 | 低 | 高 | 中 |
工程实践建议
- 从简单开始:先用 Supervisor + 2-3 个 Worker 验证可行性,再逐步增加复杂度
- 消息格式先行:在写任何 Agent 逻辑之前,先定义好 Agent 间的通信协议
- 可观测性内建:每条 Agent 间消息都携带 trace_id,支持端到端追踪
- 容错设计:每个 Agent 都必须能优雅处理上游失败,不能因为一个 Agent 挂掉而导致全系统停滞
- Token 预算管理:多 Agent 系统的 Token 消耗是单 Agent 的数倍,必须设置预算上限和监控告警
- 测试策略:单 Agent 单测 + Agent 间集成测试 + 端到端场景测试,三层覆盖
参考资料
- LangGraph Multi-Agent 文档:Agent 编排框架的参考实现
- OpenAI Swarm:轻量级 Handoff 协议的开源实现
- CrewAI:基于角色的多 Agent 框架
- AutoGen:微软的多 Agent 对话框架
- Anthropic Claude Agent SDK:支持 Multi-Agent Orchestrator 模式
Maurice | maurice_wen@proton.me