团队里同时在用Ollama、OpenAI、Anthropic三家的模型。前端同事抱怨每次换模型都要改代码,后端同事抱怨每加一个提供商就要写一套适配代码。于是决定做一个统一的API网关——所有模型提供商对外暴露同样的接口,内部按配置自动路由;
FastAPI是做这件事的天然选择:异步支持好、自带OpenAPI文档、和Pydantic配合紧密;
项目结构
llm-gateway/
├── app/
│ ├── main.py # FastAPI入口
│ ├── config.py # 配置管理
│ ├── providers/ # 模型适配器
│ │ ├── base.py # 抽象接口
│ │ ├── ollama.py
│ │ └── openai_compat.py
│ ├── middleware/
│ │ ├── auth.py # API Key验证
│ │ └── rate_limit.py # 限流
│ └── models/
│ └── schemas.py # 数据模型
├── requirements.txt
└── Dockerfile
统一数据模型
先定义标准的请求和响应格式。这是整个网关的基础,所有提供商都围绕这个格式做适配:
from pydantic import BaseModel
from typing import Optional, List, Literal
class Message(BaseModel):
role: Literal["system", "user", "assistant"]
content: str
class ChatRequest(BaseModel):
model: str
messages: List[Message]
temperature: float = 0.7
max_tokens: int = 2048
stream: bool = False
class ChatResponse(BaseModel):
id: str
model: str
content: str
usage: dict
finish_reason: str
抽象提供商接口
所有模型提供商都实现同一个接口:
from abc import ABC, abstractmethod
from typing import AsyncIterator
class LLMProvider(ABC):
@abstractmethod
async def chat(self, request: ChatRequest) -> ChatResponse:
pass
@abstractmethod
async def chat_stream(self, request: ChatRequest) -> AsyncIterator[str]:
pass
@abstractmethod
async def list_models(self) -> list:
pass
Ollama适配器实现
import httpx, json, uuid
class OllamaProvider(LLMProvider):
def __init__(self, base_url="http://localhost:11434"):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=120)
async def chat(self, request):
payload = {
"model": request.model,
"messages": [m.model_dump() for m in request.messages],
"stream": False,
"options": {"temperature": request.temperature,
"num_predict": request.max_tokens}
}
resp = await self.client.post(f"{self.base_url}/api/chat", json=payload)
data = resp.json()
return ChatResponse(
id=str(uuid.uuid4()), model=request.model,
content=data["message"]["content"],
usage={"prompt_tokens": data.get("prompt_eval_count", 0),
"completion_tokens": data.get("eval_count", 0)},
finish_reason="stop"
)
async def chat_stream(self, request):
payload = {
"model": request.model,
"messages": [m.model_dump() for m in request.messages],
"stream": True,
"options": {"temperature": request.temperature,
"num_predict": request.max_tokens}
}
async with self.client.stream(
"POST", f"{self.base_url}/api/chat", json=payload
) as resp:
async for line in resp.aiter_lines():
if line:
data = json.loads(line)
if "message" in data:
yield f"data: {json.dumps({'content': data['message']['content']})}\n\n"
if data.get("done"):
yield "data: [DONE]\n\n"
路由和流式响应
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
app = FastAPI(title="LLM Gateway")
providers = {"ollama": OllamaProvider()}
def select_provider(model: str) -> LLMProvider:
# 根据模型名前缀或配置路由到不同提供商
if model.startswith("gpt-"):
return providers.get("openai", providers["ollama"])
return providers["ollama"]
@app.post("/v1/chat/completions")
async def chat(request: ChatRequest):
provider = select_provider(request.model)
if request.stream:
return StreamingResponse(
provider.chat_stream(request),
media_type="text/event-stream"
)
return await provider.chat(request)
流式输出:SSE协议
大模型生成需要几秒到几十秒,流式输出是基本体验要求。这里用SSE(Server-Sent Events),比WebSocket简单且够用;
后端要点:media_type必须是text/event-stream;每个chunk格式是data: {json}\n\n;最后发data: [DONE]\n\n表示结束。如果前面有Nginx代理,一定要加proxy_buffering off,否则Nginx会攒够一整块再返回,流式就废了。
前端用ReadableStream处理:
const resp = await fetch('/v1/chat/completions', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({model: 'deepseek-r1:7b', messages, stream: true})
});
const reader = resp.body.getReader();
const decoder = new TextDecoder();
while (true) {
const {done, value} = await reader.read();
if (done) break;
const lines = decoder.decode(value, {stream: true}).split('\n');
for (const line of lines) {
if (line.startsWith('data: ') && !line.includes('[DONE]')) {
const {content} = JSON.parse(line.slice(6));
document.getElementById('output').innerHTML += content;
}
}
}
认证和限流
生产环境必须加。API Key验证从请求头里取,和数据库里的合法Key对比。限流用slowlimiter,按IP限制每分钟请求数:
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/v1/chat/completions")
@limiter.limit("60/minute")
async def chat(request: Request, body: ChatRequest):
api_key = request.headers.get("Authorization", "").replace("Bearer ", "")
if not verify_api_key(api_key):
raise HTTPException(401, "Invalid API Key")
...
写在最后
统一API网关最大的价值不是"炫技",而是"解耦"。前端不用关心底层用的哪个模型,后端换模型提供商时前端零改动。成本优化时(比如把一部分流量从OpenAI切到本地Ollama),改一行配置就行。