Multi-Agent 协作架构设计模式

从单体 Agent 到多 Agent 协作系统的架构演进与工程实践


为什么需要多 Agent 协作

单个 Agent 在面对复杂任务时,存在三个本质瓶颈:

  1. 认知负载上限:上下文窗口有限,单 Agent 无法同时持有完整的任务状态、领域知识和执行历史
  2. 能力边界固定:一个 Agent 的 System Prompt 和工具集难以覆盖所有专业领域
  3. 串行执行瓶颈:复杂任务的子任务之间存在并行机会,单 Agent 只能逐步推进

多 Agent 协作的核心思想:将复杂任务分解为多个专业角色,每个角色拥有独立的上下文、工具集和决策逻辑,通过协作协议完成整体目标。

单体 Agent 的能力天花板:
┌─────────────────────────────────────────┐
│  Agent                                  │
│  ┌─────────┐ ┌─────────┐ ┌─────────┐  │
│  │ 规划    │ │ 执行    │ │ 验证    │  │
│  │ Planning│ │ Execute │ │ Verify  │  │
│  └────┬────┘ └────┬────┘ └────┬────┘  │
│       └───────────┼───────────┘        │
│            单一上下文窗口               │
│            单一工具集                   │
│            串行执行                     │
└─────────────────────────────────────────┘

多 Agent 协作的能力扩展:
┌──────────────────────────────────────────────────┐
│  Orchestrator                                     │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐       │
│  │ Agent-A  │  │ Agent-B  │  │ Agent-C  │       │
│  │ 研究专家 │  │ 代码专家 │  │ 测试专家 │       │
│  │ 独立上下文│  │ 独立上下文│  │ 独立上下文│       │
│  │ 专属工具集│  │ 专属工具集│  │ 专属工具集│       │
│  └──────────┘  └──────────┘  └──────────┘       │
│       可并行 ──── 可串行 ──── 可组合               │
└──────────────────────────────────────────────────┘

五种核心协作模式

模式一:Supervisor(主管模式)

核心思想:一个 Supervisor Agent 负责任务分解、子任务分配和结果汇总,Worker Agent 负责具体执行。

适用场景:任务可以清晰分解为独立子任务,子任务之间依赖关系简单。

         ┌──────────────┐
         │  Supervisor   │
         │  任务分解      │
         │  结果汇总      │
         └──┬───┬───┬───┘
            │   │   │
    ┌───────┘   │   └───────┐
    v           v           v
┌────────┐ ┌────────┐ ┌────────┐
│Worker-A│ │Worker-B│ │Worker-C│
│ 研究   │ │ 编码   │ │ 测试   │
└────────┘ └────────┘ └────────┘

实现要点

# Supervisor 的核心决策逻辑
class SupervisorAgent:
    def __init__(self, workers: list[WorkerAgent]):
        self.workers = {w.name: w for w in workers}
        self.task_queue = []
        self.results = {}

    def decompose(self, task: str) -> list[SubTask]:
        """将复合任务拆解为原子子任务"""
        # LLM 调用:分析任务结构,输出子任务列表
        prompt = f"""
        将以下任务分解为可独立执行的子任务:
        任务:{task}

        输出 JSON 格式:
        [
          {{"id": "t1", "description": "...", "worker": "研究", "depends_on": []}},
          {{"id": "t2", "description": "...", "worker": "编码", "depends_on": ["t1"]}}
        ]
        """
        return self.llm.structured_output(prompt, schema=SubTaskList)

    def dispatch(self, subtasks: list[SubTask]):
        """按依赖关系调度子任务"""
        ready = [t for t in subtasks if all(
            d in self.results for d in t.depends_on
        )]
        for task in ready:
            worker = self.workers[task.worker]
            context = {d: self.results[d] for d in task.depends_on}
            result = worker.execute(task, context)
            self.results[task.id] = result

    def aggregate(self) -> str:
        """汇总所有子任务结果"""
        return self.llm.summarize(self.results)

优势与局限

维度 优势 局限
控制力 Supervisor 拥有全局视野 Supervisor 成为单点瓶颈
可预测性 执行路径确定 灵活性不足
调试性 决策链路清晰 Supervisor 的分解质量决定上限

模式二:Swarm(蜂群模式)

核心思想:没有固定的中心控制者,Agent 之间通过 Handoff(接力)协议自主传递控制权。当前 Agent 判断任务超出自身能力范围时,主动将控制权转交给更合适的 Agent。

适用场景:客服路由、多步骤工作流中角色切换频繁的场景。

用户输入 ──→ Agent-A(分诊)
                │
                ├──→ "这是技术问题" ──→ Handoff to Agent-B(技术支持)
                │                         │
                │                         ├──→ "需要退款" ──→ Handoff to Agent-C(财务)
                │                         │
                │                         └──→ 解决 ──→ 返回结果
                │
                └──→ "这是账单问题" ──→ Handoff to Agent-C(财务)

