AI 应用的 CI/CD 流水线设计

引言

传统软件的 CI/CD 关注代码编译、测试和部署。AI 应用在此基础上引入了模型这一核心工件:模型有自己的版本、训练数据依赖、性能指标,以及与代码不同的发布节奏。

本文系统性地介绍如何设计一条覆盖"代码 + 模型 + 数据"三位一体的 AI CI/CD 流水线,涵盖模型版本管理、评测门禁、A/B 测试、金丝雀发布等关键环节。

AI CI/CD 与传统 CI/CD 的差异

维度 传统软件 AI 应用
构建产物 二进制/镜像 二进制/镜像 + 模型权重
测试方式 单元测试/集成测试 + 模型评测/基准测试
版本管理 Git Git + 模型 Registry + 数据版本
发布策略 蓝绿/金丝雀 + A/B 测试 + Shadow Mode
回滚粒度 代码版本 代码版本 + 模型版本
构建时间 分钟级 可能达小时级(模型训练)
产物大小 MB 级 GB-TB 级(大模型)

流水线总体架构

                    CI Pipeline                         CD Pipeline
              ┌─────────────────────┐           ┌─────────────────────┐
              │                     │           │                     │
 Code Push -> │ Lint -> Test -> Build│ -> Staging│ -> Eval -> Canary -> Production
              │                     │           │                     │
              └─────────────────────┘           └─────────────────────┘
                        │                                │
              ┌─────────┴──────────┐           ┌────────┴────────┐
              │   Model Pipeline   │           │  Model Release  │
              │                    │           │                 │
 Data Change ->│ Train -> Validate  │ -> Registry -> Shadow -> A/B -> Full
              │                    │           │                 │
              └────────────────────┘           └─────────────────┘

模型版本管理

方案一:MLflow Model Registry

import mlflow
from mlflow.tracking import MlflowClient

# 记录训练实验
with mlflow.start_run(run_name="llm-finetune-v3") as run:
    # 记录超参数
    mlflow.log_params({
        "base_model": "llama-3.1-8b",
        "lora_rank": 16,
        "learning_rate": 2e-4,
        "epochs": 3,
        "dataset_version": "v2.1",
    })

    # 训练...
    model = train_model()

    # 记录指标
    mlflow.log_metrics({
        "eval_loss": 0.45,
        "accuracy": 0.92,
        "latency_p95_ms": 150,
        "throughput_qps": 50,
    })

    # 注册模型
    mlflow.pyfunc.log_model(
        artifact_path="model",
        python_model=model,
        registered_model_name="compliance-classifier",
    )

# 模型阶段管理
client = MlflowClient()
client.transition_model_version_stage(
    name="compliance-classifier",
    version=3,
    stage="Staging",  # None -> Staging -> Production -> Archived
)

方案二:DVC + Git(轻量级)

# 初始化 DVC
dvc init

# 跟踪模型文件
dvc add models/classifier_v3.onnx
git add models/classifier_v3.onnx.dvc models/.gitignore
git commit -m "model: classifier v3, accuracy=0.92"

# 推送模型到远端存储
dvc remote add -d s3store s3://my-bucket/models
dvc push

# 切换模型版本
git checkout v2.0
dvc checkout  # 自动从远端拉取对应版本的模型

方案三:Hugging Face Hub

from huggingface_hub import HfApi

api = HfApi()

# 上传模型
api.upload_folder(
    folder_path="./lora_adapter",
    repo_id="myorg/compliance-classifier",
    repo_type="model",
    commit_message="v3: improved accuracy on edge cases",
)

# 按 commit hash 或 tag 拉取特定版本
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(
    "myorg/compliance-classifier",
    revision="v3.0",  # Git tag
)

CI 阶段:自动化评测门禁

GitHub Actions 示例

# .github/workflows/ai-ci.yml
name: AI CI Pipeline

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  code-quality:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install dependencies
        run: pip install -r requirements.txt -r requirements-dev.txt

      - name: Lint
        run: ruff check . && mypy src/

      - name: Unit tests
        run: pytest tests/unit/ -v --cov=src --cov-report=xml

      - name: Integration tests
        run: pytest tests/integration/ -v
        env:
          DATABASE_URL: ${{ secrets.TEST_DB_URL }}

  model-eval:
    runs-on: [self-hosted, gpu]  # 需要 GPU runner
    needs: code-quality
    steps:
      - uses: actions/checkout@v4

      - name: Pull model from registry
        run: |
          python scripts/pull_model.py \
            --model compliance-classifier \
            --stage staging \
            --output ./models/

      - name: Run evaluation suite
        run: |
          python scripts/evaluate.py \
            --model ./models/classifier_v3 \
            --dataset ./eval_data/benchmark_v2.jsonl \
            --output ./eval_results.json

      - name: Quality gate check
        run: |
          python scripts/quality_gate.py \
            --results ./eval_results.json \
            --min-accuracy 0.90 \
            --max-latency-p95 200 \
            --max-regression 0.02

      - name: Upload eval report
        uses: actions/upload-artifact@v4
        with:
          name: eval-report
          path: ./eval_results.json

