Agent 工具调用优化策略

从工具选择到并行执行,构建高效可靠的 Agent 工具调用体系


工具调用的核心问题

Agent 的能力边界由其可用的工具集决定。工具调用的效率和可靠性直接决定了 Agent 的实际表现。核心挑战包括:

  1. 选择问题:面对数十甚至数百个工具,如何让 Agent 准确选择最合适的工具
  2. 参数问题:工具参数的构造错误是 Agent 失败的首要原因
  3. 编排问题:多个工具调用之间的顺序、并行和依赖关系管理
  4. 恢复问题:工具调用失败后的重试、降级和回退策略
工具调用的完整生命周期:

  LLM 推理 ──→ 工具选择 ──→ 参数构造 ──→ 权限检查 ──→ 执行
                                                        │
                                              ┌─────────┼─────────┐
                                              v         v         v
                                           成功       失败     超时
                                              │         │         │
                                              v         v         v
                                          结果解析   重试/降级  中断/回退
                                              │
                                              v
                                       反馈给 LLM(下一轮推理)

一、工具描述优化

工具描述是 Agent 的"说明书"

工具描述的质量直接影响 LLM 的工具选择准确率。一个好的工具描述应该回答三个问题:

  • When:什么时候应该使用这个工具
  • What:这个工具能做什么(输入/输出)
  • Why Not:什么时候不应该使用这个工具
# 差的工具描述
BAD_TOOL = {
    "name": "search",
    "description": "搜索功能",
    "parameters": {
        "query": {"type": "string"}
    }
}

# 好的工具描述
GOOD_TOOL = {
    "name": "web_search",
    "description": (
        "在互联网上搜索实时信息。"
        "适用于:需要最新数据、新闻、价格、天气等实时信息时。"
        "不适用于:已有的静态知识查询(如数学公式、历史事件日期)。"
        "返回:搜索结果列表,每条包含标题、摘要和链接。"
    ),
    "parameters": {
        "query": {
            "type": "string",
            "description": (
                "搜索查询词。建议使用关键词组合而非完整句子。"
                "例如:'Python 3.12 新特性' 而非 'Python 3.12有什么新特性'"
            )
        },
        "max_results": {
            "type": "integer",
            "description": "返回结果数量,默认 5,最大 20",
            "default": 5
        },
        "time_range": {
            "type": "string",
            "enum": ["day", "week", "month", "year", "any"],
            "description": "时间范围过滤,默认 any",
            "default": "any"
        }
    }
}

工具描述模板

TOOL_DESCRIPTION_TEMPLATE = """
名称: {name}
用途: {purpose}(一句话说清楚)
适用场景: {when_to_use}(列出 2-3 个典型场景)
不适用场景: {when_not_to_use}(列出容易误用的场景)
输入: {input_description}(每个参数的含义和格式要求)
输出: {output_description}(返回值的结构和含义)
注意事项: {caveats}(限制、耗时、成本等)
示例: {example}(一个完整的调用示例)
"""

二、工具选择策略

策略一:语义路由(Semantic Routing)

当工具数量较多时(> 20),不把所有工具描述塞入 System Prompt,而是先用语义匹配筛选出最相关的工具子集。

class SemanticToolRouter:
    """语义工具路由:根据任务语义选择最相关的工具"""

    def __init__(self, tools: list[Tool]):
        self.tools = tools
        self.tool_embeddings = self._build_index()

    def _build_index(self) -> dict:
        """为每个工具的描述生成嵌入向量"""
        embeddings = {}
        for tool in self.tools:
            text = f"{tool.name}: {tool.description}"
            embeddings[tool.name] = embed(text)
        return embeddings

    def select(self, task: str, top_k: int = 5) -> list[Tool]:
        """根据任务描述选择最相关的工具"""
        task_embedding = embed(task)

        similarities = []
        for tool_name, tool_embedding in self.tool_embeddings.items():
            sim = cosine_similarity(task_embedding, tool_embedding)
            similarities.append((tool_name, sim))

        similarities.sort(key=lambda x: x[1], reverse=True)
        selected_names = [name for name, _ in similarities[:top_k]]

        return [t for t in self.tools if t.name in selected_names]

