Feature Store 设计与实现
原创
灵阙教研团队
S 精选 进阶 |
约 7 分钟阅读
更新于 2026-02-28 AI 导读
Feature Store 设计与实现 在线/离线特征服务架构、Feast 实战、特征工程模式、Point-in-Time Correctness 与生产最佳实践 引言 机器学习系统中,特征工程占据了 60-80% 的开发时间。但更棘手的问题是:训练时用 Spark 从数据湖计算的特征,与在线推理时用 Redis...
Feature Store 设计与实现
在线/离线特征服务架构、Feast 实战、特征工程模式、Point-in-Time Correctness 与生产最佳实践
引言
机器学习系统中,特征工程占据了 60-80% 的开发时间。但更棘手的问题是:训练时用 Spark 从数据湖计算的特征,与在线推理时用 Redis 提供的特征,如何保证一致性?一旦出现"训练-服务偏差"(Training-Serving Skew),模型在线效果就会无声退化。
Feature Store 正是为了解决这个问题而生:统一管理特征的定义、计算、存储和服务,确保离线训练和在线推理使用完全一致的特征。
核心架构
Feature Store 的双通道设计
┌──────────────────────────────────────────────────────────────┐
│ Feature Store │
│ │
│ ┌────────────────────────┐ ┌────────────────────────────┐ │
│ │ 离线通道 (Batch) │ │ 在线通道 (Real-time) │ │
│ │ │ │ │ │
│ │ 数据湖 ──→ Spark ETL │ │ 事件流 ──→ Flink/Kafka │ │
│ │ ──→ 离线存储 │ │ ──→ 在线存储 │ │
│ │ (Parquet/Delta Lake) │ │ (Redis/DynamoDB) │ │
│ │ │ │ │ │
│ │ 用途: 模型训练 │ │ 用途: 在线推理 │ │
│ │ 延迟: 分钟~小时 │ │ 延迟: <10ms │ │
│ └───────────┬────────────┘ └───────────┬────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Feature Registry (元数据) │ │
│ │ 定义 + 血缘 + 版本 + 统计 + 权限 │ │
│ └──────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
核心概念
| 概念 | 说明 | 示例 |
|---|---|---|
| Entity | 特征主体(谁的特征) | user_id, item_id, order_id |
| Feature View | 一组相关特征的逻辑集合 | user_profile_features |
| Feature | 单个特征定义 | user_total_orders (INT64) |
| Data Source | 特征数据的物理来源 | BigQuery 表, Kafka topic |
| Offline Store | 离线特征存储 | Parquet, Delta Lake, BigQuery |
| Online Store | 在线特征存储 | Redis, DynamoDB |
| Materialization | 离线 -> 在线的同步过程 | 定时批量写入 Redis |
Feast 实战
项目初始化
# Install Feast
pip install feast[redis,postgres,aws]
# Initialize project
feast init recommendation_features
cd recommendation_features
特征定义
# feature_repo/features.py
from feast import (
Entity, FeatureView, Field, FileSource, PushSource,
BatchFeatureView, StreamFeatureView,
)
from feast.types import Float32, Float64, Int64, String
from datetime import timedelta
# Define entities
user = Entity(
name="user_id",
join_keys=["user_id"],
description="Unique user identifier",
)
item = Entity(
name="item_id",
join_keys=["item_id"],
description="Unique item identifier",
)
# Data sources
user_stats_source = FileSource(
name="user_stats_source",
path="data/user_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
item_features_source = FileSource(
name="item_features_source",
path="data/item_features.parquet",
timestamp_field="event_timestamp",
)
# Real-time push source for streaming features
user_realtime_source = PushSource(
name="user_realtime_push",
batch_source=user_stats_source,
)
# Feature views
user_stats_fv = FeatureView(
name="user_stats",
entities=[user],
schema=[
Field(name="total_orders", dtype=Int64, description="Lifetime order count"),
Field(name="avg_order_value", dtype=Float64, description="Average order value"),
Field(name="days_since_last_order", dtype=Int64),
Field(name="favorite_category", dtype=String),
Field(name="user_segment", dtype=String), # high/medium/low value
],
source=user_stats_source,
ttl=timedelta(days=1), # Feature freshness
online=True, # Materialize to online store
tags={"team": "recommendation", "version": "v2"},
)
item_features_fv = FeatureView(
name="item_features",
entities=[item],
schema=[
Field(name="category", dtype=String),
Field(name="price", dtype=Float64),
Field(name="avg_rating", dtype=Float32),
Field(name="total_reviews", dtype=Int64),
Field(name="click_through_rate", dtype=Float64),
],
source=item_features_source,
ttl=timedelta(hours=6),
online=True,
)
# On-demand feature (computed at request time)
from feast import on_demand_feature_view
import pandas as pd
@on_demand_feature_view(
sources=[user_stats_fv, item_features_fv],
schema=[
Field(name="price_to_avg_ratio", dtype=Float64),
Field(name="is_high_value_user", dtype=Int64),
],
)
def user_item_interaction_features(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["price_to_avg_ratio"] = inputs["price"] / (inputs["avg_order_value"] + 1e-6)
df["is_high_value_user"] = (inputs["user_segment"] == "high").astype(int)
return df
Feature Store 配置
# feature_repo/feature_store.yaml
project: recommendation
provider: local
registry:
registry_type: sql
path: postgresql://user:pass@localhost:5432/feast_registry
online_store:
type: redis
connection_string: redis://localhost:6379
offline_store:
type: file
entity_key_serialization_version: 2
特征物化与服务
from feast import FeatureStore
from datetime import datetime, timedelta
store = FeatureStore(repo_path="feature_repo/")
# Apply feature definitions (register with store)
store.apply([
user, item,
user_stats_fv, item_features_fv,
user_item_interaction_features,
])
# Materialize features to online store
store.materialize(
start_date=datetime.now() - timedelta(days=7),
end_date=datetime.now(),
)
# Incremental materialization (for production cron jobs)
store.materialize_incremental(end_date=datetime.now())
Point-in-Time Correctness
问题场景
时间线问题(数据泄露风险):
时间轴: ────────────────────────────────────────▶
T1 (1月1日) T2 (1月15日) T3 (2月1日)
│ │ │
用户注册 用户下单 训练样本时间点
total_orders=0 total_orders=1
错误做法: 在 T3 训练时,用 T3 时刻的 total_orders=1 作为 T1 样本的特征
→ 数据泄露! 训练时"偷看"了未来数据
正确做法 (Point-in-Time Join):
T1 样本 → 用 T1 时刻的 total_orders=0
T2 样本 → 用 T2 时刻的 total_orders=1
Feast 的 Point-in-Time Join
# Training data with point-in-time correctness
import pandas as pd
# Entity DataFrame: defines WHAT entities at WHAT timestamps
entity_df = pd.DataFrame({
"user_id": ["u001", "u001", "u002", "u003"],
"item_id": ["i100", "i200", "i100", "i300"],
"event_timestamp": [
pd.Timestamp("2025-01-01"), # Point in time for u001
pd.Timestamp("2025-01-15"), # Different point for same user
pd.Timestamp("2025-01-10"),
pd.Timestamp("2025-01-20"),
],
"label": [1, 0, 1, 0], # Training labels
})
# Get historical features with point-in-time join
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_stats:total_orders",
"user_stats:avg_order_value",
"user_stats:days_since_last_order",
"item_features:price",
"item_features:avg_rating",
"user_item_interaction_features:price_to_avg_ratio",
"user_item_interaction_features:is_high_value_user",
],
).to_df()
print(training_df.columns.tolist())
# ['user_id', 'item_id', 'event_timestamp', 'label',
# 'total_orders', 'avg_order_value', 'days_since_last_order',
# 'price', 'avg_rating', 'price_to_avg_ratio', 'is_high_value_user']
在线推理取特征
# Online feature retrieval (low latency, <10ms)
from feast import FeatureStore
store = FeatureStore(repo_path="feature_repo/")
# Get features for online inference
online_features = store.get_online_features(
features=[
"user_stats:total_orders",
"user_stats:avg_order_value",
"user_stats:user_segment",
"item_features:price",
"item_features:avg_rating",
"user_item_interaction_features:price_to_avg_ratio",
],
entity_rows=[
{"user_id": "u001", "item_id": "i100"},
{"user_id": "u001", "item_id": "i200"},
],
).to_dict()
# Returns:
# {
# "user_id": ["u001", "u001"],
# "item_id": ["i100", "i200"],
# "total_orders": [42, 42],
# "avg_order_value": [89.5, 89.5],
# "price": [29.99, 149.99],
# ...
# }
特征工程模式
常见特征模式
| 模式 | 描述 | 示例 | 计算方式 |
|---|---|---|---|
| 聚合特征 | 时间窗口聚合 | 7 天订单总额 | Batch (Spark) |
| 计数特征 | 事件计数 | 30 天登录次数 | Batch/Stream |
| 比率特征 | 两个指标的比值 | 点击/展示比 | On-demand |
| 时间差特征 | 距离某事件的时间 | 距上次购买天数 | On-demand |
| 嵌入特征 | 语义向量表示 | 用户兴趣向量 | Batch (模型推理) |
| 交叉特征 | 实体间的交互 | 用户-品类偏好 | Batch |
| 实时特征 | 当前状态 | 购物车商品数 | Stream |
特征质量监控
# src/feature_quality.py
import numpy as np
from dataclasses import dataclass
@dataclass
class FeatureStats:
name: str
null_rate: float
mean: float
std: float
min_val: float
max_val: float
unique_count: int
def compute_feature_stats(df, feature_name: str) -> FeatureStats:
col = df[feature_name]
return FeatureStats(
name=feature_name,
null_rate=col.isna().mean(),
mean=col.mean() if col.dtype in [np.float64, np.int64] else 0,
std=col.std() if col.dtype in [np.float64, np.int64] else 0,
min_val=col.min() if col.dtype in [np.float64, np.int64] else 0,
max_val=col.max() if col.dtype in [np.float64, np.int64] else 0,
unique_count=col.nunique(),
)
def validate_feature_quality(
current_stats: FeatureStats,
reference_stats: FeatureStats,
thresholds: dict,
) -> list[str]:
"""Validate feature quality against reference baseline."""
issues = []
# Null rate check
if current_stats.null_rate > thresholds.get("max_null_rate", 0.05):
issues.append(
f"{current_stats.name}: null rate {current_stats.null_rate:.2%} "
f"exceeds threshold {thresholds['max_null_rate']:.2%}"
)
# Distribution drift check
if reference_stats.std > 0:
z_score = abs(current_stats.mean - reference_stats.mean) / reference_stats.std
if z_score > thresholds.get("max_mean_drift_z", 3.0):
issues.append(
f"{current_stats.name}: mean shifted by {z_score:.1f} sigma"
)
# Cardinality check
ratio = current_stats.unique_count / max(reference_stats.unique_count, 1)
if ratio > 2.0 or ratio < 0.5:
issues.append(
f"{current_stats.name}: cardinality changed from "
f"{reference_stats.unique_count} to {current_stats.unique_count}"
)
return issues
生产部署架构
┌─────────────────────────────────────────────────┐
│ 生产环境 Feature Store │
│ │
│ ┌──────────┐ ┌──────────┐ ┌─────────┐ │
│ │ Airflow │────▶│ Spark │────▶│ Parquet │ │
│ │ (调度) │ │ (计算) │ │ (离线) │ │
│ └──────────┘ └──────────┘ └────┬────┘ │
│ │ │
│ Materialize│ │
│ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌─────────┐ │
│ │ Kafka │────▶│ Flink │────▶│ Redis │ │
│ │ (事件) │ │ (流计算) │ │ (在线) │ │
│ └──────────┘ └──────────┘ └────┬────┘ │
│ │ │
│ ┌────▼────┐ │
│ │ Feast │ │
│ │ Server │ │
│ │ (gRPC) │ │
│ └─────────┘ │
└─────────────────────────────────────────────────┘
总结
- Feature Store 解决训练-服务一致性:统一特征定义,确保离线训练和在线推理使用完全相同的特征计算逻辑。
- Point-in-Time Join 防止数据泄露:训练数据必须严格按时间点取特征,不能"偷看"未来。
- 双通道设计是核心:离线通道(Batch)支持训练,在线通道(Real-time)支持推理,两者共享特征定义。
- 特征质量需要持续监控:空值率、分布漂移和基数变化都可能导致模型退化。
- 从 Feast 开始:Feast 是目前最成熟的开源 Feature Store,支持从文件到 Redis 的全链路。
Maurice | maurice_wen@proton.me