评测门禁脚本

# scripts/quality_gate.py
import json
import sys

def check_quality_gate(results_path, thresholds):
    with open(results_path) as f:
        results = json.load(f)

    failures = []

    # 绝对指标检查
    if results["accuracy"] < thresholds["min_accuracy"]:
        failures.append(
            f"Accuracy {results['accuracy']:.4f} < "
            f"threshold {thresholds['min_accuracy']}"
        )

    if results["latency_p95_ms"] > thresholds["max_latency_p95"]:
        failures.append(
            f"Latency P95 {results['latency_p95_ms']}ms > "
            f"threshold {thresholds['max_latency_p95']}ms"
        )

    # 回归检查(与 baseline 对比)
    if "baseline" in results:
        regression = results["baseline"]["accuracy"] - results["accuracy"]
        if regression > thresholds["max_regression"]:
            failures.append(
                f"Regression {regression:.4f} > "
                f"threshold {thresholds['max_regression']}"
            )

    if failures:
        print("QUALITY GATE FAILED:")
        for f in failures:
            print(f"  - {f}")
        sys.exit(1)
    else:
        print("QUALITY GATE PASSED")
        print(f"  Accuracy: {results['accuracy']:.4f}")
        print(f"  Latency P95: {results['latency_p95_ms']}ms")

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--results", required=True)
    parser.add_argument("--min-accuracy", type=float, default=0.90)
    parser.add_argument("--max-latency-p95", type=float, default=200)
    parser.add_argument("--max-regression", type=float, default=0.02)
    args = parser.parse_args()

    check_quality_gate(args.results, {
        "min_accuracy": args.min_accuracy,
        "max_latency_p95": args.max_latency_p95,
        "max_regression": args.max_regression,
    })

CD 阶段:发布策略

金丝雀发布(Canary)

# k8s/canary-deployment.yaml
apiVersion: argoproj.io/v1alpha1
kind: Rollout
metadata:
  name: ai-inference-service
spec:
  replicas: 10
  strategy:
    canary:
      steps:
        - setWeight: 5      # 5% 流量到新版本
        - pause:
            duration: 30m    # 观察 30 分钟
        - analysis:
            templates:
              - templateName: success-rate
              - templateName: latency-check
        - setWeight: 25      # 通过后扩到 25%
        - pause:
            duration: 30m
        - setWeight: 50
        - pause:
            duration: 30m
        - setWeight: 100     # 全量发布

---
apiVersion: argoproj.io/v1alpha1
kind: AnalysisTemplate
metadata:
  name: success-rate
spec:
  metrics:
    - name: success-rate
      interval: 5m
      successCondition: result[0] >= 0.99
      provider:
        prometheus:
          address: http://prometheus:9090
          query: |
            sum(rate(http_requests_total{status=~"2.."}[5m]))
            /
            sum(rate(http_requests_total[5m]))

A/B 测试(模型对比)

# 基于 Feature Flag 的 A/B 分流
import hashlib

class ModelRouter:
    def __init__(self, config):
        self.models = config["models"]
        self.traffic_split = config["traffic_split"]

    def route(self, user_id: str) -> str:
        """确定性分流:同一用户始终路由到同一模型"""
        hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
        bucket = hash_val % 100

        cumulative = 0
        for model_name, percentage in self.traffic_split.items():
            cumulative += percentage
            if bucket < cumulative:
                return model_name

        return list(self.traffic_split.keys())[-1]

# 配置示例
router = ModelRouter({
    "models": {
        "model_v2": "models/classifier_v2.onnx",
        "model_v3": "models/classifier_v3.onnx",
    },
    "traffic_split": {
        "model_v2": 80,   # 80% 流量
        "model_v3": 20,   # 20% 流量
    },
})

Shadow Mode(影子模式)

在不影响线上的前提下,用生产流量测试新模型:

import asyncio
from dataclasses import dataclass

