多租户 AI 平台架构设计

概述

多租户 AI 平台需要在共享基础设施上为多个客户(租户)提供独立的 AI 服务。核心挑战在于:如何在保证租户隔离的同时,最大化 GPU 等昂贵资源的利用率。

本文覆盖多租户 AI 平台的五个核心设计维度:租户隔离、资源调度、成本分摊、推理优化和安全合规。

架构总览

                        多租户 AI 平台架构
    ┌─────────────────────────────────────────────────────┐
    │                   API Gateway                       │
    │  (认证 + 限流 + 路由 + 计量)                         │
    └────────────────┬────────────────────────────────────┘
                     │
    ┌────────────────┼────────────────────────────────────┐
    │                v                                    │
    │  ┌──────────────────────────┐                       │
    │  │     租户路由层             │                       │
    │  │  tenant_id -> model pool │                       │
    │  └──────┬───────┬───────┬──┘                       │
    │         │       │       │                           │
    │    ┌────v──┐ ┌──v────┐ ┌v─────┐                    │
    │    │租户 A │ │租户 B │ │租户 C│     模型服务层       │
    │    │独享池 │ │共享池 │ │共享池│                      │
    │    └───────┘ └───────┘ └──────┘                    │
    │                                                    │
    │  ┌──────────────────────────────────────┐          │
    │  │          GPU 资源池                    │          │
    │  │  ┌──────┐ ┌──────┐ ┌──────┐          │          │
    │  │  │A100-1│ │A100-2│ │A100-3│ ...      │          │
    │  │  └──────┘ └──────┘ └──────┘          │          │
    │  └──────────────────────────────────────┘          │
    └────────────────────────────────────────────────────┘

租户隔离策略

隔离级别

级别 隔离方式 成本 安全性 适用场景
L1 逻辑隔离(同进程) 最低 内部团队/开发环境
L2 进程隔离(容器) SaaS 通用方案
L3 VM/节点隔离 合规要求严格的企业
L4 物理隔离(独立集群) 最高 最高 金融/政府/医疗

L2 容器级隔离实现

# 每个租户独立的推理 Pod
apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-tenant-acme
  labels:
    tenant: acme
    tier: dedicated
spec:
  replicas: 2
  selector:
    matchLabels:
      app: inference
      tenant: acme
  template:
    metadata:
      labels:
        app: inference
        tenant: acme
    spec:
      # 命名空间隔离
      serviceAccountName: tenant-acme-sa

      # 资源配额
      containers:
        - name: inference
          image: inference-server:v3
          resources:
            requests:
              cpu: "4"
              memory: "16Gi"
              nvidia.com/gpu: "1"
            limits:
              cpu: "8"
              memory: "32Gi"
              nvidia.com/gpu: "1"
          env:
            - name: TENANT_ID
              value: "acme"
            - name: MODEL_PATH
              value: "/models/acme/"

      # 节点亲和性(按租户等级调度)
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: tenant-tier
                    operator: In
                    values: ["dedicated"]

---
# 租户命名空间资源配额
apiVersion: v1
kind: ResourceQuota
metadata:
  name: tenant-acme-quota
  namespace: tenant-acme
spec:
  hard:
    requests.cpu: "32"
    requests.memory: "128Gi"
    requests.nvidia.com/gpu: "4"
    pods: "20"

逻辑隔离(共享推理服务)

from dataclasses import dataclass, field
from typing import Optional

@dataclass
class TenantConfig:
    tenant_id: str
    tier: str                     # free / pro / enterprise
    models: list[str]             # 可用模型列表
    max_rpm: int = 60             # 每分钟请求上限
    max_tokens_per_request: int = 4096
    max_concurrent: int = 5       # 最大并发
    custom_system_prompt: Optional[str] = None
    data_region: str = "cn"       # 数据驻留区域
    allowed_tools: list[str] = field(default_factory=list)

