MLOps 流水线:从实验到生产

MLflow 实验管理、Feature Store 集成、模型注册与版本控制、ML CI/CD 流水线、生产监控与数据漂移检测

引言

传统软件的 DevOps 已经高度成熟,但机器学习系统引入了额外的复杂性:代码、数据和模型三个维度同时在变化。一个模型从实验笔记本到生产服务,中间存在巨大的鸿沟——Google 将其称为"隐藏的技术债务"。MLOps 正是为了弥合这道鸿沟而生的工程体系。

本文以 MLflow 为核心,构建从实验追踪到生产监控的完整 MLOps 流水线。

MLOps 成熟度模型

Level 0: 手动流程
  笔记本实验 → 手动部署 → 无监控
  问题: 不可复现, 无版本管理, 无法回滚

Level 1: ML 流水线自动化
  自动训练 → 自动验证 → 手动审批 → 自动部署
  问题: 数据管道仍然手动, 特征不共享

Level 2: CI/CD for ML
  代码变更 → 自动训练 → 自动测试 → 自动部署 → 自动监控
  特征工程 → Feature Store → 在线/离线一致

Level 3: 全自动 (目标)
  数据漂移触发 → 自动重训练 → 自动评估 → 渐进式发布
  A/B Test → 自动回滚 → 反馈闭环

实验管理

MLflow 核心概念

概念 说明 类比
Experiment 一组相关的训练运行 Git 仓库
Run 单次训练执行 Git Commit
Parameter 超参数记录 配置文件
Metric 评估指标记录 测试结果
Artifact 模型文件、图表等产物 构建产物
Model Registry 模型版本管理和阶段流转 Docker Registry

实验追踪实践

import mlflow
import mlflow.pytorch
from mlflow.models import infer_signature
import torch
from torch import nn, optim
from sklearn.metrics import accuracy_score, f1_score

# Set tracking server
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("intent-classifier-v2")

# Hyperparameter search with tracking
configs = [
    {"lr": 1e-4, "batch_size": 32, "epochs": 10, "dropout": 0.1},
    {"lr": 3e-5, "batch_size": 16, "epochs": 15, "dropout": 0.2},
    {"lr": 1e-4, "batch_size": 32, "epochs": 20, "dropout": 0.15},
]

for config in configs:
    with mlflow.start_run(run_name=f"lr{config['lr']}_bs{config['batch_size']}"):
        # Log parameters
        mlflow.log_params(config)
        mlflow.log_param("model_arch", "bert-base-chinese")
        mlflow.log_param("dataset_version", "v2.3")

        # Train model
        model = train_model(config)

        # Evaluate and log metrics
        predictions = model.predict(test_data)
        accuracy = accuracy_score(test_labels, predictions)
        f1 = f1_score(test_labels, predictions, average="weighted")

        mlflow.log_metrics({
            "accuracy": accuracy,
            "f1_weighted": f1,
            "train_loss": train_loss,
            "val_loss": val_loss,
        })

        # Log training curve as artifact
        fig = plot_training_curve(train_losses, val_losses)
        mlflow.log_figure(fig, "training_curve.png")

        # Log confusion matrix
        cm_fig = plot_confusion_matrix(test_labels, predictions)
        mlflow.log_figure(cm_fig, "confusion_matrix.png")

        # Log model with signature
        sample_input = test_data[:5]
        sample_output = model.predict(sample_input)
        signature = infer_signature(sample_input, sample_output)

        mlflow.pytorch.log_model(
            model,
            "model",
            signature=signature,
            registered_model_name="intent-classifier",
        )

        # Log dataset metadata
        mlflow.log_input(
            mlflow.data.from_pandas(train_df, name="train_set", targets="label")
        )

        print(f"Run: lr={config['lr']}, acc={accuracy:.4f}, f1={f1:.4f}")

模型注册与阶段管理

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Find the best run
experiment = client.get_experiment_by_name("intent-classifier-v2")
best_run = client.search_runs(
    experiment_ids=[experiment.experiment_id],
    order_by=["metrics.f1_weighted DESC"],
    max_results=1,
)[0]

print(f"Best run: {best_run.info.run_id}, F1: {best_run.data.metrics['f1_weighted']}")

# Register the best model
model_version = client.create_model_version(
    name="intent-classifier",
    source=f"runs:/{best_run.info.run_id}/model",
    run_id=best_run.info.run_id,
    description=f"F1: {best_run.data.metrics['f1_weighted']:.4f}",
)