实现要点

# OpenAI Swarm 风格的 Handoff 实现
class SwarmAgent:
    def __init__(self, name: str, instructions: str, tools: list):
        self.name = name
        self.instructions = instructions
        self.tools = tools

    def run(self, messages: list) -> Response:
        """执行当前 Agent 的逻辑"""
        response = self.llm.chat(
            system=self.instructions,
            messages=messages,
            tools=self.tools + self.handoff_tools()
        )

        # 检查是否触发了 Handoff
        if response.tool_call and response.tool_call.name.startswith("transfer_to_"):
            target_agent = self.resolve_handoff(response.tool_call)
            return HandoffResponse(target=target_agent, context=messages)

        return response

    def handoff_tools(self) -> list:
        """声明可以转交的目标 Agent"""
        return [
            {
                "name": "transfer_to_tech_support",
                "description": "将对话转交给技术支持专家",
                "parameters": {"reason": "转交原因"}
            },
            {
                "name": "transfer_to_billing",
                "description": "将对话转交给财务专家",
                "parameters": {"reason": "转交原因"}
            }
        ]

# 运行时循环
def swarm_loop(initial_agent, messages):
    current_agent = initial_agent
    while True:
        response = current_agent.run(messages)
        if isinstance(response, HandoffResponse):
            current_agent = response.target
            messages = response.context
            continue
        return response

关键设计决策

  • 上下文传递策略:Handoff 时传递完整对话历史还是摘要?完整历史保留信息但消耗 Token,摘要节省成本但可能丢失细节
  • 回退机制:当目标 Agent 也无法处理时,是否允许回退到上一个 Agent 或升级到人工
  • 循环检测:必须设置最大 Handoff 次数,防止 Agent 之间无限互相转交

模式三:Pipeline(流水线模式)

核心思想:Agent 按照固定顺序组成流水线,每个 Agent 处理特定阶段,输出作为下一个 Agent 的输入。

适用场景:任务具有明确的阶段性,每个阶段有不同的专业要求。

输入 ──→ [需求分析Agent] ──→ [架构设计Agent] ──→ [代码实现Agent] ──→ [测试Agent] ──→ 输出
              │                    │                    │                  │
              v                    v                    v                  v
          需求文档              架构方案              代码产物            测试报告

实现要点

class PipelineOrchestrator:
    def __init__(self, stages: list[PipelineStage]):
        self.stages = stages

    def execute(self, input_data: dict) -> dict:
        """顺序执行流水线"""
        current_data = input_data
        execution_log = []

        for stage in self.stages:
            # 门禁检查:上一阶段的输出是否满足本阶段的输入要求
            if not stage.validate_input(current_data):
                raise PipelineGateError(
                    stage=stage.name,
                    reason=f"Input validation failed: {stage.input_schema}"
                )

            # 执行当前阶段
            result = stage.agent.execute(current_data)

            # 质量门禁:当前阶段的输出是否达标
            if stage.quality_gate:
                passed, feedback = stage.quality_gate.check(result)
                if not passed:
                    # 允许重试,但有最大次数限制
                    result = self._retry_with_feedback(
                        stage, current_data, feedback, max_retries=3
                    )

            execution_log.append({
                "stage": stage.name,
                "input_hash": hash(str(current_data)),
                "output_hash": hash(str(result)),
                "duration_ms": stage.last_duration_ms
            })

            current_data = result

        return {"result": current_data, "log": execution_log}

    def _retry_with_feedback(self, stage, input_data, feedback, max_retries):
        """带反馈的重试机制"""
        for attempt in range(max_retries):
            enriched_input = {
                **input_data,
                "previous_feedback": feedback,
                "attempt": attempt + 1
            }
            result = stage.agent.execute(enriched_input)
            passed, feedback = stage.quality_gate.check(result)
            if passed:
                return result
        raise PipelineQualityError(stage=stage.name, attempts=max_retries)

门禁设计(Quality Gate)

阶段 门禁条件 失败处理
需求分析 输出包含 user story + 验收标准 重试 + 补充提问
架构设计 Mermaid 图可解析 + 组件边界清晰 重试 + 参考模板
代码实现 lint 通过 + 类型检查通过 自动修复 + 重试
测试 覆盖率 > 80% + 核心路径全绿 补充测试用例

模式四:Council(委员会模式)

核心思想:多个 Agent 从不同视角分析同一问题,通过投票、辩论或加权聚合得出最终决策。