class TenantRouter:
    """基于租户配置路由推理请求"""

    def __init__(self, tenant_store):
        self.tenant_store = tenant_store
        self.model_pools = {}

    async def route(self, tenant_id: str, request):
        config = await self.tenant_store.get(tenant_id)
        if not config:
            raise TenantNotFoundError(tenant_id)

        # 验证模型访问权限
        if request.model not in config.models:
            raise ModelAccessDeniedError(
                f"Tenant {tenant_id} has no access to {request.model}"
            )

        # 验证 token 限制
        if request.max_tokens > config.max_tokens_per_request:
            request.max_tokens = config.max_tokens_per_request

        # 选择模型池
        pool = self._get_pool(config.tier, request.model)

        # 注入租户上下文
        if config.custom_system_prompt:
            request.messages.insert(0, {
                "role": "system",
                "content": config.custom_system_prompt,
            })

        return await pool.infer(request, tenant_id=tenant_id)

    def _get_pool(self, tier: str, model: str):
        if tier == "enterprise":
            return self.model_pools.get(f"dedicated_{model}")
        return self.model_pools.get(f"shared_{model}")

速率限制与配额管理

多维度限流

import asyncio
import time
from collections import defaultdict

class TenantRateLimiter:
    """滑动窗口 + 令牌桶混合限流"""

    def __init__(self):
        self.windows = defaultdict(list)
        self.token_budgets = {}

    async def check_and_consume(
        self,
        tenant_id: str,
        config: TenantConfig,
        estimated_tokens: int,
    ) -> tuple[bool, str]:
        """检查是否允许请求,返回 (allowed, reason)"""

        now = time.time()

        # 维度一:RPM(每分钟请求数)
        window = self.windows[tenant_id]
        window[:] = [t for t in window if now - t < 60]  # 清理过期

        if len(window) >= config.max_rpm:
            return False, f"Rate limit exceeded: {config.max_rpm} RPM"

        # 维度二:并发数
        active = sum(1 for t in window if now - t < 30)
        if active >= config.max_concurrent:
            return False, f"Concurrency limit: {config.max_concurrent}"

        # 维度三:Token 预算(月度)
        budget = self.token_budgets.get(tenant_id, config.monthly_token_budget)
        if budget < estimated_tokens:
            return False, "Monthly token budget exhausted"

        # 通过所有检查
        window.append(now)
        self.token_budgets[tenant_id] = budget - estimated_tokens
        return True, "ok"

    def get_usage_stats(self, tenant_id: str) -> dict:
        now = time.time()
        window = self.windows.get(tenant_id, [])
        recent = [t for t in window if now - t < 60]

        return {
            "current_rpm": len(recent),
            "tokens_remaining": self.token_budgets.get(tenant_id, 0),
        }

Redis 分布式限流

import redis.asyncio as redis

class DistributedRateLimiter:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)

    async def check_rpm(self, tenant_id: str, limit: int) -> bool:
        """基于 Redis 的滑动窗口限流"""
        key = f"ratelimit:{tenant_id}:rpm"
        now = int(time.time() * 1000)
        window_ms = 60_000

        pipe = self.redis.pipeline()

        # 移除窗口外的记录
        pipe.zremrangebyscore(key, 0, now - window_ms)

        # 计算当前窗口内的请求数
        pipe.zcard(key)

        # 添加当前请求
        pipe.zadd(key, {str(now): now})

        # 设置 key 过期时间
        pipe.expire(key, 120)

        results = await pipe.execute()
        current_count = results[1]

        if current_count >= limit:
            # 回滚添加的记录
            await self.redis.zrem(key, str(now))
            return False

        return True

成本分摊与计量

用量计量系统

from dataclasses import dataclass
from datetime import datetime
import json

@dataclass
class UsageRecord:
    tenant_id: str
    timestamp: datetime
    model: str
    input_tokens: int
    output_tokens: int
    latency_ms: float
    gpu_seconds: float
    cost_usd: float

