news 2026/4/15 18:00:03

【Python大语言模型系列】triton通过动态批处理减少调用开销提高吞吐量(案例分析)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【Python大语言模型系列】triton通过动态批处理减少调用开销提高吞吐量(案例分析)

这是我的第447篇原创文章。

一、引言

我们创建一个加法模型,它可以同时处理多个推理请求,每个请求包含两个数字。Triton 会将多个客户端请求合并成一个 batch。

吞吐量,是指在一次性能测试过程中网络上传输的数据量的总和。(承压能力)

系统的吞吐量与request对CPU的消耗、外部接口、IO等等紧密关联。单个request 对CPU消耗越高,外部系统接口、IO速度越慢,系统吞吐能力越低,反之越高。系统吞吐量几个重要参数:QPS(TPS)、并发数、响应时间。

  • 吞吐率QPS(TPS):特指 Web 服务器单位时间内处理的请求数。(描述其并发处理能力)

  • 响应时间(RT):执行一个请求从开始到最后收到响应数据所花费的总体时间,即从客户端发起请求到收到服务器响应结果的时间。响应时间RT(Response-time),是一个系统最重要的指标之一,它的数值大小直接反应了系统的快慢。

  • 并发数:是指系统同时能处理的请求数量,这个也是反应了系统的负载能力。

理解了上面三个要素的意义之后,就能推算出它们之间的关系:

QPS=并发数/响应时间


场景对比分析

响应时间

所需最小并发数

系统行为

发送TPS

有效TPS

5秒20 × 5 = 100

100个worker同时工作

精确20

约20

0.2秒20 × 0.2 = 4

仅需4个worker即可

精确20

约20

0.05秒20 × 0.05 = 1

1个worker就能跟上

精确20

约20

单从定义来看,吞吐率描述了服务器在实际运行期间单位时间内处理的请求数,然而,我们更加关心的是服务器并发处理能力的上限,也就是单位时间内服务器能够处理的最大请求数,即最大吞吐率。

所以我们普遍使用 “压力测试” 的方法,通过模拟足够多数目的并发用户,分别持续发送一定的 HTTP 请求,并统计测试持续的总时间,计算出基于这种 “压力” 下的吞吐率,即为一个平均计算值。

二、实现过程

2.1 模型配置文件config.pbtxt

代码:

name: "batch_add_model" backend: "python" max_batch_size: 4 # 最多合并4个请求 input [ { name: "INPUT_A" data_type: TYPE_FP32 dims: [1] # 每个请求包含1个数字 }, { name: "INPUT_B" data_type: TYPE_FP32 dims: [1] } ] output [ { name: "OUTPUT_SUM" data_type: TYPE_FP32 dims: [1] } ] dynamic_batching { preferred_batch_size: [2, 4] # 优先组成2或4的batch max_queue_delay_microseconds: 500 }

2.2 Python 后端模型model.py

代码:

import triton_python_backend_utils as pb_utils import numpy as np class TritonPythonModel: def initialize(self, args): print("模型初始化完成") def execute(self, requests): print(f"\n=== 收到新的 Batch,包含 {len(requests)} 个请求 ===") responses = [] # 遍历每个请求 for i, request in enumerate(requests): print(f"\n--- 处理 Batch 中的第 {i+1} 个请求 ---") # 获取输入数据 input_a = pb_utils.get_input_tensor_by_name(request, "INPUT_A") input_b = pb_utils.get_input_tensor_by_name(request, "INPUT_B") # 转换为 numpy 并查看具体内容 a_values = input_a.as_numpy() b_values = input_b.as_numpy() print(f"INPUT_A 值: {a_values} (形状: {a_values.shape})") print(f"INPUT_B 值: {b_values} (形状: {b_values.shape})") # 执行计算 sum_result = a_values + b_values print(f"计算结果: {sum_result}") # 创建输出张量 output_tensor = pb_utils.Tensor("OUTPUT_SUM", sum_result.astype(np.float32)) # 创建响应 response = pb_utils.InferenceResponse(output_tensors=[output_tensor]) responses.append(response) print(f"\n=== 返回 {len(responses)} 个响应 ===\n") return responses def finalize(self): print("模型卸载")

