Streaming 架构:SSE、WebSocket、gRPC 在 AI 应用中的选型

为什么 AI 应用需要 Streaming

大语言模型的推理过程是逐 token 生成的。一次完整回复可能需要 5-30 秒。如果等待全部生成完毕再返回,用户体验极差。Streaming 技术让用户在第一个 token 生成后就开始看到内容,将感知延迟从数十秒降低到数百毫秒。

除了文本生成,AI 应用中还有多种 streaming 场景:

  • LLM 对话(逐 token 输出)
  • Agent 工具调用进度通知
  • 模型推理进度(图像生成、视频处理)
  • 实时语音转写(STT streaming)
  • 多步骤 pipeline 状态推送

三种 Streaming 技术对比

维度 SSE WebSocket gRPC Streaming
协议 HTTP/1.1 HTTP -> WS 升级 HTTP/2
方向 单向(服务端推送) 双向 单向/双向/客户端流/服务端流
数据格式 文本(UTF-8) 文本或二进制 Protocol Buffers
自动重连 浏览器原生支持 需手动实现 需手动实现
代理/CDN兼容 优秀 中等 较差
浏览器支持 原生 EventSource 原生 WebSocket 需 grpc-web
连接开销 低(复用 HTTP) 中(长连接) 低(HTTP/2 多路复用)
适用场景 服务器单向推送 实时双向通信 微服务间通信

SSE(Server-Sent Events)

架构特点

SSE 是最简单的 streaming 方案,基于普通的 HTTP 响应,使用 text/event-stream 内容类型。

优势:

  • 浏览器原生支持 EventSource API
  • 基于 HTTP,对防火墙、代理、CDN 友好
  • 自动重连机制(含 Last-Event-ID)
  • 实现简单,服务端只需要持续写入响应流

局限:

  • 单向通信(服务端 -> 客户端)
  • 仅支持 UTF-8 文本
  • 部分代理可能缓冲响应(需要禁用)
  • 浏览器并发连接限制(HTTP/1.1 同域 6 个)

服务端实现(Node.js)

// Next.js Route Handler
export async function POST(request: Request) {
  const { messages } = await request.json();

  const encoder = new TextEncoder();
  const stream = new ReadableStream({
    async start(controller) {
      // 连接 OpenAI
      const response = await openai.chat.completions.create({
        model: "gpt-4o",
        messages,
        stream: true,
      });

      // 心跳定时器(防止 Cloudflare 超时)
      let heartbeatCount = 0;
      const heartbeat = setInterval(() => {
        heartbeatCount++;
        controller.enqueue(
          encoder.encode(`: heartbeat ${heartbeatCount}\n\n`)
        );
      }, 25000);

      try {
        for await (const chunk of response) {
          const content = chunk.choices[0]?.delta?.content;
          if (content) {
            // SSE 格式:event: type\ndata: payload\n\n
            const sseMessage = `event: token\ndata: ${JSON.stringify({ content })}\n\n`;
            controller.enqueue(encoder.encode(sseMessage));
          }

          // 工具调用
          const toolCalls = chunk.choices[0]?.delta?.tool_calls;
          if (toolCalls) {
            const sseMessage = `event: tool_call\ndata: ${JSON.stringify({ tool_calls: toolCalls })}\n\n`;
            controller.enqueue(encoder.encode(sseMessage));
          }
        }

        // 完成事件
        controller.enqueue(
          encoder.encode(`event: done\ndata: {"status": "complete"}\n\n`)
        );
      } finally {
        clearInterval(heartbeat);
        controller.close();
      }
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache, no-transform",
      "Connection": "keep-alive",
      "X-Accel-Buffering": "no",  // 禁用 Nginx 缓冲
    },
  });
}

服务端实现(Python FastAPI)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import json
import asyncio

app = FastAPI()
client = AsyncOpenAI()

async def generate_sse_stream(messages: list):
    """SSE 事件生成器"""

    # 心跳任务
    async def heartbeat():
        count = 0
        while True:
            await asyncio.sleep(25)
            count += 1
            yield f": heartbeat {count}\n\n"

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        stream=True,
    )

    async for chunk in response:
        content = chunk.choices[0].delta.content
        if content:
            data = json.dumps({"content": content}, ensure_ascii=False)
            yield f"event: token\ndata: {data}\n\n"

        # 检查工具调用
        if chunk.choices[0].delta.tool_calls:
            data = json.dumps({"tool_calls": "..."})
            yield f"event: tool_call\ndata: {data}\n\n"

    yield f"event: done\ndata: {json.dumps({'status': 'complete'})}\n\n"

