提示词版本控制与AB测试

Git-based Prompt 管理、在线评估、统计显著性与灰度发布策略 | 2026-02


一、为什么需要版本控制

提示词是 LLM 应用的"源代码",但大多数团队的提示词管理状态是:粘贴在代码里、Slack 里传来传去、改了不知道谁改的、回滚找不到上一版

Without Version Control:
  Developer A edits prompt in code -> deploys
  Developer B edits same prompt -> deploys (overwrites A's changes)
  Bug reported -> "Which version caused this?" -> Nobody knows
  Rollback? -> "What was the previous version?" -> Lost

With Version Control:
  v1.0.0 -> tested, scored 0.82 -> deployed
  v1.1.0 -> tested, scored 0.87 -> deployed (A/B tested first)
  v1.2.0 -> tested, scored 0.79 -> REJECTED (regression detected)
  Bug in v1.1.0 -> rollback to v1.0.0 in 30 seconds

二、版本控制策略

2.1 三种版本化方案

方案 适用团队 复杂度 优势 劣势
代码内嵌 + Git 小团队 零额外工具 改 prompt 要发版
独立 Prompt 仓库 中团队 解耦发布 需要同步机制
Prompt 管理平台 大团队 全功能 GUI 学习成本/费用

2.2 文件结构设计(独立仓库方案)

prompts/
├── README.md
├── schemas/
│   └── prompt-schema.json          # JSON Schema for validation
├── customer-support/
│   ├── metadata.yaml               # Version, author, changelog
│   ├── system.txt                  # System prompt content
│   ├── variants/
│   │   ├── concise.txt             # A/B test variant
│   │   └── detailed.txt            # A/B test variant
│   ├── examples/
│   │   ├── few-shot-1.json
│   │   └── few-shot-2.json
│   ├── tests/
│   │   ├── golden-set.jsonl        # Evaluation dataset
│   │   └── adversarial.jsonl       # Injection test cases
│   └── eval-results/
│       ├── v1.0.0-eval.json
│       └── v1.1.0-eval.json
├── code-review/
│   ├── metadata.yaml
│   ├── system.txt
│   └── ...
└── shared/
    ├── safety-preamble.txt         # Shared safety rules
    └── output-format-json.txt      # Shared format instructions

2.3 Metadata 规范

# prompts/customer-support/metadata.yaml
name: customer-support
version: 2.1.0
created: 2026-01-15
updated: 2026-02-20
author: maurice
reviewers:
  - alice
  - bob

# Semantic versioning rules:
# Major: Behavioral change (different output for same input)
# Minor: New capability (handles new input types)
# Patch: Bug fix (corrects wrong behavior without changing spec)

changelog:
  - version: 2.1.0
    date: 2026-02-20
    changes: "Add multi-language support"
    eval_score: 0.87
  - version: 2.0.0
    date: 2026-02-01
    changes: "Restructure with Constitution pattern"
    eval_score: 0.85
  - version: 1.2.0
    date: 2026-01-20
    changes: "Add tool use instructions"
    eval_score: 0.82

config:
  model: gpt-4o
  temperature: 0.3
  max_tokens: 2048
  stop_sequences: []

labels:
  production: v2.1.0      # Currently serving traffic
  staging: v2.2.0-rc1     # Being tested
  canary: null             # Not in canary

dependencies:
  - shared/safety-preamble.txt@v1.0
  - shared/output-format-json.txt@v2.0

evaluation:
  dataset: tests/golden-set.jsonl
  metrics:
    - name: helpfulness
      threshold: 0.85
      current: 0.87
    - name: safety
      threshold: 0.99
      current: 1.00
    - name: format_compliance
      threshold: 0.95
      current: 0.97

三、Git 工作流

3.1 分支策略

Branch Strategy for Prompts:

main ─────────────────────────────────────────────────> (production)
  │
  ├── prompt/customer-support/v2.2 ──> PR ──> review ──> merge
  │     │
  │     └── (eval results attached to PR)
  │
  ├── prompt/code-review/v1.3 ──> PR ──> review ──> merge
  │
  └── experiment/few-shot-selection ──> (long-running experiment)

Rules:
1. Every prompt change = PR with eval results
2. No direct commits to main
3. Breaking changes = Major version bump
4. Eval regression = PR blocked

3.2 CI/CD Pipeline

# .github/workflows/prompt-ci.yml
name: Prompt CI/CD

on:
  pull_request:
    paths:
      - 'prompts/**'

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Validate prompt format
        run: |
          python scripts/validate_prompts.py \
            --schema prompts/schemas/prompt-schema.json \
            --changed-only

      - name: Check version bump
        run: |
          python scripts/check_version_bump.py \
            --base ${{ github.event.pull_request.base.sha }} \
            --head ${{ github.event.pull_request.head.sha }}

  evaluate:
    runs-on: ubuntu-latest
    needs: validate
    steps:
      - uses: actions/checkout@v4

      - name: Run evaluation
        env:
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
        run: |
          python scripts/evaluate_prompts.py \
            --changed-only \
            --output eval-results.json

      - name: Check for regression
        run: |
          python scripts/check_regression.py \
            --results eval-results.json \
            --threshold-file prompts/schemas/thresholds.json

      - name: Post eval results to PR
        uses: actions/github-script@v7
        with:
          script: |
            const fs = require('fs');
            const results = JSON.parse(fs.readFileSync('eval-results.json'));
            const body = formatEvalResults(results);
            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: body,
            });

  deploy:
    runs-on: ubuntu-latest
    needs: evaluate
    if: github.event.pull_request.merged == true
    steps:
      - name: Deploy to prompt service
        run: |
          python scripts/deploy_prompts.py \
            --env production \
            --changed-only