2.3 客户端并发请求代码client.py

代码:

from tritonclient.http import InferenceServerClient, InferInput import numpy as np from concurrent.futures import ThreadPoolExecutor import time def send_request(client, a, b, request_id): """发送单个推理请求""" # 准备输入数据 input_a = InferInput("INPUT_A", [1], "FP32") input_b = InferInput("INPUT_B", [1], "FP32") input_a.set_data_from_numpy(np.array([a], dtype=np.float32)) input_b.set_data_from_numpy(np.array([b], dtype=np.float32)) print(f"发送请求 {request_id}: {a} + {b}") # 发送请求 response = client.infer( model_name="batch_add_model", inputs=[input_a, input_b], request_id=str(request_id) ) # 获取结果 result = response.as_numpy("OUTPUT_SUM") print(f"收到响应 {request_id}: {a} + {b} = {result[0]}") return result def main(): # 连接到 Triton 服务器 client = InferenceServerClient(url="localhost:8000") # 测试数据:3组数字 test_data = [ (10.5, 20.3), # 请求1 (15.0, 25.0), # 请求2 (30.0, 40.0) # 请求3 ] print("=== 场景 1:串行发送(间隔大)===") for i, (a, b) in enumerate(test_data): send_request(client, a, b, f"sync_{i}") time.sleep(0.01) # 等待10ms,让Triton有时间处理 print("\n" + "="*50 + "\n") print("=== 场景 2:并发发送(会触发批处理)===") # 使用线程池同时发送3个请求 with ThreadPoolExecutor(max_workers=3) as executor: futures = [] for i, (a, b) in enumerate(test_data): future = executor.submit(send_request, client, a, b, f"async_{i}") futures.append(future) # 等待所有请求完成 for future in futures: future.result() if __name__ == "__main__": main()

2.4 启动 Triton 服务器

代码:

# 模型目录结构 model_repository/ └── batch_add_model/ ├── config.pbtxt └── 1/ └── model.py # 启动服务器 docker run --rm -p 8000:8000 -v $(pwd)/model_repository:/models \ nvcr.io/nvidia/tritonserver:25.11-py3 tritonserver --model-repository=/models

2.5 客户端输出(场景 1 - 串行发送)

结果:

=== 场景 1:串行发送(间隔大)=== 发送请求 sync_0: 10.5 + 20.3 收到响应 sync_0: 10.5 + 20.3 = 30.8 发送请求 sync_1: 15.0 + 25.0 收到响应 sync_1: 15.0 + 25.0 = 40.0 发送请求 sync_2: 30.0 + 40.0 收到响应 sync_2: 30.0 + 40.0 = 70.0

2.6 Triton 服务器日志(场景 1 - 串行发送)

结果:

=== 收到新的 Batch,包含 1 个请求 === --- 处理 Batch 中的第 1 个请求 --- INPUT_A 值: [10.5] (形状: (1,)) INPUT_B 值: [20.3] (形状: (1,)) 计算结果: [30.8] === 返回 1 个响应 === === 收到新的 Batch,包含 1 个请求 === --- 处理 Batch 中的第 1 个请求 --- INPUT_A 值: [15.] (形状: (1,)) INPUT_B 值: [25.] (形状: (1,)) 计算结果: [40.] === 返回 1 个响应 === === 收到新的 Batch,包含 1 个请求 === --- 处理 Batch 中的第 1 个请求 --- INPUT_A 值: [30.] (形状: (1,)) INPUT_B 值: [40.] (形状: (1,)) 计算结果: [70.] === 返回 1 个响应 === 注意:每个请求被单独处理,len(requests) 始终为 1

2.7 客户端输出(场景 2 - 并发)

结果:

=== 场景 2:并发发送(会触发批处理)=== 发送请求 async_0: 10.5 + 20.3 发送请求 async_1: 15.0 + 25.0 发送请求 async_2: 30.0 + 40.0 收到响应 async_0: 10.5 + 20.3 = 30.8 收到响应 async_1: 15.0 + 25.0 = 40.0 收到响应 async_2: 30.0 + 40.0 = 70.0

2.8 Triton 服务器日志(场景 2 - 并发)

结果:

