Feature Store 设计与实现

在线/离线特征服务架构、Feast 实战、特征工程模式、Point-in-Time Correctness 与生产最佳实践

引言

机器学习系统中,特征工程占据了 60-80% 的开发时间。但更棘手的问题是:训练时用 Spark 从数据湖计算的特征,与在线推理时用 Redis 提供的特征,如何保证一致性?一旦出现"训练-服务偏差"(Training-Serving Skew),模型在线效果就会无声退化。

Feature Store 正是为了解决这个问题而生:统一管理特征的定义、计算、存储和服务,确保离线训练和在线推理使用完全一致的特征。

核心架构

Feature Store 的双通道设计

┌──────────────────────────────────────────────────────────────┐
│                      Feature Store                            │
│                                                              │
│  ┌────────────────────────┐  ┌────────────────────────────┐  │
│  │     离线通道 (Batch)     │  │     在线通道 (Real-time)    │  │
│  │                        │  │                            │  │
│  │  数据湖 ──→ Spark ETL  │  │  事件流 ──→ Flink/Kafka    │  │
│  │         ──→ 离线存储    │  │         ──→ 在线存储        │  │
│  │  (Parquet/Delta Lake)  │  │  (Redis/DynamoDB)          │  │
│  │                        │  │                            │  │
│  │  用途: 模型训练         │  │  用途: 在线推理              │  │
│  │  延迟: 分钟~小时        │  │  延迟: <10ms               │  │
│  └───────────┬────────────┘  └───────────┬────────────────┘  │
│              │                           │                   │
│              ▼                           ▼                   │
│  ┌──────────────────────────────────────────────────────┐    │
│  │              Feature Registry (元数据)                 │    │
│  │  定义 + 血缘 + 版本 + 统计 + 权限                      │    │
│  └──────────────────────────────────────────────────────┘    │
└──────────────────────────────────────────────────────────────┘

核心概念

概念 说明 示例
Entity 特征主体(谁的特征) user_id, item_id, order_id
Feature View 一组相关特征的逻辑集合 user_profile_features
Feature 单个特征定义 user_total_orders (INT64)
Data Source 特征数据的物理来源 BigQuery 表, Kafka topic
Offline Store 离线特征存储 Parquet, Delta Lake, BigQuery
Online Store 在线特征存储 Redis, DynamoDB
Materialization 离线 -> 在线的同步过程 定时批量写入 Redis

Feast 实战

项目初始化

# Install Feast
pip install feast[redis,postgres,aws]

# Initialize project
feast init recommendation_features
cd recommendation_features

特征定义

# feature_repo/features.py
from feast import (
    Entity, FeatureView, Field, FileSource, PushSource,
    BatchFeatureView, StreamFeatureView,
)
from feast.types import Float32, Float64, Int64, String
from datetime import timedelta

# Define entities
user = Entity(
    name="user_id",
    join_keys=["user_id"],
    description="Unique user identifier",
)

item = Entity(
    name="item_id",
    join_keys=["item_id"],
    description="Unique item identifier",
)