策略二:分层工具选择

第一层:工具类别选择(粗筛)
  "文件操作" / "网络请求" / "数据库" / "代码执行" / "搜索"

第二层:具体工具选择(细筛)
  文件操作 ──→ file_read / file_write / file_list / file_delete

第三层:参数构造
  file_read(path="/src/main.py", offset=0, limit=100)
class HierarchicalToolSelector:
    """分层工具选择器"""

    def __init__(self, tool_categories: dict[str, list[Tool]]):
        self.categories = tool_categories

    def select(self, task: str, llm) -> Tool:
        """两阶段选择"""
        # 阶段 1:选择类别
        category_descriptions = {
            name: self._summarize_category(tools)
            for name, tools in self.categories.items()
        }

        selected_category = llm.choose(
            prompt=f"任务:{task}\n选择最合适的工具类别:",
            options=category_descriptions
        )

        # 阶段 2:在类别内选择具体工具
        tools_in_category = self.categories[selected_category]
        selected_tool = llm.choose(
            prompt=f"任务:{task}\n从以下工具中选择:",
            options={t.name: t.description for t in tools_in_category}
        )

        return selected_tool

策略三:工具使用模式学习

class ToolUsagePatternLearner:
    """从历史数据中学习工具使用模式"""

    def __init__(self):
        self.patterns = defaultdict(Counter)

    def record(self, task_type: str, tool_sequence: list[str]):
        """记录工具使用序列"""
        for i, tool in enumerate(tool_sequence):
            if i > 0:
                prev_tool = tool_sequence[i - 1]
                self.patterns[prev_tool][tool] += 1

    def suggest_next(self, current_tool: str,
                     top_k: int = 3) -> list[tuple[str, float]]:
        """根据历史模式推荐下一个工具"""
        if current_tool not in self.patterns:
            return []

        total = sum(self.patterns[current_tool].values())
        suggestions = [
            (tool, count / total)
            for tool, count in self.patterns[current_tool].most_common(top_k)
        ]
        return suggestions

三、并行执行优化

依赖图分析

class ToolCallScheduler:
    """工具调用调度器:基于依赖图的并行执行"""

    def analyze_dependencies(self,
                              tool_calls: list[ToolCall]) -> DependencyGraph:
        """分析工具调用之间的依赖关系"""
        graph = DependencyGraph()

        for call in tool_calls:
            graph.add_node(call.id)

        for i, call_a in enumerate(tool_calls):
            for call_b in tool_calls[i + 1:]:
                # 检查数据依赖:B 的输入是否依赖 A 的输出
                if self._has_data_dependency(call_a, call_b):
                    graph.add_edge(call_a.id, call_b.id)

                # 检查资源冲突:A 和 B 是否操作同一资源
                if self._has_resource_conflict(call_a, call_b):
                    graph.add_edge(call_a.id, call_b.id)

        return graph

    def schedule(self, graph: DependencyGraph) -> list[list[ToolCall]]:
        """生成执行计划:每个内部列表可以并行执行"""
        levels = graph.topological_sort_levels()
        return levels  # [[可并行的调用], [下一批可并行的调用], ...]

    async def execute_parallel(self,
                                schedule: list[list[ToolCall]]) -> dict:
        """按计划并行执行"""
        results = {}

        for level in schedule:
            # 同一层的调用可以并行
            tasks = [
                self._execute_single(call)
                for call in level
            ]
            level_results = await asyncio.gather(
                *tasks, return_exceptions=True
            )

            for call, result in zip(level, level_results):
                results[call.id] = result

                # 如果某个调用失败,检查是否影响后续调用
                if isinstance(result, Exception):
                    affected = graph.get_dependents(call.id)
                    for dep_id in affected:
                        results[dep_id] = SkippedResult(
                            reason=f"Dependency {call.id} failed"
                        )

        return results

并行执行示意

依赖图:
  A (file_read)  ──→  C (code_analyze)  ──→  E (generate_report)
  B (web_search) ──→  D (summarize)     ──→  E

执行计划:
  Level 0: [A, B]     # 并行执行
  Level 1: [C, D]     # A完成后执行C, B完成后执行D, 两者可并行
  Level 2: [E]         # C和D都完成后执行E