class UsageMeter:
    """实时用量计量"""

    # 定价表(每 1M tokens)
    PRICING = {
        "gpt-4o": {"input": 2.50, "output": 10.00},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "claude-sonnet": {"input": 3.00, "output": 15.00},
        "llama-3.1-8b": {"input": 0.05, "output": 0.08},  # 自托管成本
    }

    # GPU 成本(每小时)
    GPU_COST_PER_HOUR = {
        "A100-80GB": 3.50,
        "A10G": 1.00,
        "T4": 0.35,
    }

    def calculate_cost(self, model: str, input_tokens: int,
                       output_tokens: int, gpu_type: str,
                       gpu_seconds: float) -> float:
        """计算单次请求成本"""
        pricing = self.PRICING.get(model, {"input": 1.0, "output": 2.0})

        # Token 成本
        token_cost = (
            input_tokens * pricing["input"] / 1_000_000
            + output_tokens * pricing["output"] / 1_000_000
        )

        # GPU 成本(自托管模型)
        if model.startswith("llama") or model.startswith("mistral"):
            gpu_hourly = self.GPU_COST_PER_HOUR.get(gpu_type, 1.0)
            gpu_cost = gpu_seconds * gpu_hourly / 3600
            return max(token_cost, gpu_cost)  # 取较高者

        return token_cost

    async def record(self, record: UsageRecord):
        """写入计量记录(异步,不阻塞推理)"""
        # 写入时序数据库
        await self._write_to_timeseries(record)

        # 更新实时聚合
        await self._update_aggregates(record)

    async def get_monthly_report(self, tenant_id: str) -> dict:
        """月度用量报告"""
        records = await self._query_records(tenant_id, period="month")

        total_cost = sum(r.cost_usd for r in records)
        total_requests = len(records)
        total_tokens = sum(r.input_tokens + r.output_tokens for r in records)

        by_model = {}
        for r in records:
            if r.model not in by_model:
                by_model[r.model] = {"requests": 0, "tokens": 0, "cost": 0}
            by_model[r.model]["requests"] += 1
            by_model[r.model]["tokens"] += r.input_tokens + r.output_tokens
            by_model[r.model]["cost"] += r.cost_usd

        return {
            "tenant_id": tenant_id,
            "period": "2026-02",
            "total_cost_usd": round(total_cost, 4),
            "total_requests": total_requests,
            "total_tokens": total_tokens,
            "by_model": by_model,
        }

共享推理优化

请求合批(Request Batching)

import asyncio
from dataclasses import dataclass

@dataclass
class PendingRequest:
    tenant_id: str
    input_data: dict
    future: asyncio.Future

class BatchInferenceServer:
    """跨租户请求合批,提高 GPU 利用率"""

    def __init__(self, model, max_batch_size=32, max_wait_ms=50):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        self.pending: list[PendingRequest] = []
        self.lock = asyncio.Lock()
        self._batch_task = None

    async def predict(self, tenant_id: str, input_data: dict) -> dict:
        """提交推理请求,等待批处理结果"""
        future = asyncio.get_event_loop().create_future()
        request = PendingRequest(tenant_id, input_data, future)

        async with self.lock:
            self.pending.append(request)

            if len(self.pending) >= self.max_batch_size:
                await self._process_batch()
            elif self._batch_task is None:
                self._batch_task = asyncio.create_task(self._wait_and_process())

        return await future

    async def _wait_and_process(self):
        await asyncio.sleep(self.max_wait_ms / 1000)
        async with self.lock:
            if self.pending:
                await self._process_batch()
            self._batch_task = None

    async def _process_batch(self):
        batch = self.pending[:self.max_batch_size]
        self.pending = self.pending[self.max_batch_size:]

        # 合批推理
        inputs = [r.input_data for r in batch]
        results = await self.model.batch_predict(inputs)

        # 分发结果
        for request, result in zip(batch, results):
            request.future.set_result(result)

