AI 可观测性:监控、追踪与告警

LLM 可观测性平台(Langfuse/Phoenix)、Token 用量追踪、延迟监控、质量评估指标与告警体系

引言

传统软件的可观测性关注三大支柱:日志(Logs)、指标(Metrics)和追踪(Traces)。LLM 应用在此基础上增加了独特的挑战:输出不确定性(同一输入可能产生不同输出)、质量难以量化("好"的回答没有明确标准)、成本与 Token 直接挂钩(每次调用都有经济成本)。

本文系统构建 LLM 应用的可观测性体系,从数据采集到告警闭环。

可观测性架构

LLM 可观测性的四大支柱

传统三支柱 + LLM 特有支柱:

┌────────────────────────────────────────────────────────┐
│                  LLM Observability                      │
├─────────────┬─────────────┬─────────┬──────────────────┤
│   Traces    │   Metrics   │  Logs   │  Evaluations     │
│             │             │         │  (LLM 特有)       │
│ 端到端链路   │ Token/延迟  │ 原始    │ 质量评分         │
│ Span 层级   │ 成本/吞吐   │ I/O     │ 幻觉检测         │
│ 上下文传播   │ 错误率      │ 错误    │ 相关性判断       │
│ 因果关系     │ 缓存命中率   │ 审计    │ 安全过滤         │
└─────────────┴─────────────┴─────────┴──────────────────┘

数据采集架构

┌─────────────────────────────────────────────────────────┐
│                    应用层                                 │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │ RAG App  │  │ Agent    │  │ ChatBot  │              │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘              │
│       │              │              │                    │
│  ┌────▼──────────────▼──────────────▼────────────────┐  │
│  │            Instrumentation SDK                     │  │
│  │  (OpenTelemetry + LLM-specific spans)             │  │
│  └────────────────────┬──────────────────────────────┘  │
└───────────────────────┼──────────────────────────────────┘
                        │
              ┌─────────▼──────────┐
              │  Collector/Gateway  │
              └────┬────┬────┬─────┘
                   │    │    │
          ┌────────▼┐ ┌─▼──┐ ┌▼────────┐
          │Langfuse │ │ PG │ │Prometheus│
          │(Traces) │ │(DB)│ │(Metrics) │
          └─────────┘ └────┘ └─────────┘

Langfuse 集成实践

Python SDK 集成

# src/observability/langfuse_setup.py
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
import os

# Initialize Langfuse client
langfuse = Langfuse(
    public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
    secret_key=os.environ["LANGFUSE_SECRET_KEY"],
    host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com"),
)

@observe(as_type="generation")
def call_llm(messages: list[dict], model: str = "gpt-4o") -> str:
    """Automatically traced LLM call with Langfuse."""
    import openai

    client = openai.OpenAI()
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0.7,
    )

    # Langfuse automatically captures:
    # - Input messages
    # - Output text
    # - Token usage (input/output/total)
    # - Latency
    # - Model name

    return response.choices[0].message.content

@observe()
def rag_pipeline(query: str) -> dict:
    """Full RAG pipeline with nested tracing."""

    # Step 1: Embed query (traced as span)
    langfuse_context.update_current_observation(
        name="embed_query",
        metadata={"model": "text-embedding-3-small"},
    )
    query_embedding = embed_text(query)

    # Step 2: Retrieve documents (traced as span)
    with langfuse_context.observe(name="vector_search") as span:
        docs = vector_store.search(query_embedding, limit=5)
        span.update(
            metadata={"num_results": len(docs)},
            output={"doc_ids": [d.id for d in docs]},
        )

    # Step 3: Generate answer (traced as generation)
    context = "\n".join([d.content for d in docs])
    messages = [
        {"role": "system", "content": f"Answer based on context:\n{context}"},
        {"role": "user", "content": query},
    ]

    answer = call_llm(messages)

    # Step 4: Score the trace
    langfuse_context.score_current_trace(
        name="relevance",
        value=evaluate_relevance(query, answer, docs),
        comment="Auto-evaluated by relevance model",
    )

    return {"answer": answer, "sources": [d.id for d in docs]}

OpenAI SDK 自动追踪

# Zero-code instrumentation with OpenAI wrapper
from langfuse.openai import openai

# Drop-in replacement: just change the import
client = openai.OpenAI()

# All calls automatically traced
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}],
    # Langfuse-specific metadata (optional)
    langfuse_prompt=langfuse.get_prompt("greeting"),
    metadata={"user_id": "user_123", "session_id": "sess_456"},
)

LangChain 集成

from langfuse.callback import CallbackHandler

