Streaming 架构:SSE、WebSocket、gRPC 在 AI 应用中的选型
原创
灵阙教研团队
S 精选 提升 |
约 8 分钟阅读
更新于 2026-02-28 AI 导读
Streaming 架构:SSE、WebSocket、gRPC 在 AI 应用中的选型 为什么 AI 应用需要 Streaming 大语言模型的推理过程是逐 token 生成的。一次完整回复可能需要 5-30 秒。如果等待全部生成完毕再返回,用户体验极差。Streaming 技术让用户在第一个 token 生成后就开始看到内容,将感知延迟从数十秒降低到数百毫秒。 除了文本生成,AI...
Streaming 架构:SSE、WebSocket、gRPC 在 AI 应用中的选型
为什么 AI 应用需要 Streaming
大语言模型的推理过程是逐 token 生成的。一次完整回复可能需要 5-30 秒。如果等待全部生成完毕再返回,用户体验极差。Streaming 技术让用户在第一个 token 生成后就开始看到内容,将感知延迟从数十秒降低到数百毫秒。
除了文本生成,AI 应用中还有多种 streaming 场景:
- LLM 对话(逐 token 输出)
- Agent 工具调用进度通知
- 模型推理进度(图像生成、视频处理)
- 实时语音转写(STT streaming)
- 多步骤 pipeline 状态推送
三种 Streaming 技术对比
| 维度 | SSE | WebSocket | gRPC Streaming |
|---|---|---|---|
| 协议 | HTTP/1.1 | HTTP -> WS 升级 | HTTP/2 |
| 方向 | 单向(服务端推送) | 双向 | 单向/双向/客户端流/服务端流 |
| 数据格式 | 文本(UTF-8) | 文本或二进制 | Protocol Buffers |
| 自动重连 | 浏览器原生支持 | 需手动实现 | 需手动实现 |
| 代理/CDN兼容 | 优秀 | 中等 | 较差 |
| 浏览器支持 | 原生 EventSource | 原生 WebSocket | 需 grpc-web |
| 连接开销 | 低(复用 HTTP) | 中(长连接) | 低(HTTP/2 多路复用) |
| 适用场景 | 服务器单向推送 | 实时双向通信 | 微服务间通信 |
SSE(Server-Sent Events)
架构特点
SSE 是最简单的 streaming 方案,基于普通的 HTTP 响应,使用 text/event-stream 内容类型。
优势:
- 浏览器原生支持
EventSourceAPI - 基于 HTTP,对防火墙、代理、CDN 友好
- 自动重连机制(含 Last-Event-ID)
- 实现简单,服务端只需要持续写入响应流
局限:
- 单向通信(服务端 -> 客户端)
- 仅支持 UTF-8 文本
- 部分代理可能缓冲响应(需要禁用)
- 浏览器并发连接限制(HTTP/1.1 同域 6 个)
服务端实现(Node.js)
// Next.js Route Handler
export async function POST(request: Request) {
const { messages } = await request.json();
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
// 连接 OpenAI
const response = await openai.chat.completions.create({
model: "gpt-4o",
messages,
stream: true,
});
// 心跳定时器(防止 Cloudflare 超时)
let heartbeatCount = 0;
const heartbeat = setInterval(() => {
heartbeatCount++;
controller.enqueue(
encoder.encode(`: heartbeat ${heartbeatCount}\n\n`)
);
}, 25000);
try {
for await (const chunk of response) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
// SSE 格式:event: type\ndata: payload\n\n
const sseMessage = `event: token\ndata: ${JSON.stringify({ content })}\n\n`;
controller.enqueue(encoder.encode(sseMessage));
}
// 工具调用
const toolCalls = chunk.choices[0]?.delta?.tool_calls;
if (toolCalls) {
const sseMessage = `event: tool_call\ndata: ${JSON.stringify({ tool_calls: toolCalls })}\n\n`;
controller.enqueue(encoder.encode(sseMessage));
}
}
// 完成事件
controller.enqueue(
encoder.encode(`event: done\ndata: {"status": "complete"}\n\n`)
);
} finally {
clearInterval(heartbeat);
controller.close();
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", // 禁用 Nginx 缓冲
},
});
}
服务端实现(Python FastAPI)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import json
import asyncio
app = FastAPI()
client = AsyncOpenAI()
async def generate_sse_stream(messages: list):
"""SSE 事件生成器"""
# 心跳任务
async def heartbeat():
count = 0
while True:
await asyncio.sleep(25)
count += 1
yield f": heartbeat {count}\n\n"
response = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
stream=True,
)
async for chunk in response:
content = chunk.choices[0].delta.content
if content:
data = json.dumps({"content": content}, ensure_ascii=False)
yield f"event: token\ndata: {data}\n\n"
# 检查工具调用
if chunk.choices[0].delta.tool_calls:
data = json.dumps({"tool_calls": "..."})
yield f"event: tool_call\ndata: {data}\n\n"
yield f"event: done\ndata: {json.dumps({'status': 'complete'})}\n\n"
@app.post("/api/chat")
async def chat(request: ChatRequest):
return StreamingResponse(
generate_sse_stream(request.messages),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
客户端实现
// 方式一:原生 EventSource(仅支持 GET)
const source = new EventSource("/api/stream?query=hello");
source.addEventListener("token", (event) => {
const { content } = JSON.parse(event.data);
appendToUI(content);
});
source.addEventListener("done", () => {
source.close();
});
source.onerror = (error) => {
console.error("SSE error, will auto-reconnect:", error);
};
// 方式二:fetch + ReadableStream(支持 POST)
async function streamChat(messages: Message[]) {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 解析 SSE 事件
const events = buffer.split("\n\n");
buffer = events.pop() || ""; // 保留不完整的事件
for (const event of events) {
if (event.startsWith(":")) continue; // 心跳注释
const lines = event.split("\n");
let eventType = "message";
let data = "";
for (const line of lines) {
if (line.startsWith("event: ")) {
eventType = line.slice(7);
} else if (line.startsWith("data: ")) {
data = line.slice(6);
}
}
if (data) {
handleSSEEvent(eventType, JSON.parse(data));
}
}
}
}
WebSocket
架构特点
WebSocket 提供全双工通信,适合需要客户端实时发送数据的场景。
适用 AI 场景:
- 实时语音对话(同时收发音频流)
- 协作编辑 + AI 辅助
- 多轮交互式 Agent(用户随时中断/补充)
实现示例
# FastAPI WebSocket
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active: dict[str, WebSocket] = {}
async def connect(self, ws: WebSocket, user_id: str):
await ws.accept()
self.active[user_id] = ws
def disconnect(self, user_id: str):
self.active.pop(user_id, None)
async def send_json(self, user_id: str, data: dict):
ws = self.active.get(user_id)
if ws:
await ws.send_json(data)
manager = ConnectionManager()
@app.websocket("/ws/chat/{user_id}")
async def chat_websocket(ws: WebSocket, user_id: str):
await manager.connect(ws, user_id)
try:
while True:
data = await ws.receive_json()
if data["type"] == "message":
# 流式生成回复
response = await openai_client.chat.completions.create(
model="gpt-4o",
messages=data["messages"],
stream=True,
)
async for chunk in response:
content = chunk.choices[0].delta.content
if content:
await ws.send_json({
"type": "token",
"content": content,
})
await ws.send_json({"type": "done"})
elif data["type"] == "cancel":
# 用户中断生成(WebSocket 双向优势)
# 取消正在进行的推理
pass
elif data["type"] == "ping":
await ws.send_json({"type": "pong"})
except WebSocketDisconnect:
manager.disconnect(user_id)
客户端 WebSocket
class AIWebSocket {
private ws: WebSocket | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
connect(userId: string) {
this.ws = new WebSocket(`wss://api.example.com/ws/chat/${userId}`);
this.ws.onopen = () => {
this.reconnectAttempts = 0;
this.startHeartbeat();
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case "token":
this.onToken(data.content);
break;
case "done":
this.onComplete();
break;
case "error":
this.onError(data.message);
break;
case "pong":
// 心跳响应
break;
}
};
this.ws.onclose = (event) => {
if (!event.wasClean && this.reconnectAttempts < this.maxReconnectAttempts) {
const delay = Math.min(1000 * 2 ** this.reconnectAttempts, 30000);
this.reconnectAttempts++;
setTimeout(() => this.connect(userId), delay);
}
};
}
send(messages: Message[]) {
this.ws?.send(JSON.stringify({ type: "message", messages }));
}
cancel() {
this.ws?.send(JSON.stringify({ type: "cancel" }));
}
private startHeartbeat() {
setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: "ping" }));
}
}, 30000);
}
}
gRPC Streaming
架构特点
gRPC 基于 HTTP/2,使用 Protocol Buffers 序列化,适合微服务间的高性能通信。
四种 streaming 模式:
- Unary(普通请求/响应)
- Server streaming(服务端流)
- Client streaming(客户端流)
- Bidirectional streaming(双向流)
Proto 定义
syntax = "proto3";
package inference;
service InferenceService {
// 单次推理
rpc Predict(PredictRequest) returns (PredictResponse);
// 流式推理(Server streaming)
rpc StreamPredict(PredictRequest) returns (stream PredictChunk);
// 语音对话(Bidirectional streaming)
rpc VoiceChat(stream AudioChunk) returns (stream ChatResponse);
}
message PredictRequest {
string model = 1;
repeated Message messages = 2;
float temperature = 3;
int32 max_tokens = 4;
}
message PredictChunk {
string content = 1;
bool is_final = 2;
ToolCall tool_call = 3;
}
message Message {
string role = 1;
string content = 2;
}
message ToolCall {
string name = 1;
string arguments = 2;
}
message AudioChunk {
bytes audio_data = 1;
string format = 2; // pcm, opus
int32 sample_rate = 3;
}
message ChatResponse {
oneof response {
string text = 1;
bytes audio = 2;
ToolCall tool_call = 3;
}
}
服务端实现(Python)
import grpc
from concurrent import futures
import inference_pb2
import inference_pb2_grpc
class InferenceServicer(inference_pb2_grpc.InferenceServiceServicer):
async def StreamPredict(self, request, context):
"""服务端流式推理"""
response = await openai_client.chat.completions.create(
model=request.model,
messages=[
{"role": m.role, "content": m.content}
for m in request.messages
],
stream=True,
)
async for chunk in response:
content = chunk.choices[0].delta.content or ""
is_final = chunk.choices[0].finish_reason is not None
yield inference_pb2.PredictChunk(
content=content,
is_final=is_final,
)
async def VoiceChat(self, request_iterator, context):
"""双向流式语音对话"""
async for audio_chunk in request_iterator:
# 1. STT:音频 -> 文本
text = await stt_model.transcribe(audio_chunk.audio_data)
if text:
# 2. LLM:生成回复
reply = await generate_reply(text)
# 3. TTS:文本 -> 音频
audio = await tts_model.synthesize(reply)
yield inference_pb2.ChatResponse(text=reply)
yield inference_pb2.ChatResponse(audio=audio)
async def serve():
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
inference_pb2_grpc.add_InferenceServiceServicer_to_server(
InferenceServicer(), server
)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()
选型决策
按场景选择
| 场景 | 推荐 | 理由 |
|---|---|---|
| LLM 对话(Web) | SSE | 单向推送足够,兼容性最好,实现最简 |
| Agent 进度通知 | SSE | 服务端单向推送,带事件类型区分 |
| 实时语音对话 | WebSocket | 需要双向音频流 |
| 协作 + AI 辅助 | WebSocket | 多用户实时协作需要双向通信 |
| 微服务推理 | gRPC | 高性能、强类型、多语言支持 |
| 多步骤 Agent Pipeline | SSE + 事件类型 | 不同事件类型区分进度/结果/工具调用 |
混合架构
实际生产中,往往需要混合使用:
浏览器用户
|
├── SSE: LLM 流式输出
├── WebSocket: 实时协作/语音
└── REST: 非流式 API
API Gateway
|
├── gRPC: 内部推理服务调用
├── gRPC Streaming: 模型间 pipeline
└── HTTP/2: 模型下载/上传
生产环境注意事项
- Cloudflare/CDN 代理超时:免费套餐 100 秒超时,必须发心跳保活
- Nginx 缓冲:必须设置
X-Accel-Buffering: no和proxy_buffering off - 负载均衡粘性:WebSocket 需要会话粘性(sticky session)
- 连接上限:单域名 HTTP/1.1 最多 6 个 SSE 连接;用 HTTP/2 可解决
- 错误恢复:SSE 原生自动重连;WebSocket 和 gRPC 需自行实现指数退避
Maurice | maurice_wen@proton.me