3.3 自动化评估脚本

import json
import yaml
from pathlib import Path
from dataclasses import dataclass, asdict

@dataclass
class EvalMetric:
    name: str
    threshold: float
    score: float
    passed: bool

@dataclass
class PromptEvalResult:
    prompt_name: str
    version: str
    metrics: list[EvalMetric]
    overall_passed: bool
    regression_detected: bool
    previous_version: str | None
    previous_scores: dict[str, float] | None

async def evaluate_changed_prompts(
    repo_root: Path,
    changed_files: list[str],
) -> list[PromptEvalResult]:
    """Evaluate all prompts that changed in this PR."""
    results = []

    # Find which prompts changed
    changed_prompts = set()
    for f in changed_files:
        parts = Path(f).parts
        if len(parts) >= 2 and parts[0] == "prompts":
            changed_prompts.add(parts[1])

    for prompt_name in changed_prompts:
        prompt_dir = repo_root / "prompts" / prompt_name
        if not prompt_dir.exists():
            continue

        # Load metadata
        metadata = yaml.safe_load(
            (prompt_dir / "metadata.yaml").read_text()
        )

        # Load prompt content
        system_prompt = (prompt_dir / "system.txt").read_text()

        # Load test dataset
        test_file = prompt_dir / "tests" / "golden-set.jsonl"
        if not test_file.exists():
            continue

        test_cases = [
            json.loads(line)
            for line in test_file.read_text().strip().split("\n")
        ]

        # Run evaluation
        metrics = []
        for metric_config in metadata["evaluation"]["metrics"]:
            score = await run_metric(
                metric_name=metric_config["name"],
                prompt=system_prompt,
                test_cases=test_cases,
                model=metadata["config"]["model"],
            )
            metrics.append(EvalMetric(
                name=metric_config["name"],
                threshold=metric_config["threshold"],
                score=score,
                passed=score >= metric_config["threshold"],
            ))

        # Check for regression against previous version
        prev_scores = load_previous_eval(prompt_dir, metadata["version"])
        regression = False
        if prev_scores:
            for m in metrics:
                prev = prev_scores.get(m.name, 0)
                if m.score < prev - 0.02:  # 2% tolerance
                    regression = True

        results.append(PromptEvalResult(
            prompt_name=prompt_name,
            version=metadata["version"],
            metrics=metrics,
            overall_passed=all(m.passed for m in metrics),
            regression_detected=regression,
            previous_version=get_previous_version(metadata),
            previous_scores=prev_scores,
        ))

        # Save eval results
        save_eval_results(prompt_dir, metadata["version"], metrics)

    return results

