实时 AI 系统架构:流式推理与 SSE
原创
灵阙教研团队
S 精选 提升 |
约 8 分钟阅读
更新于 2026-02-28 AI 导读
实时 AI 系统架构:流式推理与 SSE Server-Sent Events 流式 Token 传输、WebSocket vs SSE 选型、背压处理与生产级流式架构 引言 大语言模型的自回归生成特性天然适合流式输出:模型逐个 token 生成,为什么要等全部生成完才返回给用户?流式传输可以将用户感知的首次响应延迟从数秒降低到数百毫秒,极大改善交互体验。ChatGPT...
实时 AI 系统架构:流式推理与 SSE
Server-Sent Events 流式 Token 传输、WebSocket vs SSE 选型、背压处理与生产级流式架构
引言
大语言模型的自回归生成特性天然适合流式输出:模型逐个 token 生成,为什么要等全部生成完才返回给用户?流式传输可以将用户感知的首次响应延迟从数秒降低到数百毫秒,极大改善交互体验。ChatGPT 那种逐字打印的效果,背后正是 SSE(Server-Sent Events)技术。
本文深入解析实时 AI 系统的流式架构,从协议选型到生产部署。
协议选型
WebSocket vs SSE vs Long Polling
| 特性 | SSE | WebSocket | Long Polling |
|---|---|---|---|
| 方向 | 服务器 → 客户端(单向) | 双向 | 服务器 → 客户端 |
| 协议 | HTTP/1.1+ | 独立协议 (ws://) | HTTP |
| 自动重连 | 浏览器内置 | 需手动实现 | 需手动实现 |
| 数据格式 | 文本(UTF-8) | 文本 + 二进制 | 任意 |
| 最大连接数 | HTTP/1.1: 6/域; HTTP/2: 100+ | 理论无限 | 同 HTTP |
| 代理/CDN 兼容 | 优秀 | 部分支持 | 优秀 |
| 浏览器支持 | 除 IE 外全部 | 全部 | 全部 |
| 适用场景 | LLM 流式输出 | 实时聊天/协作 | 旧系统兼容 |
为什么 LLM 场景首选 SSE
LLM 流式输出的特征:
1. 单向传输(服务器 → 客户端)✓ SSE 完美匹配
2. 文本数据为主 ✓ SSE 原生支持
3. 需要自动重连 ✓ SSE 浏览器内置
4. 经过反向代理/CDN ✓ SSE 是标准 HTTP
5. 无需客户端实时发送 ✓ SSE 足够
WebSocket 适用场景(LLM 不需要):
- 高频双向通信(游戏、协作编辑)
- 二进制数据传输(音视频流)
- 低延迟双向消息(<10ms)
SSE 协议详解
数据格式
SSE 消息格式(纯文本,以换行分隔):
event: message ← 事件类型(可选)
id: 12345 ← 消息 ID(用于断点续传)
retry: 3000 ← 重连间隔(毫秒)
data: {"token": "Hello"} ← 数据负载
event: message
data: {"token": " world"}
event: done
data: [DONE] ← 结束标记
规则:
- 每个字段以 "field: value\n" 格式
- 消息之间用空行 "\n\n" 分隔
- data 可以多行(自动连接)
- 以 ":" 开头的行是注释(可用于心跳)
心跳机制
: heartbeat 1 ← 注释行,保持连接活跃
← 空行表示消息结束(但注释不算消息)
: heartbeat 2
event: message
data: {"token": "Hi"}
: heartbeat 3
服务端实现
Next.js SSE 路由
// app/api/chat/route.ts
import { NextRequest } from "next/server";
export async function POST(request: NextRequest) {
const { messages, model } = await request.json();
// Validate request
if (!messages?.length) {
return new Response(JSON.stringify({ error: "No messages" }), {
status: 400,
});
}
// Create SSE stream
const encoder = new TextEncoder();
let heartbeatInterval: NodeJS.Timeout;
const stream = new ReadableStream({
async start(controller) {
// Start heartbeat to keep connection alive
let heartbeatCount = 0;
heartbeatInterval = setInterval(() => {
try {
controller.enqueue(
encoder.encode(`: heartbeat ${++heartbeatCount}\n\n`)
);
} catch {
// Stream may be closed
clearInterval(heartbeatInterval);
}
}, 15_000); // Every 15 seconds
try {
// Call LLM with streaming
const response = await callLLMStream(messages, model);
for await (const chunk of response) {
const data = JSON.stringify({
type: "token",
content: chunk.text,
model: chunk.model,
});
controller.enqueue(encoder.encode(`event: message\ndata: ${data}\n\n`));
}
// Send completion event with usage stats
const doneData = JSON.stringify({
type: "done",
usage: {
inputTokens: response.usage.inputTokens,
outputTokens: response.usage.outputTokens,
},
});
controller.enqueue(encoder.encode(`event: done\ndata: ${doneData}\n\n`));
} catch (error) {
const errorData = JSON.stringify({
type: "error",
message: error instanceof Error ? error.message : "Unknown error",
});
controller.enqueue(encoder.encode(`event: error\ndata: ${errorData}\n\n`));
} finally {
clearInterval(heartbeatInterval);
controller.close();
}
},
cancel() {
clearInterval(heartbeatInterval);
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", // Disable Nginx buffering
},
});
}
Express SSE 路由
// src/routes/chat-sse.ts
import { Router, Request, Response } from "express";
const router = Router();
router.post("/api/chat/stream", async (req: Request, res: Response) => {
const { messages, model } = req.body;
// Set SSE headers
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache, no-transform");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
// Flush headers immediately
res.flushHeaders();
// Heartbeat timer
const heartbeat = setInterval(() => {
res.write(`: heartbeat ${Date.now()}\n\n`);
}, 15_000);
// Handle client disconnect
req.on("close", () => {
clearInterval(heartbeat);
// Cancel LLM generation if possible
abortController?.abort();
});
const abortController = new AbortController();
try {
const stream = await callLLMStream(messages, model, abortController.signal);
for await (const chunk of stream) {
// Check if client still connected
if (req.destroyed) break;
const data = JSON.stringify({ type: "token", content: chunk.text });
res.write(`event: message\ndata: ${data}\n\n`);
}
// Send completion
res.write(`event: done\ndata: ${JSON.stringify({ type: "done" })}\n\n`);
} catch (error) {
if (!req.destroyed) {
const msg = error instanceof Error ? error.message : "Unknown error";
res.write(`event: error\ndata: ${JSON.stringify({ type: "error", message: msg })}\n\n`);
}
} finally {
clearInterval(heartbeat);
res.end();
}
});
export default router;
客户端实现
React 流式消费
// src/hooks/useStreamingChat.ts
import { useState, useCallback, useRef } from "react";
interface StreamingState {
content: string;
isStreaming: boolean;
error: string | null;
usage: { inputTokens: number; outputTokens: number } | null;
}
export function useStreamingChat() {
const [state, setState] = useState<StreamingState>({
content: "",
isStreaming: false,
error: null,
usage: null,
});
const abortRef = useRef<AbortController | null>(null);
const sendMessage = useCallback(async (messages: Message[]) => {
// Cancel any ongoing stream
abortRef.current?.abort();
abortRef.current = new AbortController();
setState({ content: "", isStreaming: true, error: null, usage: null });
try {
const response = await fetch("/api/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages }),
signal: abortRef.current.signal,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
let accumulated = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Parse SSE events from buffer
const lines = buffer.split("\n");
buffer = lines.pop() || ""; // Keep incomplete line in buffer
let eventType = "";
let eventData = "";
for (const line of lines) {
if (line.startsWith("event: ")) {
eventType = line.slice(7);
} else if (line.startsWith("data: ")) {
eventData = line.slice(6);
} else if (line === "" && eventData) {
// End of event, process it
try {
const parsed = JSON.parse(eventData);
if (parsed.type === "token") {
accumulated += parsed.content;
setState(prev => ({ ...prev, content: accumulated }));
} else if (parsed.type === "done") {
setState(prev => ({
...prev,
isStreaming: false,
usage: parsed.usage,
}));
} else if (parsed.type === "error") {
setState(prev => ({
...prev,
isStreaming: false,
error: parsed.message,
}));
}
} catch {
// Skip malformed events
}
eventType = "";
eventData = "";
}
}
}
} catch (error) {
if ((error as Error).name !== "AbortError") {
setState(prev => ({
...prev,
isStreaming: false,
error: (error as Error).message,
}));
}
}
}, []);
const cancel = useCallback(() => {
abortRef.current?.abort();
setState(prev => ({ ...prev, isStreaming: false }));
}, []);
return { ...state, sendMessage, cancel };
}
React 组件
// src/components/ChatMessage.tsx
function StreamingMessage({ content, isStreaming }: {
content: string;
isStreaming: boolean;
}) {
return (
<div className="message assistant">
<div className="message-content">
{content}
{isStreaming && <span className="cursor-blink">|</span>}
</div>
</div>
);
}
function ChatWindow() {
const { content, isStreaming, error, sendMessage, cancel } = useStreamingChat();
return (
<div className="chat-window">
<StreamingMessage content={content} isStreaming={isStreaming} />
{error && <div className="error-banner">{error}</div>}
{isStreaming ? (
<button onClick={cancel}>Stop generating</button>
) : (
<ChatInput onSend={(msg) => sendMessage([{ role: "user", content: msg }])} />
)}
</div>
);
}
背压处理
问题场景
背压问题:
LLM 生成速度 (100 tok/s) >> 网络传输速度 (慢网络)
│
▼
服务端缓冲区积压 → 内存溢出风险
│
▼
解决方案: 背压感知的流控制
背压实现
// src/streaming/backpressure.ts
async function streamWithBackpressure(
llmStream: AsyncIterable<Chunk>,
writer: WritableStreamDefaultWriter,
options: { highWaterMark: number; onPressure?: () => void },
) {
const encoder = new TextEncoder();
let buffered = 0;
for await (const chunk of llmStream) {
const data = encoder.encode(
`event: message\ndata: ${JSON.stringify(chunk)}\n\n`
);
// Check if writer is ready (backpressure signal)
if (writer.desiredSize !== null && writer.desiredSize <= 0) {
options.onPressure?.();
// Wait for the writer to drain
await writer.ready;
}
await writer.write(data);
buffered += data.byteLength;
}
return buffered;
}
代理与 CDN 配置
Nginx 配置
# nginx.conf - SSE proxy configuration
location /api/chat/stream {
proxy_pass http://backend;
# Disable buffering for SSE
proxy_buffering off;
proxy_cache off;
# SSE specific headers
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
# Timeouts (long-lived connections)
proxy_read_timeout 300s;
proxy_send_timeout 300s;
# Disable gzip for SSE (causes buffering)
gzip off;
}
Cloudflare 注意事项
Cloudflare 对 SSE 的限制:
1. 免费版 100 秒超时
→ 解决: 心跳保活 (每 15-30 秒)
2. 响应缓冲
→ 解决: X-Accel-Buffering: no
3. 大响应截断
→ 解决: 控制单次流长度 (Enterprise 无限制)
建议 SSE 路由配置:
- 心跳间隔: 15 秒
- 最大流时长: 90 秒 (留 10 秒余量)
- 超长生成: 分段流 (前段结束后启动新流)
监控指标
| 指标 | 采集点 | 目标值 | 告警阈值 |
|---|---|---|---|
| TTFT (首 Token 延迟) | 第一个 event | <500ms | >2s |
| Token 间隔 | 相邻 token 时间差 | <100ms | >500ms |
| 流完成率 | 正常结束 / 总数 | >99% | <95% |
| 客户端取消率 | cancel / 总数 | <10% | >25% |
| 心跳超时率 | 心跳丢失 | <0.1% | >1% |
| 背压触发率 | 背压事件 / 总流 | <5% | >20% |
总结
- SSE 是 LLM 流式输出的最佳协议:单向、文本、自动重连、CDN 友好,完美匹配 LLM 场景。
- 心跳是必需的:Cloudflare 和 Nginx 都有连接超时,15-30 秒的心跳注释保持连接活跃。
- 客户端取消要传播到服务端:用户点击"停止生成"时,必须通过 AbortController 取消 LLM 调用,避免白白消耗 Token。
- 背压要提前设计:慢网络下不做流控会导致服务端内存溢出。
- TTFT 是最重要的延迟指标:用户感知的是"多快开始有字出现",而不是总生成时间。
Maurice | maurice_wen@proton.me