四、参数构造优化

参数校验与自动修正

class ParameterValidator:
    """工具参数校验与自动修正"""

    def validate_and_fix(self, tool: Tool,
                          params: dict) -> tuple[dict, list[str]]:
        """校验参数并尝试自动修正"""
        fixes = []
        validated = {}

        for param_name, param_schema in tool.parameters.items():
            value = params.get(param_name)

            # 缺失必填参数
            if value is None and param_schema.get("required"):
                if "default" in param_schema:
                    validated[param_name] = param_schema["default"]
                    fixes.append(
                        f"Missing '{param_name}', using default: "
                        f"{param_schema['default']}"
                    )
                else:
                    raise ParameterError(
                        f"Required parameter '{param_name}' is missing"
                    )
                continue

            if value is None:
                continue

            # 类型修正
            expected_type = param_schema.get("type")
            if expected_type == "integer" and isinstance(value, str):
                try:
                    validated[param_name] = int(value)
                    fixes.append(
                        f"Converted '{param_name}' from str to int"
                    )
                    continue
                except ValueError:
                    pass

            # 枚举值修正(模糊匹配)
            if "enum" in param_schema and value not in param_schema["enum"]:
                closest = self._fuzzy_match(
                    value, param_schema["enum"]
                )
                if closest:
                    validated[param_name] = closest
                    fixes.append(
                        f"Corrected '{param_name}': "
                        f"'{value}' -> '{closest}'"
                    )
                    continue

            # 路径规范化
            if param_schema.get("format") == "file_path":
                normalized = os.path.normpath(value)
                if normalized != value:
                    validated[param_name] = normalized
                    fixes.append(
                        f"Normalized path '{param_name}': "
                        f"'{value}' -> '{normalized}'"
                    )
                    continue

            validated[param_name] = value

        return validated, fixes

五、错误恢复策略

分级重试机制

class ToolErrorRecovery:
    """工具调用错误恢复"""

    RETRY_STRATEGIES = {
        "timeout": {
            "max_retries": 3,
            "backoff": "exponential",    # 1s, 2s, 4s
            "action": "retry_same"
        },
        "rate_limit": {
            "max_retries": 5,
            "backoff": "linear",          # 等待 rate_limit_reset
            "action": "retry_same"
        },
        "invalid_params": {
            "max_retries": 2,
            "backoff": "none",
            "action": "retry_with_fix"    # 尝试修正参数后重试
        },
        "permission_denied": {
            "max_retries": 0,
            "action": "escalate"          # 升级到人工
        },
        "tool_not_found": {
            "max_retries": 1,
            "action": "find_alternative"  # 查找替代工具
        },
        "unknown": {
            "max_retries": 1,
            "action": "retry_same"
        }
    }

    async def execute_with_recovery(self, tool: Tool,
                                     params: dict) -> ToolResult:
        """带错误恢复的工具执行"""
        error_type = None
        last_error = None

        for attempt in range(self._max_retries(error_type) + 1):
            try:
                result = await tool.execute(params)
                return result

            except ToolError as e:
                error_type = self._classify_error(e)
                last_error = e
                strategy = self.RETRY_STRATEGIES.get(
                    error_type, self.RETRY_STRATEGIES["unknown"]
                )

                if attempt >= strategy["max_retries"]:
                    break

                # 执行恢复动作
                if strategy["action"] == "retry_same":
                    await self._wait(strategy["backoff"], attempt)

                elif strategy["action"] == "retry_with_fix":
                    params = await self._fix_params(tool, params, e)

                elif strategy["action"] == "find_alternative":
                    alt_tool = self._find_alternative(tool)
                    if alt_tool:
                        tool = alt_tool
                    else:
                        break

                elif strategy["action"] == "escalate":
                    return ToolResult(
                        status="escalated",
                        message=f"需要人工介入: {e}"
                    )

        # 所有重试失败
        return ToolResult(
            status="failed",
            error=str(last_error),
            attempts=attempt + 1
        )

    def _classify_error(self, error: ToolError) -> str:
        """错误分类"""
        if "timeout" in str(error).lower():
            return "timeout"
        if "rate" in str(error).lower() or "429" in str(error):
            return "rate_limit"
        if "permission" in str(error).lower() or "403" in str(error):
            return "permission_denied"
        if "invalid" in str(error).lower() or "400" in str(error):
            return "invalid_params"
        return "unknown"