# Data sources
user_stats_source = FileSource(
    name="user_stats_source",
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

item_features_source = FileSource(
    name="item_features_source",
    path="data/item_features.parquet",
    timestamp_field="event_timestamp",
)

# Real-time push source for streaming features
user_realtime_source = PushSource(
    name="user_realtime_push",
    batch_source=user_stats_source,
)

# Feature views
user_stats_fv = FeatureView(
    name="user_stats",
    entities=[user],
    schema=[
        Field(name="total_orders", dtype=Int64, description="Lifetime order count"),
        Field(name="avg_order_value", dtype=Float64, description="Average order value"),
        Field(name="days_since_last_order", dtype=Int64),
        Field(name="favorite_category", dtype=String),
        Field(name="user_segment", dtype=String),  # high/medium/low value
    ],
    source=user_stats_source,
    ttl=timedelta(days=1),       # Feature freshness
    online=True,                 # Materialize to online store
    tags={"team": "recommendation", "version": "v2"},
)

item_features_fv = FeatureView(
    name="item_features",
    entities=[item],
    schema=[
        Field(name="category", dtype=String),
        Field(name="price", dtype=Float64),
        Field(name="avg_rating", dtype=Float32),
        Field(name="total_reviews", dtype=Int64),
        Field(name="click_through_rate", dtype=Float64),
    ],
    source=item_features_source,
    ttl=timedelta(hours=6),
    online=True,
)

# On-demand feature (computed at request time)
from feast import on_demand_feature_view
import pandas as pd

@on_demand_feature_view(
    sources=[user_stats_fv, item_features_fv],
    schema=[
        Field(name="price_to_avg_ratio", dtype=Float64),
        Field(name="is_high_value_user", dtype=Int64),
    ],
)
def user_item_interaction_features(inputs: pd.DataFrame) -> pd.DataFrame:
    df = pd.DataFrame()
    df["price_to_avg_ratio"] = inputs["price"] / (inputs["avg_order_value"] + 1e-6)
    df["is_high_value_user"] = (inputs["user_segment"] == "high").astype(int)
    return df

Feature Store 配置

# feature_repo/feature_store.yaml
project: recommendation
provider: local

registry:
  registry_type: sql
  path: postgresql://user:pass@localhost:5432/feast_registry

online_store:
  type: redis
  connection_string: redis://localhost:6379

offline_store:
  type: file

entity_key_serialization_version: 2

特征物化与服务

from feast import FeatureStore
from datetime import datetime, timedelta

store = FeatureStore(repo_path="feature_repo/")

# Apply feature definitions (register with store)
store.apply([
    user, item,
    user_stats_fv, item_features_fv,
    user_item_interaction_features,
])

# Materialize features to online store
store.materialize(
    start_date=datetime.now() - timedelta(days=7),
    end_date=datetime.now(),
)

# Incremental materialization (for production cron jobs)
store.materialize_incremental(end_date=datetime.now())

Point-in-Time Correctness

问题场景

时间线问题(数据泄露风险):

时间轴:  ────────────────────────────────────────▶

T1 (1月1日)         T2 (1月15日)        T3 (2月1日)
  │                   │                   │
  用户注册            用户下单              训练样本时间点
  total_orders=0      total_orders=1

错误做法: 在 T3 训练时,用 T3 时刻的 total_orders=1 作为 T1 样本的特征
  → 数据泄露! 训练时"偷看"了未来数据

正确做法 (Point-in-Time Join):
  T1 样本 → 用 T1 时刻的 total_orders=0
  T2 样本 → 用 T2 时刻的 total_orders=1

Feast 的 Point-in-Time Join

# Training data with point-in-time correctness
import pandas as pd

# Entity DataFrame: defines WHAT entities at WHAT timestamps
entity_df = pd.DataFrame({
    "user_id": ["u001", "u001", "u002", "u003"],
    "item_id": ["i100", "i200", "i100", "i300"],
    "event_timestamp": [
        pd.Timestamp("2025-01-01"),    # Point in time for u001
        pd.Timestamp("2025-01-15"),    # Different point for same user
        pd.Timestamp("2025-01-10"),
        pd.Timestamp("2025-01-20"),
    ],
    "label": [1, 0, 1, 0],            # Training labels
})

# Get historical features with point-in-time join
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_stats:total_orders",
        "user_stats:avg_order_value",
        "user_stats:days_since_last_order",
        "item_features:price",
        "item_features:avg_rating",
        "user_item_interaction_features:price_to_avg_ratio",
        "user_item_interaction_features:is_high_value_user",
    ],
).to_df()

print(training_df.columns.tolist())
# ['user_id', 'item_id', 'event_timestamp', 'label',
#  'total_orders', 'avg_order_value', 'days_since_last_order',
#  'price', 'avg_rating', 'price_to_avg_ratio', 'is_high_value_user']

在线推理取特征

# Online feature retrieval (low latency, <10ms)
from feast import FeatureStore

store = FeatureStore(repo_path="feature_repo/")

# Get features for online inference
online_features = store.get_online_features(
    features=[
        "user_stats:total_orders",
        "user_stats:avg_order_value",
        "user_stats:user_segment",
        "item_features:price",
        "item_features:avg_rating",
        "user_item_interaction_features:price_to_avg_ratio",
    ],
    entity_rows=[
        {"user_id": "u001", "item_id": "i100"},
        {"user_id": "u001", "item_id": "i200"},
    ],
).to_dict()

# Returns:
# {
#   "user_id": ["u001", "u001"],
#   "item_id": ["i100", "i200"],
#   "total_orders": [42, 42],
#   "avg_order_value": [89.5, 89.5],
#   "price": [29.99, 149.99],
#   ...
# }

