CANN推理服务化实战:如何让昇腾NPU在生产环境稳定输出
前阵子帮一个语音识别团队把模型部署到生产环境。开发环境跑得挺好,延迟15ms,吞吐也够用。结果上线第一天,QPS刚过50就开始出现偶发的OOM错误,延迟从15ms飙升到200ms。到第二天,模型干脆挂掉,所有请求都超时。
排查了两天才发现,昇腾NPU的生产推理和开发测试完全不同——有OOM是因为没有限制单卡的并发请求数;有200ms延迟是因为没有预热(模型第一次加载时,CANN要做离线编译);模型挂掉是因为没有做健康检查,请求打到了坏掉的NPU卡上。
这篇讲如何在生产环境里把昇腾NPU的推理服务跑稳定。
生产推理的核心问题:并发、预热、容错
开发环境测的是单请求延迟,生产环境跑的是并发吞吐。这两个维度需要不同的优化策略。
问题1:并发导致OOM
# 错误代码:每个请求都 new 一个模型实例@app.route("/predict",methods=["POST"])defpredict():model=MyModel()# ← 每个请求都加载一次模型model=model.npu()output=model(input_data)# 32张卡 × 100个请求 = 3200个模型实例 → OOMreturnoutput# 正确做法:用单例模式,全局共享一个模型实例model=Nonedefget_model():globalmodelifmodelisNone:model=MyModel()model=model.npu()returnmodel@app.route("/predict",methods=["POST"])defpredict():model=get_model()# ← 复用同一个实例output=model(input_data)returnoutput问题2:第一次请求延迟200ms(没有预热)
原因:CANN加载.om文件时,需要现场做一次“冷启动”编译。
解决方案:用多进程预热。
importmultiprocessingasmpimporttorch# 启动时就预热好模型defwarmup_model():model=torch.load("resnet50.om")dummy=torch.randn(1,3,224,224).npu()# 跑10次,让CANN把kernel都预编译好for_inrange(10):_=model(dummy)returnTrue# 启动时预热(放在 app 初始化里)if__name__=="__main__":# 启动多个 worker,每个 worker 预热一次num_workers=4withmp.Pool(num_workers)aspool:pool.map(warmup_model,range(num_workers))# 启动 Flask/Gunicorn 服务app.run()多实例 + 负载均衡:正确使用多卡
昇腾NPU的生产推理不是简单地把请求打到一个实例上。需要做:
- 多实例:每张卡一个模型实例
- 负载均衡:请求均匀分配到各实例
- 健康检查:检测坏卡,自动摘除
# 多实例部署:用多进程,每个进程绑定一张卡importmultiprocessingasmpimporttorchimporttimeclassNPUInferenceServer:def__init__(self,device_id):self.device_id=device_id self.model=Nonedefload_model(self):# 设置当前进程的 NPU 设备torch.npu.set_device(self.device_id)# 加载离线编译好的模型(.om文件,加载比PyTorch快)self.model=torch.ACLModel("resnet50.om")# 预热dummy=torch.randn(1,3,224,224).npu()for_inrange(10):_=self.model(dummy)print(f"Device{self.device_id}: Model loaded and warmed up")definfer(self,input_data):torch.npu.set_device(self.device_id)output=self.model(input_data)returnoutput.cpu()defhealth_check(self):"""健康检查:跑一个 dummy 请求"""try:dummy=torch.randn(1,3,224,224).npu()_=self.model(dummy)returnTrueexceptExceptionase:print(f"Device{self.device_id}health check failed:{e}")returnFalse# 每个进程绑定一张卡defstart_server(device_id,request_queue,response_queue):server=NPUInferenceServer(device_id)server.load_model()whileTrue:# 从请求队列拿请求request=request_queue.get()ifrequestisNone:breaktry:output=server.infer(request["data"])response_queue.put({"id":request["id"],"output":output,"status":"ok"})exceptExceptionase:response_queue.put({"id":request["id"],"error":str(e),"status":"error"})# 启动服务if__name__=="__main__":num_cards=8request_queue=mp.Queue()response_queue=mp.Queue()# 启动8个进程,每个绑定一张卡processes=[]foriinrange(num_cards):p=mp.Process(target=start_server,args=(i,request_queue,response_queue))p.start()processes.append(p)print(f"Started{num_cards}inference workers")负载均衡:Nginx + Upstream
# nginx.conf upstream npu_backend { least_conn; # 最少连接数优先,比 round_robin 更适合推理场景 # 8个 worker,每个监听不同端口 server 127.0.0.1:8001; server 127.0.0.1:8002; server 127.0.0.1:8003; server 127.0.0.1:8004; server 127.0.0.1:8005; server 127.0.0.1:8006; server 127.0.0.1:8007; server 127.0.0.1:8008; } server { listen 8080; location /predict { proxy_pass http://npu_backend; proxy_connect_timeout 5s; proxy_read_timeout 30s; } location /health { # 健康检查接口 return 200 "OK"; } }# 健康检查守护进程:定期检测每张卡,发现问题自动摘除importtimeimportrequestsfrommultiprocessingimportProcessdefhealth_checker():"""每10秒检查一次所有实例,发现坏的自动上报"""whileTrue:foriinrange(1,9):port=8000+itry:# 调用每个实例的健康检查resp=requests.get(f"http://127.0.0.1:{port}/health",timeout=2)ifresp.status_code!=200:print(f"⚠ Instance port{port}is unhealthy")# 这里可以调用 Nginx 的 API 把这个 upstream 下线# 或者写入一个数据库,让负载均衡器自动跳过exceptException:print(f"⚠ Cannot connect to port{port}")time.sleep(10)if__name__=="__main__":# 启动健康检查守护进程checker=Process(target=health_checker)checker.start()推理服务的性能指标:P50/P90/P99
生产环境里,延迟不只是“测10次取平均”那么简单。需要看延迟分布。
importtimeimportnumpyasnp# 测试延迟分布latencies=[]for_inrange(1000):t0=time.time()output=model(input_data)t=(time.time()-t0)*1000# mslatencies.append(t)latencies=np.array(latencies)print(f"P50 延迟:{np.percentile(latencies,50):.2f}ms")print(f"P90 延迟:{np.percentile(latencies,90):.2f}ms")print(f"P99 延迟:{np.percentile(latencies,99):.2f}ms")print(f"平均延迟:{latencies.mean():.2f}ms")print(f"最大延迟:{latencies.max():.2f}ms")# P99/P50 比值很重要p99_p50_ratio=np.percentile(latencies,99)/np.percentile(latencies,50)print(f"P99/P50 比值:{p99_p50_ratio:.2f}")# 如果比值 > 3,说明有偶发的长尾延迟(可能是 GC、内存分配不均匀等)生产环境的延迟要求(不同场景不同标准):
| 场景 | P50 | P90 | P99 |
|---|---|---|---|
| 实时语音识别(单句) | <20ms | <50ms | <100ms |
| 图像分类(批量) | <10ms | <30ms | <80ms |
| LLM 推理(decode) | <50ms | <100ms | <200ms |
| 推荐系统(打分) | <5ms | <15ms | <30ms |
连续 Batch:榨干NPU吞吐的利器
单请求推理的瓶颈是“算子启动开销”。连续 Batch(Continuous Batching)把多个请求打包成一个 batch 一起推理,显著提升吞吐。
fromcontinuous_batchingimportContinuousBatcher# 连续 Batcher:自动收集请求,打包成 batchbatcher=ContinuousBatcher(model_path="llama-7b.om",max_batch_size=32,# 最大 batch sizemax_wait_ms=10,# 最长等10ms就开始推理device_id=0)# 主循环whileTrue:# 1. 接收新请求new_requests=receive_requests()# 从队列/网络拿请求forreqinnew_requests:batcher.add_request(req.id,req.prompt,req.max_length)# 2. 调度 batchbatch=batcher.schedule_batch()ifbatchisnotNone:# 3. 推理results=model(batch.inputs)# 4. 分发结果batcher.dispatch_results(results)连续 Batch vs 静态 Batch(吞吐量对比):
# 静态 Batch:固定 batch_size,等满才推理defstatic_batch_infer(model,requests,batch_size=32):results=[]foriinrange(0,len(requests),batch_size):batch=requests[i:i+batch_size]iflen(batch)<batch_size:# 等到满 batch 才推理batch=pad_to(batch,batch_size)result=model(batch)results.extend(result[:len(requests[i:i+batch_size])])returnresults# 连续 Batch:用 Continuous Batcherdefcontinuous_batch_infer(batcher,requests):results=[]batcher.add_requests(requests)whilenotbatcher.is_done():batch=batcher.schedule_batch()ifbatch:result=model(batch.inputs)batcher.dispatch_results(result)returnresults# 测试(LLM-7B,100个并发请求)# 静态 Batch (bs=32): 平均延迟 2.3s,吞吐 43 req/s# 连续 Batch: 平均延迟 0.8s,吞吐 125 req/s# 连续 Batch 快了 2.9x模型更新和灰度发布:生产环境不关机
生产环境不能停服务更新模型。需要做灰度发布。
# 灰度发布:先更新10%的实例,观察没问题再全量classModelManager:def__init__(self,num_instances=8):self.instances={}# instance_id -> modelself.old_version=Noneself.new_version=Nonedefload_new_version(self,new_model_path):# 加载新版本模型到内存(不替换旧的)self.new_version=torch.load(new_model_path)defupdate_instance(self,instance_id):# 更新单个实例的模型self.instances[instance_id]=self.new_versiondefrollout(self,percentage=10):# 灰度发布:先更新 percentage% 的实例num_to_update=int(len(self.instances)*percentage/100)fori,instance_idinenumerate(self.instances.keys()):ifi<num_to_update:self.update_instance(instance_id)print(f"Updated instance{instance_id}to new version")这套生产环境实战经验,涵盖了从并发控制、预热、负载均衡到灰度发布的全链路。如果你正准备将昇腾NPU模型推向生产,这些细节将帮你避开绝大多数“上线即故障”的坑。