@app.post("/api/chat")
async def chat(request: ChatRequest):
    return StreamingResponse(
        generate_sse_stream(request.messages),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        },
    )

客户端实现

// 方式一:原生 EventSource(仅支持 GET)
const source = new EventSource("/api/stream?query=hello");

source.addEventListener("token", (event) => {
  const { content } = JSON.parse(event.data);
  appendToUI(content);
});

source.addEventListener("done", () => {
  source.close();
});

source.onerror = (error) => {
  console.error("SSE error, will auto-reconnect:", error);
};

// 方式二:fetch + ReadableStream(支持 POST)
async function streamChat(messages: Message[]) {
  const response = await fetch("/api/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ messages }),
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });

    // 解析 SSE 事件
    const events = buffer.split("\n\n");
    buffer = events.pop() || "";  // 保留不完整的事件

    for (const event of events) {
      if (event.startsWith(":")) continue;  // 心跳注释

      const lines = event.split("\n");
      let eventType = "message";
      let data = "";

      for (const line of lines) {
        if (line.startsWith("event: ")) {
          eventType = line.slice(7);
        } else if (line.startsWith("data: ")) {
          data = line.slice(6);
        }
      }

      if (data) {
        handleSSEEvent(eventType, JSON.parse(data));
      }
    }
  }
}

WebSocket

架构特点

WebSocket 提供全双工通信,适合需要客户端实时发送数据的场景。

适用 AI 场景:

  • 实时语音对话(同时收发音频流)
  • 协作编辑 + AI 辅助
  • 多轮交互式 Agent(用户随时中断/补充)

实现示例

# FastAPI WebSocket
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active: dict[str, WebSocket] = {}

    async def connect(self, ws: WebSocket, user_id: str):
        await ws.accept()
        self.active[user_id] = ws

    def disconnect(self, user_id: str):
        self.active.pop(user_id, None)

    async def send_json(self, user_id: str, data: dict):
        ws = self.active.get(user_id)
        if ws:
            await ws.send_json(data)

manager = ConnectionManager()

@app.websocket("/ws/chat/{user_id}")
async def chat_websocket(ws: WebSocket, user_id: str):
    await manager.connect(ws, user_id)

    try:
        while True:
            data = await ws.receive_json()

            if data["type"] == "message":
                # 流式生成回复
                response = await openai_client.chat.completions.create(
                    model="gpt-4o",
                    messages=data["messages"],
                    stream=True,
                )

                async for chunk in response:
                    content = chunk.choices[0].delta.content
                    if content:
                        await ws.send_json({
                            "type": "token",
                            "content": content,
                        })

                await ws.send_json({"type": "done"})

            elif data["type"] == "cancel":
                # 用户中断生成(WebSocket 双向优势)
                # 取消正在进行的推理
                pass

            elif data["type"] == "ping":
                await ws.send_json({"type": "pong"})

    except WebSocketDisconnect:
        manager.disconnect(user_id)

客户端 WebSocket

class AIWebSocket {
  private ws: WebSocket | null = null;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;

  connect(userId: string) {
    this.ws = new WebSocket(`wss://api.example.com/ws/chat/${userId}`);

    this.ws.onopen = () => {
      this.reconnectAttempts = 0;
      this.startHeartbeat();
    };

    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);

      switch (data.type) {
        case "token":
          this.onToken(data.content);
          break;
        case "done":
          this.onComplete();
          break;
        case "error":
          this.onError(data.message);
          break;
        case "pong":
          // 心跳响应
          break;
      }
    };

    this.ws.onclose = (event) => {
      if (!event.wasClean && this.reconnectAttempts < this.maxReconnectAttempts) {
        const delay = Math.min(1000 * 2 ** this.reconnectAttempts, 30000);
        this.reconnectAttempts++;
        setTimeout(() => this.connect(userId), delay);
      }
    };
  }

  send(messages: Message[]) {
    this.ws?.send(JSON.stringify({ type: "message", messages }));
  }

  cancel() {
    this.ws?.send(JSON.stringify({ type: "cancel" }));
  }

  private startHeartbeat() {
    setInterval(() => {
      if (this.ws?.readyState === WebSocket.OPEN) {
        this.ws.send(JSON.stringify({ type: "ping" }));
      }
    }, 30000);
  }
}