=== 收到新的 Batch,包含 3 个请求 === --- 处理 Batch 中的第 1 个请求 --- INPUT_A 值: [10.5] (形状: (1,)) INPUT_B 值: [20.3] (形状: (1,)) 计算结果: [30.8] --- 处理 Batch 中的第 2 个请求 --- INPUT_A 值: [15.] (形状: (1,)) INPUT_B 值: [25.] (形状: (1,)) 计算结果: [40.] --- 处理 Batch 中的第 3 个请求 --- INPUT_A 值: [30.] (形状: (1,)) INPUT_B 值: [40.] (形状: (1,)) 计算结果: [70.] === 返回 3 个响应 ===

关键观察:

  • len(requests)变为3,代表 3 个客户端请求被合并

  • 但每个请求的数据仍然是独立的[1]形状

  • Triton 自动完成了请求的聚合与响应的拆分

2.9 性能对比

场景

服务器调用次数

总耗时(约)

吞吐量

串行发送

3 次

30ms

并发发送

1 次

12ms

高 2.5x

这个例子清晰地展示了 Triton 动态批处理的价值:减少调用开销,提升 GPU 利用率

技术层面

开销项

串行(3次)

并发(1次)

节省

网络通信

3 次 HTTP 往返

1 次 batch HTTP

2x

Python 调用

3 次execute()

1 次execute()

2x

GPU 启动

3 次 kernel 启动

1 次 batched kernel

2x

日志打印

3 次 I/O

1 次 I/O

2x

这些固定开销的减少,直接转化为吞吐量提升。

作者简介:

读研期间发表6篇SCI数据挖掘相关论文,现在某研究院从事数据算法相关科研工作,结合自身科研实践经历不定期分享关于Python、机器学习、深度学习、人工智能系列基础知识与应用案例。致力于只做原创,以最简单的方式理解和学习,关注我一起交流成长。需要数据集和源码的小伙伴可以关注底部公众号添加作者微信。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/15 15:06:02

南大通用GBase 8s 内部用户创建及使用方法介绍

本文将详细介绍如何在 GBase 8s 中创建普通用户,并展示如何为这些用户赋权以及如何使用这些用户连接数据库。通过本文你将能够顺利地完成用户创建、赋权和连接数据库的全过程。探讨Gbase8S创建普通用户方法,直接执行 create user tmp_u001 with password…

作者头像 李华
网站建设 2026/4/15 13:12:12

GPUSTACK在深度学习训练中的实战应用

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 开发一个基于GPUSTACK的深度学习训练平台,支持多GPU并行训练和自动资源分配。平台应包含TensorFlow/PyTorch集成、训练进度监控和性能分析工具。实现自动扩展GPU资源功…

作者头像 李华
网站建设 2026/4/15 13:12:37

CentOS 7.9零基础入门:从安装到基本运维

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 创建一个CentOS 7.9新手学习环境,包含:1. 交互式安装引导 2. 常用命令练习场景 3. 基础服务(SSH/FTP)配置教程 4. 系统管理任务模拟 5. 实时帮助文档。要求…

作者头像 李华
网站建设 2026/4/15 12:53:40

对比传统开发:快马让STM32项目效率提升300%

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 请生成一个完整的STM32F407VG的USB HID设备项目,实现通过USB接口与PC通信,能够接收PC端发送的数据并控制开发板上的LED。要求:1) USB设备初始化…

作者头像 李华
网站建设 2026/4/15 13:15:27

AI助力Vue拖拽组件开发:vue-draggable-next实战

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 请使用vue-draggable-next库创建一个可拖拽排序的Vue3组件。要求:1. 支持水平/垂直两种拖拽模式切换 2. 包含10个默认项目 3. 每个项目显示序号和内容 4. 实现拖拽结束…

作者头像 李华
网站建设 2026/4/15 13:14:47

新质生产力政府关注度(2002-2025)

2005新质生产力政府关注度(2002-2025)数据简介本研究整理了2002至2025年间地级市政府与省级政府工作报告中关于新质生产力的文本内容,旨在为研究新质生产力的发展特征和趋势提供数据支持。通过分析相关关键词的词频,揭示地方政府在…

作者头像 李华