分布式 Agent 系统架构

多 Agent 通信、编排、容错与生产部署的工程化实践

Maurice | 灵阙学院


一、为什么需要多 Agent 架构

单个 Agent 在面对复杂任务时存在三个天然瓶颈:上下文窗口有限、单一角色视角片面、串行执行效率低。多 Agent 系统通过分工协作来突破这些限制。

┌─────────────────────────────────────────────────────────────┐
│              单 Agent vs 多 Agent 对比                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Single Agent                 Multi-Agent System            │
│  ┌───────────┐               ┌──────────────────┐          │
│  │           │               │  Orchestrator    │          │
│  │  一个角色  │               │  ┌────┐ ┌────┐  │          │
│  │  一个上下文│               │  │研究│ │分析│  │          │
│  │  串行执行  │     ──→       │  └────┘ └────┘  │          │
│  │  全部工具  │               │  ┌────┐ ┌────┐  │          │
│  │           │               │  │编码│ │审查│  │          │
│  └───────────┘               │  └────┘ └────┘  │          │
│                              └──────────────────┘          │
│  上下文膨胀              分而治之,并行执行                   │
│  工具冲突                专业化分工                          │
│  单点故障                容错隔离                            │
└─────────────────────────────────────────────────────────────┘

二、通信模式

2.1 四种基础通信模式

1. Direct (点对点)              2. Broadcast (广播)
   A ──── B                        A ──┬── B
                                      ├── C
                                      └── D

3. Pub/Sub (发布订阅)           4. Blackboard (黑板)
   Publisher ──→ Topic            ┌─────────────┐
                  │               │  Blackboard  │
              ┌───┼───┐          │  (共享状态)   │
              ▼   ▼   ▼          └──┬──┬──┬──┘
             S1  S2  S3             A  B  C
                                  (读写共享)
模式 耦合度 适用场景 典型实现
Direct Agent A 明确需要 Agent B 的结果 函数调用 / RPC
Broadcast 通知所有 Agent 某个事件 Event Bus
Pub/Sub Agent 按兴趣订阅特定类型消息 Redis Pub/Sub / Kafka
Blackboard 多个 Agent 协作构建共享成果 共享内存 / 数据库

2.2 消息协议设计

from dataclasses import dataclass
from enum import Enum

class MessageType(Enum):
    TASK = "task"           # 任务分配
    RESULT = "result"       # 结果返回
    STATUS = "status"       # 状态更新
    ERROR = "error"         # 错误报告
    HEARTBEAT = "heartbeat" # 心跳

@dataclass
class AgentMessage:
    id: str                     # 消息唯一 ID
    trace_id: str               # 追踪链路 ID
    type: MessageType
    sender: str                 # 发送 Agent ID
    receiver: str | None        # 接收者 (None = broadcast)
    payload: dict               # 业务数据
    parent_id: str | None       # 父消息 ID (用于关联)
    timestamp: float
    ttl_seconds: int = 300      # 消息过期时间

三、编排模式: Orchestration vs Choreography

3.1 Orchestration (中心编排)

一个 Orchestrator Agent 掌控全局,分配任务并汇总结果。

                    Orchestrator
                    ┌─────────┐
                    │ 1. 拆解  │
                    │ 2. 分配  │
                    │ 3. 汇总  │
                    └──┬──┬──┬┘
                  ┌────┘  │  └────┐
                  ▼       ▼       ▼
               Agent A  Agent B  Agent C
               (研究)   (分析)   (编码)
                  │       │       │
                  └───────┼───────┘
                          ▼
                    Orchestrator
                    (汇总 + 输出)

优势:流程可控、易调试、状态集中。 劣势:Orchestrator 是单点瓶颈、扩展性受限。

3.2 Choreography (去中心编排)

每个 Agent 根据事件自主决策,没有中央控制者。

Agent A ──(event: research_done)──→ Agent B
Agent B ──(event: analysis_done)──→ Agent C
Agent C ──(event: code_ready)────→ Agent D (Review)
Agent D ──(event: approved)──────→ Agent A (下一轮)

优势:去中心化、弹性高、天然可扩展。 劣势:全局状态难追踪、调试困难、死锁风险。

3.3 选型建议

