提示词链编排:复杂任务的分治策略
原创
灵阙教研团队
A 推荐 提升 |
约 7 分钟阅读
更新于 2026-02-28 AI 导读
提示词链编排:复杂任务的分治策略 Chain-of-Prompts、条件路由、错误传播与 LangChain LCEL 实现 | 2026-02 一、为什么需要提示词链 单个提示词在面对复杂任务时会遇到瓶颈:上下文窗口不够用、指令过于复杂导致遵循率下降、不同子任务需要不同的模型或参数。提示词链(Chain of Prompts)将复杂任务分解为多个简单步骤,每个步骤使用专门优化的提示词。...
提示词链编排:复杂任务的分治策略
Chain-of-Prompts、条件路由、错误传播与 LangChain LCEL 实现 | 2026-02
一、为什么需要提示词链
单个提示词在面对复杂任务时会遇到瓶颈:上下文窗口不够用、指令过于复杂导致遵循率下降、不同子任务需要不同的模型或参数。提示词链(Chain of Prompts)将复杂任务分解为多个简单步骤,每个步骤使用专门优化的提示词。
Single Prompt (fragile):
Complex Question -> [Mega Prompt] -> Answer (often wrong)
Chain of Prompts (robust):
Complex Question -> [Classify] -> [Retrieve] -> [Synthesize] -> [Validate] -> Answer
| | | |
Simple, Focused, Specialized, Quality
reliable efficient accurate checked
二、链式编排模式
2.1 基础模式
| 模式 | 描述 | 适用场景 |
|---|---|---|
| 顺序链 | A -> B -> C | 固定流程 |
| 条件链 | A -> if(X) B else C | 分类后分支 |
| 并行链 | A -> [B, C] -> D | 独立子任务 |
| 循环链 | A -> B -> (check) -> A | 自我修正 |
| 树形链 | A -> [B -> D, C -> E] -> F | 复杂分解 |
| Map-Reduce | Split -> [Process...] -> Merge | 批量处理 |
2.2 架构图
Pattern 1: Sequential Chain
Input -> [Step 1] -> [Step 2] -> [Step 3] -> Output
Pattern 2: Conditional Chain (Router)
Input -> [Classifier] --"tech"--> [Tech Support]
--"billing"--> [Billing Agent]
--"general"--> [General FAQ]
Pattern 3: Parallel Chain
Input -> [Extract Entities] ---|
-> [Analyze Sentiment] --+--> [Merge & Synthesize] -> Output
-> [Detect Language] ---|
Pattern 4: Self-Correcting Loop
Input -> [Generate] -> [Validate] --pass--> Output
|
fail
|
v
[Fix with feedback] ---> [Validate]
(max 3 iterations)
Pattern 5: Map-Reduce
Long Document -> [Split into chunks]
|
+--------+--------+
| | |
[Summarize] [Summarize] [Summarize]
| | |
+--------+--------+
|
[Merge summaries] -> Final Summary
三、LangChain LCEL 实现
3.1 LCEL 基础
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_openai import ChatOpenAI
# LCEL: LangChain Expression Language
# Uses the pipe (|) operator to chain components
# Simple chain: prompt | model | parser
classify_prompt = ChatPromptTemplate.from_messages([
("system", "Classify the user query into: tech_support, billing, general"),
("user", "{query}"),
])
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
# This creates a Runnable chain
classify_chain = classify_prompt | model | parser
# Execute
result = await classify_chain.ainvoke({"query": "My invoice is wrong"})
# result: "billing"
3.2 条件路由
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
# Define specialized chains for each category
tech_chain = (
ChatPromptTemplate.from_messages([
("system", "You are a technical support specialist. Diagnose and solve the issue."),
("user", "{query}"),
])
| ChatOpenAI(model="gpt-4o", temperature=0.3)
| StrOutputParser()
)
billing_chain = (
ChatPromptTemplate.from_messages([
("system", "You are a billing specialist. Help with invoice and payment issues."),
("user", "{query}"),
])
| ChatOpenAI(model="gpt-4o-mini", temperature=0.3) # Cheaper model for billing
| StrOutputParser()
)
general_chain = (
ChatPromptTemplate.from_messages([
("system", "You are a helpful general assistant."),
("user", "{query}"),
])
| ChatOpenAI(model="gpt-4o-mini", temperature=0.5)
| StrOutputParser()
)
# Router function
def route(info: dict) -> object:
category = info["category"].strip().lower()
if "tech" in category:
return tech_chain
elif "billing" in category:
return billing_chain
else:
return general_chain
# Complete routing chain
full_chain = (
{"category": classify_chain, "query": RunnablePassthrough()}
| RunnableLambda(route)
)
# Execute
result = await full_chain.ainvoke({"query": "My invoice is wrong"})
3.3 并行链
from langchain_core.runnables import RunnableParallel
# Run multiple analyses in parallel
parallel_analysis = RunnableParallel(
entities=(
ChatPromptTemplate.from_messages([
("system", "Extract named entities (people, companies, locations)."),
("user", "{text}"),
])
| model
| JsonOutputParser()
),
sentiment=(
ChatPromptTemplate.from_messages([
("system", "Analyze sentiment. Output: positive/negative/neutral with score."),
("user", "{text}"),
])
| model
| JsonOutputParser()
),
topics=(
ChatPromptTemplate.from_messages([
("system", "Extract main topics (max 5)."),
("user", "{text}"),
])
| model
| JsonOutputParser()
),
)
# Merge results
synthesis_chain = (
parallel_analysis
| ChatPromptTemplate.from_messages([
("system", "Synthesize the analysis into a comprehensive report."),
("user", "Entities: {entities}\nSentiment: {sentiment}\nTopics: {topics}"),
])
| model
| StrOutputParser()
)
report = await synthesis_chain.ainvoke({"text": article_text})
四、自纠错循环
4.1 验证-修复循环
from pydantic import BaseModel, ValidationError
class AnalysisResult(BaseModel):
summary: str
key_points: list[str]
confidence: float
async def generate_with_validation(
query: str, max_retries: int = 3,
) -> AnalysisResult:
"""Generate structured output with self-correction loop."""
generate_prompt = ChatPromptTemplate.from_messages([
("system", "Analyze the query and provide structured analysis as JSON."),
("user", "{query}"),
])
fix_prompt = ChatPromptTemplate.from_messages([
("system", "Fix the JSON output based on the validation error."),
("user", "Original output:\n{output}\n\nError:\n{error}\n\nFix it:"),
])
generate_chain = generate_prompt | model | StrOutputParser()
fix_chain = fix_prompt | model | StrOutputParser()
output = await generate_chain.ainvoke({"query": query})
for attempt in range(max_retries):
try:
parsed = json.loads(output)
return AnalysisResult.model_validate(parsed)
except (json.JSONDecodeError, ValidationError) as e:
if attempt == max_retries - 1:
raise
output = await fix_chain.ainvoke({
"output": output, "error": str(e),
})
raise ValueError("Failed to generate valid output")
4.2 质量门禁
async def chain_with_quality_gate(query: str) -> str:
"""Chain with intermediate quality check."""
# Step 1: Generate draft
draft_chain = (
ChatPromptTemplate.from_messages([
("system", "Write a detailed analysis."),
("user", "{query}"),
])
| ChatOpenAI(model="gpt-4o")
| StrOutputParser()
)
# Step 2: Quality check (cheaper model as judge)
quality_chain = (
ChatPromptTemplate.from_messages([
("system", """Rate this analysis on a scale of 1-10.
Output JSON: {"score": N, "issues": ["issue1", ...]}
Score >= 7 = pass. Below 7 = needs improvement."""),
("user", "Query: {query}\n\nAnalysis: {draft}"),
])
| ChatOpenAI(model="gpt-4o-mini", temperature=0)
| JsonOutputParser()
)
# Step 3: Improve if needed
improve_chain = (
ChatPromptTemplate.from_messages([
("system", "Improve this analysis based on the feedback."),
("user", "Original: {draft}\n\nIssues: {issues}\n\nImproved version:"),
])
| ChatOpenAI(model="gpt-4o")
| StrOutputParser()
)
draft = await draft_chain.ainvoke({"query": query})
quality = await quality_chain.ainvoke({"query": query, "draft": draft})
if quality["score"] >= 7:
return draft
else:
improved = await improve_chain.ainvoke({
"draft": draft,
"issues": "\n".join(quality["issues"]),
})
return improved
五、Map-Reduce 模式
5.1 文档摘要
from langchain_text_splitters import RecursiveCharacterTextSplitter
async def map_reduce_summary(document: str) -> str:
"""Summarize a long document using map-reduce."""
# Split document into chunks
splitter = RecursiveCharacterTextSplitter(
chunk_size=4000, chunk_overlap=200,
)
chunks = splitter.split_text(document)
# Map: Summarize each chunk
map_chain = (
ChatPromptTemplate.from_messages([
("system", "Summarize this text section concisely. Keep key facts and numbers."),
("user", "{chunk}"),
])
| ChatOpenAI(model="gpt-4o-mini")
| StrOutputParser()
)
# Process chunks in parallel
import asyncio
summaries = await asyncio.gather(*[
map_chain.ainvoke({"chunk": chunk}) for chunk in chunks
])
# Reduce: Merge summaries
reduce_chain = (
ChatPromptTemplate.from_messages([
("system", """Merge these section summaries into one coherent summary.
Eliminate redundancy. Preserve key facts. Keep under 500 words."""),
("user", "Section summaries:\n\n{summaries}"),
])
| ChatOpenAI(model="gpt-4o")
| StrOutputParser()
)
combined = "\n\n---\n\n".join(
f"Section {i+1}: {s}" for i, s in enumerate(summaries)
)
final = await reduce_chain.ainvoke({"summaries": combined})
return final
六、错误传播与处理
6.1 错误处理策略
| 策略 | 描述 | 适用场景 |
|---|---|---|
| Fail-fast | 第一个错误即停止 | 高精度要求 |
| Fallback | 失败时用备选链 | 高可用要求 |
| Retry | 失败时重试 | 临时性错误 |
| Skip | 跳过失败步骤 | 可选步骤 |
| Default | 返回默认值 | 非关键步骤 |
6.2 LCEL 错误处理
from langchain_core.runnables import RunnableConfig
# Fallback chains
primary_chain = (
prompt | ChatOpenAI(model="gpt-4o") | parser
)
fallback_chain = (
prompt | ChatOpenAI(model="gpt-4o-mini") | parser
)
# Use with_fallbacks for automatic failover
robust_chain = primary_chain.with_fallbacks([fallback_chain])
# Use with_retry for transient errors
retrying_chain = primary_chain.with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True,
retry_if_exception_type=(TimeoutError, ConnectionError),
)
七、性能优化
7.1 优化策略
| 策略 | 效果 | 实现方式 |
|---|---|---|
| 并行执行 | 减少总延迟 | RunnableParallel |
| 流式输出 | 减少感知延迟 | .astream() |
| 模型分级 | 减少成本 | 简单步骤用小模型 |
| 缓存 | 减少重复调用 | LangChain Cache |
| 批处理 | 提高吞吐量 | .abatch() |
7.2 模型分级策略
# Use different models for different chain steps
CHAIN_CONFIG = {
"classify": {"model": "gpt-4o-mini", "temp": 0, "reason": "Simple classification"},
"retrieve": {"model": None, "reason": "No LLM needed, vector search"},
"synthesize": {"model": "gpt-4o", "temp": 0.3, "reason": "Complex generation"},
"validate": {"model": "gpt-4o-mini", "temp": 0, "reason": "Binary judgment"},
"format": {"model": "gpt-4o-mini", "temp": 0, "reason": "Formatting only"},
}
# Cost estimate per 1000 queries:
# All gpt-4o: $25 (5 calls x $5 each)
# Model grading: $8 (1 x $5 + 4 x $0.75)
# Savings: 68%
八、总结
提示词链编排是将 LLM 从"单次调用"提升为"可组合系统"的关键技术。核心设计原则:
- 单一职责:每个链节点只做一件事
- 模型匹配:按复杂度选择模型,不要所有步骤都用最贵的
- 错误隔离:每个节点独立错误处理,不让一个失败拖垮整条链
- 可观测:每个节点记录输入/输出/延迟/成本
- 并行优先:能并行的步骤不要串行
Maurice | maurice_wen@proton.me