# Stage transitions: None -> Staging -> Production -> Archived
client.set_registered_model_alias(
    name="intent-classifier",
    alias="staging",
    version=model_version.version,
)

# After validation passes
client.set_registered_model_alias(
    name="intent-classifier",
    alias="production",
    version=model_version.version,
)

ML CI/CD 流水线

流水线架构

┌──────────────────────────────────────────────────────┐
│                    ML CI/CD Pipeline                  │
├──────────────────────────────────────────────────────┤
│                                                      │
│  代码变更 ──→ 数据验证 ──→ 特征工程 ──→ 模型训练     │
│                │              │            │         │
│              Schema检查    Feature Store  MLflow     │
│              统计测试       一致性校验    实验记录    │
│                                            │         │
│                                     模型验证 ──→     │
│                                      │    │         │
│                                   质量门禁  │         │
│                                      │    │         │
│                              ┌───────▼────▼─────┐   │
│                              │  Production Gate  │   │
│                              │  - F1 > 0.92     │   │
│                              │  - Latency < 50ms│   │
│                              │  - No data leak  │   │
│                              └────────┬─────────┘   │
│                                       │             │
│                              渐进式发布 (Canary)     │
│                                       │             │
│                              生产监控 + 告警         │
└──────────────────────────────────────────────────────┘

GitHub Actions 实现

# .github/workflows/ml-pipeline.yml
name: ML Training Pipeline

on:
  push:
    paths:
      - 'models/**'
      - 'features/**'
      - 'data/configs/**'
  schedule:
    - cron: '0 2 * * 1'  # Weekly retrain

jobs:
  data-validation:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Validate data schema
        run: |
          python -m pytest tests/data/test_schema.py -v

      - name: Check for data drift
        run: |
          python scripts/check_data_drift.py \
            --reference data/reference_stats.json \
            --current data/latest/ \
            --threshold 0.05

      - name: Data quality report
        run: |
          python scripts/data_quality_report.py \
            --output reports/data_quality.html

  train:
    needs: data-validation
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v4

      - name: Train model
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
        run: |
          python train.py \
            --config configs/production.yaml \
            --experiment "intent-classifier-v2" \
            --run-name "ci-${{ github.sha }}"

      - name: Validate model quality
        run: |
          python scripts/validate_model.py \
            --min-f1 0.92 \
            --max-latency-ms 50 \
            --run-id $(cat .mlflow_run_id)

  deploy-staging:
    needs: train
    runs-on: ubuntu-latest
    steps:
      - name: Promote to staging
        run: |
          python scripts/promote_model.py \
            --model intent-classifier \
            --alias staging \
            --run-id $(cat .mlflow_run_id)

      - name: Integration tests
        run: |
          python -m pytest tests/integration/ -v \
            --endpoint $STAGING_ENDPOINT

  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - name: Canary deployment (10%)
        run: |
          python scripts/canary_deploy.py \
            --model intent-classifier \
            --weight 0.10 \
            --duration 30m

      - name: Monitor canary metrics
        run: |
          python scripts/monitor_canary.py \
            --min-success-rate 0.99 \
            --max-latency-p99 100

      - name: Full rollout
        run: |
          python scripts/promote_model.py \
            --model intent-classifier \
            --alias production \
            --run-id $(cat .mlflow_run_id)

数据验证与漂移检测

数据 Schema 验证

# tests/data/test_schema.py
import pandera as pa
from pandera import Column, DataFrameSchema, Check
import pandas as pd

# Define expected schema
intent_schema = DataFrameSchema({
    "text": Column(str, [
        Check.str_length(min_value=1, max_value=512),
        Check(lambda s: s.notna().all(), error="No null texts"),
    ]),
    "label": Column(str, [
        Check.isin([
            "greeting", "farewell", "query", "complaint",
            "order_status", "refund", "other",
        ]),
    ]),
    "confidence": Column(float, [
        Check.in_range(0.0, 1.0),
    ], nullable=True),
    "timestamp": Column("datetime64[ns]"),
})

def test_training_data_schema():
    df = pd.read_parquet("data/latest/train.parquet")
    intent_schema.validate(df)

def test_no_label_leakage():
    """Ensure no test labels leaked into training data."""
    train = pd.read_parquet("data/latest/train.parquet")
    test = pd.read_parquet("data/latest/test.parquet")
    overlap = set(train["text"]) & set(test["text"])
    assert len(overlap) == 0, f"Found {len(overlap)} overlapping samples"

