提示词链编排:复杂任务的分治策略

Chain-of-Prompts、条件路由、错误传播与 LangChain LCEL 实现 | 2026-02


一、为什么需要提示词链

单个提示词在面对复杂任务时会遇到瓶颈:上下文窗口不够用、指令过于复杂导致遵循率下降、不同子任务需要不同的模型或参数。提示词链(Chain of Prompts)将复杂任务分解为多个简单步骤,每个步骤使用专门优化的提示词。

Single Prompt (fragile):
  Complex Question -> [Mega Prompt] -> Answer (often wrong)

Chain of Prompts (robust):
  Complex Question -> [Classify] -> [Retrieve] -> [Synthesize] -> [Validate] -> Answer
                         |             |              |              |
                      Simple,        Focused,       Specialized,   Quality
                      reliable       efficient      accurate       checked

二、链式编排模式

2.1 基础模式

模式 描述 适用场景
顺序链 A -> B -> C 固定流程
条件链 A -> if(X) B else C 分类后分支
并行链 A -> [B, C] -> D 独立子任务
循环链 A -> B -> (check) -> A 自我修正
树形链 A -> [B -> D, C -> E] -> F 复杂分解
Map-Reduce Split -> [Process...] -> Merge 批量处理

2.2 架构图

Pattern 1: Sequential Chain
  Input -> [Step 1] -> [Step 2] -> [Step 3] -> Output

Pattern 2: Conditional Chain (Router)
  Input -> [Classifier] --"tech"--> [Tech Support]
                        --"billing"--> [Billing Agent]
                        --"general"--> [General FAQ]

Pattern 3: Parallel Chain
  Input -> [Extract Entities] ---|
        -> [Analyze Sentiment] --+--> [Merge & Synthesize] -> Output
        -> [Detect Language]   ---|

Pattern 4: Self-Correcting Loop
  Input -> [Generate] -> [Validate] --pass--> Output
                             |
                           fail
                             |
                             v
                       [Fix with feedback] ---> [Validate]
                       (max 3 iterations)

Pattern 5: Map-Reduce
  Long Document -> [Split into chunks]
                        |
               +--------+--------+
               |        |        |
          [Summarize] [Summarize] [Summarize]
               |        |        |
               +--------+--------+
                        |
                   [Merge summaries] -> Final Summary

三、LangChain LCEL 实现

3.1 LCEL 基础

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_openai import ChatOpenAI

# LCEL: LangChain Expression Language
# Uses the pipe (|) operator to chain components

# Simple chain: prompt | model | parser
classify_prompt = ChatPromptTemplate.from_messages([
    ("system", "Classify the user query into: tech_support, billing, general"),
    ("user", "{query}"),
])

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()

# This creates a Runnable chain
classify_chain = classify_prompt | model | parser

# Execute
result = await classify_chain.ainvoke({"query": "My invoice is wrong"})
# result: "billing"

3.2 条件路由

from langchain_core.runnables import RunnableLambda, RunnablePassthrough

# Define specialized chains for each category
tech_chain = (
    ChatPromptTemplate.from_messages([
        ("system", "You are a technical support specialist. Diagnose and solve the issue."),
        ("user", "{query}"),
    ])
    | ChatOpenAI(model="gpt-4o", temperature=0.3)
    | StrOutputParser()
)

billing_chain = (
    ChatPromptTemplate.from_messages([
        ("system", "You are a billing specialist. Help with invoice and payment issues."),
        ("user", "{query}"),
    ])
    | ChatOpenAI(model="gpt-4o-mini", temperature=0.3)  # Cheaper model for billing
    | StrOutputParser()
)

general_chain = (
    ChatPromptTemplate.from_messages([
        ("system", "You are a helpful general assistant."),
        ("user", "{query}"),
    ])
    | ChatOpenAI(model="gpt-4o-mini", temperature=0.5)
    | StrOutputParser()
)

# Router function
def route(info: dict) -> object:
    category = info["category"].strip().lower()
    if "tech" in category:
        return tech_chain
    elif "billing" in category:
        return billing_chain
    else:
        return general_chain

