分布式 Agent 系统架构
原创
灵阙教研团队
S 精选 提升 |
约 9 分钟阅读
更新于 2026-02-27 AI 导读
分布式 Agent 系统架构 多 Agent 通信、编排、容错与生产部署的工程化实践 Maurice | 灵阙学院 一、为什么需要多 Agent 架构 单个 Agent 在面对复杂任务时存在三个天然瓶颈:上下文窗口有限、单一角色视角片面、串行执行效率低。多 Agent 系统通过分工协作来突破这些限制。...
分布式 Agent 系统架构
多 Agent 通信、编排、容错与生产部署的工程化实践
Maurice | 灵阙学院
一、为什么需要多 Agent 架构
单个 Agent 在面对复杂任务时存在三个天然瓶颈:上下文窗口有限、单一角色视角片面、串行执行效率低。多 Agent 系统通过分工协作来突破这些限制。
┌─────────────────────────────────────────────────────────────┐
│ 单 Agent vs 多 Agent 对比 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Single Agent Multi-Agent System │
│ ┌───────────┐ ┌──────────────────┐ │
│ │ │ │ Orchestrator │ │
│ │ 一个角色 │ │ ┌────┐ ┌────┐ │ │
│ │ 一个上下文│ │ │研究│ │分析│ │ │
│ │ 串行执行 │ ──→ │ └────┘ └────┘ │ │
│ │ 全部工具 │ │ ┌────┐ ┌────┐ │ │
│ │ │ │ │编码│ │审查│ │ │
│ └───────────┘ │ └────┘ └────┘ │ │
│ └──────────────────┘ │
│ 上下文膨胀 分而治之,并行执行 │
│ 工具冲突 专业化分工 │
│ 单点故障 容错隔离 │
└─────────────────────────────────────────────────────────────┘
二、通信模式
2.1 四种基础通信模式
1. Direct (点对点) 2. Broadcast (广播)
A ──── B A ──┬── B
├── C
└── D
3. Pub/Sub (发布订阅) 4. Blackboard (黑板)
Publisher ──→ Topic ┌─────────────┐
│ │ Blackboard │
┌───┼───┐ │ (共享状态) │
▼ ▼ ▼ └──┬──┬──┬──┘
S1 S2 S3 A B C
(读写共享)
| 模式 | 耦合度 | 适用场景 | 典型实现 |
|---|---|---|---|
| Direct | 高 | Agent A 明确需要 Agent B 的结果 | 函数调用 / RPC |
| Broadcast | 中 | 通知所有 Agent 某个事件 | Event Bus |
| Pub/Sub | 低 | Agent 按兴趣订阅特定类型消息 | Redis Pub/Sub / Kafka |
| Blackboard | 中 | 多个 Agent 协作构建共享成果 | 共享内存 / 数据库 |
2.2 消息协议设计
from dataclasses import dataclass
from enum import Enum
class MessageType(Enum):
TASK = "task" # 任务分配
RESULT = "result" # 结果返回
STATUS = "status" # 状态更新
ERROR = "error" # 错误报告
HEARTBEAT = "heartbeat" # 心跳
@dataclass
class AgentMessage:
id: str # 消息唯一 ID
trace_id: str # 追踪链路 ID
type: MessageType
sender: str # 发送 Agent ID
receiver: str | None # 接收者 (None = broadcast)
payload: dict # 业务数据
parent_id: str | None # 父消息 ID (用于关联)
timestamp: float
ttl_seconds: int = 300 # 消息过期时间
三、编排模式: Orchestration vs Choreography
3.1 Orchestration (中心编排)
一个 Orchestrator Agent 掌控全局,分配任务并汇总结果。
Orchestrator
┌─────────┐
│ 1. 拆解 │
│ 2. 分配 │
│ 3. 汇总 │
└──┬──┬──┬┘
┌────┘ │ └────┐
▼ ▼ ▼
Agent A Agent B Agent C
(研究) (分析) (编码)
│ │ │
└───────┼───────┘
▼
Orchestrator
(汇总 + 输出)
优势:流程可控、易调试、状态集中。 劣势:Orchestrator 是单点瓶颈、扩展性受限。
3.2 Choreography (去中心编排)
每个 Agent 根据事件自主决策,没有中央控制者。
Agent A ──(event: research_done)──→ Agent B
Agent B ──(event: analysis_done)──→ Agent C
Agent C ──(event: code_ready)────→ Agent D (Review)
Agent D ──(event: approved)──────→ Agent A (下一轮)
优势:去中心化、弹性高、天然可扩展。 劣势:全局状态难追踪、调试困难、死锁风险。
3.3 选型建议
任务特征是什么?
│
├─ 流程固定、步骤清晰、需要强一致性
│ └─ Orchestration (推荐 LangGraph / CrewAI)
│
├─ 流程动态、Agent 数量可变、需要高弹性
│ └─ Choreography (推荐 Event-driven + Pub/Sub)
│
└─ 混合场景: 核心流程用编排,子任务用自组织
└─ Hybrid (Orchestrator 委派子任务给自组织团队)
四、状态管理
4.1 共享状态 vs 事件溯源
| 维度 | 共享状态 (Shared State) | 事件溯源 (Event Sourcing) |
|---|---|---|
| 数据模型 | 可变的当前状态 | 不可变的事件序列 |
| 一致性 | 需要锁/事务 | 最终一致性 |
| 审计能力 | 弱 (只有最新状态) | 强 (完整历史) |
| 调试能力 | 中等 | 强 (可回放) |
| 复杂度 | 低 | 高 |
| 适用场景 | 简单协作 | 需要审计/回放的场景 |
4.2 事件溯源实现
class AgentEventStore:
"""基于事件溯源的 Agent 状态管理"""
def __init__(self, store: EventStoreBackend):
self.store = store
async def append(self, event: AgentEvent):
"""追加事件 (不可变)"""
await self.store.append(
stream_id=event.task_id,
event_type=event.type,
data=event.payload,
metadata={
"agent_id": event.agent_id,
"trace_id": event.trace_id,
"timestamp": event.timestamp,
},
)
async def get_state(self, task_id: str) -> TaskState:
"""通过回放事件序列重建当前状态"""
events = await self.store.read_stream(task_id)
state = TaskState()
for event in events:
state.apply(event)
return state
async def replay_from(
self, task_id: str, event_id: str
) -> TaskState:
"""从某个事件点重放 (用于调试/恢复)"""
events = await self.store.read_stream(
task_id, from_event=event_id
)
state = TaskState()
for event in events:
state.apply(event)
return state
五、故障处理与重试
5.1 Agent 级别故障处理
┌─────────────────────────────────────────────────┐
│ 故障处理策略 │
├─────────────────────────────────────────────────┤
│ │
│ Agent 调用失败 │
│ │ │
│ ▼ │
│ 重试 (最多 3 次, 指数退避) │
│ │ │
│ 仍然失败 │
│ │ │
│ ▼ │
│ 降级策略: │
│ ├── 1. 换用备选模型 (gpt-4o → claude-sonnet) │
│ ├── 2. 简化任务 (减少上下文/降低要求) │
│ ├── 3. 跳过并标记 (非关键步骤) │
│ └── 4. 上报人工介入 (关键步骤) │
│ │
│ 所有操作记录到事件日志 │
└─────────────────────────────────────────────────┘
5.2 重试与熔断
class AgentCircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = "closed" # closed / open / half-open
self.last_failure_time = 0
async def execute(self, agent_fn, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half-open"
else:
raise CircuitOpenError("Agent circuit breaker is open")
try:
result = await agent_fn(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
六、Agent 生命周期管理
┌──────────────────────────────────────────────────────┐
│ Agent 生命周期 │
├──────────────────────────────────────────────────────┤
│ │
│ Created → Initializing → Ready → Running → Done │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ Init Failed Error Timeout │
│ │ │ │ │
│ └─────────────────────┼────────┘ │
│ ▼ │
│ Recovering │
│ │ │ │
│ 成功 ▼ ▼ 失败 │
│ Ready Terminated │
└──────────────────────────────────────────────────────┘
每个状态转换都应该发出事件,便于监控和调试。
七、生产部署
7.1 Kubernetes 部署架构
┌──────────────────────────────────────────────────────┐
│ Kubernetes Cluster │
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ Namespace: agent-system │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────────────┐ │ │
│ │ │Orchestrtr│ │ Agent Pool │ │ │
│ │ │Deployment│ │ ┌────┐ ┌────┐ │ │ │
│ │ │(1 replica)│ │ │ R1 │ │ R2 │ │ │ │
│ │ └──────────┘ │ └────┘ └────┘ │ │ │
│ │ │ ┌────┐ ┌────┐ │ │ │
│ │ ┌──────────┐ │ │ A1 │ │ A2 │ │ │ │
│ │ │ Redis │ │ └────┘ └────┘ │ │ │
│ │ │(message) │ │ (HPA: 2-20) │ │ │
│ │ └──────────┘ └──────────────────┘ │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────────────┐ │ │
│ │ │PostgreSQL│ │ Monitoring │ │ │
│ │ │(state) │ │ LangFuse+Grafana│ │ │
│ │ └──────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────┘
7.2 Serverless 部署
适合突发性负载、按需扩缩的场景:
用户请求 → API Gateway → Lambda/Cloud Function (Orchestrator)
│
┌────┼────┐
▼ ▼ ▼
Lambda Lambda Lambda
(Agent) (Agent) (Agent)
│ │ │
└────┼────┘
▼
DynamoDB / Firestore
(状态持久化)
优势:零运维、按调用计费、天然弹性。 限制:冷启动延迟、执行时间上限、状态管理需要外部存储。
八、监控与调试
8.1 关键监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| Agent 任务成功率 | 按 Agent 类型统计 | < 90% |
| E2E 任务完成率 | 整体任务完成比例 | < 95% |
| Agent 平均响应时间 | 从接收到返回 | P95 > 30s |
| 消息队列深度 | 未处理消息数 | > 100 |
| Agent 实例健康率 | 健康实例占比 | < 80% |
| LLM 调用失败率 | Provider 级别 | > 5% |
| 重试率 | 需要重试的请求比例 | > 20% |
8.2 调试工具链
问题排查流程:
1. 入口: Grafana Dashboard (全局视图)
└─ 发现异常指标 (成功率下降/延迟升高)
2. 追踪: LangFuse / LangSmith (调用链路)
└─ 定位到具体的 Agent + 具体的 LLM 调用
3. 日志: 结构化日志 (ELK / Loki)
└─ 查看错误详情、输入输出、上下文
4. 回放: 事件溯源回放 (如果启用)
└─ 从任意断点重放任务,复现问题
8.3 可观测性代码模式
import structlog
logger = structlog.get_logger()
async def execute_agent_task(task: AgentTask) -> AgentResult:
log = logger.bind(
task_id=task.id,
trace_id=task.trace_id,
agent_type=task.agent_type,
)
log.info("agent_task_started", input_tokens=count_tokens(task.input))
try:
result = await agent.run(task)
log.info(
"agent_task_completed",
output_tokens=count_tokens(result.output),
duration_ms=result.duration_ms,
llm_calls=result.llm_call_count,
)
return result
except AgentError as e:
log.error(
"agent_task_failed",
error_type=type(e).__name__,
error_message=str(e),
retry_count=task.retry_count,
)
raise
九、架构选型决策树
你的任务复杂度如何?
│
├─ 2-3 个固定角色,串行流程
│ └─ 简单 Orchestration (LangGraph / CrewAI)
│ 部署: 单进程,无需消息队列
│
├─ 5-10 个角色,部分可并行
│ └─ Orchestration + Worker Pool
│ 部署: K8s Deployment + Redis
│
├─ 10+ 角色,动态组合,高弹性要求
│ └─ Choreography + Event Sourcing
│ 部署: K8s + Kafka + PostgreSQL
│
└─ 不确定 / 快速验证
└─ 从简单 Orchestration 开始
复杂度上来后再拆分
核心原则:从最简单的架构开始,只在真正遇到瓶颈时才增加复杂度。过度设计比设计不足的代价更大。
Maurice | maurice_wen@proton.me