模型多路复用

class ModelMultiplexer:
    """多租户共享模型实例,按需加载/卸载"""

    def __init__(self, gpu_memory_limit_gb=80):
        self.loaded_models: dict[str, LoadedModel] = {}
        self.gpu_memory_limit = gpu_memory_limit_gb * 1024 ** 3
        self.gpu_memory_used = 0
        self.lru_order: list[str] = []

    async def get_model(self, model_name: str) -> LoadedModel:
        if model_name in self.loaded_models:
            # LRU 更新
            self.lru_order.remove(model_name)
            self.lru_order.append(model_name)
            return self.loaded_models[model_name]

        # 需要加载新模型
        model_size = self._estimate_size(model_name)

        # 显存不足时卸载最久未用的模型
        while self.gpu_memory_used + model_size > self.gpu_memory_limit:
            if not self.lru_order:
                raise OutOfMemoryError("Cannot load model, GPU memory full")

            evict_name = self.lru_order.pop(0)
            evicted = self.loaded_models.pop(evict_name)
            self.gpu_memory_used -= evicted.size
            await evicted.unload()

        # 加载新模型
        model = await self._load_model(model_name)
        self.loaded_models[model_name] = model
        self.lru_order.append(model_name)
        self.gpu_memory_used += model.size

        return model

数据隔离与合规

租户数据隔离

# 数据库层面:Row Level Security (PostgreSQL)
"""
-- 启用 RLS
ALTER TABLE ai_conversations ENABLE ROW LEVEL SECURITY;

-- 创建策略:租户只能看到自己的数据
CREATE POLICY tenant_isolation ON ai_conversations
    USING (tenant_id = current_setting('app.tenant_id')::uuid);

-- 应用层设置租户上下文
SET app.tenant_id = 'tenant-uuid-here';
"""

# 向量数据库层面:Collection 隔离
class TenantVectorStore:
    def __init__(self, qdrant_client):
        self.client = qdrant_client

    async def ensure_collection(self, tenant_id: str):
        """每个租户独立的 collection"""
        collection_name = f"tenant_{tenant_id}_docs"

        if not await self.client.collection_exists(collection_name):
            await self.client.create_collection(
                collection_name=collection_name,
                vectors_config={"size": 768, "distance": "Cosine"},
            )

        return collection_name

    async def search(self, tenant_id: str, query_vector, limit=10):
        collection = await self.ensure_collection(tenant_id)
        # 自动限定在租户 collection 内搜索
        return await self.client.query_points(
            collection_name=collection,
            query=query_vector,
            limit=limit,
        )

可观测性

# 多租户指标分维度采集
from prometheus_client import Histogram, Counter, Gauge

INFERENCE_LATENCY = Histogram(
    "inference_latency_seconds",
    "Inference latency by tenant and model",
    ["tenant_id", "model", "tier"],
    buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0],
)

TOKEN_USAGE = Counter(
    "token_usage_total",
    "Total tokens consumed",
    ["tenant_id", "model", "direction"],  # direction: input/output
)

ACTIVE_TENANTS = Gauge(
    "active_tenants",
    "Number of tenants with active requests",
)

GPU_UTILIZATION_BY_TENANT = Gauge(
    "gpu_utilization_by_tenant",
    "GPU time allocation per tenant",
    ["tenant_id", "gpu_id"],
)

总结

多租户 AI 平台的核心设计原则:

  1. 隔离与共享的平衡:按租户等级选择隔离级别,共享推理通过合批提高利用率
  2. 成本透明:精确计量每个租户的 token、GPU 时间和 API 调用
  3. 弹性扩展:共享池 + 独享池混合,按需扩缩
  4. 数据安全:数据库 RLS + 向量库 Collection 隔离 + 上下文注入边界
  5. 可观测性:所有指标按 tenant_id 维度采集,支持异常检测和 SLA 报告

Maurice | maurice_wen@proton.me