做过三次模型迁移,最大的教训是:如果你的代码里到处写着openai.ChatCompletion.create(...),等你想换模型的时候就是噩梦。改一处容易,改几十处容易漏,改完测试覆盖不全容易出事故;

这篇文章分享一套从架构设计到灰度发布的完整迁移方案;

核心思想:抽象层

在业务代码和具体模型之间加一层抽象接口。业务代码只依赖接口,不依赖具体实现。换模型时只需要换实现,业务代码零改动;

from abc import ABC, abstractmethod
from typing import AsyncIterator, List

class LLMBackend(ABC):
    @abstractmethod
    async def chat(self, messages: List[dict], **kwargs) -> str:
        pass

    @abstractmethod
    async def chat_stream(self, messages: List[dict], **kwargs) -> AsyncIterator[str]:
        pass

class OllamaBackend(LLMBackend):
    def __init__(self, base_url: str, model: str):
        self.client = httpx.AsyncClient(base_url=base_url, timeout=120)
        self.model = model

    async def chat(self, messages, **kwargs):
        resp = await self.client.post("/api/chat", json={
            "model": self.model, "messages": messages, "stream": False
        })
        return resp.json()["message"]["content"]

    async def chat_stream(self, messages, **kwargs):
        async with self.client.stream("POST", "/api/chat", json={
            "model": self.model, "messages": messages, "stream": True
        }) as resp:
            async for line in resp.aiter_lines():
                if line:
                    data = json.loads(line)
                    if "message" in data:
                        yield data["message"]["content"]

class OpenAIBackend(LLMBackend):
    def __init__(self, api_key: str, model: str):
        self.client = OpenAI(api_key=api_key)
        self.model = model

    async def chat(self, messages, **kwargs):
        resp = self.client.chat.completions.create(
            model=self.model, messages=messages
        )
        return resp.choices[0].message.content

    async def chat_stream(self, messages, **kwargs):
        stream = self.client.chat.completions.create(
            model=self.model, messages=messages, stream=True
        )
        for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

工厂模式

用工厂函数根据配置创建后端,换模型改配置不改代码:

def create_llm_backend(provider: str = None) -> LLMBackend:
    provider = provider or os.getenv("LLM_PROVIDER", "ollama")

    if provider == "ollama":
        return OllamaBackend(
            base_url=os.getenv("OLLAMA_URL", "http://localhost:11434"),
            model=os.getenv("OLLAMA_MODEL", "deepseek-r1:7b")
        )
    elif provider == "openai":
        return OpenAIBackend(
            api_key=os.getenv("OPENAI_API_KEY"),
            model=os.getenv("OPENAI_MODEL", "gpt-4o")
        )
    raise ValueError(f"Unknown provider: {provider}")

业务代码:

llm = create_llm_backend()  # 从环境变量读取provider
response = await llm.chat([{"role": "user", "content": "你好"}])

从Ollama切到OpenAI,改一个环境变量就行:LLM_PROVIDER=openai

提示词适配

不同模型的提示格式可能不同。Qwen用ChatML格式,Llama用[INST]标签。在Backend的实现里处理格式转换,对外保持统一的messages格式:

class OllamaBackend(LLMBackend):
    # Ollama已经做了格式转换,直接传messages就行
    pass

class VLLMBackend(LLMBackend):
    def _adapt_messages(self, messages):
        # vLLM可能需要特定格式
        return [{"role": m["role"], "content": m["content"]} for m in messages]

灰度切换

不要一次性全切。用配置控制流量比例:

import random

def get_backend_with_shadow(config):
    primary = create_llm_backend(config["primary_provider"])
    shadow_ratio = config.get("shadow_ratio", 0.0)

    if random.random() < shadow_ratio and config.get("shadow_provider"):
        shadow = create_llm_backend(config["shadow_provider"])
        return ShadowBackend(primary, shadow)
    return primary

class ShadowBackend(LLMBackend):
    """同时调用两个后端,只返回主后端结果,记录对比日志"""
    def __init__(self, primary, shadow):
        self.primary = primary
        self.shadow = shadow

    async def chat(self, messages, **kwargs):
        primary_result = await self.primary.chat(messages, **kwargs)
        try:
            shadow_result = await asyncio.wait_for(
                self.shadow.chat(messages, **kwargs), timeout=30
            )
            logger.info(f"Shadow comparison: primary_len={len(primary_result)}, shadow_len={len(shadow_result)}")
        except Exception as e:
            logger.warning(f"Shadow failed: {e}")
        return primary_result

逐步提高shadow_ratio:10% → 30% → 50% → 100%。每一步都观察错误率和输出质量,确认没问题再提高。

数据迁移

如果涉及到向量数据库的迁移,批量导出导入:

async def migrate_vectors(source_db, target_db, batch_size=1000):
    offset = 0
    while True:
        batch = await source_db.query(offset=offset, limit=batch_size)
        if not batch:
            break
        await target_db.upsert(batch)
        offset += batch_size
        logger.info(f"Migrated {offset} vectors")

注意检查向量维度是否一致。如果换了嵌入模型,维度可能不同,需要重新向量化。

写在最后

项目迁移的关键是"提前做好抽象层设计"。如果一开始就把模型调用封装在接口后面,后续迁移就只需要换实现,业务代码零改动。好的架构设计让你永远不会被任何一个模型绑定——这才是真正的技术自由。