API 网关与模型路由实战

构建生产级 LLM API 网关:多模型路由、语义缓存、限流、降级与成本监控 Maurice | 灵阙学院

前置准备

  • Python 3.10+
  • Redis(用于缓存和限流)
  • 至少一个 LLM API Key(OpenAI / Anthropic / Google)

一、为什么需要 LLM API 网关

直接调用 LLM API 的痛点:

应用层直连的问题:
- 单一供应商锁定,故障时无法切换
- 无法统一管理多个 API Key 的配额和成本
- 重复请求浪费 Token 和金钱
- 无全局限流,容易触发 Rate Limit
- 缺少统一的用量追踪和成本报表

网关架构解决方案:

客户端 --> API 网关 --> [限流] --> [缓存] --> [路由] --> Provider A (OpenAI)
                                                    --> Provider B (Anthropic)
                                                    --> Provider C (Google)
                                              --> [日志/计费]

二、使用 LiteLLM 快速搭建

LiteLLM 是目前最成熟的开源 LLM 代理网关,支持 100+ 模型。

2.1 安装

pip install litellm[proxy] redis

2.2 配置文件

创建 litellm_config.yaml

model_list:
  # 模型别名 --> 实际模型映射
  - model_name: "fast"
    litellm_params:
      model: "gpt-4o-mini"
      api_key: "os.environ/OPENAI_API_KEY"

  - model_name: "smart"
    litellm_params:
      model: "claude-sonnet-4-20250514"
      api_key: "os.environ/ANTHROPIC_API_KEY"

  - model_name: "smart"  # 同名 = 负载均衡 + 自动降级
    litellm_params:
      model: "gpt-4o"
      api_key: "os.environ/OPENAI_API_KEY"

  - model_name: "search"
    litellm_params:
      model: "gemini/gemini-2.5-flash"
      api_key: "os.environ/GOOGLE_API_KEY"

litellm_settings:
  drop_params: true
  set_verbose: false

router_settings:
  routing_strategy: "latency-based-routing"  # 按延迟选择最快的
  num_retries: 3
  timeout: 60
  retry_after: 5
  fallbacks:
    - {"smart": ["fast"]}  # smart 全部失败时降级到 fast

2.3 启动代理

export OPENAI_API_KEY=sk-xxx
export ANTHROPIC_API_KEY=sk-ant-xxx
export GOOGLE_API_KEY=AIza-xxx

litellm --config litellm_config.yaml --port 4000

2.4 测试调用

# 调用别名 "fast" --> 自动路由到 gpt-4o-mini
curl http://localhost:4000/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "fast",
    "messages": [{"role": "user", "content": "你好"}]
  }'

预期输出(OpenAI 兼容格式):

{
  "id": "chatcmpl-xxx",
  "model": "gpt-4o-mini",
  "choices": [{
    "message": {"role": "assistant", "content": "你好!有什么可以帮你的吗?"}
  }],
  "usage": {"prompt_tokens": 8, "completion_tokens": 12, "total_tokens": 20}
}

三、自建 Python 网关(深度定制)

当 LiteLLM 无法满足业务需求时,自建网关提供最大灵活度。

3.1 项目结构

llm-gateway/
  gateway.py          # FastAPI 主服务
  router.py           # 模型路由逻辑
  cache.py            # 语义缓存
  rate_limiter.py     # 限流器
  cost_tracker.py     # 成本追踪
  config.py           # 配置
  requirements.txt

3.2 核心路由器

创建 router.py

import time
import random
from dataclasses import dataclass, field
from openai import OpenAI
from anthropic import Anthropic

@dataclass
class Provider:
    name: str
    client: object
    model: str
    priority: int = 0          # 越小优先级越高
    avg_latency: float = 1.0
    failure_count: int = 0
    last_failure: float = 0
    cost_per_1k_input: float = 0.0
    cost_per_1k_output: float = 0.0