适用场景:需要多角度评估的决策问题,如代码审查、方案选型、风险评估。

                    问题输入
                       │
         ┌─────────────┼─────────────┐
         v             v             v
    ┌─────────┐  ┌─────────┐  ┌─────────┐
    │安全专家  │  │性能专家  │  │可维护性  │
    │视角     │  │视角     │  │专家视角  │
    └────┬────┘  └────┬────┘  └────┬────┘
         │             │             │
         v             v             v
    ┌──────────────────────────────────┐
    │         Judge / Aggregator       │
    │  投票 / 加权聚合 / 辩论仲裁      │
    └──────────────────────────────────┘
                       │
                       v
                   最终决策

聚合策略对比

class CouncilAggregator:
    """三种聚合策略"""

    def majority_vote(self, opinions: list[Opinion]) -> Decision:
        """多数投票:适用于二元决策"""
        votes = Counter(o.recommendation for o in opinions)
        winner = votes.most_common(1)[0]
        return Decision(
            choice=winner[0],
            confidence=winner[1] / len(opinions),
            dissent=[o for o in opinions if o.recommendation != winner[0]]
        )

    def weighted_aggregate(self, opinions: list[Opinion],
                           weights: dict[str, float]) -> Decision:
        """加权聚合:不同专家权重不同"""
        scores = {}
        for opinion in opinions:
            w = weights.get(opinion.agent_name, 1.0)
            for option, score in opinion.scores.items():
                scores[option] = scores.get(option, 0) + score * w
        best = max(scores, key=scores.get)
        return Decision(choice=best, scores=scores)

    def debate_and_judge(self, opinions: list[Opinion],
                         judge: Agent) -> Decision:
        """辩论仲裁:专家互相质疑后由 Judge 裁决"""
        debate_transcript = []
        for i, opinion_a in enumerate(opinions):
            for opinion_b in opinions[i+1:]:
                challenge = opinion_a.agent.challenge(opinion_b)
                rebuttal = opinion_b.agent.rebut(challenge)
                debate_transcript.append({
                    "challenger": opinion_a.agent_name,
                    "challenged": opinion_b.agent_name,
                    "challenge": challenge,
                    "rebuttal": rebuttal
                })

        return judge.decide(opinions, debate_transcript)

模式五:Watchdog(监督者模式)

核心思想:一个或多个 Watchdog Agent 不参与任务执行,专门负责监控其他 Agent 的行为和输出质量。

适用场景:高风险操作、需要持续质量保证、安全审计。

┌─────────────────────────────────────────┐
│            Execution Layer              │
│  Agent-A ──→ Agent-B ──→ Agent-C       │
└─────────────┬───────────┬──────────────┘
              │           │
              v           v
┌─────────────────────────────────────────┐
│           Watchdog Layer                │
│  ┌──────────┐  ┌──────────┐            │
│  │ 安全审计  │  │ 质量监控  │            │
│  │ Watchdog │  │ Watchdog │            │
│  └──────────┘  └──────────┘            │
│                                         │
│  检查项:                               │
│  - 工具调用是否在白名单内               │
│  - 输出是否包含敏感信息                 │
│  - 执行时间是否异常                     │
│  - 结果是否满足质量标准                 │
└─────────────────────────────────────────┘

实现要点

class WatchdogAgent:
    def __init__(self, rules: list[WatchdogRule]):
        self.rules = rules
        self.violations = []

    def inspect(self, event: AgentEvent) -> WatchdogVerdict:
        """检查 Agent 行为是否合规"""
        for rule in self.rules:
            if rule.matches(event):
                result = rule.evaluate(event)
                if not result.passed:
                    self.violations.append(result)
                    if result.severity == "critical":
                        return WatchdogVerdict.HALT  # 立即停止
                    elif result.severity == "warning":
                        return WatchdogVerdict.WARN  # 警告但继续

        return WatchdogVerdict.PASS

# Watchdog 规则示例
RULES = [
    WatchdogRule(
        name="no_secret_in_output",
        pattern=r"(sk-[a-zA-Z0-9]{48}|ghp_[a-zA-Z0-9]{36})",
        severity="critical",
        action="halt_and_redact"
    ),
    WatchdogRule(
        name="execution_time_limit",
        condition=lambda e: e.duration_ms > 30000,
        severity="warning",
        action="log_and_notify"
    ),
    WatchdogRule(
        name="tool_whitelist",
        condition=lambda e: e.tool_name not in ALLOWED_TOOLS,
        severity="critical",
        action="deny_execution"
    ),
]

通信协议设计

多 Agent 之间的通信是协作的基础。核心需要解决三个问题:消息格式、路由机制、状态同步。

消息格式标准化

@dataclass
class AgentMessage:
    """Agent 间通信的标准消息格式"""
    id: str                          # 消息唯一 ID
    sender: str                      # 发送者 Agent 名称
    receiver: str                    # 接收者 Agent 名称(或 "broadcast")
    type: MessageType                # REQUEST / RESPONSE / EVENT / HANDOFF
    payload: dict                    # 业务数据
    context: dict                    # 共享上下文(任务 ID、对话历史摘要等)
    metadata: MessageMetadata        # 时间戳、优先级、过期时间
    trace_id: str                    # 分布式追踪 ID