def load_previous_eval(
    prompt_dir: Path, current_version: str,
) -> dict[str, float] | None:
    """Load evaluation results from the previous version."""
    eval_dir = prompt_dir / "eval-results"
    if not eval_dir.exists():
        return None

    # Find the most recent eval that isn't the current version
    results = sorted(eval_dir.glob("v*-eval.json"), reverse=True)
    for result_file in results:
        if current_version not in result_file.name:
            data = json.loads(result_file.read_text())
            return {m["name"]: m["score"] for m in data["metrics"]}
    return None

四、A/B 测试框架

4.1 A/B 测试架构

A/B Test Architecture:

User Request
     |
     v
[Traffic Router] --10%--> [Variant B (new prompt)]
     |                          |
     |--90%-->                  |
     v                          v
[Variant A (control)]    [Variant B (test)]
     |                          |
     v                          v
[Log: request + response + variant_id + user_feedback]
     |                          |
     +----------+---------------+
                |
                v
     [Statistical Analysis]
                |
                v
     [Decision: ship B / keep A / inconclusive]

4.2 流量分配实现

import hashlib
import time
from dataclasses import dataclass
from typing import Any

@dataclass
class ABTestConfig:
    test_id: str
    prompt_name: str
    control_version: str       # Variant A
    treatment_version: str     # Variant B
    traffic_percentage: float  # 0.0 - 1.0 for treatment
    start_time: float
    end_time: float | None
    min_samples: int           # Minimum samples before analysis
    metrics: list[str]         # Metrics to track

class ABTestRouter:
    """Deterministic A/B test routing based on user/request ID."""

    def __init__(self, tests: list[ABTestConfig]):
        self.active_tests = {t.test_id: t for t in tests}

    def get_variant(
        self, test_id: str, user_id: str,
    ) -> str:
        """Determine which variant a user should see.

        Uses consistent hashing so the same user always gets
        the same variant (no flip-flopping).
        """
        test = self.active_tests.get(test_id)
        if not test:
            return "control"

        # Check if test is active
        now = time.time()
        if now < test.start_time:
            return "control"
        if test.end_time and now > test.end_time:
            return "control"

        # Consistent hash: same user always gets same variant
        hash_input = f"{test_id}:{user_id}".encode()
        hash_value = int(hashlib.sha256(hash_input).hexdigest(), 16)
        bucket = (hash_value % 10000) / 10000.0

        if bucket < test.traffic_percentage:
            return "treatment"
        return "control"

    def get_prompt_version(
        self, test_id: str, user_id: str,
    ) -> str:
        """Get the prompt version for this user."""
        test = self.active_tests[test_id]
        variant = self.get_variant(test_id, user_id)
        if variant == "treatment":
            return test.treatment_version
        return test.control_version


# Usage
router = ABTestRouter([
    ABTestConfig(
        test_id="cs-prompt-v2.2",
        prompt_name="customer-support",
        control_version="v2.1.0",
        treatment_version="v2.2.0-rc1",
        traffic_percentage=0.1,  # 10% traffic to new version
        start_time=time.time(),
        end_time=None,
        min_samples=500,
        metrics=["helpfulness", "resolution_rate", "user_satisfaction"],
    ),
])

# For each request
version = router.get_prompt_version("cs-prompt-v2.2", user_id="user_123")
prompt = load_prompt("customer-support", version=version)

4.3 指标收集

import time
from dataclasses import dataclass, field

@dataclass
class ABTestEvent:
    test_id: str
    variant: str         # "control" | "treatment"
    user_id: str
    request_id: str
    timestamp: float
    prompt_version: str
    model: str
    # Quality metrics
    latency_ms: float = 0.0
    token_count: int = 0
    cost_usd: float = 0.0
    # Business metrics (filled asynchronously)
    user_rating: int | None = None       # 1-5 stars
    resolution: bool | None = None       # Issue resolved?
    escalation: bool | None = None       # Escalated to human?
    # Automated quality metrics
    format_valid: bool = True
    safety_passed: bool = True
    relevance_score: float | None = None  # LLM-as-Judge

