做过三次模型迁移,最大的教训是:如果你的代码里到处写着openai.ChatCompletion.create(...),等你想换模型的时候就是噩梦。改一处容易,改几十处容易漏,改完测试覆盖不全容易出事故;
这篇文章分享一套从架构设计到灰度发布的完整迁移方案;
核心思想:抽象层
在业务代码和具体模型之间加一层抽象接口。业务代码只依赖接口,不依赖具体实现。换模型时只需要换实现,业务代码零改动;
from abc import ABC, abstractmethod
from typing import AsyncIterator, List
class LLMBackend(ABC):
@abstractmethod
async def chat(self, messages: List[dict], **kwargs) -> str:
pass
@abstractmethod
async def chat_stream(self, messages: List[dict], **kwargs) -> AsyncIterator[str]:
pass
class OllamaBackend(LLMBackend):
def __init__(self, base_url: str, model: str):
self.client = httpx.AsyncClient(base_url=base_url, timeout=120)
self.model = model
async def chat(self, messages, **kwargs):
resp = await self.client.post("/api/chat", json={
"model": self.model, "messages": messages, "stream": False
})
return resp.json()["message"]["content"]
async def chat_stream(self, messages, **kwargs):
async with self.client.stream("POST", "/api/chat", json={
"model": self.model, "messages": messages, "stream": True
}) as resp:
async for line in resp.aiter_lines():
if line:
data = json.loads(line)
if "message" in data:
yield data["message"]["content"]
class OpenAIBackend(LLMBackend):
def __init__(self, api_key: str, model: str):
self.client = OpenAI(api_key=api_key)
self.model = model
async def chat(self, messages, **kwargs):
resp = self.client.chat.completions.create(
model=self.model, messages=messages
)
return resp.choices[0].message.content
async def chat_stream(self, messages, **kwargs):
stream = self.client.chat.completions.create(
model=self.model, messages=messages, stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
工厂模式
用工厂函数根据配置创建后端,换模型改配置不改代码:
def create_llm_backend(provider: str = None) -> LLMBackend:
provider = provider or os.getenv("LLM_PROVIDER", "ollama")
if provider == "ollama":
return OllamaBackend(
base_url=os.getenv("OLLAMA_URL", "http://localhost:11434"),
model=os.getenv("OLLAMA_MODEL", "deepseek-r1:7b")
)
elif provider == "openai":
return OpenAIBackend(
api_key=os.getenv("OPENAI_API_KEY"),
model=os.getenv("OPENAI_MODEL", "gpt-4o")
)
raise ValueError(f"Unknown provider: {provider}")
业务代码:
llm = create_llm_backend() # 从环境变量读取provider
response = await llm.chat([{"role": "user", "content": "你好"}])
从Ollama切到OpenAI,改一个环境变量就行:LLM_PROVIDER=openai。
提示词适配
不同模型的提示格式可能不同。Qwen用ChatML格式,Llama用[INST]标签。在Backend的实现里处理格式转换,对外保持统一的messages格式:
class OllamaBackend(LLMBackend):
# Ollama已经做了格式转换,直接传messages就行
pass
class VLLMBackend(LLMBackend):
def _adapt_messages(self, messages):
# vLLM可能需要特定格式
return [{"role": m["role"], "content": m["content"]} for m in messages]
灰度切换
不要一次性全切。用配置控制流量比例:
import random
def get_backend_with_shadow(config):
primary = create_llm_backend(config["primary_provider"])
shadow_ratio = config.get("shadow_ratio", 0.0)
if random.random() < shadow_ratio and config.get("shadow_provider"):
shadow = create_llm_backend(config["shadow_provider"])
return ShadowBackend(primary, shadow)
return primary
class ShadowBackend(LLMBackend):
"""同时调用两个后端,只返回主后端结果,记录对比日志"""
def __init__(self, primary, shadow):
self.primary = primary
self.shadow = shadow
async def chat(self, messages, **kwargs):
primary_result = await self.primary.chat(messages, **kwargs)
try:
shadow_result = await asyncio.wait_for(
self.shadow.chat(messages, **kwargs), timeout=30
)
logger.info(f"Shadow comparison: primary_len={len(primary_result)}, shadow_len={len(shadow_result)}")
except Exception as e:
logger.warning(f"Shadow failed: {e}")
return primary_result
逐步提高shadow_ratio:10% → 30% → 50% → 100%。每一步都观察错误率和输出质量,确认没问题再提高。
数据迁移
如果涉及到向量数据库的迁移,批量导出导入:
async def migrate_vectors(source_db, target_db, batch_size=1000):
offset = 0
while True:
batch = await source_db.query(offset=offset, limit=batch_size)
if not batch:
break
await target_db.upsert(batch)
offset += batch_size
logger.info(f"Migrated {offset} vectors")
注意检查向量维度是否一致。如果换了嵌入模型,维度可能不同,需要重新向量化。
写在最后
项目迁移的关键是"提前做好抽象层设计"。如果一开始就把模型调用封装在接口后面,后续迁移就只需要换实现,业务代码零改动。好的架构设计让你永远不会被任何一个模型绑定——这才是真正的技术自由。