API 网关与模型路由实战
AI 导读
API 网关与模型路由实战 构建生产级 LLM API 网关:多模型路由、语义缓存、限流、降级与成本监控 Maurice | 灵阙学院 前置准备 Python 3.10+ Redis(用于缓存和限流) 至少一个 LLM API Key(OpenAI / Anthropic / Google) 一、为什么需要 LLM API 网关 直接调用 LLM API 的痛点: 应用层直连的问题: -...
API 网关与模型路由实战
构建生产级 LLM API 网关:多模型路由、语义缓存、限流、降级与成本监控 Maurice | 灵阙学院
前置准备
- Python 3.10+
- Redis(用于缓存和限流)
- 至少一个 LLM API Key(OpenAI / Anthropic / Google)
一、为什么需要 LLM API 网关
直接调用 LLM API 的痛点:
应用层直连的问题:
- 单一供应商锁定,故障时无法切换
- 无法统一管理多个 API Key 的配额和成本
- 重复请求浪费 Token 和金钱
- 无全局限流,容易触发 Rate Limit
- 缺少统一的用量追踪和成本报表
网关架构解决方案:
客户端 --> API 网关 --> [限流] --> [缓存] --> [路由] --> Provider A (OpenAI)
--> Provider B (Anthropic)
--> Provider C (Google)
--> [日志/计费]
二、使用 LiteLLM 快速搭建
LiteLLM 是目前最成熟的开源 LLM 代理网关,支持 100+ 模型。
2.1 安装
pip install litellm[proxy] redis
2.2 配置文件
创建 litellm_config.yaml:
model_list:
# 模型别名 --> 实际模型映射
- model_name: "fast"
litellm_params:
model: "gpt-4o-mini"
api_key: "os.environ/OPENAI_API_KEY"
- model_name: "smart"
litellm_params:
model: "claude-sonnet-4-20250514"
api_key: "os.environ/ANTHROPIC_API_KEY"
- model_name: "smart" # 同名 = 负载均衡 + 自动降级
litellm_params:
model: "gpt-4o"
api_key: "os.environ/OPENAI_API_KEY"
- model_name: "search"
litellm_params:
model: "gemini/gemini-2.5-flash"
api_key: "os.environ/GOOGLE_API_KEY"
litellm_settings:
drop_params: true
set_verbose: false
router_settings:
routing_strategy: "latency-based-routing" # 按延迟选择最快的
num_retries: 3
timeout: 60
retry_after: 5
fallbacks:
- {"smart": ["fast"]} # smart 全部失败时降级到 fast
2.3 启动代理
export OPENAI_API_KEY=sk-xxx
export ANTHROPIC_API_KEY=sk-ant-xxx
export GOOGLE_API_KEY=AIza-xxx
litellm --config litellm_config.yaml --port 4000
2.4 测试调用
# 调用别名 "fast" --> 自动路由到 gpt-4o-mini
curl http://localhost:4000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "fast",
"messages": [{"role": "user", "content": "你好"}]
}'
预期输出(OpenAI 兼容格式):
{
"id": "chatcmpl-xxx",
"model": "gpt-4o-mini",
"choices": [{
"message": {"role": "assistant", "content": "你好!有什么可以帮你的吗?"}
}],
"usage": {"prompt_tokens": 8, "completion_tokens": 12, "total_tokens": 20}
}
三、自建 Python 网关(深度定制)
当 LiteLLM 无法满足业务需求时,自建网关提供最大灵活度。
3.1 项目结构
llm-gateway/
gateway.py # FastAPI 主服务
router.py # 模型路由逻辑
cache.py # 语义缓存
rate_limiter.py # 限流器
cost_tracker.py # 成本追踪
config.py # 配置
requirements.txt
3.2 核心路由器
创建 router.py:
import time
import random
from dataclasses import dataclass, field
from openai import OpenAI
from anthropic import Anthropic
@dataclass
class Provider:
name: str
client: object
model: str
priority: int = 0 # 越小优先级越高
avg_latency: float = 1.0
failure_count: int = 0
last_failure: float = 0
cost_per_1k_input: float = 0.0
cost_per_1k_output: float = 0.0
class ModelRouter:
def __init__(self, providers: list[Provider]):
self.providers = sorted(providers, key=lambda p: p.priority)
def route(self, messages: list[dict],
strategy: str = "priority") -> dict:
"""
路由策略:
- priority: 按优先级顺序尝试
- latency: 选延迟最低的
- cost: 选最便宜的
- random: 随机负载均衡
"""
candidates = self._get_healthy_providers()
if strategy == "latency":
candidates.sort(key=lambda p: p.avg_latency)
elif strategy == "cost":
candidates.sort(key=lambda p: p.cost_per_1k_input)
elif strategy == "random":
random.shuffle(candidates)
for provider in candidates:
try:
start = time.time()
result = self._call_provider(provider, messages)
latency = time.time() - start
# 更新滑动平均延迟
provider.avg_latency = (
provider.avg_latency * 0.7 + latency * 0.3
)
provider.failure_count = 0
return {
"content": result["content"],
"provider": provider.name,
"model": provider.model,
"latency": round(latency, 3),
"usage": result.get("usage", {})
}
except Exception as e:
provider.failure_count += 1
provider.last_failure = time.time()
print(f"[WARN] {provider.name} failed: {e}")
continue
raise RuntimeError("All providers failed")
def _get_healthy_providers(self) -> list[Provider]:
"""过滤掉处于熔断状态的 provider"""
now = time.time()
healthy = []
for p in self.providers:
# 连续失败 3 次,冷却 60 秒
if p.failure_count >= 3:
if now - p.last_failure < 60:
continue
p.failure_count = 0 # 冷却结束,重置
healthy.append(p)
return healthy
def _call_provider(self, provider: Provider,
messages: list[dict]) -> dict:
if isinstance(provider.client, OpenAI):
resp = provider.client.chat.completions.create(
model=provider.model,
messages=messages
)
return {
"content": resp.choices[0].message.content,
"usage": {
"input": resp.usage.prompt_tokens,
"output": resp.usage.completion_tokens
}
}
elif isinstance(provider.client, Anthropic):
resp = provider.client.messages.create(
model=provider.model,
max_tokens=4096,
messages=messages
)
return {
"content": resp.content[0].text,
"usage": {
"input": resp.usage.input_tokens,
"output": resp.usage.output_tokens
}
}
3.3 语义缓存
创建 cache.py:
import json
import hashlib
import numpy as np
import redis
from openai import OpenAI
class SemanticCache:
def __init__(self, redis_url: str = "redis://localhost:6379",
threshold: float = 0.92):
self.redis = redis.from_url(redis_url)
self.embed_client = OpenAI()
self.threshold = threshold
self.cache_prefix = "llm_cache:"
def _get_embedding(self, text: str) -> list[float]:
resp = self.embed_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return resp.data[0].embedding
def _cosine_similarity(self, a: list[float],
b: list[float]) -> float:
a, b = np.array(a), np.array(b)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
def get(self, query: str) -> dict | None:
"""语义匹配查找缓存"""
query_emb = self._get_embedding(query)
# 遍历已有缓存条目
for key in self.redis.scan_iter(f"{self.cache_prefix}*"):
data = json.loads(self.redis.get(key))
similarity = self._cosine_similarity(
query_emb, data["embedding"]
)
if similarity >= self.threshold:
return {
"content": data["content"],
"cached": True,
"similarity": round(similarity, 4)
}
return None
def set(self, query: str, content: str, ttl: int = 3600):
"""写入缓存"""
embedding = self._get_embedding(query)
key = f"{self.cache_prefix}{hashlib.md5(query.encode()).hexdigest()}"
self.redis.setex(key, ttl, json.dumps({
"query": query,
"content": content,
"embedding": embedding
}))
3.4 限流器
创建 rate_limiter.py:
import time
import redis
class SlidingWindowLimiter:
"""滑动窗口限流器"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
def is_allowed(self, key: str, max_requests: int,
window_seconds: int) -> bool:
now = time.time()
window_start = now - window_seconds
pipe = self.redis.pipeline()
# 移除过期记录
pipe.zremrangebyscore(key, 0, window_start)
# 统计当前窗口请求数
pipe.zcard(key)
# 添加当前请求
pipe.zadd(key, {str(now): now})
# 设置 key 过期时间
pipe.expire(key, window_seconds)
results = pipe.execute()
current_count = results[1]
return current_count < max_requests
3.5 组装网关服务
创建 gateway.py:
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from openai import OpenAI
from anthropic import Anthropic
from router import ModelRouter, Provider
from cache import SemanticCache
from rate_limiter import SlidingWindowLimiter
app = FastAPI(title="LLM API Gateway")
# 初始化组件
router = ModelRouter([
Provider(
name="openai",
client=OpenAI(),
model="gpt-4o-mini",
priority=0,
cost_per_1k_input=0.15,
cost_per_1k_output=0.60
),
Provider(
name="anthropic",
client=Anthropic(),
model="claude-sonnet-4-20250514",
priority=1,
cost_per_1k_input=3.0,
cost_per_1k_output=15.0
),
])
cache = SemanticCache()
limiter = SlidingWindowLimiter()
# 用量统计(生产环境用数据库)
usage_log: list[dict] = []
class ChatRequest(BaseModel):
messages: list[dict]
user_id: str = "default"
strategy: str = "priority" # priority / latency / cost
use_cache: bool = True
@app.post("/v1/chat/completions")
async def chat(req: ChatRequest):
# 1. 限流检查
if not limiter.is_allowed(
f"rate:{req.user_id}", max_requests=60, window_seconds=60
):
raise HTTPException(429, "Rate limit exceeded (60/min)")
# 2. 缓存查询
user_query = req.messages[-1]["content"]
if req.use_cache:
cached = cache.get(user_query)
if cached:
return {
"content": cached["content"],
"cached": True,
"similarity": cached["similarity"]
}
# 3. 路由调用
result = router.route(req.messages, strategy=req.strategy)
# 4. 写入缓存
if req.use_cache:
cache.set(user_query, result["content"])
# 5. 记录用量
usage_log.append({
"user_id": req.user_id,
"provider": result["provider"],
"model": result["model"],
"input_tokens": result["usage"].get("input", 0),
"output_tokens": result["usage"].get("output", 0),
"latency": result["latency"]
})
return result
@app.get("/stats")
async def stats():
"""成本和用量统计"""
total_input = sum(r.get("input_tokens", 0) for r in usage_log)
total_output = sum(r.get("output_tokens", 0) for r in usage_log)
by_provider = {}
for r in usage_log:
p = r["provider"]
if p not in by_provider:
by_provider[p] = {"calls": 0, "input": 0, "output": 0}
by_provider[p]["calls"] += 1
by_provider[p]["input"] += r.get("input_tokens", 0)
by_provider[p]["output"] += r.get("output_tokens", 0)
return {
"total_requests": len(usage_log),
"total_tokens": {"input": total_input, "output": total_output},
"by_provider": by_provider
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=4000)
四、启动与测试
4.1 启动服务
# 先启动 Redis
docker run -d --name redis -p 6379:6379 redis:alpine
# 启动网关
python gateway.py
4.2 功能测试
# 普通请求
curl -X POST http://localhost:4000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"messages": [{"role": "user", "content": "什么是 API 网关?"}]}'
# 再次发送相同语义的请求(测试缓存命中)
curl -X POST http://localhost:4000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"messages": [{"role": "user", "content": "API 网关是什么意思?"}]}'
# 查看统计
curl http://localhost:4000/stats
第二次请求预期输出(缓存命中):
{
"content": "API 网关是位于客户端和后端服务之间的中间层...",
"cached": true,
"similarity": 0.9456
}
五、生产化要点
5.1 部署架构
Nginx (SSL/TLS)
--> LLM Gateway (多实例, Gunicorn + Uvicorn workers)
--> Redis Cluster (缓存 + 限流 + 会话)
--> PostgreSQL (用量日志 + 计费)
--> Provider APIs (OpenAI / Anthropic / Google)
5.2 监控指标
| 指标 | 含义 | 告警阈值 |
|---|---|---|
| p99_latency | 99 分位延迟 | > 10s |
| cache_hit_rate | 缓存命中率 | < 20% |
| error_rate | 请求错误率 | > 5% |
| daily_cost | 日消费 | > 预算 80% |
| provider_health | 供应商健康 | 连续 3 次失败 |
5.3 安全清单
- [ ] API Key 不硬编码,使用环境变量或密钥管理服务
- [ ] 网关自身添加 API Key 认证(Bearer Token)
- [ ] 启用 HTTPS,禁止明文传输
- [ ] 限制单用户请求频率
- [ ] 日志脱敏(不记录完整 prompt/response)
- [ ] 定期轮换 Provider API Key
常见问题
Q1: LiteLLM 和自建网关如何选择? 初期用 LiteLLM 快速上线;当需要自定义缓存策略、计费逻辑或特殊路由规则时,切换到自建网关。两者可以共存(自建网关内部调用 LiteLLM)。
Q2: 语义缓存的命中率太低? 降低 threshold(如从 0.92 到 0.88),但注意过低会导致语义不同的请求被错误缓存。也可以在缓存 key 中加入 system prompt 的 hash,避免不同场景串缓存。
Q3: 如何处理流式响应的缓存? 流式请求先完整收集响应,然后写入缓存。下次命中缓存时,将缓存内容模拟为流式输出返回给客户端。
Q4: 多个 Provider 的 Token 计数不一致? 不同模型的 tokenizer 不同。统一用 tiktoken 估算成本,或直接使用各 Provider 返回的 usage 字段。成本计算以各自官方价格为准。
Q5: 如何做灰度发布(新模型测试)?
在路由配置中为新模型设置低权重(如 10%),通过 strategy: "random" 实现流量分割。观察延迟和质量指标后,逐步提升权重。
Maurice | maurice_wen@proton.me