# Create Langfuse callback handler
langfuse_handler = CallbackHandler(
    public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
    secret_key=os.environ["LANGFUSE_SECRET_KEY"],
    user_id="user_123",
    session_id="sess_456",
    tags=["production", "rag-v2"],
)

# Pass to LangChain
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

chain = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("user", "{input}"),
]) | ChatOpenAI(model="gpt-4o")

# Automatically traces the full chain
result = chain.invoke(
    {"input": "What is RAG?"},
    config={"callbacks": [langfuse_handler]},
)

关键监控指标

指标体系

类别 指标 采集方式 告警阈值 仪表盘位置
延迟 TTFT (首 Token) SDK 埋点 >2s (P95) 概览页
延迟 端到端延迟 SDK 埋点 >10s (P95) 概览页
成本 每请求成本 Token 计数 x 单价 日均超预算 20% 成本页
成本 每用户成本 聚合 单用户日超 $1 成本页
质量 幻觉率 评估模型 >5% 质量页
质量 相关性评分 评估模型 <0.7 (均值) 质量页
质量 用户满意度 反馈采集 差评率 >10% 质量页
可靠性 错误率 HTTP 状态码 >1% 概览页
可靠性 超时率 SDK 埋点 >2% 概览页
可靠性 Fallback 触发率 路由日志 >10% 可靠性页
缓存 缓存命中率 缓存层 <20% 性能页

Prometheus 指标采集

# src/observability/metrics.py
from prometheus_client import (
    Counter, Histogram, Gauge, Summary, start_http_server,
)

# Request counters
llm_requests_total = Counter(
    "llm_requests_total",
    "Total LLM API requests",
    ["provider", "model", "scene", "status"],
)

# Token usage
llm_tokens_total = Counter(
    "llm_tokens_total",
    "Total tokens consumed",
    ["provider", "model", "direction"],  # direction: input/output
)

# Latency histograms
llm_latency_seconds = Histogram(
    "llm_latency_seconds",
    "LLM request latency",
    ["provider", "model"],
    buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0],
)

llm_ttft_seconds = Histogram(
    "llm_ttft_seconds",
    "Time to First Token",
    ["provider", "model"],
    buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0],
)

# Cost tracking
llm_cost_usd = Counter(
    "llm_cost_usd_total",
    "Cumulative LLM cost in USD",
    ["provider", "model", "team"],
)

# Quality scores
llm_quality_score = Summary(
    "llm_quality_score",
    "LLM response quality score (0-1)",
    ["evaluation_type"],
)

# Cache metrics
cache_requests_total = Counter(
    "cache_requests_total",
    "Total cache lookups",
    ["cache_type", "result"],  # result: hit/miss
)

# Middleware to record metrics
def record_llm_metrics(
    provider: str,
    model: str,
    scene: str,
    status: str,
    latency: float,
    ttft: float,
    input_tokens: int,
    output_tokens: int,
    cost: float,
    team: str,
):
    llm_requests_total.labels(provider, model, scene, status).inc()
    llm_latency_seconds.labels(provider, model).observe(latency)
    llm_ttft_seconds.labels(provider, model).observe(ttft)
    llm_tokens_total.labels(provider, model, "input").inc(input_tokens)
    llm_tokens_total.labels(provider, model, "output").inc(output_tokens)
    llm_cost_usd.labels(provider, model, team).inc(cost)

质量评估

自动评估流水线

# src/observability/evaluators.py
from langfuse import Langfuse

langfuse = Langfuse()

def evaluate_hallucination(
    query: str,
    answer: str,
    context: list[str],
) -> float:
    """Use LLM-as-Judge to detect hallucination. Returns 0-1 score."""
    judge_prompt = f"""Evaluate if the answer is grounded in the provided context.
Score from 0 to 1:
- 1.0: Fully grounded, all claims supported by context
- 0.5: Partially grounded, some claims unsupported
- 0.0: Hallucinated, major claims not in context

Context:
{chr(10).join(context)}

Question: {query}
Answer: {answer}

Respond with only a number between 0 and 1."""

    import openai
    client = openai.OpenAI()
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": judge_prompt}],
        temperature=0,
    )

    try:
        score = float(response.choices[0].message.content.strip())
        return max(0.0, min(1.0, score))
    except ValueError:
        return 0.5  # Default if parsing fails

def evaluate_relevance(
    query: str,
    answer: str,
    sources: list[str],
) -> float:
    """Evaluate answer relevance to query. Returns 0-1 score."""
    judge_prompt = f"""Rate how relevant and helpful the answer is to the question.
Score from 0 to 1:
- 1.0: Directly answers the question, comprehensive
- 0.5: Partially relevant, missing key information
- 0.0: Irrelevant or unhelpful

Question: {query}
Answer: {answer}

Respond with only a number between 0 and 1."""

    import openai
    client = openai.OpenAI()
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": judge_prompt}],
        temperature=0,
    )

    try:
        return float(response.choices[0].message.content.strip())
    except ValueError:
        return 0.5