@dataclass
class ShadowResult:
    primary_output: str
    shadow_output: str
    primary_latency_ms: float
    shadow_latency_ms: float
    outputs_match: bool

async def shadow_inference(request, primary_model, shadow_model):
    """主模型响应用户,影子模型仅记录结果"""
    primary_task = asyncio.create_task(primary_model.predict(request))
    shadow_task = asyncio.create_task(shadow_model.predict(request))

    # 主模型结果立即返回给用户
    primary_result = await primary_task

    # 影子模型结果异步记录,不阻塞响应
    try:
        shadow_result = await asyncio.wait_for(shadow_task, timeout=5.0)
    except asyncio.TimeoutError:
        shadow_result = None

    # 异步记录对比结果
    asyncio.create_task(log_comparison(primary_result, shadow_result))

    return primary_result

数据版本管理

# 使用 DVC 管理训练数据版本
# dvc.yaml - 定义数据处理 pipeline
"""
stages:
  prepare_data:
    cmd: python scripts/prepare_data.py
    deps:
      - scripts/prepare_data.py
      - data/raw/
    outs:
      - data/processed/
    params:
      - prepare.min_length
      - prepare.max_length

  train:
    cmd: python scripts/train.py
    deps:
      - scripts/train.py
      - data/processed/
    outs:
      - models/latest/
    metrics:
      - metrics.json:
          cache: false
    params:
      - train.epochs
      - train.learning_rate
"""

# 执行 pipeline
# dvc repro          # 重现整个 pipeline
# dvc metrics show   # 查看当前指标
# dvc metrics diff   # 对比不同版本指标

监控与告警

模型性能监控

# 使用 Prometheus + Grafana 监控推理服务
from prometheus_client import Histogram, Counter, Gauge

# 延迟分布
INFERENCE_LATENCY = Histogram(
    "model_inference_latency_seconds",
    "Model inference latency",
    ["model_name", "model_version"],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
)

# 请求计数
INFERENCE_REQUESTS = Counter(
    "model_inference_requests_total",
    "Total inference requests",
    ["model_name", "model_version", "status"],
)

# 模型漂移指标
MODEL_DRIFT_SCORE = Gauge(
    "model_drift_score",
    "Data drift detection score",
    ["model_name", "feature_name"],
)

# 在推理代码中使用
async def predict(request):
    with INFERENCE_LATENCY.labels(
        model_name="classifier",
        model_version="v3",
    ).time():
        result = model.predict(request.input)

    INFERENCE_REQUESTS.labels(
        model_name="classifier",
        model_version="v3",
        status="success",
    ).inc()

    return result

完整流水线示例(GitLab CI)

# .gitlab-ci.yml
stages:
  - test
  - build
  - evaluate
  - staging
  - canary
  - production

variables:
  MODEL_REGISTRY: "s3://ml-models"
  EVAL_THRESHOLD_ACCURACY: "0.90"

test:
  stage: test
  script:
    - pip install -r requirements.txt
    - ruff check .
    - pytest tests/ -v --cov

build-image:
  stage: build
  script:
    - docker build -t $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA .
    - docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA

evaluate-model:
  stage: evaluate
  tags: [gpu]
  script:
    - python scripts/pull_model.py --stage staging
    - python scripts/evaluate.py --output results.json
    - python scripts/quality_gate.py --results results.json
  artifacts:
    paths: [results.json]

deploy-staging:
  stage: staging
  script:
    - kubectl apply -f k8s/staging/ --namespace staging
    - kubectl rollout status deployment/ai-service -n staging
  environment:
    name: staging

deploy-canary:
  stage: canary
  script:
    - kubectl apply -f k8s/canary/
    - sleep 1800  # 30 分钟观察窗口
    - python scripts/check_canary_health.py
  when: manual

deploy-production:
  stage: production
  script:
    - kubectl apply -f k8s/production/
    - kubectl rollout status deployment/ai-service -n production
  when: manual
  only:
    - main

实践建议

  1. 代码和模型解耦版本:代码用 Git tag,模型用 Model Registry,通过配置文件关联
  2. 评测集固定版本:评测数据集也要做版本管理,避免"飘动的 baseline"
  3. 门禁先于发布:模型评测不通过,绝不允许进入 staging
  4. 回滚要覆盖模型:回滚时不仅回滚代码,还要回滚到对应的模型版本
  5. 监控数据漂移:生产环境持续监控输入数据分布,漂移超阈值自动告警

Maurice | maurice_wen@proton.me