降级策略

工具调用降级链:
  首选工具失败
      │
      v
  同类替代工具
      │  (例如:web_search 失败 → news_search)
      │
      v  也失败
  更基础的工具
      │  (例如:structured_search → simple_search)
      │
      v  也失败
  LLM 内部知识
      │  (告知用户:无法获取实时数据,以下基于已有知识)
      │
      v
  向用户求助
      (请求用户提供所需信息)
class FallbackChain:
    """降级链"""

    def __init__(self, chains: dict[str, list[str]]):
        self.chains = chains  # tool_name -> [fallback_1, fallback_2, ...]

    async def execute(self, tool_name: str,
                       params: dict) -> ToolResult:
        """按降级链执行"""
        chain = [tool_name] + self.chains.get(tool_name, [])

        for fallback in chain:
            try:
                tool = self.registry.get(fallback)
                adapted_params = self._adapt_params(
                    params, tool_name, fallback
                )
                result = await tool.execute(adapted_params)
                if fallback != tool_name:
                    result.metadata["fallback_from"] = tool_name
                    result.metadata["fallback_to"] = fallback
                return result
            except ToolError:
                continue

        return ToolResult(
            status="all_fallbacks_failed",
            error=f"All tools in chain failed: {chain}"
        )

# 降级链配置示例
FALLBACK_CHAINS = {
    "web_search": ["cached_search", "llm_knowledge"],
    "database_query": ["cached_query", "file_based_query"],
    "code_execute": ["code_analyze", "llm_reasoning"],
    "image_generate": ["image_search", "text_description"],
}

六、工具调用监控

关键指标

class ToolCallMetrics:
    """工具调用指标收集"""

    def record(self, event: ToolCallEvent):
        """记录工具调用指标"""
        metrics = {
            "tool_name": event.tool_name,
            "duration_ms": event.duration_ms,
            "status": event.status,           # success/failure/timeout
            "attempt": event.attempt,          # 第几次尝试
            "fallback_used": event.fallback,   # 是否使用了降级
            "tokens_in_params": event.param_tokens,
            "tokens_in_result": event.result_tokens,
        }

        # 推送到监控系统
        self.push_to_prometheus(metrics)

    def get_dashboard_data(self) -> dict:
        """生成仪表盘数据"""
        return {
            "success_rate": self._calc_success_rate(),
            "avg_latency_ms": self._calc_avg_latency(),
            "p99_latency_ms": self._calc_p99_latency(),
            "top_failures": self._get_top_failures(),
            "tool_usage_distribution": self._get_usage_dist(),
            "fallback_rate": self._calc_fallback_rate(),
            "cost_by_tool": self._calc_cost_by_tool(),
        }

监控仪表盘指标

指标 含义 告警阈值
Success Rate 工具调用成功率 < 95%
Avg Latency 平均延迟 > 5s
P99 Latency 99 分位延迟 > 30s
Fallback Rate 降级触发率 > 10%
Token Efficiency Token 有效利用率 < 30%
Error Rate by Tool 各工具的错误率 因工具而异

工程实践建议

  1. 工具描述是第一优先级:投入时间优化工具描述的收益远大于调优 Agent Prompt
  2. 少即是多:可用工具不超过 15 个;超过时使用语义路由或分层选择
  3. 参数示例胜过描述:在工具描述中提供 1-2 个完整的参数示例
  4. 监控先于优化:先建立工具调用的可观测性,再根据数据做优化
  5. 降级链必须测试:降级策略不测试等于没有降级
  6. 幂等设计:重试安全的前提是工具调用是幂等的,写操作需要幂等 key

参考资料

  • OpenAI Function Calling 最佳实践
  • Anthropic Tool Use 文档
  • LangChain Tools & Toolkits 设计模式
  • Gorilla: LLM Connected with Massive APIs

Maurice | maurice_wen@proton.me