大模型生成一段回复可能需要几秒到几十秒。如果等全部生成完再返回,用户盯着空白页面等十几秒——这体验和看加载转圈没区别。流式输出让用户看到逐字生成的效果,感知等待时间大幅缩短,这是大模型应用的基本体验要求;
这篇文章从后端到前端,完整实现流式输出;
SSE vs WebSocket
大模型场景只需要服务器向客户端推送token,不需要客户端频繁发消息。SSE是单向推送,比WebSocket更简单,浏览器原生支持EventSource API,还自带断线重连。对于大模型流式输出,SSE是更合适的选择;
WebSocket适合需要双向高频通信的场景(比如实时协作编辑),但在大模型流式输出上有点杀鸡用牛刀;
后端实现
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json
app = FastAPI()
client = OpenAI(base_url="http://localhost:11434/v1", api_key="ollama")
@app.post("/v1/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
stream = client.chat.completions.create(
model=request.model,
messages=[m.dict() for m in request.messages],
stream=True
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
yield f"data: {json.dumps({'content': delta.content}, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # 禁用Nginx缓冲
}
)
X-Accel-Buffering: no是最容易忘的配置。如果前面有Nginx代理但没加这行,Nginx会把响应攒够再返回,流式输出就失效了。排查"流式不生效"的问题时,80%的原因在这里。
前端实现
async function chat(messages, onChunk) {
const resp = await fetch('/v1/chat/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({model: 'deepseek-r1:7b', messages})
});
const reader = resp.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const {done, value} = await reader.read();
if (done) break;
buffer += decoder.decode(value, {stream: true});
const lines = buffer.split('\n');
buffer = lines.pop(); // 保留不完整的行
for (const line of lines) {
if (line.startsWith('data: ')) {
const payload = line.slice(6);
if (payload === '[DONE]') return;
const data = JSON.parse(payload);
onChunk(data.content);
}
}
}
}
buffer的处理是关键——SSE数据块可能被TCP分片,一行可能分两次到达。用buffer暂存不完整的行,下次循环再处理。
React封装
function useStreamingChat() {
const [content, setContent] = useState('');
const [loading, setLoading] = useState(false);
const send = useCallback(async (messages) => {
setLoading(true);
setContent('');
let full = '';
const resp = await fetch('/v1/chat/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({model: 'deepseek-r1:7b', messages, stream: true})
});
const reader = resp.body.getReader();
const dec = new TextDecoder();
let buf = '';
while (true) {
const {done, value} = await reader.read();
if (done) break;
buf += dec.decode(value, {stream: true});
const parts = buf.split('\n');
buf = parts.pop();
for (const p of parts) {
if (p.startsWith('data: ') && !p.includes('[DONE]')) {
full += JSON.parse(p.slice(6)).content;
setContent(full);
}
}
}
setLoading(false);
}, []);
return {content, loading, send};
}
Nginx配置
location /v1/chat/stream {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 300s;
}
proxy_buffering off是必须的。proxy_read_timeout设300秒是因为长回复的生成时间可能超过默认的60秒超时。
写在最后
流式输出的技术实现不复杂,但细节决定体验。Nginx缓冲、TCP分片处理、超时配置——这几个点踩对了,流式输出就能顺畅工作。