实时 AI 系统架构:流式推理与 SSE

Server-Sent Events 流式 Token 传输、WebSocket vs SSE 选型、背压处理与生产级流式架构

引言

大语言模型的自回归生成特性天然适合流式输出:模型逐个 token 生成,为什么要等全部生成完才返回给用户?流式传输可以将用户感知的首次响应延迟从数秒降低到数百毫秒,极大改善交互体验。ChatGPT 那种逐字打印的效果,背后正是 SSE(Server-Sent Events)技术。

本文深入解析实时 AI 系统的流式架构,从协议选型到生产部署。

协议选型

WebSocket vs SSE vs Long Polling

特性 SSE WebSocket Long Polling
方向 服务器 → 客户端(单向) 双向 服务器 → 客户端
协议 HTTP/1.1+ 独立协议 (ws://) HTTP
自动重连 浏览器内置 需手动实现 需手动实现
数据格式 文本(UTF-8) 文本 + 二进制 任意
最大连接数 HTTP/1.1: 6/域; HTTP/2: 100+ 理论无限 同 HTTP
代理/CDN 兼容 优秀 部分支持 优秀
浏览器支持 除 IE 外全部 全部 全部
适用场景 LLM 流式输出 实时聊天/协作 旧系统兼容

为什么 LLM 场景首选 SSE

LLM 流式输出的特征:
  1. 单向传输(服务器 → 客户端)✓ SSE 完美匹配
  2. 文本数据为主              ✓ SSE 原生支持
  3. 需要自动重连              ✓ SSE 浏览器内置
  4. 经过反向代理/CDN          ✓ SSE 是标准 HTTP
  5. 无需客户端实时发送         ✓ SSE 足够

WebSocket 适用场景(LLM 不需要):
  - 高频双向通信(游戏、协作编辑)
  - 二进制数据传输(音视频流)
  - 低延迟双向消息(<10ms)

SSE 协议详解

数据格式

SSE 消息格式(纯文本,以换行分隔):

event: message          ← 事件类型(可选)
id: 12345              ← 消息 ID(用于断点续传)
retry: 3000            ← 重连间隔(毫秒)
data: {"token": "Hello"} ← 数据负载

event: message
data: {"token": " world"}

event: done
data: [DONE]           ← 结束标记

规则:
  - 每个字段以 "field: value\n" 格式
  - 消息之间用空行 "\n\n" 分隔
  - data 可以多行(自动连接)
  - 以 ":" 开头的行是注释(可用于心跳)

心跳机制

: heartbeat 1          ← 注释行,保持连接活跃
                       ← 空行表示消息结束(但注释不算消息)

: heartbeat 2

event: message
data: {"token": "Hi"}

: heartbeat 3

服务端实现

Next.js SSE 路由

// app/api/chat/route.ts
import { NextRequest } from "next/server";

export async function POST(request: NextRequest) {
  const { messages, model } = await request.json();

  // Validate request
  if (!messages?.length) {
    return new Response(JSON.stringify({ error: "No messages" }), {
      status: 400,
    });
  }

  // Create SSE stream
  const encoder = new TextEncoder();
  let heartbeatInterval: NodeJS.Timeout;

  const stream = new ReadableStream({
    async start(controller) {
      // Start heartbeat to keep connection alive
      let heartbeatCount = 0;
      heartbeatInterval = setInterval(() => {
        try {
          controller.enqueue(
            encoder.encode(`: heartbeat ${++heartbeatCount}\n\n`)
          );
        } catch {
          // Stream may be closed
          clearInterval(heartbeatInterval);
        }
      }, 15_000); // Every 15 seconds

      try {
        // Call LLM with streaming
        const response = await callLLMStream(messages, model);

        for await (const chunk of response) {
          const data = JSON.stringify({
            type: "token",
            content: chunk.text,
            model: chunk.model,
          });
          controller.enqueue(encoder.encode(`event: message\ndata: ${data}\n\n`));
        }

        // Send completion event with usage stats
        const doneData = JSON.stringify({
          type: "done",
          usage: {
            inputTokens: response.usage.inputTokens,
            outputTokens: response.usage.outputTokens,
          },
        });
        controller.enqueue(encoder.encode(`event: done\ndata: ${doneData}\n\n`));

      } catch (error) {
        const errorData = JSON.stringify({
          type: "error",
          message: error instanceof Error ? error.message : "Unknown error",
        });
        controller.enqueue(encoder.encode(`event: error\ndata: ${errorData}\n\n`));
      } finally {
        clearInterval(heartbeatInterval);
        controller.close();
      }
    },

    cancel() {
      clearInterval(heartbeatInterval);
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache, no-transform",
      "Connection": "keep-alive",
      "X-Accel-Buffering": "no",     // Disable Nginx buffering
    },
  });
}

Express SSE 路由

// src/routes/chat-sse.ts
import { Router, Request, Response } from "express";

const router = Router();

router.post("/api/chat/stream", async (req: Request, res: Response) => {
  const { messages, model } = req.body;

  // Set SSE headers
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache, no-transform");
  res.setHeader("Connection", "keep-alive");
  res.setHeader("X-Accel-Buffering", "no");

  // Flush headers immediately
  res.flushHeaders();

  // Heartbeat timer
  const heartbeat = setInterval(() => {
    res.write(`: heartbeat ${Date.now()}\n\n`);
  }, 15_000);

  // Handle client disconnect
  req.on("close", () => {
    clearInterval(heartbeat);
    // Cancel LLM generation if possible
    abortController?.abort();
  });

  const abortController = new AbortController();

  try {
    const stream = await callLLMStream(messages, model, abortController.signal);

    for await (const chunk of stream) {
      // Check if client still connected
      if (req.destroyed) break;

      const data = JSON.stringify({ type: "token", content: chunk.text });
      res.write(`event: message\ndata: ${data}\n\n`);
    }

    // Send completion
    res.write(`event: done\ndata: ${JSON.stringify({ type: "done" })}\n\n`);
  } catch (error) {
    if (!req.destroyed) {
      const msg = error instanceof Error ? error.message : "Unknown error";
      res.write(`event: error\ndata: ${JSON.stringify({ type: "error", message: msg })}\n\n`);
    }
  } finally {
    clearInterval(heartbeat);
    res.end();
  }
});

export default router;

客户端实现

React 流式消费

// src/hooks/useStreamingChat.ts
import { useState, useCallback, useRef } from "react";

interface StreamingState {
  content: string;
  isStreaming: boolean;
  error: string | null;
  usage: { inputTokens: number; outputTokens: number } | null;
}

export function useStreamingChat() {
  const [state, setState] = useState<StreamingState>({
    content: "",
    isStreaming: false,
    error: null,
    usage: null,
  });
  const abortRef = useRef<AbortController | null>(null);

  const sendMessage = useCallback(async (messages: Message[]) => {
    // Cancel any ongoing stream
    abortRef.current?.abort();
    abortRef.current = new AbortController();

    setState({ content: "", isStreaming: true, error: null, usage: null });

    try {
      const response = await fetch("/api/chat/stream", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ messages }),
        signal: abortRef.current.signal,
      });

      if (!response.ok) {
        throw new Error(`HTTP ${response.status}: ${response.statusText}`);
      }

      const reader = response.body!.getReader();
      const decoder = new TextDecoder();
      let buffer = "";
      let accumulated = "";

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });

        // Parse SSE events from buffer
        const lines = buffer.split("\n");
        buffer = lines.pop() || ""; // Keep incomplete line in buffer

        let eventType = "";
        let eventData = "";

        for (const line of lines) {
          if (line.startsWith("event: ")) {
            eventType = line.slice(7);
          } else if (line.startsWith("data: ")) {
            eventData = line.slice(6);
          } else if (line === "" && eventData) {
            // End of event, process it
            try {
              const parsed = JSON.parse(eventData);

              if (parsed.type === "token") {
                accumulated += parsed.content;
                setState(prev => ({ ...prev, content: accumulated }));
              } else if (parsed.type === "done") {
                setState(prev => ({
                  ...prev,
                  isStreaming: false,
                  usage: parsed.usage,
                }));
              } else if (parsed.type === "error") {
                setState(prev => ({
                  ...prev,
                  isStreaming: false,
                  error: parsed.message,
                }));
              }
            } catch {
              // Skip malformed events
            }
            eventType = "";
            eventData = "";
          }
        }
      }
    } catch (error) {
      if ((error as Error).name !== "AbortError") {
        setState(prev => ({
          ...prev,
          isStreaming: false,
          error: (error as Error).message,
        }));
      }
    }
  }, []);

  const cancel = useCallback(() => {
    abortRef.current?.abort();
    setState(prev => ({ ...prev, isStreaming: false }));
  }, []);

  return { ...state, sendMessage, cancel };
}