class ABTestCollector:
    """Collect and store A/B test events for analysis."""

    def __init__(self, storage_backend: Any):
        self.storage = storage_backend

    async def log_event(self, event: ABTestEvent) -> None:
        """Log an A/B test event."""
        await self.storage.insert("ab_test_events", {
            "test_id": event.test_id,
            "variant": event.variant,
            "user_id": event.user_id,
            "request_id": event.request_id,
            "timestamp": event.timestamp,
            "prompt_version": event.prompt_version,
            "latency_ms": event.latency_ms,
            "token_count": event.token_count,
            "cost_usd": event.cost_usd,
            "format_valid": event.format_valid,
            "safety_passed": event.safety_passed,
            "relevance_score": event.relevance_score,
        })

    async def update_user_feedback(
        self, request_id: str,
        rating: int | None = None,
        resolution: bool | None = None,
        escalation: bool | None = None,
    ) -> None:
        """Update event with delayed user feedback."""
        updates = {}
        if rating is not None:
            updates["user_rating"] = rating
        if resolution is not None:
            updates["resolution"] = resolution
        if escalation is not None:
            updates["escalation"] = escalation

        await self.storage.update(
            "ab_test_events",
            {"request_id": request_id},
            updates,
        )

五、统计显著性分析

5.1 分析方法

方法 适用场景 优势 劣势
双样本 t 检验 连续指标(分数) 简单直观 假设正态分布
卡方检验 二值指标(通过/不通过) 适合比例 不适合连续值
Mann-Whitney U 非正态分布 无分布假设 检验力略低
Bootstrap 任何指标 最灵活 计算量大
贝叶斯方法 需要实时决策 可随时停止 需要先验

5.2 统计分析实现

import numpy as np
from scipy import stats
from dataclasses import dataclass

@dataclass
class ABTestAnalysis:
    test_id: str
    metric_name: str
    control_mean: float
    treatment_mean: float
    relative_improvement: float  # percentage
    p_value: float
    confidence_interval: tuple[float, float]
    sample_size_control: int
    sample_size_treatment: int
    is_significant: bool
    recommendation: str  # "ship" | "keep_control" | "inconclusive"
    power: float  # Statistical power achieved

def analyze_ab_test(
    control_values: list[float],
    treatment_values: list[float],
    alpha: float = 0.05,
    min_effect_size: float = 0.02,  # Minimum meaningful improvement
) -> ABTestAnalysis:
    """Analyze A/B test results with proper statistical rigor."""
    control = np.array(control_values)
    treatment = np.array(treatment_values)

    control_mean = np.mean(control)
    treatment_mean = np.mean(treatment)
    relative_improvement = (
        (treatment_mean - control_mean) / control_mean
        if control_mean != 0 else 0
    )

    # Two-sample t-test (Welch's t-test, no equal variance assumption)
    t_stat, p_value = stats.ttest_ind(treatment, control, equal_var=False)

    # Confidence interval for the difference
    diff = treatment_mean - control_mean
    se = np.sqrt(np.var(treatment) / len(treatment) + np.var(control) / len(control))
    ci_low = diff - stats.norm.ppf(1 - alpha / 2) * se
    ci_high = diff + stats.norm.ppf(1 - alpha / 2) * se

    # Statistical power
    effect_size = diff / np.sqrt(
        (np.var(control) + np.var(treatment)) / 2
    ) if np.var(control) + np.var(treatment) > 0 else 0

    from statsmodels.stats.power import TTestIndPower
    power_analysis = TTestIndPower()
    achieved_power = power_analysis.solve_power(
        effect_size=abs(effect_size),
        nobs1=len(control),
        ratio=len(treatment) / len(control),
        alpha=alpha,
    ) if abs(effect_size) > 0 else 0

    # Decision logic
    is_significant = p_value < alpha
    if is_significant and relative_improvement > min_effect_size:
        recommendation = "ship"
    elif is_significant and relative_improvement < -min_effect_size:
        recommendation = "keep_control"
    elif not is_significant and achieved_power < 0.8:
        recommendation = "inconclusive"  # Need more data
    else:
        recommendation = "keep_control"  # No meaningful difference

    return ABTestAnalysis(
        test_id="",
        metric_name="",
        control_mean=float(control_mean),
        treatment_mean=float(treatment_mean),
        relative_improvement=float(relative_improvement),
        p_value=float(p_value),
        confidence_interval=(float(ci_low), float(ci_high)),
        sample_size_control=len(control),
        sample_size_treatment=len(treatment),
        is_significant=is_significant,
        recommendation=recommendation,
        power=float(achieved_power),
    )