class MessageType(Enum):
    REQUEST = "request"              # 请求执行
    RESPONSE = "response"            # 执行结果
    EVENT = "event"                  # 事件通知
    HANDOFF = "handoff"              # 控制权转交
    FEEDBACK = "feedback"            # 反馈(用于迭代改进)

共享状态管理

class SharedBlackboard:
    """黑板模式:Agent 通过共享黑板交换信息"""

    def __init__(self):
        self._store = {}
        self._locks = {}
        self._history = []

    def write(self, agent: str, key: str, value: any):
        """写入共享数据"""
        with self._locks.setdefault(key, Lock()):
            old_value = self._store.get(key)
            self._store[key] = value
            self._history.append({
                "agent": agent,
                "key": key,
                "old": old_value,
                "new": value,
                "timestamp": time.time()
            })

    def read(self, key: str) -> any:
        """读取共享数据"""
        return self._store.get(key)

    def subscribe(self, key: str, callback: Callable):
        """订阅数据变更"""
        # 当 key 对应的值被修改时触发回调
        pass

容错与恢复策略

单 Agent 失败处理

Agent 执行失败
     │
     v
┌─────────────────┐
│ 失败类型判断     │
├─────────────────┤
│ 可重试错误?     │──→ Yes ──→ 指数退避重试(最多 3 次)
│ (超时/限流/网络) │
├─────────────────┤
│ 逻辑错误?       │──→ Yes ──→ 附带错误反馈重新执行
│ (输出不符合格式) │
├─────────────────┤
│ 能力不足?       │──→ Yes ──→ 升级到更强模型 / Handoff 到专家
│ (hallucination) │
├─────────────────┤
│ 不可恢复?       │──→ Yes ──→ 标记失败 + 通知 Supervisor
└─────────────────┘

全局回滚机制

class CheckpointManager:
    """检查点管理器:支持全局回滚"""

    def __init__(self):
        self.checkpoints = []

    def save(self, state: dict, label: str):
        """保存检查点"""
        self.checkpoints.append({
            "label": label,
            "state": deepcopy(state),
            "timestamp": time.time()
        })

    def rollback(self, label: str) -> dict:
        """回滚到指定检查点"""
        for cp in reversed(self.checkpoints):
            if cp["label"] == label:
                return cp["state"]
        raise CheckpointNotFoundError(label)

模式选型决策树

任务特征分析
     │
     ├── 子任务是否独立?
     │    ├── Yes + 需要中心协调 ──→ Supervisor
     │    └── Yes + 角色切换频繁 ──→ Swarm
     │
     ├── 是否有严格的阶段顺序?
     │    └── Yes ──→ Pipeline
     │
     ├── 是否需要多视角评估?
     │    └── Yes ──→ Council
     │
     ├── 是否需要持续质量监控?
     │    └── Yes ──→ 在任何模式上叠加 Watchdog
     │
     └── 复杂度极高?
          └── 组合使用:Pipeline(Supervisor(Workers)) + Watchdog
决策因素 Supervisor Swarm Pipeline Council Watchdog
任务分解明确 必需 不需要 必需 不需要 N/A
角色切换频繁 不适合 最佳 不适合 不适合 N/A
阶段性强 可以 不适合 最佳 不适合 N/A
需要多视角 不适合 不适合 不适合 最佳 N/A
高风险操作 需叠加 需叠加 需叠加 需叠加 必需
实现复杂度

工程实践建议

  1. 从简单开始:先用 Supervisor + 2-3 个 Worker 验证可行性,再逐步增加复杂度
  2. 消息格式先行:在写任何 Agent 逻辑之前,先定义好 Agent 间的通信协议
  3. 可观测性内建:每条 Agent 间消息都携带 trace_id,支持端到端追踪
  4. 容错设计:每个 Agent 都必须能优雅处理上游失败,不能因为一个 Agent 挂掉而导致全系统停滞
  5. Token 预算管理:多 Agent 系统的 Token 消耗是单 Agent 的数倍,必须设置预算上限和监控告警
  6. 测试策略:单 Agent 单测 + Agent 间集成测试 + 端到端场景测试,三层覆盖

参考资料

  • LangGraph Multi-Agent 文档:Agent 编排框架的参考实现
  • OpenAI Swarm:轻量级 Handoff 协议的开源实现
  • CrewAI:基于角色的多 Agent 框架
  • AutoGen:微软的多 Agent 对话框架
  • Anthropic Claude Agent SDK:支持 Multi-Agent Orchestrator 模式

Maurice | maurice_wen@proton.me