MLOps 流水线:从实验到生产
原创
灵阙教研团队
S 精选 进阶 |
约 8 分钟阅读
更新于 2026-02-28 AI 导读
MLOps 流水线:从实验到生产 MLflow 实验管理、Feature Store 集成、模型注册与版本控制、ML CI/CD 流水线、生产监控与数据漂移检测 引言 传统软件的 DevOps 已经高度成熟,但机器学习系统引入了额外的复杂性:代码、数据和模型三个维度同时在变化。一个模型从实验笔记本到生产服务,中间存在巨大的鸿沟——Google 将其称为"隐藏的技术债务"。MLOps...
MLOps 流水线:从实验到生产
MLflow 实验管理、Feature Store 集成、模型注册与版本控制、ML CI/CD 流水线、生产监控与数据漂移检测
引言
传统软件的 DevOps 已经高度成熟,但机器学习系统引入了额外的复杂性:代码、数据和模型三个维度同时在变化。一个模型从实验笔记本到生产服务,中间存在巨大的鸿沟——Google 将其称为"隐藏的技术债务"。MLOps 正是为了弥合这道鸿沟而生的工程体系。
本文以 MLflow 为核心,构建从实验追踪到生产监控的完整 MLOps 流水线。
MLOps 成熟度模型
Level 0: 手动流程
笔记本实验 → 手动部署 → 无监控
问题: 不可复现, 无版本管理, 无法回滚
Level 1: ML 流水线自动化
自动训练 → 自动验证 → 手动审批 → 自动部署
问题: 数据管道仍然手动, 特征不共享
Level 2: CI/CD for ML
代码变更 → 自动训练 → 自动测试 → 自动部署 → 自动监控
特征工程 → Feature Store → 在线/离线一致
Level 3: 全自动 (目标)
数据漂移触发 → 自动重训练 → 自动评估 → 渐进式发布
A/B Test → 自动回滚 → 反馈闭环
实验管理
MLflow 核心概念
| 概念 | 说明 | 类比 |
|---|---|---|
| Experiment | 一组相关的训练运行 | Git 仓库 |
| Run | 单次训练执行 | Git Commit |
| Parameter | 超参数记录 | 配置文件 |
| Metric | 评估指标记录 | 测试结果 |
| Artifact | 模型文件、图表等产物 | 构建产物 |
| Model Registry | 模型版本管理和阶段流转 | Docker Registry |
实验追踪实践
import mlflow
import mlflow.pytorch
from mlflow.models import infer_signature
import torch
from torch import nn, optim
from sklearn.metrics import accuracy_score, f1_score
# Set tracking server
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("intent-classifier-v2")
# Hyperparameter search with tracking
configs = [
{"lr": 1e-4, "batch_size": 32, "epochs": 10, "dropout": 0.1},
{"lr": 3e-5, "batch_size": 16, "epochs": 15, "dropout": 0.2},
{"lr": 1e-4, "batch_size": 32, "epochs": 20, "dropout": 0.15},
]
for config in configs:
with mlflow.start_run(run_name=f"lr{config['lr']}_bs{config['batch_size']}"):
# Log parameters
mlflow.log_params(config)
mlflow.log_param("model_arch", "bert-base-chinese")
mlflow.log_param("dataset_version", "v2.3")
# Train model
model = train_model(config)
# Evaluate and log metrics
predictions = model.predict(test_data)
accuracy = accuracy_score(test_labels, predictions)
f1 = f1_score(test_labels, predictions, average="weighted")
mlflow.log_metrics({
"accuracy": accuracy,
"f1_weighted": f1,
"train_loss": train_loss,
"val_loss": val_loss,
})
# Log training curve as artifact
fig = plot_training_curve(train_losses, val_losses)
mlflow.log_figure(fig, "training_curve.png")
# Log confusion matrix
cm_fig = plot_confusion_matrix(test_labels, predictions)
mlflow.log_figure(cm_fig, "confusion_matrix.png")
# Log model with signature
sample_input = test_data[:5]
sample_output = model.predict(sample_input)
signature = infer_signature(sample_input, sample_output)
mlflow.pytorch.log_model(
model,
"model",
signature=signature,
registered_model_name="intent-classifier",
)
# Log dataset metadata
mlflow.log_input(
mlflow.data.from_pandas(train_df, name="train_set", targets="label")
)
print(f"Run: lr={config['lr']}, acc={accuracy:.4f}, f1={f1:.4f}")
模型注册与阶段管理
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Find the best run
experiment = client.get_experiment_by_name("intent-classifier-v2")
best_run = client.search_runs(
experiment_ids=[experiment.experiment_id],
order_by=["metrics.f1_weighted DESC"],
max_results=1,
)[0]
print(f"Best run: {best_run.info.run_id}, F1: {best_run.data.metrics['f1_weighted']}")
# Register the best model
model_version = client.create_model_version(
name="intent-classifier",
source=f"runs:/{best_run.info.run_id}/model",
run_id=best_run.info.run_id,
description=f"F1: {best_run.data.metrics['f1_weighted']:.4f}",
)
# Stage transitions: None -> Staging -> Production -> Archived
client.set_registered_model_alias(
name="intent-classifier",
alias="staging",
version=model_version.version,
)
# After validation passes
client.set_registered_model_alias(
name="intent-classifier",
alias="production",
version=model_version.version,
)
ML CI/CD 流水线
流水线架构
┌──────────────────────────────────────────────────────┐
│ ML CI/CD Pipeline │
├──────────────────────────────────────────────────────┤
│ │
│ 代码变更 ──→ 数据验证 ──→ 特征工程 ──→ 模型训练 │
│ │ │ │ │
│ Schema检查 Feature Store MLflow │
│ 统计测试 一致性校验 实验记录 │
│ │ │
│ 模型验证 ──→ │
│ │ │ │
│ 质量门禁 │ │
│ │ │ │
│ ┌───────▼────▼─────┐ │
│ │ Production Gate │ │
│ │ - F1 > 0.92 │ │
│ │ - Latency < 50ms│ │
│ │ - No data leak │ │
│ └────────┬─────────┘ │
│ │ │
│ 渐进式发布 (Canary) │
│ │ │
│ 生产监控 + 告警 │
└──────────────────────────────────────────────────────┘
GitHub Actions 实现
# .github/workflows/ml-pipeline.yml
name: ML Training Pipeline
on:
push:
paths:
- 'models/**'
- 'features/**'
- 'data/configs/**'
schedule:
- cron: '0 2 * * 1' # Weekly retrain
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate data schema
run: |
python -m pytest tests/data/test_schema.py -v
- name: Check for data drift
run: |
python scripts/check_data_drift.py \
--reference data/reference_stats.json \
--current data/latest/ \
--threshold 0.05
- name: Data quality report
run: |
python scripts/data_quality_report.py \
--output reports/data_quality.html
train:
needs: data-validation
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v4
- name: Train model
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
run: |
python train.py \
--config configs/production.yaml \
--experiment "intent-classifier-v2" \
--run-name "ci-${{ github.sha }}"
- name: Validate model quality
run: |
python scripts/validate_model.py \
--min-f1 0.92 \
--max-latency-ms 50 \
--run-id $(cat .mlflow_run_id)
deploy-staging:
needs: train
runs-on: ubuntu-latest
steps:
- name: Promote to staging
run: |
python scripts/promote_model.py \
--model intent-classifier \
--alias staging \
--run-id $(cat .mlflow_run_id)
- name: Integration tests
run: |
python -m pytest tests/integration/ -v \
--endpoint $STAGING_ENDPOINT
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
steps:
- name: Canary deployment (10%)
run: |
python scripts/canary_deploy.py \
--model intent-classifier \
--weight 0.10 \
--duration 30m
- name: Monitor canary metrics
run: |
python scripts/monitor_canary.py \
--min-success-rate 0.99 \
--max-latency-p99 100
- name: Full rollout
run: |
python scripts/promote_model.py \
--model intent-classifier \
--alias production \
--run-id $(cat .mlflow_run_id)
数据验证与漂移检测
数据 Schema 验证
# tests/data/test_schema.py
import pandera as pa
from pandera import Column, DataFrameSchema, Check
import pandas as pd
# Define expected schema
intent_schema = DataFrameSchema({
"text": Column(str, [
Check.str_length(min_value=1, max_value=512),
Check(lambda s: s.notna().all(), error="No null texts"),
]),
"label": Column(str, [
Check.isin([
"greeting", "farewell", "query", "complaint",
"order_status", "refund", "other",
]),
]),
"confidence": Column(float, [
Check.in_range(0.0, 1.0),
], nullable=True),
"timestamp": Column("datetime64[ns]"),
})
def test_training_data_schema():
df = pd.read_parquet("data/latest/train.parquet")
intent_schema.validate(df)
def test_no_label_leakage():
"""Ensure no test labels leaked into training data."""
train = pd.read_parquet("data/latest/train.parquet")
test = pd.read_parquet("data/latest/test.parquet")
overlap = set(train["text"]) & set(test["text"])
assert len(overlap) == 0, f"Found {len(overlap)} overlapping samples"
数据漂移检测
# src/monitoring/drift_detector.py
from scipy import stats
import numpy as np
from dataclasses import dataclass
@dataclass
class DriftReport:
feature_name: str
drift_score: float
p_value: float
is_drifted: bool
method: str
def detect_numerical_drift(
reference: np.ndarray,
current: np.ndarray,
threshold: float = 0.05,
) -> DriftReport:
"""Kolmogorov-Smirnov test for numerical feature drift."""
statistic, p_value = stats.ks_2samp(reference, current)
return DriftReport(
feature_name="",
drift_score=statistic,
p_value=p_value,
is_drifted=p_value < threshold,
method="KS-test",
)
def detect_categorical_drift(
reference: np.ndarray,
current: np.ndarray,
threshold: float = 0.05,
) -> DriftReport:
"""Chi-squared test for categorical feature drift."""
ref_counts = np.bincount(reference, minlength=max(reference.max(), current.max()) + 1)
cur_counts = np.bincount(current, minlength=max(reference.max(), current.max()) + 1)
# Normalize to proportions
ref_props = ref_counts / ref_counts.sum()
cur_props = cur_counts / cur_counts.sum()
# PSI (Population Stability Index)
epsilon = 1e-10
psi = np.sum(
(cur_props - ref_props) * np.log((cur_props + epsilon) / (ref_props + epsilon))
)
return DriftReport(
feature_name="",
drift_score=psi,
p_value=0.0,
is_drifted=psi > 0.2, # PSI > 0.2 indicates significant drift
method="PSI",
)
def detect_embedding_drift(
reference_embeddings: np.ndarray,
current_embeddings: np.ndarray,
threshold: float = 0.1,
) -> DriftReport:
"""Maximum Mean Discrepancy for embedding drift detection."""
ref_mean = reference_embeddings.mean(axis=0)
cur_mean = current_embeddings.mean(axis=0)
mmd = np.linalg.norm(ref_mean - cur_mean)
return DriftReport(
feature_name="embedding",
drift_score=mmd,
p_value=0.0,
is_drifted=mmd > threshold,
method="MMD",
)
模型服务部署
多框架统一部署
# src/serving/model_server.py
import mlflow
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import time
app = FastAPI(title="ML Model Server")
# Load model from registry
model = mlflow.pyfunc.load_model("models:/intent-classifier@production")
class PredictionRequest(BaseModel):
text: str
return_probabilities: bool = False
class PredictionResponse(BaseModel):
label: str
confidence: float
probabilities: Optional[dict] = None
latency_ms: float
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
start = time.perf_counter()
try:
result = model.predict([request.text])
latency_ms = (time.perf_counter() - start) * 1000
return PredictionResponse(
label=result["label"][0],
confidence=result["confidence"][0],
probabilities=result.get("probabilities", [None])[0] if request.return_probabilities else None,
latency_ms=round(latency_ms, 2),
model_version=model.metadata.run_id[:8],
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {
"status": "healthy",
"model": "intent-classifier",
"version": model.metadata.run_id[:8],
}
监控看板指标
| 指标类别 | 具体指标 | 告警阈值 | 检查频率 |
|---|---|---|---|
| 模型质量 | 准确率/F1 | 低于训练指标 5% | 每小时 |
| 预测分布 | 标签分布偏移 | PSI > 0.2 | 每天 |
| 数据质量 | 缺失值比例 | > 5% | 实时 |
| 延迟 | p99 推理延迟 | > 100ms | 实时 |
| 吞吐 | QPS | 低于基线 50% | 实时 |
| 资源 | GPU/CPU 利用率 | > 90% 持续 5 分钟 | 实时 |
| 特征漂移 | 输入特征分布 | KS p-value < 0.01 | 每天 |
| 概念漂移 | 预测正确率 | 连续下降 3 天 | 每天 |
总结
- 实验追踪是基础:没有可复现的实验,就没有可信的模型。MLflow 提供了从参数到产物的全链路追踪。
- 模型注册是枢纽:Model Registry 连接了训练和部署,提供版本管理和阶段流转。
- CI/CD 消除手动步骤:自动化训练、验证和部署流水线,确保每次变更都经过质量门禁。
- 数据验证先于模型训练:垃圾进垃圾出,数据 Schema 和漂移检测是质量的第一道防线。
- 生产监控闭环:监控不是可选项,数据漂移和概念漂移会让模型在生产中静默退化。
Maurice | maurice_wen@proton.me