数据漂移检测

# src/monitoring/drift_detector.py
from scipy import stats
import numpy as np
from dataclasses import dataclass

@dataclass
class DriftReport:
    feature_name: str
    drift_score: float
    p_value: float
    is_drifted: bool
    method: str

def detect_numerical_drift(
    reference: np.ndarray,
    current: np.ndarray,
    threshold: float = 0.05,
) -> DriftReport:
    """Kolmogorov-Smirnov test for numerical feature drift."""
    statistic, p_value = stats.ks_2samp(reference, current)
    return DriftReport(
        feature_name="",
        drift_score=statistic,
        p_value=p_value,
        is_drifted=p_value < threshold,
        method="KS-test",
    )

def detect_categorical_drift(
    reference: np.ndarray,
    current: np.ndarray,
    threshold: float = 0.05,
) -> DriftReport:
    """Chi-squared test for categorical feature drift."""
    ref_counts = np.bincount(reference, minlength=max(reference.max(), current.max()) + 1)
    cur_counts = np.bincount(current, minlength=max(reference.max(), current.max()) + 1)

    # Normalize to proportions
    ref_props = ref_counts / ref_counts.sum()
    cur_props = cur_counts / cur_counts.sum()

    # PSI (Population Stability Index)
    epsilon = 1e-10
    psi = np.sum(
        (cur_props - ref_props) * np.log((cur_props + epsilon) / (ref_props + epsilon))
    )

    return DriftReport(
        feature_name="",
        drift_score=psi,
        p_value=0.0,
        is_drifted=psi > 0.2,  # PSI > 0.2 indicates significant drift
        method="PSI",
    )

def detect_embedding_drift(
    reference_embeddings: np.ndarray,
    current_embeddings: np.ndarray,
    threshold: float = 0.1,
) -> DriftReport:
    """Maximum Mean Discrepancy for embedding drift detection."""
    ref_mean = reference_embeddings.mean(axis=0)
    cur_mean = current_embeddings.mean(axis=0)

    mmd = np.linalg.norm(ref_mean - cur_mean)

    return DriftReport(
        feature_name="embedding",
        drift_score=mmd,
        p_value=0.0,
        is_drifted=mmd > threshold,
        method="MMD",
    )

模型服务部署

多框架统一部署

# src/serving/model_server.py
import mlflow
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import time

app = FastAPI(title="ML Model Server")

# Load model from registry
model = mlflow.pyfunc.load_model("models:/intent-classifier@production")

class PredictionRequest(BaseModel):
    text: str
    return_probabilities: bool = False

class PredictionResponse(BaseModel):
    label: str
    confidence: float
    probabilities: Optional[dict] = None
    latency_ms: float
    model_version: str

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    start = time.perf_counter()

    try:
        result = model.predict([request.text])
        latency_ms = (time.perf_counter() - start) * 1000

        return PredictionResponse(
            label=result["label"][0],
            confidence=result["confidence"][0],
            probabilities=result.get("probabilities", [None])[0] if request.return_probabilities else None,
            latency_ms=round(latency_ms, 2),
            model_version=model.metadata.run_id[:8],
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health():
    return {
        "status": "healthy",
        "model": "intent-classifier",
        "version": model.metadata.run_id[:8],
    }

监控看板指标

指标类别 具体指标 告警阈值 检查频率
模型质量 准确率/F1 低于训练指标 5% 每小时
预测分布 标签分布偏移 PSI > 0.2 每天
数据质量 缺失值比例 > 5% 实时
延迟 p99 推理延迟 > 100ms 实时
吞吐 QPS 低于基线 50% 实时
资源 GPU/CPU 利用率 > 90% 持续 5 分钟 实时
特征漂移 输入特征分布 KS p-value < 0.01 每天
概念漂移 预测正确率 连续下降 3 天 每天

总结

  1. 实验追踪是基础:没有可复现的实验,就没有可信的模型。MLflow 提供了从参数到产物的全链路追踪。
  2. 模型注册是枢纽:Model Registry 连接了训练和部署,提供版本管理和阶段流转。
  3. CI/CD 消除手动步骤:自动化训练、验证和部署流水线,确保每次变更都经过质量门禁。
  4. 数据验证先于模型训练:垃圾进垃圾出,数据 Schema 和漂移检测是质量的第一道防线。
  5. 生产监控闭环:监控不是可选项,数据漂移和概念漂移会让模型在生产中静默退化。

Maurice | maurice_wen@proton.me