特征工程模式

常见特征模式

模式 描述 示例 计算方式
聚合特征 时间窗口聚合 7 天订单总额 Batch (Spark)
计数特征 事件计数 30 天登录次数 Batch/Stream
比率特征 两个指标的比值 点击/展示比 On-demand
时间差特征 距离某事件的时间 距上次购买天数 On-demand
嵌入特征 语义向量表示 用户兴趣向量 Batch (模型推理)
交叉特征 实体间的交互 用户-品类偏好 Batch
实时特征 当前状态 购物车商品数 Stream

特征质量监控

# src/feature_quality.py
import numpy as np
from dataclasses import dataclass

@dataclass
class FeatureStats:
    name: str
    null_rate: float
    mean: float
    std: float
    min_val: float
    max_val: float
    unique_count: int

def compute_feature_stats(df, feature_name: str) -> FeatureStats:
    col = df[feature_name]
    return FeatureStats(
        name=feature_name,
        null_rate=col.isna().mean(),
        mean=col.mean() if col.dtype in [np.float64, np.int64] else 0,
        std=col.std() if col.dtype in [np.float64, np.int64] else 0,
        min_val=col.min() if col.dtype in [np.float64, np.int64] else 0,
        max_val=col.max() if col.dtype in [np.float64, np.int64] else 0,
        unique_count=col.nunique(),
    )

def validate_feature_quality(
    current_stats: FeatureStats,
    reference_stats: FeatureStats,
    thresholds: dict,
) -> list[str]:
    """Validate feature quality against reference baseline."""
    issues = []

    # Null rate check
    if current_stats.null_rate > thresholds.get("max_null_rate", 0.05):
        issues.append(
            f"{current_stats.name}: null rate {current_stats.null_rate:.2%} "
            f"exceeds threshold {thresholds['max_null_rate']:.2%}"
        )

    # Distribution drift check
    if reference_stats.std > 0:
        z_score = abs(current_stats.mean - reference_stats.mean) / reference_stats.std
        if z_score > thresholds.get("max_mean_drift_z", 3.0):
            issues.append(
                f"{current_stats.name}: mean shifted by {z_score:.1f} sigma"
            )

    # Cardinality check
    ratio = current_stats.unique_count / max(reference_stats.unique_count, 1)
    if ratio > 2.0 or ratio < 0.5:
        issues.append(
            f"{current_stats.name}: cardinality changed from "
            f"{reference_stats.unique_count} to {current_stats.unique_count}"
        )

    return issues

生产部署架构

┌─────────────────────────────────────────────────┐
│                生产环境 Feature Store             │
│                                                 │
│  ┌──────────┐     ┌──────────┐     ┌─────────┐ │
│  │ Airflow  │────▶│  Spark   │────▶│ Parquet │ │
│  │ (调度)   │     │ (计算)   │     │ (离线)  │ │
│  └──────────┘     └──────────┘     └────┬────┘ │
│                                         │      │
│                              Materialize│      │
│                                         ▼      │
│  ┌──────────┐     ┌──────────┐     ┌─────────┐ │
│  │ Kafka    │────▶│ Flink    │────▶│ Redis   │ │
│  │ (事件)   │     │ (流计算)  │     │ (在线)  │ │
│  └──────────┘     └──────────┘     └────┬────┘ │
│                                         │      │
│                                    ┌────▼────┐ │
│                                    │ Feast   │ │
│                                    │ Server  │ │
│                                    │ (gRPC)  │ │
│                                    └─────────┘ │
└─────────────────────────────────────────────────┘

总结

  1. Feature Store 解决训练-服务一致性:统一特征定义,确保离线训练和在线推理使用完全相同的特征计算逻辑。
  2. Point-in-Time Join 防止数据泄露:训练数据必须严格按时间点取特征,不能"偷看"未来。
  3. 双通道设计是核心:离线通道(Batch)支持训练,在线通道(Real-time)支持推理,两者共享特征定义。
  4. 特征质量需要持续监控:空值率、分布漂移和基数变化都可能导致模型退化。
  5. 从 Feast 开始:Feast 是目前最成熟的开源 Feature Store,支持从文件到 Redis 的全链路。

Maurice | maurice_wen@proton.me