class ModelRouter:
    def __init__(self, providers: list[Provider]):
        self.providers = sorted(providers, key=lambda p: p.priority)

    def route(self, messages: list[dict],
              strategy: str = "priority") -> dict:
        """
        路由策略:
        - priority: 按优先级顺序尝试
        - latency: 选延迟最低的
        - cost: 选最便宜的
        - random: 随机负载均衡
        """
        candidates = self._get_healthy_providers()

        if strategy == "latency":
            candidates.sort(key=lambda p: p.avg_latency)
        elif strategy == "cost":
            candidates.sort(key=lambda p: p.cost_per_1k_input)
        elif strategy == "random":
            random.shuffle(candidates)

        for provider in candidates:
            try:
                start = time.time()
                result = self._call_provider(provider, messages)
                latency = time.time() - start

                # 更新滑动平均延迟
                provider.avg_latency = (
                    provider.avg_latency * 0.7 + latency * 0.3
                )
                provider.failure_count = 0

                return {
                    "content": result["content"],
                    "provider": provider.name,
                    "model": provider.model,
                    "latency": round(latency, 3),
                    "usage": result.get("usage", {})
                }

            except Exception as e:
                provider.failure_count += 1
                provider.last_failure = time.time()
                print(f"[WARN] {provider.name} failed: {e}")
                continue

        raise RuntimeError("All providers failed")

    def _get_healthy_providers(self) -> list[Provider]:
        """过滤掉处于熔断状态的 provider"""
        now = time.time()
        healthy = []
        for p in self.providers:
            # 连续失败 3 次,冷却 60 秒
            if p.failure_count >= 3:
                if now - p.last_failure < 60:
                    continue
                p.failure_count = 0  # 冷却结束,重置
            healthy.append(p)
        return healthy

    def _call_provider(self, provider: Provider,
                       messages: list[dict]) -> dict:
        if isinstance(provider.client, OpenAI):
            resp = provider.client.chat.completions.create(
                model=provider.model,
                messages=messages
            )
            return {
                "content": resp.choices[0].message.content,
                "usage": {
                    "input": resp.usage.prompt_tokens,
                    "output": resp.usage.completion_tokens
                }
            }

        elif isinstance(provider.client, Anthropic):
            resp = provider.client.messages.create(
                model=provider.model,
                max_tokens=4096,
                messages=messages
            )
            return {
                "content": resp.content[0].text,
                "usage": {
                    "input": resp.usage.input_tokens,
                    "output": resp.usage.output_tokens
                }
            }

3.3 语义缓存

创建 cache.py

import json
import hashlib
import numpy as np
import redis
from openai import OpenAI

class SemanticCache:
    def __init__(self, redis_url: str = "redis://localhost:6379",
                 threshold: float = 0.92):
        self.redis = redis.from_url(redis_url)
        self.embed_client = OpenAI()
        self.threshold = threshold
        self.cache_prefix = "llm_cache:"

    def _get_embedding(self, text: str) -> list[float]:
        resp = self.embed_client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return resp.data[0].embedding

    def _cosine_similarity(self, a: list[float],
                           b: list[float]) -> float:
        a, b = np.array(a), np.array(b)
        return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

    def get(self, query: str) -> dict | None:
        """语义匹配查找缓存"""
        query_emb = self._get_embedding(query)

        # 遍历已有缓存条目
        for key in self.redis.scan_iter(f"{self.cache_prefix}*"):
            data = json.loads(self.redis.get(key))
            similarity = self._cosine_similarity(
                query_emb, data["embedding"]
            )
            if similarity >= self.threshold:
                return {
                    "content": data["content"],
                    "cached": True,
                    "similarity": round(similarity, 4)
                }
        return None

    def set(self, query: str, content: str, ttl: int = 3600):
        """写入缓存"""
        embedding = self._get_embedding(query)
        key = f"{self.cache_prefix}{hashlib.md5(query.encode()).hexdigest()}"
        self.redis.setex(key, ttl, json.dumps({
            "query": query,
            "content": content,
            "embedding": embedding
        }))

3.4 限流器

创建 rate_limiter.py

import time
import redis

class SlidingWindowLimiter:
    """滑动窗口限流器"""
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)

    def is_allowed(self, key: str, max_requests: int,
                   window_seconds: int) -> bool:
        now = time.time()
        window_start = now - window_seconds
        pipe = self.redis.pipeline()

        # 移除过期记录
        pipe.zremrangebyscore(key, 0, window_start)
        # 统计当前窗口请求数
        pipe.zcard(key)
        # 添加当前请求
        pipe.zadd(key, {str(now): now})
        # 设置 key 过期时间
        pipe.expire(key, window_seconds)

        results = pipe.execute()
        current_count = results[1]
        return current_count < max_requests

3.5 组装网关服务

创建 gateway.py

import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from openai import OpenAI
from anthropic import Anthropic
from router import ModelRouter, Provider
from cache import SemanticCache
from rate_limiter import SlidingWindowLimiter

app = FastAPI(title="LLM API Gateway")

# 初始化组件
router = ModelRouter([
    Provider(
        name="openai",
        client=OpenAI(),
        model="gpt-4o-mini",
        priority=0,
        cost_per_1k_input=0.15,
        cost_per_1k_output=0.60
    ),
    Provider(
        name="anthropic",
        client=Anthropic(),
        model="claude-sonnet-4-20250514",
        priority=1,
        cost_per_1k_input=3.0,
        cost_per_1k_output=15.0
    ),
])

cache = SemanticCache()
limiter = SlidingWindowLimiter()

# 用量统计(生产环境用数据库)
usage_log: list[dict] = []

