团队里同时在用Ollama、OpenAI、Anthropic三家的模型。前端同事抱怨每次换模型都要改代码,后端同事抱怨每加一个提供商就要写一套适配代码。于是决定做一个统一的API网关——所有模型提供商对外暴露同样的接口,内部按配置自动路由;

FastAPI是做这件事的天然选择:异步支持好、自带OpenAPI文档、和Pydantic配合紧密;

项目结构

llm-gateway/
├── app/
│   ├── main.py           # FastAPI入口
│   ├── config.py         # 配置管理
│   ├── providers/        # 模型适配器
│   │   ├── base.py       # 抽象接口
│   │   ├── ollama.py
│   │   └── openai_compat.py
│   ├── middleware/
│   │   ├── auth.py       # API Key验证
│   │   └── rate_limit.py # 限流
│   └── models/
│       └── schemas.py    # 数据模型
├── requirements.txt
└── Dockerfile

统一数据模型

先定义标准的请求和响应格式。这是整个网关的基础,所有提供商都围绕这个格式做适配:

from pydantic import BaseModel
from typing import Optional, List, Literal

class Message(BaseModel):
    role: Literal["system", "user", "assistant"]
    content: str

class ChatRequest(BaseModel):
    model: str
    messages: List[Message]
    temperature: float = 0.7
    max_tokens: int = 2048
    stream: bool = False

class ChatResponse(BaseModel):
    id: str
    model: str
    content: str
    usage: dict
    finish_reason: str

抽象提供商接口

所有模型提供商都实现同一个接口:

from abc import ABC, abstractmethod
from typing import AsyncIterator

class LLMProvider(ABC):
    @abstractmethod
    async def chat(self, request: ChatRequest) -> ChatResponse:
        pass

    @abstractmethod
    async def chat_stream(self, request: ChatRequest) -> AsyncIterator[str]:
        pass

    @abstractmethod
    async def list_models(self) -> list:
        pass

Ollama适配器实现

import httpx, json, uuid

class OllamaProvider(LLMProvider):
    def __init__(self, base_url="http://localhost:11434"):
        self.base_url = base_url
        self.client = httpx.AsyncClient(timeout=120)

    async def chat(self, request):
        payload = {
            "model": request.model,
            "messages": [m.model_dump() for m in request.messages],
            "stream": False,
            "options": {"temperature": request.temperature,
                        "num_predict": request.max_tokens}
        }
        resp = await self.client.post(f"{self.base_url}/api/chat", json=payload)
        data = resp.json()
        return ChatResponse(
            id=str(uuid.uuid4()), model=request.model,
            content=data["message"]["content"],
            usage={"prompt_tokens": data.get("prompt_eval_count", 0),
                   "completion_tokens": data.get("eval_count", 0)},
            finish_reason="stop"
        )

    async def chat_stream(self, request):
        payload = {
            "model": request.model,
            "messages": [m.model_dump() for m in request.messages],
            "stream": True,
            "options": {"temperature": request.temperature,
                        "num_predict": request.max_tokens}
        }
        async with self.client.stream(
            "POST", f"{self.base_url}/api/chat", json=payload
        ) as resp:
            async for line in resp.aiter_lines():
                if line:
                    data = json.loads(line)
                    if "message" in data:
                        yield f"data: {json.dumps({'content': data['message']['content']})}\n\n"
                    if data.get("done"):
                        yield "data: [DONE]\n\n"

路由和流式响应

from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse

app = FastAPI(title="LLM Gateway")
providers = {"ollama": OllamaProvider()}

def select_provider(model: str) -> LLMProvider:
    # 根据模型名前缀或配置路由到不同提供商
    if model.startswith("gpt-"):
        return providers.get("openai", providers["ollama"])
    return providers["ollama"]

@app.post("/v1/chat/completions")
async def chat(request: ChatRequest):
    provider = select_provider(request.model)
    if request.stream:
        return StreamingResponse(
            provider.chat_stream(request),
            media_type="text/event-stream"
        )
    return await provider.chat(request)

流式输出:SSE协议

大模型生成需要几秒到几十秒,流式输出是基本体验要求。这里用SSE(Server-Sent Events),比WebSocket简单且够用;

后端要点:media_type必须是text/event-stream;每个chunk格式是data: {json}\n\n;最后发data: [DONE]\n\n表示结束。如果前面有Nginx代理,一定要加proxy_buffering off,否则Nginx会攒够一整块再返回,流式就废了。

前端用ReadableStream处理:

const resp = await fetch('/v1/chat/completions', {
  method: 'POST',
  headers: {'Content-Type': 'application/json'},
  body: JSON.stringify({model: 'deepseek-r1:7b', messages, stream: true})
});
const reader = resp.body.getReader();
const decoder = new TextDecoder();
while (true) {
  const {done, value} = await reader.read();
  if (done) break;
  const lines = decoder.decode(value, {stream: true}).split('\n');
  for (const line of lines) {
    if (line.startsWith('data: ') && !line.includes('[DONE]')) {
      const {content} = JSON.parse(line.slice(6));
      document.getElementById('output').innerHTML += content;
    }
  }
}

认证和限流

生产环境必须加。API Key验证从请求头里取,和数据库里的合法Key对比。限流用slowlimiter,按IP限制每分钟请求数:

from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post("/v1/chat/completions")
@limiter.limit("60/minute")
async def chat(request: Request, body: ChatRequest):
    api_key = request.headers.get("Authorization", "").replace("Bearer ", "")
    if not verify_api_key(api_key):
        raise HTTPException(401, "Invalid API Key")
    ...

写在最后

统一API网关最大的价值不是"炫技",而是"解耦"。前端不用关心底层用的哪个模型,后端换模型提供商时前端零改动。成本优化时(比如把一部分流量从OpenAI切到本地Ollama),改一行配置就行。