多租户 AI 平台架构设计
原创
灵阙教研团队
S 精选 提升 |
约 9 分钟阅读
更新于 2026-02-28 AI 导读
多租户 AI 平台架构设计 概述 多租户 AI 平台需要在共享基础设施上为多个客户(租户)提供独立的 AI 服务。核心挑战在于:如何在保证租户隔离的同时,最大化 GPU 等昂贵资源的利用率。 本文覆盖多租户 AI 平台的五个核心设计维度:租户隔离、资源调度、成本分摊、推理优化和安全合规。 架构总览 多租户 AI 平台架构...
多租户 AI 平台架构设计
概述
多租户 AI 平台需要在共享基础设施上为多个客户(租户)提供独立的 AI 服务。核心挑战在于:如何在保证租户隔离的同时,最大化 GPU 等昂贵资源的利用率。
本文覆盖多租户 AI 平台的五个核心设计维度:租户隔离、资源调度、成本分摊、推理优化和安全合规。
架构总览
多租户 AI 平台架构
┌─────────────────────────────────────────────────────┐
│ API Gateway │
│ (认证 + 限流 + 路由 + 计量) │
└────────────────┬────────────────────────────────────┘
│
┌────────────────┼────────────────────────────────────┐
│ v │
│ ┌──────────────────────────┐ │
│ │ 租户路由层 │ │
│ │ tenant_id -> model pool │ │
│ └──────┬───────┬───────┬──┘ │
│ │ │ │ │
│ ┌────v──┐ ┌──v────┐ ┌v─────┐ │
│ │租户 A │ │租户 B │ │租户 C│ 模型服务层 │
│ │独享池 │ │共享池 │ │共享池│ │
│ └───────┘ └───────┘ └──────┘ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ GPU 资源池 │ │
│ │ ┌──────┐ ┌──────┐ ┌──────┐ │ │
│ │ │A100-1│ │A100-2│ │A100-3│ ... │ │
│ │ └──────┘ └──────┘ └──────┘ │ │
│ └──────────────────────────────────────┘ │
└────────────────────────────────────────────────────┘
租户隔离策略
隔离级别
| 级别 | 隔离方式 | 成本 | 安全性 | 适用场景 |
|---|---|---|---|---|
| L1 | 逻辑隔离(同进程) | 最低 | 低 | 内部团队/开发环境 |
| L2 | 进程隔离(容器) | 中 | 中 | SaaS 通用方案 |
| L3 | VM/节点隔离 | 高 | 高 | 合规要求严格的企业 |
| L4 | 物理隔离(独立集群) | 最高 | 最高 | 金融/政府/医疗 |
L2 容器级隔离实现
# 每个租户独立的推理 Pod
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-tenant-acme
labels:
tenant: acme
tier: dedicated
spec:
replicas: 2
selector:
matchLabels:
app: inference
tenant: acme
template:
metadata:
labels:
app: inference
tenant: acme
spec:
# 命名空间隔离
serviceAccountName: tenant-acme-sa
# 资源配额
containers:
- name: inference
image: inference-server:v3
resources:
requests:
cpu: "4"
memory: "16Gi"
nvidia.com/gpu: "1"
limits:
cpu: "8"
memory: "32Gi"
nvidia.com/gpu: "1"
env:
- name: TENANT_ID
value: "acme"
- name: MODEL_PATH
value: "/models/acme/"
# 节点亲和性(按租户等级调度)
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: tenant-tier
operator: In
values: ["dedicated"]
---
# 租户命名空间资源配额
apiVersion: v1
kind: ResourceQuota
metadata:
name: tenant-acme-quota
namespace: tenant-acme
spec:
hard:
requests.cpu: "32"
requests.memory: "128Gi"
requests.nvidia.com/gpu: "4"
pods: "20"
逻辑隔离(共享推理服务)
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class TenantConfig:
tenant_id: str
tier: str # free / pro / enterprise
models: list[str] # 可用模型列表
max_rpm: int = 60 # 每分钟请求上限
max_tokens_per_request: int = 4096
max_concurrent: int = 5 # 最大并发
custom_system_prompt: Optional[str] = None
data_region: str = "cn" # 数据驻留区域
allowed_tools: list[str] = field(default_factory=list)
class TenantRouter:
"""基于租户配置路由推理请求"""
def __init__(self, tenant_store):
self.tenant_store = tenant_store
self.model_pools = {}
async def route(self, tenant_id: str, request):
config = await self.tenant_store.get(tenant_id)
if not config:
raise TenantNotFoundError(tenant_id)
# 验证模型访问权限
if request.model not in config.models:
raise ModelAccessDeniedError(
f"Tenant {tenant_id} has no access to {request.model}"
)
# 验证 token 限制
if request.max_tokens > config.max_tokens_per_request:
request.max_tokens = config.max_tokens_per_request
# 选择模型池
pool = self._get_pool(config.tier, request.model)
# 注入租户上下文
if config.custom_system_prompt:
request.messages.insert(0, {
"role": "system",
"content": config.custom_system_prompt,
})
return await pool.infer(request, tenant_id=tenant_id)
def _get_pool(self, tier: str, model: str):
if tier == "enterprise":
return self.model_pools.get(f"dedicated_{model}")
return self.model_pools.get(f"shared_{model}")
速率限制与配额管理
多维度限流
import asyncio
import time
from collections import defaultdict
class TenantRateLimiter:
"""滑动窗口 + 令牌桶混合限流"""
def __init__(self):
self.windows = defaultdict(list)
self.token_budgets = {}
async def check_and_consume(
self,
tenant_id: str,
config: TenantConfig,
estimated_tokens: int,
) -> tuple[bool, str]:
"""检查是否允许请求,返回 (allowed, reason)"""
now = time.time()
# 维度一:RPM(每分钟请求数)
window = self.windows[tenant_id]
window[:] = [t for t in window if now - t < 60] # 清理过期
if len(window) >= config.max_rpm:
return False, f"Rate limit exceeded: {config.max_rpm} RPM"
# 维度二:并发数
active = sum(1 for t in window if now - t < 30)
if active >= config.max_concurrent:
return False, f"Concurrency limit: {config.max_concurrent}"
# 维度三:Token 预算(月度)
budget = self.token_budgets.get(tenant_id, config.monthly_token_budget)
if budget < estimated_tokens:
return False, "Monthly token budget exhausted"
# 通过所有检查
window.append(now)
self.token_budgets[tenant_id] = budget - estimated_tokens
return True, "ok"
def get_usage_stats(self, tenant_id: str) -> dict:
now = time.time()
window = self.windows.get(tenant_id, [])
recent = [t for t in window if now - t < 60]
return {
"current_rpm": len(recent),
"tokens_remaining": self.token_budgets.get(tenant_id, 0),
}
Redis 分布式限流
import redis.asyncio as redis
class DistributedRateLimiter:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
async def check_rpm(self, tenant_id: str, limit: int) -> bool:
"""基于 Redis 的滑动窗口限流"""
key = f"ratelimit:{tenant_id}:rpm"
now = int(time.time() * 1000)
window_ms = 60_000
pipe = self.redis.pipeline()
# 移除窗口外的记录
pipe.zremrangebyscore(key, 0, now - window_ms)
# 计算当前窗口内的请求数
pipe.zcard(key)
# 添加当前请求
pipe.zadd(key, {str(now): now})
# 设置 key 过期时间
pipe.expire(key, 120)
results = await pipe.execute()
current_count = results[1]
if current_count >= limit:
# 回滚添加的记录
await self.redis.zrem(key, str(now))
return False
return True
成本分摊与计量
用量计量系统
from dataclasses import dataclass
from datetime import datetime
import json
@dataclass
class UsageRecord:
tenant_id: str
timestamp: datetime
model: str
input_tokens: int
output_tokens: int
latency_ms: float
gpu_seconds: float
cost_usd: float
class UsageMeter:
"""实时用量计量"""
# 定价表(每 1M tokens)
PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-sonnet": {"input": 3.00, "output": 15.00},
"llama-3.1-8b": {"input": 0.05, "output": 0.08}, # 自托管成本
}
# GPU 成本(每小时)
GPU_COST_PER_HOUR = {
"A100-80GB": 3.50,
"A10G": 1.00,
"T4": 0.35,
}
def calculate_cost(self, model: str, input_tokens: int,
output_tokens: int, gpu_type: str,
gpu_seconds: float) -> float:
"""计算单次请求成本"""
pricing = self.PRICING.get(model, {"input": 1.0, "output": 2.0})
# Token 成本
token_cost = (
input_tokens * pricing["input"] / 1_000_000
+ output_tokens * pricing["output"] / 1_000_000
)
# GPU 成本(自托管模型)
if model.startswith("llama") or model.startswith("mistral"):
gpu_hourly = self.GPU_COST_PER_HOUR.get(gpu_type, 1.0)
gpu_cost = gpu_seconds * gpu_hourly / 3600
return max(token_cost, gpu_cost) # 取较高者
return token_cost
async def record(self, record: UsageRecord):
"""写入计量记录(异步,不阻塞推理)"""
# 写入时序数据库
await self._write_to_timeseries(record)
# 更新实时聚合
await self._update_aggregates(record)
async def get_monthly_report(self, tenant_id: str) -> dict:
"""月度用量报告"""
records = await self._query_records(tenant_id, period="month")
total_cost = sum(r.cost_usd for r in records)
total_requests = len(records)
total_tokens = sum(r.input_tokens + r.output_tokens for r in records)
by_model = {}
for r in records:
if r.model not in by_model:
by_model[r.model] = {"requests": 0, "tokens": 0, "cost": 0}
by_model[r.model]["requests"] += 1
by_model[r.model]["tokens"] += r.input_tokens + r.output_tokens
by_model[r.model]["cost"] += r.cost_usd
return {
"tenant_id": tenant_id,
"period": "2026-02",
"total_cost_usd": round(total_cost, 4),
"total_requests": total_requests,
"total_tokens": total_tokens,
"by_model": by_model,
}
共享推理优化
请求合批(Request Batching)
import asyncio
from dataclasses import dataclass
@dataclass
class PendingRequest:
tenant_id: str
input_data: dict
future: asyncio.Future
class BatchInferenceServer:
"""跨租户请求合批,提高 GPU 利用率"""
def __init__(self, model, max_batch_size=32, max_wait_ms=50):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.pending: list[PendingRequest] = []
self.lock = asyncio.Lock()
self._batch_task = None
async def predict(self, tenant_id: str, input_data: dict) -> dict:
"""提交推理请求,等待批处理结果"""
future = asyncio.get_event_loop().create_future()
request = PendingRequest(tenant_id, input_data, future)
async with self.lock:
self.pending.append(request)
if len(self.pending) >= self.max_batch_size:
await self._process_batch()
elif self._batch_task is None:
self._batch_task = asyncio.create_task(self._wait_and_process())
return await future
async def _wait_and_process(self):
await asyncio.sleep(self.max_wait_ms / 1000)
async with self.lock:
if self.pending:
await self._process_batch()
self._batch_task = None
async def _process_batch(self):
batch = self.pending[:self.max_batch_size]
self.pending = self.pending[self.max_batch_size:]
# 合批推理
inputs = [r.input_data for r in batch]
results = await self.model.batch_predict(inputs)
# 分发结果
for request, result in zip(batch, results):
request.future.set_result(result)
模型多路复用
class ModelMultiplexer:
"""多租户共享模型实例,按需加载/卸载"""
def __init__(self, gpu_memory_limit_gb=80):
self.loaded_models: dict[str, LoadedModel] = {}
self.gpu_memory_limit = gpu_memory_limit_gb * 1024 ** 3
self.gpu_memory_used = 0
self.lru_order: list[str] = []
async def get_model(self, model_name: str) -> LoadedModel:
if model_name in self.loaded_models:
# LRU 更新
self.lru_order.remove(model_name)
self.lru_order.append(model_name)
return self.loaded_models[model_name]
# 需要加载新模型
model_size = self._estimate_size(model_name)
# 显存不足时卸载最久未用的模型
while self.gpu_memory_used + model_size > self.gpu_memory_limit:
if not self.lru_order:
raise OutOfMemoryError("Cannot load model, GPU memory full")
evict_name = self.lru_order.pop(0)
evicted = self.loaded_models.pop(evict_name)
self.gpu_memory_used -= evicted.size
await evicted.unload()
# 加载新模型
model = await self._load_model(model_name)
self.loaded_models[model_name] = model
self.lru_order.append(model_name)
self.gpu_memory_used += model.size
return model
数据隔离与合规
租户数据隔离
# 数据库层面:Row Level Security (PostgreSQL)
"""
-- 启用 RLS
ALTER TABLE ai_conversations ENABLE ROW LEVEL SECURITY;
-- 创建策略:租户只能看到自己的数据
CREATE POLICY tenant_isolation ON ai_conversations
USING (tenant_id = current_setting('app.tenant_id')::uuid);
-- 应用层设置租户上下文
SET app.tenant_id = 'tenant-uuid-here';
"""
# 向量数据库层面:Collection 隔离
class TenantVectorStore:
def __init__(self, qdrant_client):
self.client = qdrant_client
async def ensure_collection(self, tenant_id: str):
"""每个租户独立的 collection"""
collection_name = f"tenant_{tenant_id}_docs"
if not await self.client.collection_exists(collection_name):
await self.client.create_collection(
collection_name=collection_name,
vectors_config={"size": 768, "distance": "Cosine"},
)
return collection_name
async def search(self, tenant_id: str, query_vector, limit=10):
collection = await self.ensure_collection(tenant_id)
# 自动限定在租户 collection 内搜索
return await self.client.query_points(
collection_name=collection,
query=query_vector,
limit=limit,
)
可观测性
# 多租户指标分维度采集
from prometheus_client import Histogram, Counter, Gauge
INFERENCE_LATENCY = Histogram(
"inference_latency_seconds",
"Inference latency by tenant and model",
["tenant_id", "model", "tier"],
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0],
)
TOKEN_USAGE = Counter(
"token_usage_total",
"Total tokens consumed",
["tenant_id", "model", "direction"], # direction: input/output
)
ACTIVE_TENANTS = Gauge(
"active_tenants",
"Number of tenants with active requests",
)
GPU_UTILIZATION_BY_TENANT = Gauge(
"gpu_utilization_by_tenant",
"GPU time allocation per tenant",
["tenant_id", "gpu_id"],
)
总结
多租户 AI 平台的核心设计原则:
- 隔离与共享的平衡:按租户等级选择隔离级别,共享推理通过合批提高利用率
- 成本透明:精确计量每个租户的 token、GPU 时间和 API 调用
- 弹性扩展:共享池 + 独享池混合,按需扩缩
- 数据安全:数据库 RLS + 向量库 Collection 隔离 + 上下文注入边界
- 可观测性:所有指标按 tenant_id 维度采集,支持异常检测和 SLA 报告
Maurice | maurice_wen@proton.me