def required_sample_size(
    baseline_mean: float,
    baseline_std: float,
    min_detectable_effect: float,  # e.g., 0.05 for 5% improvement
    alpha: float = 0.05,
    power: float = 0.8,
) -> int:
    """Calculate required sample size per variant."""
    from statsmodels.stats.power import TTestIndPower
    analysis = TTestIndPower()

    # Convert relative effect to Cohen's d
    absolute_effect = baseline_mean * min_detectable_effect
    cohens_d = absolute_effect / baseline_std

    n = analysis.solve_power(
        effect_size=cohens_d,
        alpha=alpha,
        power=power,
        ratio=1.0,  # Equal group sizes
    )
    return int(np.ceil(n))


# Example: How many samples do we need?
n = required_sample_size(
    baseline_mean=0.85,       # Current score
    baseline_std=0.15,        # Score standard deviation
    min_detectable_effect=0.05,  # Want to detect 5% improvement
)
print(f"Need {n} samples per variant")  # Typically ~500-2000

5.3 多指标决策

def multi_metric_decision(
    analyses: list[ABTestAnalysis],
    primary_metric: str,
    guardrail_metrics: list[str],
) -> dict:
    """Make a decision based on multiple metrics.

    Rules:
    1. Primary metric must improve significantly
    2. No guardrail metric may regress significantly
    3. If primary improves but guardrail regresses -> HOLD
    """
    primary = None
    guardrails = []

    for a in analyses:
        if a.metric_name == primary_metric:
            primary = a
        elif a.metric_name in guardrail_metrics:
            guardrails.append(a)

    if not primary:
        return {"decision": "error", "reason": "Primary metric not found"}

    # Check guardrails first
    for g in guardrails:
        if g.is_significant and g.relative_improvement < -0.01:
            return {
                "decision": "hold",
                "reason": f"Guardrail regression: {g.metric_name} "
                         f"dropped {g.relative_improvement:.1%}",
                "primary_result": primary.recommendation,
                "guardrail_violations": [g.metric_name],
            }

    # Check primary metric
    return {
        "decision": primary.recommendation,
        "reason": f"Primary metric {primary_metric}: "
                 f"{primary.relative_improvement:+.1%} "
                 f"(p={primary.p_value:.4f})",
        "guardrail_violations": [],
        "all_metrics": {
            a.metric_name: {
                "improvement": f"{a.relative_improvement:+.1%}",
                "significant": a.is_significant,
            }
            for a in analyses
        },
    }

六、灰度发布策略

6.1 发布阶段

Rollout Strategy:

Stage 1: Shadow Mode (0% live traffic)
  - Run new prompt in parallel, log results, don't serve to users
  - Compare outputs with production prompt
  - Duration: 24-48 hours

Stage 2: Canary (1-5% traffic)
  - Serve to small percentage of real users
  - Monitor error rates, latency, user feedback
  - Automated rollback if error rate > threshold
  - Duration: 48-72 hours

Stage 3: A/B Test (10-50% traffic)
  - Full statistical comparison
  - Collect enough samples for significance
  - Duration: depends on traffic (typically 1-2 weeks)