任务特征是什么?
│
├─ 流程固定、步骤清晰、需要强一致性
│  └─ Orchestration (推荐 LangGraph / CrewAI)
│
├─ 流程动态、Agent 数量可变、需要高弹性
│  └─ Choreography (推荐 Event-driven + Pub/Sub)
│
└─ 混合场景: 核心流程用编排,子任务用自组织
   └─ Hybrid (Orchestrator 委派子任务给自组织团队)

四、状态管理

4.1 共享状态 vs 事件溯源

维度 共享状态 (Shared State) 事件溯源 (Event Sourcing)
数据模型 可变的当前状态 不可变的事件序列
一致性 需要锁/事务 最终一致性
审计能力 弱 (只有最新状态) 强 (完整历史)
调试能力 中等 强 (可回放)
复杂度
适用场景 简单协作 需要审计/回放的场景

4.2 事件溯源实现

class AgentEventStore:
    """基于事件溯源的 Agent 状态管理"""

    def __init__(self, store: EventStoreBackend):
        self.store = store

    async def append(self, event: AgentEvent):
        """追加事件 (不可变)"""
        await self.store.append(
            stream_id=event.task_id,
            event_type=event.type,
            data=event.payload,
            metadata={
                "agent_id": event.agent_id,
                "trace_id": event.trace_id,
                "timestamp": event.timestamp,
            },
        )

    async def get_state(self, task_id: str) -> TaskState:
        """通过回放事件序列重建当前状态"""
        events = await self.store.read_stream(task_id)
        state = TaskState()
        for event in events:
            state.apply(event)
        return state

    async def replay_from(
        self, task_id: str, event_id: str
    ) -> TaskState:
        """从某个事件点重放 (用于调试/恢复)"""
        events = await self.store.read_stream(
            task_id, from_event=event_id
        )
        state = TaskState()
        for event in events:
            state.apply(event)
        return state

五、故障处理与重试

5.1 Agent 级别故障处理

┌─────────────────────────────────────────────────┐
│              故障处理策略                          │
├─────────────────────────────────────────────────┤
│                                                 │
│  Agent 调用失败                                  │
│       │                                         │
│       ▼                                         │
│  重试 (最多 3 次, 指数退避)                       │
│       │                                         │
│  仍然失败                                        │
│       │                                         │
│       ▼                                         │
│  降级策略:                                       │
│  ├── 1. 换用备选模型 (gpt-4o → claude-sonnet)   │
│  ├── 2. 简化任务 (减少上下文/降低要求)           │
│  ├── 3. 跳过并标记 (非关键步骤)                  │
│  └── 4. 上报人工介入 (关键步骤)                  │
│                                                 │
│  所有操作记录到事件日志                           │
└─────────────────────────────────────────────────┘

5.2 重试与熔断

class AgentCircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
    ):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.state = "closed"  # closed / open / half-open
        self.last_failure_time = 0

    async def execute(self, agent_fn, *args, **kwargs):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "half-open"
            else:
                raise CircuitOpenError("Agent circuit breaker is open")

        try:
            result = await agent_fn(*args, **kwargs)
            if self.state == "half-open":
                self.state = "closed"
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.failure_count >= self.failure_threshold:
                self.state = "open"
            raise

六、Agent 生命周期管理

┌──────────────────────────────────────────────────────┐
│                Agent 生命周期                          │
├──────────────────────────────────────────────────────┤
│                                                      │
│  Created → Initializing → Ready → Running → Done     │
│               │                     │        │       │
│               ▼                     ▼        ▼       │
│           Init Failed           Error    Timeout     │
│               │                     │        │       │
│               └─────────────────────┼────────┘       │
│                                     ▼                │
│                               Recovering             │
│                                 │     │              │
│                            成功 ▼     ▼ 失败         │
│                            Ready   Terminated        │
└──────────────────────────────────────────────────────┘

每个状态转换都应该发出事件,便于监控和调试。

七、生产部署

7.1 Kubernetes 部署架构