# Complete routing chain
full_chain = (
    {"category": classify_chain, "query": RunnablePassthrough()}
    | RunnableLambda(route)
)

# Execute
result = await full_chain.ainvoke({"query": "My invoice is wrong"})

3.3 并行链

from langchain_core.runnables import RunnableParallel

# Run multiple analyses in parallel
parallel_analysis = RunnableParallel(
    entities=(
        ChatPromptTemplate.from_messages([
            ("system", "Extract named entities (people, companies, locations)."),
            ("user", "{text}"),
        ])
        | model
        | JsonOutputParser()
    ),
    sentiment=(
        ChatPromptTemplate.from_messages([
            ("system", "Analyze sentiment. Output: positive/negative/neutral with score."),
            ("user", "{text}"),
        ])
        | model
        | JsonOutputParser()
    ),
    topics=(
        ChatPromptTemplate.from_messages([
            ("system", "Extract main topics (max 5)."),
            ("user", "{text}"),
        ])
        | model
        | JsonOutputParser()
    ),
)

# Merge results
synthesis_chain = (
    parallel_analysis
    | ChatPromptTemplate.from_messages([
        ("system", "Synthesize the analysis into a comprehensive report."),
        ("user", "Entities: {entities}\nSentiment: {sentiment}\nTopics: {topics}"),
    ])
    | model
    | StrOutputParser()
)

report = await synthesis_chain.ainvoke({"text": article_text})

四、自纠错循环

4.1 验证-修复循环

from pydantic import BaseModel, ValidationError

class AnalysisResult(BaseModel):
    summary: str
    key_points: list[str]
    confidence: float

async def generate_with_validation(
    query: str, max_retries: int = 3,
) -> AnalysisResult:
    """Generate structured output with self-correction loop."""
    generate_prompt = ChatPromptTemplate.from_messages([
        ("system", "Analyze the query and provide structured analysis as JSON."),
        ("user", "{query}"),
    ])

    fix_prompt = ChatPromptTemplate.from_messages([
        ("system", "Fix the JSON output based on the validation error."),
        ("user", "Original output:\n{output}\n\nError:\n{error}\n\nFix it:"),
    ])

    generate_chain = generate_prompt | model | StrOutputParser()
    fix_chain = fix_prompt | model | StrOutputParser()

    output = await generate_chain.ainvoke({"query": query})

    for attempt in range(max_retries):
        try:
            parsed = json.loads(output)
            return AnalysisResult.model_validate(parsed)
        except (json.JSONDecodeError, ValidationError) as e:
            if attempt == max_retries - 1:
                raise
            output = await fix_chain.ainvoke({
                "output": output, "error": str(e),
            })

    raise ValueError("Failed to generate valid output")

4.2 质量门禁

async def chain_with_quality_gate(query: str) -> str:
    """Chain with intermediate quality check."""
    # Step 1: Generate draft
    draft_chain = (
        ChatPromptTemplate.from_messages([
            ("system", "Write a detailed analysis."),
            ("user", "{query}"),
        ])
        | ChatOpenAI(model="gpt-4o")
        | StrOutputParser()
    )

    # Step 2: Quality check (cheaper model as judge)
    quality_chain = (
        ChatPromptTemplate.from_messages([
            ("system", """Rate this analysis on a scale of 1-10.
Output JSON: {"score": N, "issues": ["issue1", ...]}
Score >= 7 = pass. Below 7 = needs improvement."""),
            ("user", "Query: {query}\n\nAnalysis: {draft}"),
        ])
        | ChatOpenAI(model="gpt-4o-mini", temperature=0)
        | JsonOutputParser()
    )

    # Step 3: Improve if needed
    improve_chain = (
        ChatPromptTemplate.from_messages([
            ("system", "Improve this analysis based on the feedback."),
            ("user", "Original: {draft}\n\nIssues: {issues}\n\nImproved version:"),
        ])
        | ChatOpenAI(model="gpt-4o")
        | StrOutputParser()
    )

    draft = await draft_chain.ainvoke({"query": query})
    quality = await quality_chain.ainvoke({"query": query, "draft": draft})

    if quality["score"] >= 7:
        return draft
    else:
        improved = await improve_chain.ainvoke({
            "draft": draft,
            "issues": "\n".join(quality["issues"]),
        })
        return improved

