Agent 工具调用优化策略
原创
灵阙教研团队
S 精选 进阶 |
约 10 分钟阅读
更新于 2026-02-28 AI 导读
Agent 工具调用优化策略 从工具选择到并行执行,构建高效可靠的 Agent 工具调用体系 工具调用的核心问题 Agent 的能力边界由其可用的工具集决定。工具调用的效率和可靠性直接决定了 Agent 的实际表现。核心挑战包括: 选择问题:面对数十甚至数百个工具,如何让 Agent 准确选择最合适的工具 参数问题:工具参数的构造错误是 Agent 失败的首要原因...
Agent 工具调用优化策略
从工具选择到并行执行,构建高效可靠的 Agent 工具调用体系
工具调用的核心问题
Agent 的能力边界由其可用的工具集决定。工具调用的效率和可靠性直接决定了 Agent 的实际表现。核心挑战包括:
- 选择问题:面对数十甚至数百个工具,如何让 Agent 准确选择最合适的工具
- 参数问题:工具参数的构造错误是 Agent 失败的首要原因
- 编排问题:多个工具调用之间的顺序、并行和依赖关系管理
- 恢复问题:工具调用失败后的重试、降级和回退策略
工具调用的完整生命周期:
LLM 推理 ──→ 工具选择 ──→ 参数构造 ──→ 权限检查 ──→ 执行
│
┌─────────┼─────────┐
v v v
成功 失败 超时
│ │ │
v v v
结果解析 重试/降级 中断/回退
│
v
反馈给 LLM(下一轮推理)
一、工具描述优化
工具描述是 Agent 的"说明书"
工具描述的质量直接影响 LLM 的工具选择准确率。一个好的工具描述应该回答三个问题:
- When:什么时候应该使用这个工具
- What:这个工具能做什么(输入/输出)
- Why Not:什么时候不应该使用这个工具
# 差的工具描述
BAD_TOOL = {
"name": "search",
"description": "搜索功能",
"parameters": {
"query": {"type": "string"}
}
}
# 好的工具描述
GOOD_TOOL = {
"name": "web_search",
"description": (
"在互联网上搜索实时信息。"
"适用于:需要最新数据、新闻、价格、天气等实时信息时。"
"不适用于:已有的静态知识查询(如数学公式、历史事件日期)。"
"返回:搜索结果列表,每条包含标题、摘要和链接。"
),
"parameters": {
"query": {
"type": "string",
"description": (
"搜索查询词。建议使用关键词组合而非完整句子。"
"例如:'Python 3.12 新特性' 而非 'Python 3.12有什么新特性'"
)
},
"max_results": {
"type": "integer",
"description": "返回结果数量,默认 5,最大 20",
"default": 5
},
"time_range": {
"type": "string",
"enum": ["day", "week", "month", "year", "any"],
"description": "时间范围过滤,默认 any",
"default": "any"
}
}
}
工具描述模板
TOOL_DESCRIPTION_TEMPLATE = """
名称: {name}
用途: {purpose}(一句话说清楚)
适用场景: {when_to_use}(列出 2-3 个典型场景)
不适用场景: {when_not_to_use}(列出容易误用的场景)
输入: {input_description}(每个参数的含义和格式要求)
输出: {output_description}(返回值的结构和含义)
注意事项: {caveats}(限制、耗时、成本等)
示例: {example}(一个完整的调用示例)
"""
二、工具选择策略
策略一:语义路由(Semantic Routing)
当工具数量较多时(> 20),不把所有工具描述塞入 System Prompt,而是先用语义匹配筛选出最相关的工具子集。
class SemanticToolRouter:
"""语义工具路由:根据任务语义选择最相关的工具"""
def __init__(self, tools: list[Tool]):
self.tools = tools
self.tool_embeddings = self._build_index()
def _build_index(self) -> dict:
"""为每个工具的描述生成嵌入向量"""
embeddings = {}
for tool in self.tools:
text = f"{tool.name}: {tool.description}"
embeddings[tool.name] = embed(text)
return embeddings
def select(self, task: str, top_k: int = 5) -> list[Tool]:
"""根据任务描述选择最相关的工具"""
task_embedding = embed(task)
similarities = []
for tool_name, tool_embedding in self.tool_embeddings.items():
sim = cosine_similarity(task_embedding, tool_embedding)
similarities.append((tool_name, sim))
similarities.sort(key=lambda x: x[1], reverse=True)
selected_names = [name for name, _ in similarities[:top_k]]
return [t for t in self.tools if t.name in selected_names]
策略二:分层工具选择
第一层:工具类别选择(粗筛)
"文件操作" / "网络请求" / "数据库" / "代码执行" / "搜索"
第二层:具体工具选择(细筛)
文件操作 ──→ file_read / file_write / file_list / file_delete
第三层:参数构造
file_read(path="/src/main.py", offset=0, limit=100)
class HierarchicalToolSelector:
"""分层工具选择器"""
def __init__(self, tool_categories: dict[str, list[Tool]]):
self.categories = tool_categories
def select(self, task: str, llm) -> Tool:
"""两阶段选择"""
# 阶段 1:选择类别
category_descriptions = {
name: self._summarize_category(tools)
for name, tools in self.categories.items()
}
selected_category = llm.choose(
prompt=f"任务:{task}\n选择最合适的工具类别:",
options=category_descriptions
)
# 阶段 2:在类别内选择具体工具
tools_in_category = self.categories[selected_category]
selected_tool = llm.choose(
prompt=f"任务:{task}\n从以下工具中选择:",
options={t.name: t.description for t in tools_in_category}
)
return selected_tool
策略三:工具使用模式学习
class ToolUsagePatternLearner:
"""从历史数据中学习工具使用模式"""
def __init__(self):
self.patterns = defaultdict(Counter)
def record(self, task_type: str, tool_sequence: list[str]):
"""记录工具使用序列"""
for i, tool in enumerate(tool_sequence):
if i > 0:
prev_tool = tool_sequence[i - 1]
self.patterns[prev_tool][tool] += 1
def suggest_next(self, current_tool: str,
top_k: int = 3) -> list[tuple[str, float]]:
"""根据历史模式推荐下一个工具"""
if current_tool not in self.patterns:
return []
total = sum(self.patterns[current_tool].values())
suggestions = [
(tool, count / total)
for tool, count in self.patterns[current_tool].most_common(top_k)
]
return suggestions
三、并行执行优化
依赖图分析
class ToolCallScheduler:
"""工具调用调度器:基于依赖图的并行执行"""
def analyze_dependencies(self,
tool_calls: list[ToolCall]) -> DependencyGraph:
"""分析工具调用之间的依赖关系"""
graph = DependencyGraph()
for call in tool_calls:
graph.add_node(call.id)
for i, call_a in enumerate(tool_calls):
for call_b in tool_calls[i + 1:]:
# 检查数据依赖:B 的输入是否依赖 A 的输出
if self._has_data_dependency(call_a, call_b):
graph.add_edge(call_a.id, call_b.id)
# 检查资源冲突:A 和 B 是否操作同一资源
if self._has_resource_conflict(call_a, call_b):
graph.add_edge(call_a.id, call_b.id)
return graph
def schedule(self, graph: DependencyGraph) -> list[list[ToolCall]]:
"""生成执行计划:每个内部列表可以并行执行"""
levels = graph.topological_sort_levels()
return levels # [[可并行的调用], [下一批可并行的调用], ...]
async def execute_parallel(self,
schedule: list[list[ToolCall]]) -> dict:
"""按计划并行执行"""
results = {}
for level in schedule:
# 同一层的调用可以并行
tasks = [
self._execute_single(call)
for call in level
]
level_results = await asyncio.gather(
*tasks, return_exceptions=True
)
for call, result in zip(level, level_results):
results[call.id] = result
# 如果某个调用失败,检查是否影响后续调用
if isinstance(result, Exception):
affected = graph.get_dependents(call.id)
for dep_id in affected:
results[dep_id] = SkippedResult(
reason=f"Dependency {call.id} failed"
)
return results
并行执行示意
依赖图:
A (file_read) ──→ C (code_analyze) ──→ E (generate_report)
B (web_search) ──→ D (summarize) ──→ E
执行计划:
Level 0: [A, B] # 并行执行
Level 1: [C, D] # A完成后执行C, B完成后执行D, 两者可并行
Level 2: [E] # C和D都完成后执行E
四、参数构造优化
参数校验与自动修正
class ParameterValidator:
"""工具参数校验与自动修正"""
def validate_and_fix(self, tool: Tool,
params: dict) -> tuple[dict, list[str]]:
"""校验参数并尝试自动修正"""
fixes = []
validated = {}
for param_name, param_schema in tool.parameters.items():
value = params.get(param_name)
# 缺失必填参数
if value is None and param_schema.get("required"):
if "default" in param_schema:
validated[param_name] = param_schema["default"]
fixes.append(
f"Missing '{param_name}', using default: "
f"{param_schema['default']}"
)
else:
raise ParameterError(
f"Required parameter '{param_name}' is missing"
)
continue
if value is None:
continue
# 类型修正
expected_type = param_schema.get("type")
if expected_type == "integer" and isinstance(value, str):
try:
validated[param_name] = int(value)
fixes.append(
f"Converted '{param_name}' from str to int"
)
continue
except ValueError:
pass
# 枚举值修正(模糊匹配)
if "enum" in param_schema and value not in param_schema["enum"]:
closest = self._fuzzy_match(
value, param_schema["enum"]
)
if closest:
validated[param_name] = closest
fixes.append(
f"Corrected '{param_name}': "
f"'{value}' -> '{closest}'"
)
continue
# 路径规范化
if param_schema.get("format") == "file_path":
normalized = os.path.normpath(value)
if normalized != value:
validated[param_name] = normalized
fixes.append(
f"Normalized path '{param_name}': "
f"'{value}' -> '{normalized}'"
)
continue
validated[param_name] = value
return validated, fixes
五、错误恢复策略
分级重试机制
class ToolErrorRecovery:
"""工具调用错误恢复"""
RETRY_STRATEGIES = {
"timeout": {
"max_retries": 3,
"backoff": "exponential", # 1s, 2s, 4s
"action": "retry_same"
},
"rate_limit": {
"max_retries": 5,
"backoff": "linear", # 等待 rate_limit_reset
"action": "retry_same"
},
"invalid_params": {
"max_retries": 2,
"backoff": "none",
"action": "retry_with_fix" # 尝试修正参数后重试
},
"permission_denied": {
"max_retries": 0,
"action": "escalate" # 升级到人工
},
"tool_not_found": {
"max_retries": 1,
"action": "find_alternative" # 查找替代工具
},
"unknown": {
"max_retries": 1,
"action": "retry_same"
}
}
async def execute_with_recovery(self, tool: Tool,
params: dict) -> ToolResult:
"""带错误恢复的工具执行"""
error_type = None
last_error = None
for attempt in range(self._max_retries(error_type) + 1):
try:
result = await tool.execute(params)
return result
except ToolError as e:
error_type = self._classify_error(e)
last_error = e
strategy = self.RETRY_STRATEGIES.get(
error_type, self.RETRY_STRATEGIES["unknown"]
)
if attempt >= strategy["max_retries"]:
break
# 执行恢复动作
if strategy["action"] == "retry_same":
await self._wait(strategy["backoff"], attempt)
elif strategy["action"] == "retry_with_fix":
params = await self._fix_params(tool, params, e)
elif strategy["action"] == "find_alternative":
alt_tool = self._find_alternative(tool)
if alt_tool:
tool = alt_tool
else:
break
elif strategy["action"] == "escalate":
return ToolResult(
status="escalated",
message=f"需要人工介入: {e}"
)
# 所有重试失败
return ToolResult(
status="failed",
error=str(last_error),
attempts=attempt + 1
)
def _classify_error(self, error: ToolError) -> str:
"""错误分类"""
if "timeout" in str(error).lower():
return "timeout"
if "rate" in str(error).lower() or "429" in str(error):
return "rate_limit"
if "permission" in str(error).lower() or "403" in str(error):
return "permission_denied"
if "invalid" in str(error).lower() or "400" in str(error):
return "invalid_params"
return "unknown"
降级策略
工具调用降级链:
首选工具失败
│
v
同类替代工具
│ (例如:web_search 失败 → news_search)
│
v 也失败
更基础的工具
│ (例如:structured_search → simple_search)
│
v 也失败
LLM 内部知识
│ (告知用户:无法获取实时数据,以下基于已有知识)
│
v
向用户求助
(请求用户提供所需信息)
class FallbackChain:
"""降级链"""
def __init__(self, chains: dict[str, list[str]]):
self.chains = chains # tool_name -> [fallback_1, fallback_2, ...]
async def execute(self, tool_name: str,
params: dict) -> ToolResult:
"""按降级链执行"""
chain = [tool_name] + self.chains.get(tool_name, [])
for fallback in chain:
try:
tool = self.registry.get(fallback)
adapted_params = self._adapt_params(
params, tool_name, fallback
)
result = await tool.execute(adapted_params)
if fallback != tool_name:
result.metadata["fallback_from"] = tool_name
result.metadata["fallback_to"] = fallback
return result
except ToolError:
continue
return ToolResult(
status="all_fallbacks_failed",
error=f"All tools in chain failed: {chain}"
)
# 降级链配置示例
FALLBACK_CHAINS = {
"web_search": ["cached_search", "llm_knowledge"],
"database_query": ["cached_query", "file_based_query"],
"code_execute": ["code_analyze", "llm_reasoning"],
"image_generate": ["image_search", "text_description"],
}
六、工具调用监控
关键指标
class ToolCallMetrics:
"""工具调用指标收集"""
def record(self, event: ToolCallEvent):
"""记录工具调用指标"""
metrics = {
"tool_name": event.tool_name,
"duration_ms": event.duration_ms,
"status": event.status, # success/failure/timeout
"attempt": event.attempt, # 第几次尝试
"fallback_used": event.fallback, # 是否使用了降级
"tokens_in_params": event.param_tokens,
"tokens_in_result": event.result_tokens,
}
# 推送到监控系统
self.push_to_prometheus(metrics)
def get_dashboard_data(self) -> dict:
"""生成仪表盘数据"""
return {
"success_rate": self._calc_success_rate(),
"avg_latency_ms": self._calc_avg_latency(),
"p99_latency_ms": self._calc_p99_latency(),
"top_failures": self._get_top_failures(),
"tool_usage_distribution": self._get_usage_dist(),
"fallback_rate": self._calc_fallback_rate(),
"cost_by_tool": self._calc_cost_by_tool(),
}
监控仪表盘指标
| 指标 | 含义 | 告警阈值 |
|---|---|---|
| Success Rate | 工具调用成功率 | < 95% |
| Avg Latency | 平均延迟 | > 5s |
| P99 Latency | 99 分位延迟 | > 30s |
| Fallback Rate | 降级触发率 | > 10% |
| Token Efficiency | Token 有效利用率 | < 30% |
| Error Rate by Tool | 各工具的错误率 | 因工具而异 |
工程实践建议
- 工具描述是第一优先级:投入时间优化工具描述的收益远大于调优 Agent Prompt
- 少即是多:可用工具不超过 15 个;超过时使用语义路由或分层选择
- 参数示例胜过描述:在工具描述中提供 1-2 个完整的参数示例
- 监控先于优化:先建立工具调用的可观测性,再根据数据做优化
- 降级链必须测试:降级策略不测试等于没有降级
- 幂等设计:重试安全的前提是工具调用是幂等的,写操作需要幂等 key
参考资料
- OpenAI Function Calling 最佳实践
- Anthropic Tool Use 文档
- LangChain Tools & Toolkits 设计模式
- Gorilla: LLM Connected with Massive APIs
Maurice | maurice_wen@proton.me