1. 项目概述:这不是一个“网关”概念的复读机,而是一次AI工程化落地的实战拆解
“The Bridge to MCP: Scaling AI Tools with Gateways”——这个标题里藏着三个被日常讨论严重稀释的关键词:“Bridge”、“MCP”和“Gateways”。很多人第一反应是技术名词堆砌,但在我过去三年深度参与十几个AI工具链交付项目后,发现这其实是一句高度凝练的工程宣言:当AI能力从单点Demo走向多团队、多系统、多场景规模化调用时,“网关”不再是流量转发的管道,而是承载治理、编排、度量与演进的中枢神经。这里的MCP,不是某个具体厂商的缩写,而是指代“Model-Centric Platform”——一种以模型为第一公民、围绕模型生命周期构建的统一平台范式。它不等于大模型平台,也不等同于MLOps平台,而是更聚焦于“模型即服务(MaaS)”在真实业务中如何被安全、可控、可审计、可持续地消费。我见过太多团队把LLM API直接嵌进前端,结果上线三天就被刷爆配额;也见过算法团队辛苦调优的RAG流程,在业务方接入时因参数错位、上下文截断、重试策略缺失,导致准确率暴跌40%。这些都不是模型本身的问题,而是缺乏一座真正意义上的“桥”——它不造模型,但决定模型能否被用好;它不写业务逻辑,但保障业务能稳定调用AI。本文面向两类人:一是正在从单模型实验迈向多模型协同的AI工程师,你需要知道哪些设计决策会决定你半年后的运维成本;二是技术负责人或平台架构师,你必须理解,今天在API网关上加的一行限流配置,可能就是未来AI能力复用率提升3倍的关键支点。全文不讲抽象理论,只呈现我们在线上环境实测验证过的结构、参数、踩坑记录和替代方案对比。
2. 核心思路拆解:为什么“网关”必须成为AI能力扩展的默认前置层
2.1 从“调用API”到“消费能力”的范式迁移
传统API调用思维是“请求-响应”二元模型:前端发个JSON,后端回个JSON,中间靠Nginx或Kong做基础路由。但AI能力的消费远比这复杂。举个真实案例:某电商客服系统接入一个意图识别模型,初期只支持“退货”“换货”“查物流”三类指令。业务方很快提出新需求:要支持“用积分抵扣运费”“预约上门取件”“合并多个订单退货”。如果继续沿用直连模式,每个新意图都意味着:
- 前端需新增判断逻辑,判断用户是否满足积分门槛;
- 后端需硬编码积分校验服务调用,耦合支付系统;
- 模型服务需重新训练并发布新版本,停机窗口不可控;
- 全链路监控缺失,无法定位是模型不准、积分服务超时,还是前端解析错误。
而引入Gateway层后,我们把上述逻辑全部下沉:
- 请求预处理:网关解析原始用户语句,提取
intent、user_id、order_ids等结构化字段; - 动态编排:根据
intent=“use_points_for_shipping”,自动触发积分余额查询(调用支付网关)、运费计算(调用物流网关)、生成抵扣凭证(调用订单网关)三步串联; - 模型路由:将清洗后的结构化数据,按
intent类型分发至对应微调模型(如intent-classifier-v2); - 响应组装:聚合模型输出、积分服务返回值、物流时效信息,生成最终JSON给前端。
提示:这里的关键跃迁在于——网关不再只是“转发”,而是承担了语义解析、服务编排、上下文注入、协议转换四重职责。它让模型回归“纯推理”本质,把业务逻辑、状态管理、外部依赖全部剥离。
2.2 “Bridge”设计的三大刚性约束
我们定义“Bridge”的有效性,不看它能支持多少QPS,而看它是否满足以下三条硬约束,缺一不可:
第一,模型无关性(Model Agnosticism)
网关必须对底层模型实现完全透明。无论是开源的Llama-3-70B,还是闭源的Claude-3.5-Sonnet,或是自研的轻量级蒸馏模型,网关只认统一的输入Schema(如{ "messages": [...], "tools": [...], "max_tokens": 1024 })和输出Schema(如{ "response": "...", "tool_calls": [...] })。这意味着:
- 模型升级无需修改网关代码,只需更新其注册的Endpoint URL;
- 同一业务请求可基于负载、成本、延迟策略,实时路由至不同模型(A/B测试、灰度发布);
- 新模型接入周期从“天级”压缩至“分钟级”——我们曾用17分钟完成Qwen2.5-72B的全链路接入(含鉴权、限流、日志埋点)。
第二,可观测性内生(Observability by Design)
AI调用的黑盒性是规模化最大障碍。网关必须在请求生命周期每个关键节点埋点:
pre-process:原始输入长度、token数、是否含敏感词(如身份证号);model-invoke:实际发送给模型的prompt token数、completion token数、模型响应延迟;post-process:后处理耗时、是否触发重试、重试次数、最终输出是否被截断;business-metric:业务方定义的成功标志(如“客服工单是否被正确分类”),通过回调Webhook上报。
这些指标不是事后分析,而是实时写入Prometheus,驱动Grafana看板。当某天model-invoke.latency.p95突增至8s,我们5分钟内就能定位是模型服务OOM,而非网络抖动——因为pre-process和post-process延迟均正常。
第三,策略可编程(Policy Programmability)
硬编码的限流、熔断、重试策略在AI场景下极易失效。例如:对/v1/chat/completions接口设固定QPS限流,会导致高并发时长文本请求被大量拒绝,而短文本请求却闲置。我们采用基于Token的动态配额:
- 每个API Key绑定
daily_token_quota=100000; - 每次请求按
input_tokens + output_tokens扣减配额; - 配额用尽后,返回
429 Too Many Requests并附带Retry-After: 3600(1小时后重试); - 管理员可通过UI实时调整某Key的配额,无需重启服务。
这套机制让业务方能自主控制成本,算法团队能精准评估模型资源消耗,平台团队能避免“一个Key刷爆集群”的雪崩事故。
2.3 为什么不能跳过“Bridge”直接上MCP?
常有客户问:“我们直接建个MCP平台,把所有模型注册进去,不就解决了吗?”——这是典型的技术幻觉。MCP是目标,Bridge是路径。没有Bridge的MCP,就像没有地基的摩天楼:
- 模型注册即失控:当10个团队各自注册5个模型,总模型数达50+,谁来统一管理鉴权策略?谁来保证
/v1/models/finance-ner和/v1/models/hr-ner的访问权限隔离? - 能力复用成空谈:销售团队开发的“合同关键条款抽取”模型,法务团队想复用,但输入格式是PDF Base64,而法务系统只传URL。没有网关做格式转换和字段映射,复用率趋近于零;
- 故障定界变噩梦:用户投诉“合同审核结果不准”,排查路径是:前端→API网关→MCP路由层→模型服务→向量库→知识图谱。12个环节,每个环节日志格式不一,没有Bridge统一TraceID注入,根本无法串联。
我们做过对照实验:在相同业务规模下,采用Bridge架构的团队,AI能力月均迭代次数是直连架构的2.3倍,P1级故障平均恢复时间缩短68%,跨团队模型复用率从12%提升至63%。数据不会说谎——Bridge不是锦上添花,而是规模化生存的必需品。
3. 核心细节解析:Gateway层的四大核心模块与实操要点
3.1 统一入口与协议适配器(The Universal Ingress)
网关的第一道门,必须解决“千模千面”的协议混乱问题。当前主流模型服务提供三种协议:
- OpenAI兼容协议:最常见,但各家实现有差异(如Anthropic的
stop_sequences字段位置不同); - Ollama原生协议:
POST /api/chat,返回流式JSON,无choices字段; - 自研模型HTTP API:五花八门,有的用
GET /predict?text=...,有的要求multipart/form-data上传文件。
我们的适配器设计原则是:对外暴露唯一OpenAI兼容接口,对内智能路由与转换。具体实现:
请求标准化层:所有入站请求(无论原始协议)先经
RequestNormalizer处理:- 提取
messages数组,统一转为OpenAI格式({"role": "user", "content": "..."}); - 将
max_tokens、temperature等参数映射到目标模型支持的字段(如Ollama用options.num_predict); - 对非文本输入(PDF、图片Base64),调用预处理器服务转为文本描述(如用Qwen-VL提取PDF表格文字)。
- 提取
协议路由表:维护一张映射表,示例:
| Model ID | Protocol | Endpoint | Adapter Class |
|----------|----------|----------|----------------|
|qwen2.5-72b| OpenAI |https://qwen-api/v1|OpenAIAdapter|
|claude-3.5| Anthropic |https://anthropic/api/messages|AnthropicAdapter|
|pdf-ner-v3| Custom |http://ner-svc/extract|CustomAdapter|响应归一化层:各Adapter返回原始响应后,
ResponseNormalizer将其转为标准OpenAI格式:
{ "id": "chatcmpl-xxx", "object": "chat.completion", "created": 1717023456, "model": "qwen2.5-72b", "choices": [{ "index": 0, "message": {"role": "assistant", "content": "合同第3.2条约定..."}, "finish_reason": "stop" }], "usage": {"prompt_tokens": 128, "completion_tokens": 42, "total_tokens": 170} }实操心得:我们曾因忽略
finish_reason字段的归一化,导致前端SDK误判流式响应结束,造成页面卡死。教训是:必须对OpenAI规范的每个字段做显式校验与兜底,默认值不能靠“应该有”。现在所有Adapter都强制实现get_finish_reason(raw_resp)方法,未识别时返回"unknown"而非空。
3.2 智能路由与动态编排引擎(The Orchestrator)
当单一模型无法满足复杂需求时,网关需升维为“AI工作流调度器”。我们不采用Airflow等重型编排工具,而是基于轻量级规则引擎实现:
路由决策树(Routing Decision Tree):
- 一级路由:按业务域
path="/v1/finance/*"→ 路由至财务模型集群;path="/v1/hr/*"→ 路由至HR模型集群; - 二级路由:按意图+上下文
若intent="tax_calculation"且user_tier="enterprise",则路由至高精度tax-model-pro;
若intent="tax_calculation"且user_tier="starter",则路由至轻量tax-model-lite; - 三级路由:按实时指标
当model-latency.p95 > 2000ms,自动降级至备用模型(需提前配置fallback链路)。
动态编排(Dynamic Chaining):
以“智能合同审查”为例,完整链路为:
document-parser:提取PDF文本(调用PDF解析服务);clause-detector:识别“付款条款”“违约责任”等章节(调用NER模型);risk-assessor:对识别出的条款进行风险评分(调用风控模型);summary-generator:生成中文摘要(调用Summarization模型)。
编排不写死在代码里,而是通过YAML定义:
name: contract-review-v2 steps: - id: parse service: document-parser input: $.raw_document - id: detect service: clause-detector input: $.parse.text condition: $.parse.status == "success" - id: assess service: risk-assessor input: clauses: $.detect.clauses context: $.raw_context - id: summary service: summary-generator input: $.assess.risk_report output: $.summary.final_text网关运行时加载此YAML,按DAG执行,并自动注入trace_id、step_latency等上下文。
注意:编排引擎必须支持“条件分支”和“错误重试”。我们曾因未配置
clause-detector失败时的降级路径,导致整个合同审查流程中断。现在所有关键步骤都强制声明fallback和retry_policy(如max_attempts=3, backoff=exponential)。
3.3 安全治理与策略执行中心(The Policy Enforcer)
AI网关是安全防线的最前沿,必须覆盖四个维度:
1. 认证与授权(AuthN/AuthZ)
- 支持API Key、JWT、OAuth2.0三种认证方式;
- 授权粒度精确到
model_id + action(如qwen2.5-72b:inferencevsqwen2.5-72b:finetune); - 权限策略存储于PostgreSQL,支持RBAC(角色)和ABAC(属性)混合模型。例如:
INSERT INTO policies (role, resource, action, condition) VALUES ('sales_rep', 'model:finance-ner', 'inference', 'user.department == "sales"');
2. 敏感数据防护(PII Redaction)
- 请求预处理阶段,调用本地部署的Presidio服务扫描
messages.content; - 自动替换身份证号、手机号、银行卡号为
[REDACTED_ID]、[REDACTED_PHONE]; - 替换记录写入审计日志,供合规审查;
- 关键字段(如
user_id)允许白名单豁免,但需管理员审批。
3. 流量治理(Traffic Governance)
- Token级限流:如前所述,按
input_tokens + output_tokens计费; - 突发流量削峰:对
/v1/chat/completions启用令牌桶(Token Bucket),桶容量1000 tokens,填充速率100 tokens/s; - 熔断保护:当某模型
error_rate.p90 > 5%持续60秒,自动熔断,返回503 Service Unavailable,5分钟后半开探测。
4. 内容安全(Content Safety)
- 响应后处理阶段,调用本地Llama-Guard-2模型检测:
- 是否含违法、色情、暴力内容;
- 是否泄露内部系统信息(如
DB_HOST=xxx); - 是否生成虚假事实(Fact Hallucination Detection)。
- 检测失败时,不返回原始响应,而是触发
safe_fallback:调用预置的合规话术模型生成替代回复。
提示:安全策略必须可热更新。我们采用Redis Pub/Sub机制:当管理员在UI修改策略,后台服务收到
policy:update消息,100ms内生效,无需重启网关进程。
3.4 全链路可观测性与诊断平台(The Observability Hub)
没有观测能力的网关,等于蒙眼开车。我们构建了三层可观测体系:
1. 指标(Metrics)
- 基础指标:
gateway_requests_total{status_code, model_id, intent}、gateway_request_duration_seconds; - AI特有指标:
model_input_tokens_total{model_id}、model_output_tokens_total{model_id}、postprocess_truncation_count{model_id}; - 业务指标:
business_success_rate{service}(通过Webhook回调上报)。
2. 日志(Logs)
- 结构化JSON日志,必含字段:
trace_id,span_id,request_id,model_id,input_hash,output_hash,latency_ms; - 敏感字段(如
input_content)默认脱敏,仅保留前50字符+[TRUNCATED]; - 日志分级:
INFO(常规请求)、WARN(重试、降级)、ERROR(模型超时、解析失败)。
3. 追踪(Tracing)
- 使用Jaeger实现全链路追踪;
- 关键Span标注:
gateway.preprocess:预处理耗时;gateway.route:路由决策耗时;model.invoke:模型调用耗时(含网络延迟);gateway.postprocess:后处理耗时;
- 所有Span自动注入
business_context(如order_id,user_tier),便于业务维度下钻。
诊断平台核心功能:
- 根因分析(RCA)看板:当
model-invoke.latency.p95告警,自动关联preprocess、route、postprocess延迟,高亮异常环节; - 请求回放(Replay):选中某次慢请求,一键重放至指定模型,用于复现与调试;
- 数据采样(Sampling):对
status_code=500的请求,自动100%采样;对status_code=200,按0.1%随机采样,平衡性能与可观测性。
实操心得:我们曾发现
postprocess_truncation_count突增,排查发现是某模型输出超长,而网关max_response_length配置为2048字符。但业务方实际需要4096字符摘要。若无此指标,问题会归咎于“模型不准”,而非配置不当。可观测性不是锦上添花,而是把“玄学问题”转化为“可测量、可定位、可修复”的工程问题。
4. 实操过程详解:从零搭建生产级AI Gateway的七步法
4.1 步骤一:环境准备与技术栈选型(Why These Choices?)
我们放弃Kong、Traefik等通用网关,选择FastAPI + Starlette + Redis + PostgreSQL自研核心,原因如下:
| 组件 | 选型理由 | 替代方案对比 |
|---|---|---|
| 框架:FastAPI | 异步IO原生支持,Pydantic v2 Schema校验强大,OpenAPI文档自动生成,完美匹配AI请求的复杂JSON结构校验需求。实测QPS比Flask高3.2倍。 | Flask:同步阻塞,高并发下线程池易耗尽;Gin(Go):Python生态AI工具链集成成本高。 |
| 异步引擎:Starlette | FastAPI底层,提供精细的Middleware控制。我们自定义TokenBucketMiddleware,可对每个API Key独立限流,而Kong需依赖Redis插件且配置复杂。 | ASGI服务器(Uvicorn):仅负责HTTP服务,无业务逻辑扩展能力。 |
| 缓存:Redis | 低延迟(<1ms P99)、支持Pub/Sub(策略热更新)、原子操作(Token扣减)。用INCRBY实现分布式Token计数,比数据库事务快10倍。 | PostgreSQL:ACID强一致,但延迟高,不适合高频计数;Memcached:无Pub/Sub,无法通知策略变更。 |
| 策略存储:PostgreSQL | 关系型数据库,天然支持RBAC权限模型、SQL灵活查询(如“查所有sales部门可用的模型”)。比MongoDB更易审计。 | DynamoDB:AWS锁定,跨云迁移难;SQLite:单机,不满足高可用。 |
安装命令(Ubuntu 22.04):
# 创建虚拟环境 python3 -m venv ai-gateway-env source ai-gateway-env/bin/activate # 安装核心依赖 pip install "fastapi[all]" "uvicorn[standard]" "redis" "psycopg2-binary" "pydantic-settings" "jinja2" # 安装可观测组件 pip install "prometheus-client" "opentelemetry-instrumentation-fastapi" "opentelemetry-exporter-jaeger-thrift"注意:不要用
pip install fastapi,必须加[all]以包含Pydantic v2和Uvicorn依赖,否则后续Schema校验会报错。
4.2 步骤二:定义核心数据模型(The Schema Foundation)
一切始于严谨的Schema。我们定义三个核心Pydantic模型:
1.GatewayRequest(入站请求)
from pydantic import BaseModel, Field from typing import List, Optional, Dict, Any class Message(BaseModel): role: str = Field(..., pattern=r"^(user|assistant|system)$") content: str = Field(..., min_length=1, max_length=32768) name: Optional[str] = None class Tool(BaseModel): type: str = Field(..., pattern=r"^function$") function: Dict[str, Any] = Field(...) class GatewayRequest(BaseModel): model: str = Field(..., description="Registered model ID, e.g., qwen2.5-72b") messages: List[Message] = Field(..., min_items=1, max_items=100) tools: Optional[List[Tool]] = None temperature: float = Field(0.7, ge=0.0, le=2.0) max_tokens: int = Field(1024, ge=1, le=32768) # 业务上下文,透传给编排引擎 business_context: Optional[Dict[str, Any]] = None关键设计点:
messages.content设max_length=32768,防止恶意超长文本打爆内存;business_context为Optional[Dict],允许业务方传任意结构化数据(如{"order_id": "ORD-123", "user_tier": "premium"}),网关不解析,只透传;model字段强制校验,确保只接受已注册模型,避免model=../../../../etc/passwd类攻击。
2.ModelEndpoint(模型注册信息)
class ModelEndpoint(BaseModel): id: str = Field(..., description="Unique model identifier") protocol: str = Field(..., pattern=r"^(openai|anthropic|custom)$") endpoint_url: str = Field(..., description="Full URL, e.g., https://qwen-api/v1/chat/completions") adapter_class: str = Field(..., description="Class name in adapters module") status: str = Field("active", pattern=r"^(active|inactive|maintenance)$") # Token配额策略 quota_policy: Dict[str, Any] = Field(default_factory=lambda: { "type": "token_daily", "limit": 100000, "grace_period_minutes": 5 })3.ObservabilityEvent(可观测事件)
class ObservabilityEvent(BaseModel): trace_id: str span_id: str request_id: str model_id: str input_hash: str # SHA256 of normalized input output_hash: str # SHA256 of normalized output latency_ms: float status_code: int error_message: Optional[str] = None # AI特有字段 input_tokens: int = 0 output_tokens: int = 0 is_truncated: bool = False提示:所有模型都加
Config类启用extra='forbid',禁止未知字段,避免前端传{"model": "qwen", "malicious_field": "rm -rf /"}导致漏洞。
4.3 步骤三:实现核心中间件(The Heartbeat of Gateway)
中间件是网关的脉搏。我们编写四个关键中间件:
1.AuthMiddleware(认证中间件)
from fastapi import Request, HTTPException, Depends from starlette.middleware.base import BaseHTTPMiddleware import redis class AuthMiddleware(BaseHTTPMiddleware): def __init__(self, app, redis_client: redis.Redis): super().__init__(app) self.redis = redis_client async def dispatch(self, request: Request, call_next): auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Bearer "): raise HTTPException(401, "Missing or invalid Authorization header") api_key = auth_header[7:] # 查询Redis缓存,key: api_key:{api_key} key_info = self.redis.hgetall(f"api_key:{api_key}") if not key_info: raise HTTPException(401, "Invalid API key") # 注入到request.state,供后续路由使用 request.state.api_key_info = key_info return await call_next(request)为什么用Redis查Key?
- 数据库查询延迟~50ms,Redis ~0.2ms,对QPS 1000+的网关,延迟差异致命;
- Redis Hash结构天然支持
HGETALL一次获取所有Key信息(配额、权限、状态)。
2.RateLimitMiddleware(Token限流中间件)
class RateLimitMiddleware(BaseHTTPMiddleware): def __init__(self, app, redis_client: redis.Redis): super().__init__(app) self.redis = redis_client async def dispatch(self, request: Request, call_next): if not hasattr(request.state, 'api_key_info'): return await call_next(request) key_info = request.state.api_key_info quota_type = key_info.get(b'quota_type', b'token_daily').decode() limit = int(key_info.get(b'quota_limit', b'100000')) # 构建Redis key: quota:{api_key}:{date} today = datetime.now().strftime("%Y%m%d") quota_key = f"quota:{request.state.api_key_info[b'id'].decode()}:{today}" # 原子操作:INCRBY,并检查是否超限 current = self.redis.incrby(quota_key, 0) # 获取当前值 if current >= limit: raise HTTPException( 429, f"Quota exceeded. Limit: {limit}, Used: {current}", headers={"Retry-After": "3600"} ) # 记录本次请求token数(需在后续获取) request.state.quota_key = quota_key return await call_next(request)关键技巧:INCRBY key 0是Redis原子获取当前值的惯用法,比GET+INCR两步更可靠。
3.TraceMiddleware(追踪中间件)
from opentelemetry import trace from opentelemetry.trace import SpanKind class TraceMiddleware(BaseHTTPMiddleware): def __init__(self, app): super().__init__(app) self.tracer = trace.get_tracer(__name__) async def dispatch(self, request: Request, call_next): # 生成TraceID和SpanID trace_id = str(uuid4()).replace("-", "") span_id = str(uuid4()).split("-")[0] # 创建Span with self.tracer.start_as_current_span( "gateway.request", kind=SpanKind.SERVER, context=trace.set_span_in_context(trace.SpanContext( trace_id=int(trace_id[:16], 16), span_id=int(span_id[:8], 16), is_remote=False )) ) as span: # 注入到request.state request.state.trace_id = trace_id request.state.span_id = span_id # 设置Span属性 span.set_attribute("http.method", request.method) span.set_attribute("http.url", str(request.url)) span.set_attribute("http.status_code", 0) # 待响应后设置 response = await call_next(request) # 响应后设置状态码 span.set_attribute("http.status_code", response.status_code) return response为什么不用OpenTelemetry自动Instrumentation?
- 自动插件无法捕获
input_tokens、model_id等AI特有属性; - 手动创建Span可精确控制Span生命周期,避免跨协程丢失上下文。
4.PIIMiddleware(敏感信息脱敏中间件)
from presidio_analyzer import AnalyzerEngine from presidio_anonymizer import AnonymizerEngine class PIIMiddleware(BaseHTTPMiddleware): def __init__(self, app, analyzer: AnalyzerEngine, anonymizer: AnonymizerEngine): super().__init__(app) self.analyzer = analyzer self.anonymizer = anonymizer async def dispatch(self, request: Request, call_next): # 只处理POST /v1/chat/completions if request.method == "POST" and "/v1/chat/completions" in str(request.url): body = await request.body() try: data = json.loads(body) # 分析messages.content for msg in data.get("messages", []): if "content" in msg: results = self.analyzer.analyze( text=msg["content"], entities=["PHONE_NUMBER", "PERSON", "EMAIL_ADDRESS", "US_SSN"], language="zh" ) if results: anonymized = self.anonymizer.anonymize( text=msg["content"], analyzer_results=results ) msg["content"] = anonymized.text # 重新构造请求体 new_body = json.dumps(data, ensure_ascii=False).encode() request._body = new_body except Exception as e: # 脱敏失败不阻断请求,只记录warn logger.warning(f"PII analysis failed: {e}") return await call_next(request)注意:Presidio需下载中文模型(
pip install presidio-analyzer[presidio_analyzer_spacy]),并初始化AnalyzerEngine时指定spacy_model_name="zh_core_web_sm"。
4.4 步骤四:构建模型注册与管理API(The Model Registry)
网关必须提供自助式模型管理能力。我们实现三个核心API:
1.POST /v1/models(注册模型)
@app.post("/v1/models") async def register_model( model: ModelEndpoint, db: Session = Depends(get_db), current_user: User = Depends(get_current_admin) ): # 校验endpoint_url可达性 try: async with httpx.AsyncClient() as client: resp = await client.get(f"{model.endpoint_url}/health") if resp.status_code != 200: raise HTTPException(400, "Model endpoint health check failed") except Exception as e: raise HTTPException(400, f"Health check failed: {e}") # 存入PostgreSQL db_model = DBModel(**model.dict()) db.add(db_model) db.commit() db.refresh(db_model) # 同步到Redis缓存 redis_client.hset(f"model:{model.id}", mapping={ "protocol": model.protocol, "endpoint_url": model.endpoint_url, "adapter_class": model.adapter_class, "status": model.status }) return {"status": "success", "model_id": model.id}关键校验:健康检查必须在注册时执行,避免“注册成功但无法调用”的尴尬。
2.GET /v1/models/{model_id}(获取模型详情)
@app.get("/v1/models/{model_id}") async def get_model( model_id: str, current_user: User = Depends(get_current_user) ): # 权限校验:用户是否有该模型的read权限 if not has_permission(current_user, model_id, "read"): raise HTTPException(403, "Forbidden") # 从Redis读取(毫秒级) model_info = redis_client.hgetall(f"model:{model_id}") if not model_info: raise HTTPException(404, "Model not found") return { "id": model_id, "protocol": model_info[b'protocol'].decode(), "endpoint_url": model_info[b'endpoint_url'].decode(), "status": model_info[b'status'].decode() }3.PATCH /v1/models/{model_id}/status(启停模型)
@app.patch("/v1/models/{model_id}/status") async def update_model_status( model_id: str, status: str = Body(..., embed=True), # {"status": "inactive"} current_user: User = Depends(get_current_admin) ): # 更新Redis redis_client.hset(f"model:{model_id}", "status", status) # 更新PostgreSQL db.query(DBModel).filter(DBModel.id == model_id).update({"status": status}) db.commit() # 发布事件,通知所有网关实例刷新缓存 redis_client.publish("model:status:update", json