class ChatRequest(BaseModel):
    messages: list[dict]
    user_id: str = "default"
    strategy: str = "priority"    # priority / latency / cost
    use_cache: bool = True

@app.post("/v1/chat/completions")
async def chat(req: ChatRequest):
    # 1. 限流检查
    if not limiter.is_allowed(
        f"rate:{req.user_id}", max_requests=60, window_seconds=60
    ):
        raise HTTPException(429, "Rate limit exceeded (60/min)")

    # 2. 缓存查询
    user_query = req.messages[-1]["content"]
    if req.use_cache:
        cached = cache.get(user_query)
        if cached:
            return {
                "content": cached["content"],
                "cached": True,
                "similarity": cached["similarity"]
            }

    # 3. 路由调用
    result = router.route(req.messages, strategy=req.strategy)

    # 4. 写入缓存
    if req.use_cache:
        cache.set(user_query, result["content"])

    # 5. 记录用量
    usage_log.append({
        "user_id": req.user_id,
        "provider": result["provider"],
        "model": result["model"],
        "input_tokens": result["usage"].get("input", 0),
        "output_tokens": result["usage"].get("output", 0),
        "latency": result["latency"]
    })

    return result

@app.get("/stats")
async def stats():
    """成本和用量统计"""
    total_input = sum(r.get("input_tokens", 0) for r in usage_log)
    total_output = sum(r.get("output_tokens", 0) for r in usage_log)
    by_provider = {}
    for r in usage_log:
        p = r["provider"]
        if p not in by_provider:
            by_provider[p] = {"calls": 0, "input": 0, "output": 0}
        by_provider[p]["calls"] += 1
        by_provider[p]["input"] += r.get("input_tokens", 0)
        by_provider[p]["output"] += r.get("output_tokens", 0)
    return {
        "total_requests": len(usage_log),
        "total_tokens": {"input": total_input, "output": total_output},
        "by_provider": by_provider
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=4000)

四、启动与测试

4.1 启动服务

# 先启动 Redis
docker run -d --name redis -p 6379:6379 redis:alpine

# 启动网关
python gateway.py

4.2 功能测试

# 普通请求
curl -X POST http://localhost:4000/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{"messages": [{"role": "user", "content": "什么是 API 网关?"}]}'

# 再次发送相同语义的请求(测试缓存命中)
curl -X POST http://localhost:4000/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{"messages": [{"role": "user", "content": "API 网关是什么意思?"}]}'

# 查看统计
curl http://localhost:4000/stats

第二次请求预期输出(缓存命中):

{
  "content": "API 网关是位于客户端和后端服务之间的中间层...",
  "cached": true,
  "similarity": 0.9456
}

五、生产化要点

5.1 部署架构

Nginx (SSL/TLS)
  --> LLM Gateway (多实例, Gunicorn + Uvicorn workers)
        --> Redis Cluster (缓存 + 限流 + 会话)
        --> PostgreSQL (用量日志 + 计费)
        --> Provider APIs (OpenAI / Anthropic / Google)

5.2 监控指标

指标 含义 告警阈值
p99_latency 99 分位延迟 > 10s
cache_hit_rate 缓存命中率 < 20%
error_rate 请求错误率 > 5%
daily_cost 日消费 > 预算 80%
provider_health 供应商健康 连续 3 次失败

5.3 安全清单

- [ ] API Key 不硬编码,使用环境变量或密钥管理服务
- [ ] 网关自身添加 API Key 认证(Bearer Token)
- [ ] 启用 HTTPS,禁止明文传输
- [ ] 限制单用户请求频率
- [ ] 日志脱敏(不记录完整 prompt/response)
- [ ] 定期轮换 Provider API Key

常见问题

Q1: LiteLLM 和自建网关如何选择? 初期用 LiteLLM 快速上线;当需要自定义缓存策略、计费逻辑或特殊路由规则时,切换到自建网关。两者可以共存(自建网关内部调用 LiteLLM)。

Q2: 语义缓存的命中率太低? 降低 threshold(如从 0.92 到 0.88),但注意过低会导致语义不同的请求被错误缓存。也可以在缓存 key 中加入 system prompt 的 hash,避免不同场景串缓存。

Q3: 如何处理流式响应的缓存? 流式请求先完整收集响应,然后写入缓存。下次命中缓存时,将缓存内容模拟为流式输出返回给客户端。

Q4: 多个 Provider 的 Token 计数不一致? 不同模型的 tokenizer 不同。统一用 tiktoken 估算成本,或直接使用各 Provider 返回的 usage 字段。成本计算以各自官方价格为准。

Q5: 如何做灰度发布(新模型测试)? 在路由配置中为新模型设置低权重(如 10%),通过 strategy: "random" 实现流量分割。观察延迟和质量指标后,逐步提升权重。


Maurice | maurice_wen@proton.me