┌──────────────────────────────────────────────────────┐
│  Kubernetes Cluster                                  │
│                                                      │
│  ┌─────────────────────────────────────────┐        │
│  │  Namespace: agent-system                 │        │
│  │                                         │        │
│  │  ┌──────────┐   ┌──────────────────┐   │        │
│  │  │Orchestrtr│   │  Agent Pool       │   │        │
│  │  │Deployment│   │  ┌────┐ ┌────┐   │   │        │
│  │  │(1 replica)│  │  │ R1 │ │ R2 │   │   │        │
│  │  └──────────┘   │  └────┘ └────┘   │   │        │
│  │                 │  ┌────┐ ┌────┐   │   │        │
│  │  ┌──────────┐   │  │ A1 │ │ A2 │   │   │        │
│  │  │  Redis   │   │  └────┘ └────┘   │   │        │
│  │  │(message) │   │  (HPA: 2-20)     │   │        │
│  │  └──────────┘   └──────────────────┘   │        │
│  │                                         │        │
│  │  ┌──────────┐   ┌──────────────────┐   │        │
│  │  │PostgreSQL│   │  Monitoring       │   │        │
│  │  │(state)   │   │  LangFuse+Grafana│   │        │
│  │  └──────────┘   └──────────────────┘   │        │
│  └─────────────────────────────────────────┘        │
└──────────────────────────────────────────────────────┘

7.2 Serverless 部署

适合突发性负载、按需扩缩的场景:

用户请求 → API Gateway → Lambda/Cloud Function (Orchestrator)
                              │
                         ┌────┼────┐
                         ▼    ▼    ▼
                      Lambda Lambda Lambda
                      (Agent) (Agent) (Agent)
                         │    │    │
                         └────┼────┘
                              ▼
                         DynamoDB / Firestore
                         (状态持久化)

优势:零运维、按调用计费、天然弹性。 限制:冷启动延迟、执行时间上限、状态管理需要外部存储。

八、监控与调试

8.1 关键监控指标

指标 说明 告警阈值
Agent 任务成功率 按 Agent 类型统计 < 90%
E2E 任务完成率 整体任务完成比例 < 95%
Agent 平均响应时间 从接收到返回 P95 > 30s
消息队列深度 未处理消息数 > 100
Agent 实例健康率 健康实例占比 < 80%
LLM 调用失败率 Provider 级别 > 5%
重试率 需要重试的请求比例 > 20%

8.2 调试工具链

问题排查流程:

1. 入口: Grafana Dashboard (全局视图)
   └─ 发现异常指标 (成功率下降/延迟升高)

2. 追踪: LangFuse / LangSmith (调用链路)
   └─ 定位到具体的 Agent + 具体的 LLM 调用

3. 日志: 结构化日志 (ELK / Loki)
   └─ 查看错误详情、输入输出、上下文

4. 回放: 事件溯源回放 (如果启用)
   └─ 从任意断点重放任务,复现问题

8.3 可观测性代码模式

import structlog

logger = structlog.get_logger()

async def execute_agent_task(task: AgentTask) -> AgentResult:
    log = logger.bind(
        task_id=task.id,
        trace_id=task.trace_id,
        agent_type=task.agent_type,
    )

    log.info("agent_task_started", input_tokens=count_tokens(task.input))

    try:
        result = await agent.run(task)
        log.info(
            "agent_task_completed",
            output_tokens=count_tokens(result.output),
            duration_ms=result.duration_ms,
            llm_calls=result.llm_call_count,
        )
        return result

    except AgentError as e:
        log.error(
            "agent_task_failed",
            error_type=type(e).__name__,
            error_message=str(e),
            retry_count=task.retry_count,
        )
        raise

九、架构选型决策树

你的任务复杂度如何?
│
├─ 2-3 个固定角色,串行流程
│  └─ 简单 Orchestration (LangGraph / CrewAI)
│     部署: 单进程,无需消息队列
│
├─ 5-10 个角色,部分可并行
│  └─ Orchestration + Worker Pool
│     部署: K8s Deployment + Redis
│
├─ 10+ 角色,动态组合,高弹性要求
│  └─ Choreography + Event Sourcing
│     部署: K8s + Kafka + PostgreSQL
│
└─ 不确定 / 快速验证
   └─ 从简单 Orchestration 开始
      复杂度上来后再拆分

核心原则:从最简单的架构开始,只在真正遇到瓶颈时才增加复杂度。过度设计比设计不足的代价更大。


Maurice | maurice_wen@proton.me