大模型生成一段回复可能需要几秒到几十秒。如果等全部生成完再返回,用户盯着空白页面等十几秒——这体验和看加载转圈没区别。流式输出让用户看到逐字生成的效果,感知等待时间大幅缩短,这是大模型应用的基本体验要求;

这篇文章从后端到前端,完整实现流式输出;

SSE vs WebSocket

大模型场景只需要服务器向客户端推送token,不需要客户端频繁发消息。SSE是单向推送,比WebSocket更简单,浏览器原生支持EventSource API,还自带断线重连。对于大模型流式输出,SSE是更合适的选择;

WebSocket适合需要双向高频通信的场景(比如实时协作编辑),但在大模型流式输出上有点杀鸡用牛刀;

后端实现

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json

app = FastAPI()
client = OpenAI(base_url="http://localhost:11434/v1", api_key="ollama")

@app.post("/v1/chat/stream")
async def chat_stream(request: ChatRequest):
    async def generate():
        stream = client.chat.completions.create(
            model=request.model,
            messages=[m.dict() for m in request.messages],
            stream=True
        )
        for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                yield f"data: {json.dumps({'content': delta.content}, ensure_ascii=False)}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"  # 禁用Nginx缓冲
        }
    )

X-Accel-Buffering: no是最容易忘的配置。如果前面有Nginx代理但没加这行,Nginx会把响应攒够再返回,流式输出就失效了。排查"流式不生效"的问题时,80%的原因在这里。

前端实现

async function chat(messages, onChunk) {
  const resp = await fetch('/v1/chat/stream', {
    method: 'POST',
    headers: {'Content-Type': 'application/json'},
    body: JSON.stringify({model: 'deepseek-r1:7b', messages})
  });

  const reader = resp.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const {done, value} = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, {stream: true});
    const lines = buffer.split('\n');
    buffer = lines.pop();  // 保留不完整的行

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const payload = line.slice(6);
        if (payload === '[DONE]') return;
        const data = JSON.parse(payload);
        onChunk(data.content);
      }
    }
  }
}

buffer的处理是关键——SSE数据块可能被TCP分片,一行可能分两次到达。用buffer暂存不完整的行,下次循环再处理。

React封装

function useStreamingChat() {
  const [content, setContent] = useState('');
  const [loading, setLoading] = useState(false);

  const send = useCallback(async (messages) => {
    setLoading(true);
    setContent('');
    let full = '';

    const resp = await fetch('/v1/chat/stream', {
      method: 'POST',
      headers: {'Content-Type': 'application/json'},
      body: JSON.stringify({model: 'deepseek-r1:7b', messages, stream: true})
    });

    const reader = resp.body.getReader();
    const dec = new TextDecoder();
    let buf = '';

    while (true) {
      const {done, value} = await reader.read();
      if (done) break;
      buf += dec.decode(value, {stream: true});
      const parts = buf.split('\n');
      buf = parts.pop();
      for (const p of parts) {
        if (p.startsWith('data: ') && !p.includes('[DONE]')) {
          full += JSON.parse(p.slice(6)).content;
          setContent(full);
        }
      }
    }
    setLoading(false);
  }, []);

  return {content, loading, send};
}

Nginx配置

location /v1/chat/stream {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Connection "";
    proxy_buffering off;
    proxy_cache off;
    proxy_read_timeout 300s;
}

proxy_buffering off是必须的。proxy_read_timeout设300秒是因为长回复的生成时间可能超过默认的60秒超时。

写在最后

流式输出的技术实现不复杂,但细节决定体验。Nginx缓冲、TCP分片处理、超时配置——这几个点踩对了,流式输出就能顺畅工作。