React 组件

// src/components/ChatMessage.tsx
function StreamingMessage({ content, isStreaming }: {
  content: string;
  isStreaming: boolean;
}) {
  return (
    <div className="message assistant">
      <div className="message-content">
        {content}
        {isStreaming && <span className="cursor-blink">|</span>}
      </div>
    </div>
  );
}

function ChatWindow() {
  const { content, isStreaming, error, sendMessage, cancel } = useStreamingChat();

  return (
    <div className="chat-window">
      <StreamingMessage content={content} isStreaming={isStreaming} />

      {error && <div className="error-banner">{error}</div>}

      {isStreaming ? (
        <button onClick={cancel}>Stop generating</button>
      ) : (
        <ChatInput onSend={(msg) => sendMessage([{ role: "user", content: msg }])} />
      )}
    </div>
  );
}

背压处理

问题场景

背压问题:

LLM 生成速度 (100 tok/s) >> 网络传输速度 (慢网络)
  │
  ▼
服务端缓冲区积压 → 内存溢出风险
  │
  ▼
解决方案: 背压感知的流控制

背压实现

// src/streaming/backpressure.ts
async function streamWithBackpressure(
  llmStream: AsyncIterable<Chunk>,
  writer: WritableStreamDefaultWriter,
  options: { highWaterMark: number; onPressure?: () => void },
) {
  const encoder = new TextEncoder();
  let buffered = 0;

  for await (const chunk of llmStream) {
    const data = encoder.encode(
      `event: message\ndata: ${JSON.stringify(chunk)}\n\n`
    );

    // Check if writer is ready (backpressure signal)
    if (writer.desiredSize !== null && writer.desiredSize <= 0) {
      options.onPressure?.();
      // Wait for the writer to drain
      await writer.ready;
    }

    await writer.write(data);
    buffered += data.byteLength;
  }

  return buffered;
}