# Batch evaluation job (runs daily)
def run_daily_evaluation():
    """Evaluate a sample of yesterday's traces."""
    traces = langfuse.fetch_traces(
        limit=100,
        order_by="timestamp",
    )

    for trace in traces.data:
        # Skip already evaluated traces
        if any(s.name == "hallucination" for s in trace.scores):
            continue

        # Extract data from trace
        generations = [o for o in trace.observations if o.type == "GENERATION"]
        if not generations:
            continue

        gen = generations[-1]  # Last generation in the trace
        if not gen.input or not gen.output:
            continue

        # Run evaluations
        hall_score = evaluate_hallucination(
            query=extract_user_query(gen.input),
            answer=gen.output,
            context=extract_context(gen.input),
        )

        langfuse.score(
            trace_id=trace.id,
            name="hallucination",
            value=hall_score,
            comment="Daily batch evaluation",
        )

告警规则设计

Grafana 告警规则

# alerting-rules.yml
groups:
  - name: llm_alerts
    rules:
      # High error rate
      - alert: LLMHighErrorRate
        expr: |
          sum(rate(llm_requests_total{status="error"}[5m]))
          /
          sum(rate(llm_requests_total[5m]))
          > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "LLM error rate above 5%"
          description: "Error rate is {{ $value | humanizePercentage }} over the last 5 minutes"

      # High latency
      - alert: LLMHighLatency
        expr: |
          histogram_quantile(0.95, rate(llm_latency_seconds_bucket[5m])) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "LLM P95 latency above 10s"

      # Cost spike
      - alert: LLMCostSpike
        expr: |
          sum(increase(llm_cost_usd_total[1h]))
          >
          1.5 * avg_over_time(sum(increase(llm_cost_usd_total[1h]))[7d:1h])
        for: 30m
        labels:
          severity: warning
        annotations:
          summary: "LLM cost 50% above 7-day average"

      # Quality degradation
      - alert: LLMQualityDrop
        expr: |
          avg(llm_quality_score{evaluation_type="relevance"}) < 0.7
        for: 1h
        labels:
          severity: warning
        annotations:
          summary: "Average relevance score below 0.7"

      # Provider down (circuit breaker open)
      - alert: LLMProviderDown
        expr: |
          llm_circuit_breaker_state{state="open"} == 1
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "LLM provider {{ $labels.provider }} circuit breaker is open"

追踪数据分析

常见分析场景

-- 1. Token 成本 Top 10 用户 (本月)
SELECT
    user_id,
    COUNT(*) AS request_count,
    SUM(input_tokens + output_tokens) AS total_tokens,
    SUM(cost_usd) AS total_cost
FROM llm_traces
WHERE timestamp >= DATE_TRUNC('month', NOW())
GROUP BY user_id
ORDER BY total_cost DESC
LIMIT 10;

-- 2. 平均延迟按模型分析
SELECT
    model,
    COUNT(*) AS requests,
    AVG(latency_ms) AS avg_latency,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY latency_ms) AS p50,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY latency_ms) AS p95,
    PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY latency_ms) AS p99
FROM llm_traces
WHERE timestamp >= NOW() - INTERVAL '24 hours'
GROUP BY model
ORDER BY avg_latency DESC;

-- 3. 低质量回答分析
SELECT
    t.id,
    t.input,
    t.output,
    s.value AS quality_score,
    t.model,
    t.timestamp
FROM llm_traces t
JOIN trace_scores s ON t.id = s.trace_id
WHERE s.name = 'relevance'
  AND s.value < 0.5
  AND t.timestamp >= NOW() - INTERVAL '7 days'
ORDER BY s.value ASC
LIMIT 20;

总结

  1. 追踪是可观测性的核心:每次 LLM 调用都要生成结构化 Trace,包含输入、输出、Token 用量、延迟和评估分数。
  2. 质量评估不能缺席:LLM 输出的不确定性要求持续的质量监控,LLM-as-Judge 是目前最实用的自动评估方案。
  3. 成本必须可见:逐请求的 Token 计量和成本归属,是控制 LLM 开支的前提。
  4. 告警要分级:错误率和 Provider 故障是 Critical,延迟和质量下降是 Warning,成本波动是 Info。
  5. 从采样开始,逐步全量:初期可以只追踪 10% 的请求,确保采集链路稳定后再提高采样率。

Maurice | maurice_wen@proton.me