Stage 4: Ramp Up (50% -> 100%)
  - Gradually increase traffic
  - Monitor for long-tail issues
  - Duration: 24-48 hours

Stage 5: Full Deployment (100%)
  - Update production label
  - Archive old version
  - Update documentation

6.2 自动回滚实现

from dataclasses import dataclass
from typing import Callable

@dataclass
class RollbackRule:
    metric: str
    threshold: float
    comparison: str  # "above" | "below"
    window_minutes: int
    description: str

class AutoRollback:
    """Automatic rollback system for prompt deployments."""

    DEFAULT_RULES = [
        RollbackRule(
            metric="error_rate",
            threshold=0.05,
            comparison="above",
            window_minutes=15,
            description="Error rate exceeds 5% in 15-minute window",
        ),
        RollbackRule(
            metric="p99_latency_ms",
            threshold=10000,
            comparison="above",
            window_minutes=10,
            description="P99 latency exceeds 10s",
        ),
        RollbackRule(
            metric="safety_violation_rate",
            threshold=0.001,
            comparison="above",
            window_minutes=5,
            description="Safety violations detected",
        ),
        RollbackRule(
            metric="format_compliance_rate",
            threshold=0.90,
            comparison="below",
            window_minutes=15,
            description="Format compliance drops below 90%",
        ),
    ]

    def __init__(
        self,
        metrics_backend: object,
        rollback_callback: Callable,
        rules: list[RollbackRule] | None = None,
    ):
        self.metrics = metrics_backend
        self.rollback = rollback_callback
        self.rules = rules or self.DEFAULT_RULES

    async def check_and_rollback(
        self, deployment_id: str,
    ) -> dict:
        """Check all rollback rules and trigger if needed."""
        violations = []

        for rule in self.rules:
            value = await self.metrics.get_metric(
                rule.metric,
                window_minutes=rule.window_minutes,
                deployment_id=deployment_id,
            )

            triggered = (
                (rule.comparison == "above" and value > rule.threshold) or
                (rule.comparison == "below" and value < rule.threshold)
            )

            if triggered:
                violations.append({
                    "rule": rule.description,
                    "metric": rule.metric,
                    "value": value,
                    "threshold": rule.threshold,
                })

        if violations:
            await self.rollback(deployment_id)
            return {
                "action": "rolled_back",
                "violations": violations,
                "deployment_id": deployment_id,
            }

        return {"action": "healthy", "deployment_id": deployment_id}

七、Prompt 管理平台对比

7.1 工具选型

平台 版本控制 A/B 测试 评估 部署 价格
Langfuse Git 集成 手动 内置 API 开源/自托管
PromptLayer 内置 内置 基础 API $29+/月
Humanloop 内置 内置 完整 API + SDK $99+/月
Weights & Biases Git 手动 完整 手动 $50+/月
自建(Git + CI) Git 自建 自建 CI/CD 免费

7.2 选择建议

Decision Matrix:

Team Size   | Budget   | Recommendation
------------|----------|------------------
1-3 devs    | < $50/mo | Git + CI (自建)
3-10 devs   | $50-200  | Langfuse (self-hosted) + custom A/B
10-50 devs  | $200+    | Humanloop / PromptLayer
50+ devs    | Custom   | Custom platform on top of Langfuse

Key factors:
- Volume: < 10K requests/day -> Git + CI is enough
- Iteration speed: need to test > 5 variants/week -> platform
- Compliance: need audit trail -> platform with logging
- Multi-model: need to compare across providers -> Langfuse

八、总结

提示词版本控制与 A/B 测试是 LLM 应用从"手工作坊"走向"工程化生产"的关键基础设施。核心原则:

  1. 每次改动可追溯:提示词变更必须有版本号、变更说明、评估结果
  2. 数据驱动决策:A/B 测试用统计方法证明"新版本确实更好",而不是"感觉更好"
  3. 安全发布:灰度发布 + 自动回滚,把改坏的风险降到最低
  4. 评估先行:没有自动化评估的版本控制只是文件管理;评估才是版本控制的灵魂

Maurice | maurice_wen@proton.me