gRPC Streaming

架构特点

gRPC 基于 HTTP/2,使用 Protocol Buffers 序列化,适合微服务间的高性能通信。

四种 streaming 模式:

  • Unary(普通请求/响应)
  • Server streaming(服务端流)
  • Client streaming(客户端流)
  • Bidirectional streaming(双向流)

Proto 定义

syntax = "proto3";

package inference;

service InferenceService {
  // 单次推理
  rpc Predict(PredictRequest) returns (PredictResponse);

  // 流式推理(Server streaming)
  rpc StreamPredict(PredictRequest) returns (stream PredictChunk);

  // 语音对话(Bidirectional streaming)
  rpc VoiceChat(stream AudioChunk) returns (stream ChatResponse);
}

message PredictRequest {
  string model = 1;
  repeated Message messages = 2;
  float temperature = 3;
  int32 max_tokens = 4;
}

message PredictChunk {
  string content = 1;
  bool is_final = 2;
  ToolCall tool_call = 3;
}

message Message {
  string role = 1;
  string content = 2;
}

message ToolCall {
  string name = 1;
  string arguments = 2;
}

message AudioChunk {
  bytes audio_data = 1;
  string format = 2;  // pcm, opus
  int32 sample_rate = 3;
}

message ChatResponse {
  oneof response {
    string text = 1;
    bytes audio = 2;
    ToolCall tool_call = 3;
  }
}

服务端实现(Python)

import grpc
from concurrent import futures
import inference_pb2
import inference_pb2_grpc

class InferenceServicer(inference_pb2_grpc.InferenceServiceServicer):

    async def StreamPredict(self, request, context):
        """服务端流式推理"""
        response = await openai_client.chat.completions.create(
            model=request.model,
            messages=[
                {"role": m.role, "content": m.content}
                for m in request.messages
            ],
            stream=True,
        )

        async for chunk in response:
            content = chunk.choices[0].delta.content or ""
            is_final = chunk.choices[0].finish_reason is not None

            yield inference_pb2.PredictChunk(
                content=content,
                is_final=is_final,
            )

    async def VoiceChat(self, request_iterator, context):
        """双向流式语音对话"""
        async for audio_chunk in request_iterator:
            # 1. STT:音频 -> 文本
            text = await stt_model.transcribe(audio_chunk.audio_data)

            if text:
                # 2. LLM:生成回复
                reply = await generate_reply(text)

                # 3. TTS:文本 -> 音频
                audio = await tts_model.synthesize(reply)

                yield inference_pb2.ChatResponse(text=reply)
                yield inference_pb2.ChatResponse(audio=audio)

async def serve():
    server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
    inference_pb2_grpc.add_InferenceServiceServicer_to_server(
        InferenceServicer(), server
    )
    server.add_insecure_port("[::]:50051")
    await server.start()
    await server.wait_for_termination()

选型决策

按场景选择

场景 推荐 理由
LLM 对话(Web) SSE 单向推送足够,兼容性最好,实现最简
Agent 进度通知 SSE 服务端单向推送,带事件类型区分
实时语音对话 WebSocket 需要双向音频流
协作 + AI 辅助 WebSocket 多用户实时协作需要双向通信
微服务推理 gRPC 高性能、强类型、多语言支持
多步骤 Agent Pipeline SSE + 事件类型 不同事件类型区分进度/结果/工具调用

混合架构

实际生产中,往往需要混合使用:

浏览器用户
    |
    ├── SSE: LLM 流式输出
    ├── WebSocket: 实时协作/语音
    └── REST: 非流式 API

API Gateway
    |
    ├── gRPC: 内部推理服务调用
    ├── gRPC Streaming: 模型间 pipeline
    └── HTTP/2: 模型下载/上传

生产环境注意事项

  1. Cloudflare/CDN 代理超时:免费套餐 100 秒超时,必须发心跳保活
  2. Nginx 缓冲:必须设置 X-Accel-Buffering: noproxy_buffering off
  3. 负载均衡粘性:WebSocket 需要会话粘性(sticky session)
  4. 连接上限:单域名 HTTP/1.1 最多 6 个 SSE 连接;用 HTTP/2 可解决
  5. 错误恢复:SSE 原生自动重连;WebSocket 和 gRPC 需自行实现指数退避

Maurice | maurice_wen@proton.me