代理与 CDN 配置

Nginx 配置

# nginx.conf - SSE proxy configuration
location /api/chat/stream {
    proxy_pass http://backend;

    # Disable buffering for SSE
    proxy_buffering off;
    proxy_cache off;

    # SSE specific headers
    proxy_set_header Connection '';
    proxy_http_version 1.1;
    chunked_transfer_encoding off;

    # Timeouts (long-lived connections)
    proxy_read_timeout 300s;
    proxy_send_timeout 300s;

    # Disable gzip for SSE (causes buffering)
    gzip off;
}

Cloudflare 注意事项

Cloudflare 对 SSE 的限制:

1. 免费版 100 秒超时
   → 解决: 心跳保活 (每 15-30 秒)

2. 响应缓冲
   → 解决: X-Accel-Buffering: no

3. 大响应截断
   → 解决: 控制单次流长度 (Enterprise 无限制)

建议 SSE 路由配置:
  - 心跳间隔: 15 秒
  - 最大流时长: 90 秒 (留 10 秒余量)
  - 超长生成: 分段流 (前段结束后启动新流)

监控指标

指标 采集点 目标值 告警阈值
TTFT (首 Token 延迟) 第一个 event <500ms >2s
Token 间隔 相邻 token 时间差 <100ms >500ms
流完成率 正常结束 / 总数 >99% <95%
客户端取消率 cancel / 总数 <10% >25%
心跳超时率 心跳丢失 <0.1% >1%
背压触发率 背压事件 / 总流 <5% >20%

总结

  1. SSE 是 LLM 流式输出的最佳协议:单向、文本、自动重连、CDN 友好,完美匹配 LLM 场景。
  2. 心跳是必需的:Cloudflare 和 Nginx 都有连接超时,15-30 秒的心跳注释保持连接活跃。
  3. 客户端取消要传播到服务端:用户点击"停止生成"时,必须通过 AbortController 取消 LLM 调用,避免白白消耗 Token。
  4. 背压要提前设计:慢网络下不做流控会导致服务端内存溢出。
  5. TTFT 是最重要的延迟指标:用户感知的是"多快开始有字出现",而不是总生成时间。

Maurice | maurice_wen@proton.me