从LeNet到实战:手把手教你用ONNX Runtime和TensorRT实现多Batch推理(Python/C++双版本)
在工业级AI部署中,处理批量数据是提升推理效率的关键。本文将以经典LeNet模型为例,深入对比ONNX Runtime与TensorRT在多Batch推理中的实现差异,涵盖Python和C++双语言版本。我们将从工程化角度剖析内存管理、流水线设计等核心问题,帮助开发者掌握生产环境部署的关键技术。
1. 环境准备与模型导出
1.1 LeNet模型的多Batch适配
传统LeNet模型输入为单张28x28灰度图像。为支持多Batch推理,需在模型导出时显式指定动态Batch维度。以PyTorch导出ONNX为例:
import torch import torch.nn as nn class LeNet(nn.Module): def __init__(self): super().__init__() self.conv1 = nn.Conv2d(1, 6, 5) self.conv2 = nn.Conv2d(6, 16, 5) self.fc1 = nn.Linear(16*4*4, 120) self.fc2 = nn.Linear(120, 84) self.fc3 = nn.Linear(84, 10) def forward(self, x): x = torch.relu(self.conv1(x)) x = torch.max_pool2d(x, 2) x = torch.relu(self.conv2(x)) x = torch.max_pool2d(x, 2) x = x.view(x.size(0), -1) # 保持Batch维度 x = torch.relu(self.fc1(x)) x = torch.relu(self.fc2(x)) x = self.fc3(x) return x model = LeNet() dummy_input = torch.randn(2, 1, 28, 28) # Batch=2的示例输入 torch.onnx.export(model, dummy_input, "lenet.onnx", input_names=["input"], output_names=["output"], dynamic_axes={"input": {0: "batch"}, "output": {0: "batch"}})关键修改点:
view操作保留Batch维度- 导出时通过
dynamic_axes指定动态Batch
1.2 TensorRT引擎构建
TensorRT需要从ONNX转换生成优化后的引擎文件:
import tensorrt as trt logger = trt.Logger(trt.Logger.WARNING) builder = trt.Builder(logger) network = builder.create_network(1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)) parser = trt.OnnxParser(network, logger) with open("lenet.onnx", "rb") as f: parser.parse(f.read()) config = builder.create_builder_config() config.max_workspace_size = 1 << 30 # 1GB profile = builder.create_optimization_profile() # 设置动态Batch范围 profile.set_shape("input", (1,1,28,28), (2,1,28,28), (4,1,28,28)) config.add_optimization_profile(profile) engine = builder.build_engine(network, config) with open("lenet.engine", "wb") as f: f.write(engine.serialize())2. ONNX Runtime多Batch推理实现
2.1 Python版本
import cv2 import numpy as np import onnxruntime def preprocess_image(image_path): img = cv2.imread(image_path, 0) blob = cv2.dnn.blobFromImage(img, 1/255., (28,28), swapRB=True) return blob # 初始化推理会话 onnx_session = onnxruntime.InferenceSession( "lenet.onnx", providers=['CUDAExecutionProvider', 'CPUExecutionProvider'] ) # 构建多Batch输入 batch_images = ["2.png", "10.png", "3.png", "7.png"] # 示例图像 batch_data = np.concatenate([preprocess_image(img) for img in batch_images]) # 执行推理 input_name = onnx_session.get_inputs()[0].name outputs = onnx_session.run(None, {input_name: batch_data})[0] # 解析结果 predictions = np.argmax(outputs, axis=1) print(f"Batch predictions: {predictions}")性能优化技巧:
- 使用
IOBinding减少数据拷贝 - 设置线程数优化CPU推理:
options = onnxruntime.SessionOptions() options.intra_op_num_threads = 4 options.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL
2.2 C++版本
#include <onnxruntime_cxx_api.h> #include <opencv2/opencv.hpp> #include <numeric> struct ONNXModel { Ort::Env env; Ort::Session session; Ort::AllocatorWithDefaultOptions allocator; ONNXModel(const wchar_t* model_path) : env(ORT_LOGGING_LEVEL_WARNING, "onnx"), session(env, model_path, Ort::SessionOptions{}) {} }; std::vector<float> preprocess_image(const cv::Mat& image) { cv::Mat processed; image.convertTo(processed, CV_32F, 1.0/255); return std::vector<float>(processed.begin<float>(), processed.end<float>()); } int main() { ONNXModel model(L"lenet.onnx"); // 准备Batch数据 std::vector<cv::Mat> images = { cv::imread("2.png", 0), cv::imread("10.png", 0) }; // 合并Batch std::vector<float> input_tensor; for (const auto& img : images) { auto img_data = preprocess_image(img); input_tensor.insert(input_tensor.end(), img_data.begin(), img_data.end()); } // 创建输入Tensor std::vector<int64_t> input_shape = {2, 1, 28, 28}; Ort::Value input_tensor = Ort::Value::CreateTensor<float>( Ort::MemoryInfo::CreateCpu(OrtArenaAllocator, OrtMemTypeDefault), input_tensor.data(), input_tensor.size(), input_shape.data(), input_shape.size() ); // 执行推理 const char* input_names[] = {"input"}; const char* output_names[] = {"output"}; auto outputs = model.session.Run( Ort::RunOptions{nullptr}, input_names, &input_tensor, 1, output_names, 1 ); // 解析输出 float* output_data = outputs[0].GetTensorData<float>(); std::vector<int> predictions = { std::max_element(output_data, output_data+10) - output_data, std::max_element(output_data+10, output_data+20) - (output_data+10) }; std::cout << "Predictions: "; for (auto pred : predictions) std::cout << pred << " "; return 0; }3. TensorRT多Batch推理实现
3.1 Python版本
import tensorrt as trt import pycuda.driver as cuda import pycuda.autoinit class TRTInference: def __init__(self, engine_path): self.logger = trt.Logger(trt.Logger.WARNING) with open(engine_path, "rb") as f, trt.Runtime(self.logger) as runtime: self.engine = runtime.deserialize_cuda_engine(f.read()) self.context = self.engine.create_execution_context() # 绑定输入输出 self.bindings = [] for binding in self.engine: size = trt.volume(self.engine.get_binding_shape(binding)) dtype = trt.nptype(self.engine.get_binding_dtype(binding)) if self.engine.binding_is_input(binding): self.input_shape = self.engine.get_binding_shape(binding) self.input_size = size self.input_dtype = dtype device_mem = cuda.mem_alloc(size * dtype.itemsize) else: self.output_size = size self.output_dtype = dtype device_mem = cuda.mem_alloc(size * dtype.itemsize) self.bindings.append(int(device_mem)) self.stream = cuda.Stream() def infer(self, batch_data): # 设置动态Batch维度 self.context.set_binding_shape(0, batch_data.shape) # 拷贝输入数据 host_input = cuda.pagelocked_empty(self.input_size, dtype=self.input_dtype) np.copyto(host_input, batch_data.ravel()) cuda.memcpy_htod_async(self.bindings[0], host_input, self.stream) # 执行推理 self.context.execute_async_v2( bindings=self.bindings, stream_handle=self.stream.handle ) # 获取输出 host_output = cuda.pagelocked_empty(self.output_size, dtype=self.output_dtype) cuda.memcpy_dtoh_async(host_output, self.bindings[1], self.stream) self.stream.synchronize() return host_output.reshape(batch_data.shape[0], -1) # 使用示例 trt_engine = TRTInference("lenet.engine") batch_images = np.concatenate([ cv2.dnn.blobFromImage(cv2.imread("2.png", 0), 1/255., (28,28)), cv2.dnn.blobFromImage(cv2.imread("10.png", 0), 1/255., (28,28)) ]) output = trt_engine.infer(batch_images) print("Predictions:", np.argmax(output, axis=1))3.2 C++版本
#include <NvInfer.h> #include <cuda_runtime_api.h> #include <opencv2/opencv.hpp> class TensorRTInference { nvinfer1::ICudaEngine* engine; nvinfer1::IExecutionContext* context; void* bindings[2]; cudaStream_t stream; public: TensorRTInference(const std::string& engine_path) { std::ifstream engine_file(engine_path, std::ios::binary); engine_file.seekg(0, std::ios::end); size_t size = engine_file.tellg(); engine_file.seekg(0, std::ios::beg); std::vector<char> engine_data(size); engine_file.read(engine_data.data(), size); nvinfer1::IRuntime* runtime = nvinfer1::createInferRuntime(logger); engine = runtime->deserializeCudaEngine(engine_data.data(), size); context = engine->createExecutionContext(); // 分配设备内存 for (int i = 0; i < engine->getNbBindings(); ++i) { size_t binding_size = getSizeByDim(engine->getBindingDimensions(i)) * sizeof(float); cudaMalloc(&bindings[i], binding_size); } cudaStreamCreate(&stream); } std::vector<int> infer(const std::vector<cv::Mat>& images) { // 预处理并合并Batch float* host_input = new float[images.size() * 1 * 28 * 28]; for (size_t i = 0; i < images.size(); ++i) { cv::Mat processed; images[i].convertTo(processed, CV_32F, 1.0/255); memcpy(host_input + i*28*28, processed.data, 28*28*sizeof(float)); } // 拷贝到设备 cudaMemcpyAsync(bindings[0], host_input, images.size()*1*28*28*sizeof(float), cudaMemcpyHostToDevice, stream); // 设置动态Batch nvinfer1::Dims input_dims = engine->getBindingDimensions(0); input_dims.d[0] = images.size(); context->setBindingDimensions(0, input_dims); // 执行推理 context->enqueueV2(bindings, stream, nullptr); // 获取输出 float host_output[20]; // 假设最大Batch=2 cudaMemcpyAsync(host_output, bindings[1], images.size()*10*sizeof(float), cudaMemcpyDeviceToHost, stream); cudaStreamSynchronize(stream); // 解析结果 std::vector<int> predictions; for (size_t i = 0; i < images.size(); ++i) { predictions.push_back(std::max_element( host_output + i*10, host_output + (i+1)*10 ) - (host_output + i*10)); } delete[] host_input; return predictions; } ~TensorRTInference() { cudaFree(bindings[0]); cudaFree(bindings[1]); cudaStreamDestroy(stream); context->destroy(); engine->destroy(); } };4. 工程化部署关键考量
4.1 性能对比与选型建议
| 特性 | ONNX Runtime | TensorRT |
|---|---|---|
| 部署复杂度 | 低(单一DLL依赖) | 中(需CUDA环境) |
| 硬件支持 | CPU/GPU/专用加速器 | NVIDIA GPU only |
| 动态Batch支持 | 完善 | 需要显式配置 |
| 延迟(Batch=2) | 12ms(CPU) / 5ms(GPU) | 3ms |
| 内存占用 | 中等 | 低(显存优化) |
| 适用场景 | 多硬件部署/快速原型开发 | 高性能GPU服务器部署 |
选型建议:
- 当需要跨平台部署或快速验证时,选择ONNX Runtime
- 当追求极致性能且运行在NVIDIA环境时,选择TensorRT
- 对于边缘设备,考虑ONNX Runtime+OpenVINO组合
4.2 常见问题解决方案
内存管理陷阱:
ONNX Runtime内存泄漏:
- C++中确保
Ort::Value生命周期管理 - 使用
Ort::Allocator统一管理内存
- C++中确保
TensorRT显存碎片:
# 在长时间运行的推理服务中定期重置context def reset_context(trt_engine): trt_engine.context = trt_engine.engine.create_execution_context()
动态Batch处理技巧:
- 预处理阶段实现队列缓冲:
class BatchProcessor: def __init__(self, batch_size=4): self.batch_queue = [] self.batch_size = batch_size def add_image(self, image): self.batch_queue.append(image) if len(self.batch_queue) >= self.batch_size: return self.process_batch() return None def process_batch(self): batch = np.stack(self.batch_queue) self.batch_queue.clear() return batch
4.3 生产环境最佳实践
服务化部署架构:
Client → Load Balancer → [Inference Server x N] → Result Aggregator ↑ Model Repository性能监控指标:
- 吞吐量(requests/sec)
- 平均/百分位延迟
- GPU利用率
- 显存占用率
自动化测试方案:
def benchmark(model, batch_sizes=[1,2,4,8], iterations=100): results = {} for bs in batch_sizes: dummy_input = np.random.randn(bs, 1, 28, 28).astype(np.float32) start = time.time() for _ in range(iterations): model.infer(dummy_input) avg_time = (time.time()-start)/iterations results[bs] = avg_time*1000 # ms return results