五、Map-Reduce 模式

5.1 文档摘要

from langchain_text_splitters import RecursiveCharacterTextSplitter

async def map_reduce_summary(document: str) -> str:
    """Summarize a long document using map-reduce."""
    # Split document into chunks
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=4000, chunk_overlap=200,
    )
    chunks = splitter.split_text(document)

    # Map: Summarize each chunk
    map_chain = (
        ChatPromptTemplate.from_messages([
            ("system", "Summarize this text section concisely. Keep key facts and numbers."),
            ("user", "{chunk}"),
        ])
        | ChatOpenAI(model="gpt-4o-mini")
        | StrOutputParser()
    )

    # Process chunks in parallel
    import asyncio
    summaries = await asyncio.gather(*[
        map_chain.ainvoke({"chunk": chunk}) for chunk in chunks
    ])

    # Reduce: Merge summaries
    reduce_chain = (
        ChatPromptTemplate.from_messages([
            ("system", """Merge these section summaries into one coherent summary.
Eliminate redundancy. Preserve key facts. Keep under 500 words."""),
            ("user", "Section summaries:\n\n{summaries}"),
        ])
        | ChatOpenAI(model="gpt-4o")
        | StrOutputParser()
    )

    combined = "\n\n---\n\n".join(
        f"Section {i+1}: {s}" for i, s in enumerate(summaries)
    )
    final = await reduce_chain.ainvoke({"summaries": combined})
    return final

六、错误传播与处理

6.1 错误处理策略

策略 描述 适用场景
Fail-fast 第一个错误即停止 高精度要求
Fallback 失败时用备选链 高可用要求
Retry 失败时重试 临时性错误
Skip 跳过失败步骤 可选步骤
Default 返回默认值 非关键步骤

6.2 LCEL 错误处理

from langchain_core.runnables import RunnableConfig

# Fallback chains
primary_chain = (
    prompt | ChatOpenAI(model="gpt-4o") | parser
)
fallback_chain = (
    prompt | ChatOpenAI(model="gpt-4o-mini") | parser
)

# Use with_fallbacks for automatic failover
robust_chain = primary_chain.with_fallbacks([fallback_chain])

# Use with_retry for transient errors
retrying_chain = primary_chain.with_retry(
    stop_after_attempt=3,
    wait_exponential_jitter=True,
    retry_if_exception_type=(TimeoutError, ConnectionError),
)

七、性能优化

7.1 优化策略

策略 效果 实现方式
并行执行 减少总延迟 RunnableParallel
流式输出 减少感知延迟 .astream()
模型分级 减少成本 简单步骤用小模型
缓存 减少重复调用 LangChain Cache
批处理 提高吞吐量 .abatch()

7.2 模型分级策略

# Use different models for different chain steps
CHAIN_CONFIG = {
    "classify": {"model": "gpt-4o-mini", "temp": 0, "reason": "Simple classification"},
    "retrieve": {"model": None, "reason": "No LLM needed, vector search"},
    "synthesize": {"model": "gpt-4o", "temp": 0.3, "reason": "Complex generation"},
    "validate": {"model": "gpt-4o-mini", "temp": 0, "reason": "Binary judgment"},
    "format": {"model": "gpt-4o-mini", "temp": 0, "reason": "Formatting only"},
}

# Cost estimate per 1000 queries:
# All gpt-4o:    $25 (5 calls x $5 each)
# Model grading:  $8 (1 x $5 + 4 x $0.75)
# Savings: 68%

八、总结

提示词链编排是将 LLM 从"单次调用"提升为"可组合系统"的关键技术。核心设计原则:

  1. 单一职责:每个链节点只做一件事
  2. 模型匹配:按复杂度选择模型,不要所有步骤都用最贵的
  3. 错误隔离:每个节点独立错误处理,不让一个失败拖垮整条链
  4. 可观测:每个节点记录输入/输出/延迟/成本
  5. 并行优先:能并行的步骤不要串行

Maurice | maurice_wen@proton.me