简介
本文详细介绍了LLM推理中的三种并行计算方法:数据并行通过在多设备上复制模型并并行处理不同批次数据提升速度;模型并行将模型拆分到多设备上解决单设备显存不足问题;流水线并行通过微批次调度实现GPU并行计算提高利用率。文章对比分析了三者在显存占用、吞吐量和性能上的权衡,指出需根据模型规模和硬件限制选择合适策略,组合使用能进一步释放扩展潜力。
本文将分析数据并行(Data Parallelism)、模型并行(Model Parallelism)与流水线并行(Pipeline Parallelism)在推理引擎中的实现方式,并讨论它们对显存占用、吞吐量及整体性能权衡的影响。
在数据并行(Data Parallel)范式中,我们将数据集划分到多个计算设备上,而每个设备均保留一份完整的模型副本。当模型规模能够轻松装入单个设备显存时,该方法尤为高效。通过并行处理不同批次的数据,理论上可将推理速度提升为可用设备数量的倍数。然而,当模型体积过大以至于无法完整放入单个 GPU 时,这一策略便不再可行。
Figure 1: Data Parallel Diagram(credits : eraser.io)
图 1:数据并行示意图(来源:eraser.io)
在多设备环境下,数据集需均匀划分。例如,若共有 100 条数据并使用 2 张 GPU,理想策略是按 50/50 比例切分。为保证随机性并避免偏差,通常先打乱索引,再将其平均分配到各 GPU。下面给出具体实现。
#Dataset file : dataset.pyimport torchfrom random import Randomimport torch.distributed as distfrom torch.utils.data import DataLoaderclassPartition(): #Standard pytorch dataset class(containing len and getitem methods) def__init__(self, data, index): self.data = data #Entire Dataset(list) self.index = index #Indices(list) : Different for each device def__len__(self): returnlen(self.index) #Partition represents the chunk of data(not entire data) def__getitem__(self, index): data_idx = self.index[index] returnself.data[data_idx]classDataPartitioner(): def__init__(self, data, sizes=[0.5, 0.5], seed=1234): self.data = data self.partitions = partitions rng = Random() rng.seed(seed) #Get the length of the entire dataset data_len = len(data) indices = list(range(data_len)) #Shuffle the indices rng.shuffle(indices) #Partition the indices for the devices start_idx = 0 for size in sizes: part_len = int(size * data_len) self.partitions.append(indices[start_idx:start_idx + part_len]) start_idx += part_len defuse(self, partition): return Partition(self.data, self.partitions[partition]) #Dataset, List of Indices defpartition_dataset(rank, world_size, dataset, batch_size=128, collate_fn=None): partitioned_batch_size = batch_size // world_size sizes = [1/ world_size for _ inrange(world_size)] #world_size : The number of devices partitioner = DataPartitioner(dataset, sizes=sizes) partition = partitioner.use(rank) #Wrap this in a dataloader dataloader = DataLoader( partition, batch_size=partitioned_batch_size, collate_fn=collate_fn ) return dataloader逻辑清晰,对吧?
若对world_size与rank的含义存疑,可简单理解为:
- •
world_size表示参与训练的设备总数(如 GPU 数量)。 - •
rank用于标识其中的某一具体设备。
例如,使用 4 张 GPU 时,world_size设为 4,而rank取值 0–3,分别对应每张 GPU。
大规模训练需两大核心组件:多进程并发(torch.multiprocessing)与设备间高效通信(torch.distributed)。
思考题:为何需要通信?请先行思考。
分布式数据加载器已就绪,接下来实现训练流程。
#importsimport tqdmimport torchimport dataset #The dataloader that we wroteimport numpy as npimport torch.nn as nnfrom functools import partialimport torch.distributed as distfrom torch.utils.data import DataLoaderfrom torch.multiprocessing import Processfrom transformers import AutoConfig, GPT2LMHeadModelfrom utils import get_tokenizer, collate_batch#You can write your own tokenizer, train and generate functionsdefaverage_gradients(model): world_size = dist.get_world_size() for param in model.parameters(): if param.grad isnotNone: dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM) #Communication overhead across gpus param.grad.data /= world_sizedeftrain(model, optimizer, examples, batch_size, collate_fn, desc, rank=0, average_gradients_fn=None): model.train() tokens_per_sec = [] tokens_num = [] for i, batch inenumerate(prog_bar := tqdm.tqdm(examples, desc=f'Training ({desc})')): t0 = time.time() optimizer.zero_grad() logits = model(input_ids=batch['input_ids']).logits loss = torch.nn.functional.cross_entropy( input=logits.reshape((-1, logits.shape[-1])), target=batch['labels'].reshape(-1), reduction='none') loss = (torch.sum(loss * batch['label_token_weights'].reshape(-1)) / torch.sum(batch['label_token_weights'])) loss.backward() #Calculates the gradients if average_gradients_fn isnotNone: average_gradients_fn(model) optimizer.step() #Updates the weights batch_time = time.time() - t0 tokens = np.prod(batch['input_ids'].shape) tokens_per_sec.append(tokens / batch_time) tokens_num.append(tokens) prog_bar.set_postfix( tokens_per_sec=tokens / batch_time, loss=loss.item()) return np.mean(tokens_per_sec), tokens_num defsetup(rank, world_size, backend): #Sets up the communication between multiple devices(GPUs) os.environ['MASTER_ADDRESS'] = 'localhost' os.environ['MASTER_PORT'] = '33333' dist.init_process_group(backend=backend, rank=rank, world_size=world_size)#This function will be run by each process concurrently, the id for the process is 'rank'defrun_dp(rank, world_size, backend, dataset_name, model_max_length, n_epochs, batch_size, learning_rate): setup(rank, world_size, backend) config = AutoConfig.from_pretrained('gpt2') model = GPT2LMHeadModel(config=config).to(rank) #This is great! We are loading it on each device, that's why rank!!! optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate) #Surprisingly, AdamW works great for LLMs #We will use german(deutsch) to english translation dataset dataset = { split: datasets.load_dataset(dataset_name, split=split)['translation'] for split in ['train', 'validation', 'test'] } src_key, tgt_key = 'de', 'en' dataset['train'] = dataset['train'][:5000] dataset['validation'] = dataset['validation'][:1000] dataset['test'] = dataset['test'][:100] #tokenization tokenizer = get_tokenizer(examples=dataset['train'], vocab_size=config.vocab_size, src_key=src_key, tgt_key=tgt_key) #collate function : partial pre-fills some of the arguments collate_fn = partial(collate_batch, src_key=src_key, tgt_key=tgt_key, tokenizer=tokenizer, model_max_length=model_max_length, device=rank) train_loader = partition_dataset(rank, world_size, dataset['train'], batch_size=batch_size, collate_fn=collate_fn) val_loader = DataLoader(dataset["validation"], batch_size=batch_size, shuffle=False, collate_fn=collate_fn) test_loader = DataLoader(dataset["test"], batch_size=batch_size, shuffle=False, collate_fn=collate_fn) total_time = [] total_tokens_per_sec = [] for epoch_idx inrange(n_epochs): start = time.time() avg_tokens_per_sec, _ = train( model=model, optimizer=optimizer, examples=train_loader, batch_size=batch_size, collate_fn=collate_fn, desc=desc, rank=rank, average_gradients_fn=average_gradients) end = time.time()if __name__ == '__main__': import torch.multiprocessing as mp mp.set_start_method('spawn', force=True) parser = argparse.ArgumentParser() parser.add_argument('--pytest', type=bool, default=False) parser.add_argument('--dataset', type=str) parser.add_argument('--model_max_length', type=int, default=128) parser.add_argument('--n_epochs', type=int, default=10) parser.add_argument('--batch_size', type=int, default=128) parser.add_argument('--learning_rate', type=float, default=1e-4) parser.add_argument('--world_size', type=int, default=2) args = parser.parse_args() backend = 'nccl'#for cpu choose 'gloo' for rank inrange(world_size): p = Process( target=run_dp, args=(rank, world_size, backend, args.dataset, args.model_max_length, args.n_epochs, args.batch_size, args.learning_rate) ) p.start() processes.append(p) # Wait for all processes to finish for p in processes: p.join()```在更新模型权重之前,所有设备上的梯度会被求平均,以保证各副本的一致性。这意味着在训练的每一步,每张 GPU 都拥有完全相同的模型副本。 *图 2:用于梯度平均的 All-reduce 方法*下面做一个快速实验。笔者手头有两张 H100(80 GB)GPU,对于 GPT-2 这种小模型来说实属“杀鸡用牛刀”。然而,正如俗语所言,“若手中只有火箭筒,也只能用它打蚊子”。因此,本文将测试在该配置下可获得的训练吞吐率与训练耗时。 *图 3:单卡与双卡训练期间的吞吐率对比*结果显示,双卡方案的吞吐率(训练过程中处理的 token 数)几乎达到单卡的 2 倍,但这一速度提升以硬件数量翻倍为代价。 *图 4:单卡与双卡训练耗时对比*尽管预期每个 epoch 的平均训练时间应约为单卡方案的一半,但多次实验表明情况略有不同:大多数 epoch 确实提速明显,但总有 1–2 个 epoch 出现显著波动。这种峰值可能源于多种开销,例如数据加载器取下一批数据的延迟、Python 垃圾回收的触发,或 NCCL 跨设备通信带来的同步延迟。## 模型并行(Model Parallel)模型并行(Model Parallel)的核心思想是将模型本身拆分到多个设备上。例如,当模型包含 12 层且拥有 2 张 GPU 时,可把前 6 层置于 GPU 0,后 6 层置于 GPU 1。每张设备仅负责前向与反向传播的一部分,从而允许训练单张 GPU 无法容纳的超大模型。 *图 5:模型并行流程示意图*在传统模型并行中,执行是**顺序**的:任一时刻只有一张 GPU 处于活跃计算状态。因此,即便拥有 4 张 GPU 并将模型切分,也只能让一张 GPU 参与计算,其余 GPU 处于空闲等待。这导致 GPU 利用率低下,设备数量增加时收益递减。```pythonimport mathdefget_device_map(n_layers, devices): """Returns a dictionary of layers distributed evenly across all devices.""" layers = list(range(n_layers)) n_blocks = int(math.ceil(n_layers / len(devices))) layers_list = [layers[i : i + n_blocks] for i inrange(0, n_layers, n_blocks)] returndict(zip(devices, layers_list))defparallelize(self, device_map=None): ''' Distribute the model layers across the devices based on the device_map. ''' self.device_map = ( get_device_map(len(self.h), range(torch.cuda.device_count())) if device_map isNoneelse device_map ) self.model_parallel = True self.first_device = "cpu"if"cpu"inself.device_map.keys() else"cuda:" + str(min(self.device_map.keys())) self.last_device = "cuda:" + str(max(self.device_map.keys())) self.wte = self.wte.to(self.first_device) self.wpe = self.wpe.to(self.first_device) # Load onto devices for k, v inself.device_map.items(): for block in v: cuda_device = "cuda:" + str(k) self.h[block] = self.h[block].to(cuda_device) # ln_f to last self.ln_f = self.ln_f.to(self.last_device)为简化说明,以 GPT-2 为例:
模型共 12 层,平均分布在 2 张 GPU 上。
表 1:不同层在 GPU 上的分布示意
表 1:不同层在 GPU 上的分布示意
- • wte、wpe → “cuda:0”
迭代 1:k=0, v=[0,1,2,3,4,5]
└── self.h[0].to(“cuda:0”)
└── self.h[1].to(“cuda:0”)
└── self.h[2].to(“cuda:0”)
└── self.h[3].to(“cuda:0”)
└── self.h[4].to(“cuda:0”)
└── self.h[5].to(“cuda:0”)
迭代 2:k=1, v=[6,7,8,9,10,11]
└── self.h[6].to(“cuda:1”)
└── self.h[7].to(“cuda:1”)
└── self.h[8].to(“cuda:1”)
└── self.h[9].to(“cuda:1”)
└── self.h[10].to(“cuda:1”)
└── self.h[11].to(“cuda:1”)
- • ln_f、lm_head → “cuda:1”
为进一步验证,我们使用 7B 参数的 LLaMA-2 进行实验。在 2 张与 4 张 V100(16 GB)GPU 上,以相同配置运行模型并行,评估其性能与可扩展性。
表 2:LLaMA-2–7B 模型并行参数与结果
表 2:LLaMA-2–7B 模型并行参数与结果
实验再次证明:任一时刻仅有一张 GPU 处于活跃状态。数据像水流一样顺序经过每张 GPU;增加 GPU 数量并不能加快“水流”速度,只是把管道切成更多段。然而,它确实使得原本无法单卡加载的超大模型得以训练。
流水线并行(Pipeline Parallel)
流水线并行(Pipeline Parallel)通过将深度学习模型划分为若干顺序阶段,并让不同 GPU 并行处理输入批次中的不同微批次(micro-batch),从而在多 GPU 上分布模型。与传统模型并行每次仅让一张 GPU 计算不同,流水线并行允许每张 GPU 负责模型的一部分,并在不同微批次上并行工作。
具体而言:假设模型共 32 层,可用 GPU 为 4 张,可将模型切分为每 8 层一段。当一批输入数据到达时,先将其进一步拆分为微批次。第一个微批次在 GPU 0(层 0–7)上开始处理;一旦完成,立即把中间激活传递给 GPU 1(层 8–15),同时 GPU 0 开始处理第二个微批次。依此类推:GPU 2 在收到 GPU 1 的输出后立即处理第一个微批次,GPU 3 接收 GPU 2 的结果继续计算,形成持续的数据流。
经过短暂的“预热”阶段(称为pipeline bubble)后,所有 GPU 保持活跃,每张 GPU 在任意时刻都在处理不同的微批次。与模型并行相比,这种重叠执行显著提高了 GPU 利用率,类似于流水线装配:每个阶段(GPU)专注特定任务,一旦流水线填满,系统效率大幅提升。
图 6:模型并行 vs 流水线并行
图 6:模型并行 vs 流水线并行
调度器负责在每个时钟周期决定哪张 GPU 处理哪个微批次。
def _clock_cycles(num_batches: int, num_partitions: int) -> Iterable[List[Tuple[int, int]]]: """ Key insight: batch i is at partition j when: clock = i + j """ total_clocks = num_batches + num_partitions - 1 # Fill + Run + Drain for clock in range(total_clocks): schedule = [] for i in range(num_batches): j = clock - i # Which partition is batch i at? if 0 <= j < num_partitions: # Is it a valid partition? schedule.append((i, j)) # (batch_idx, partition_idx) yield schedule示例:3 个批次,3 个分区
- • Clock 0: [(0,0)] → GPU 0 处理批次 0
- • Clock 1: [(1,0), (0,1)] → GPU 0 处理批次 1,GPU 1 处理批次 0
- • Clock 2: [(2,0), (1,1), (0,2)] → 3 张 GPU 全部忙碌!
- • Clock 3: [(2,1), (1,2)] → GPU 1 处理批次 2,GPU 2 处理批次 1
- • Clock 4: [(2,2)] → GPU 2 处理批次 2
def forward(self, x): # 1. SPLIT: Divide batch into micro-batches batches = list(x.split(self.split_size, dim=0)) # 2. SCHEDULE: Generate the clock cycle schedule schedules = _clock_cycles(num_batches, num_partitions) # 3. EXECUTE: Process each clock cycle for schedule in schedules: self.compute(batches, schedule) # ← This runs GPUs in parallel! # 4. COMBINE: Concatenate results output = torch.cat(batches, dim=0) return output.to(last_device) ``````plaintext def compute(self, batches, schedule): # PHASE 1: Submit ALL tasks in parallel (non-blocking) for batch_idx, partition_idx in schedule: batch = batches[batch_idx].to(devices[partition_idx]) defcompute_fn(): return partition(batch) # Run layers on this GPU task = Task(compute_fn) self.in_queues[partition_idx].put(task) # Send to worker thread # PHASE 2: Collect ALL results for batch_idx, partition_idx in schedule: success, result = self.out_queues[partition_idx].get() # Wait for result batches[batch_idx] = output # Store result for next stage或许有人会问:为何不能像数据并行那样使用进程?
流水线并行需要在每个时钟周期进行高频、紧密的 GPU 间协调。线程与共享内存队列的开销极小,而独立进程则会引入过多通信延迟。
数据并行、模型并行与流水线并行各自提供了在多 GPU 上扩展深度学习负载的不同思路,但它们的权衡决定了适用场景。
表 3:数据并行、模型并行与流水线并行对比总结
表 3:数据并行、模型并行与流水线并行对比总结
最终,不存在放之四海而皆准的策略。若模型可装入内存,数据并行通常最为直接;若模型规模巨大,则模型并行或流水线并行必不可少。将策略组合(如流水线并行 + 数据并行)更能进一步释放扩展潜力。关键在于理解模型规模、硬件限制,以及每种方法在内存、通信与性能上的权衡。
如何学习AI大模型?
大模型时代,火爆出圈的LLM大模型让程序员们开始重新评估自己的本领。 “AI会取代那些行业?”“谁的饭碗又将不保了?”等问题热议不断。
不如成为「掌握AI工具的技术人」,毕竟AI时代,谁先尝试,谁就能占得先机!
想正式转到一些新兴的 AI 行业,不仅需要系统的学习AI大模型。同时也要跟已有的技能结合,辅助编程提效,或上手实操应用,增加自己的职场竞争力。
但是LLM相关的内容很多,现在网上的老课程老教材关于LLM又太少。所以现在小白入门就只能靠自学,学习成本和门槛很高
那么针对所有自学遇到困难的同学们,我帮大家系统梳理大模型学习脉络,将这份LLM大模型资料分享出来:包括LLM大模型书籍、640套大模型行业报告、LLM大模型学习视频、LLM大模型学习路线、开源大模型学习教程等, 😝有需要的小伙伴,可以扫描下方二维码领取🆓↓↓↓
学习路线
第一阶段: 从大模型系统设计入手,讲解大模型的主要方法;
第二阶段: 在通过大模型提示词工程从Prompts角度入手更好发挥模型的作用;
第三阶段: 大模型平台应用开发借助阿里云PAI平台构建电商领域虚拟试衣系统;
第四阶段: 大模型知识库应用开发以LangChain框架为例,构建物流行业咨询智能问答系统;
第五阶段: 大模型微调开发借助以大健康、新零售、新媒体领域构建适合当前领域大模型;
第六阶段: 以SD多模态大模型为主,搭建了文生图小程序案例;
第七阶段: 以大模型平台应用与开发为主,通过星火大模型,文心大模型等成熟大模型构建大模型行业应用。
👉学会后的收获:👈
• 基于大模型全栈工程实现(前端、后端、产品经理、设计、数据分析等),通过这门课可获得不同能力;
• 能够利用大模型解决相关实际项目需求: 大数据时代,越来越多的企业和机构需要处理海量数据,利用大模型技术可以更好地处理这些数据,提高数据分析和决策的准确性。因此,掌握大模型应用开发技能,可以让程序员更好地应对实际项目需求;
• 基于大模型和企业数据AI应用开发,实现大模型理论、掌握GPU算力、硬件、LangChain开发框架和项目实战技能, 学会Fine-tuning垂直训练大模型(数据准备、数据蒸馏、大模型部署)一站式掌握;
• 能够完成时下热门大模型垂直领域模型训练能力,提高程序员的编码能力: 大模型应用开发需要掌握机器学习算法、深度学习框架等技术,这些技术的掌握可以提高程序员的编码能力和分析能力,让程序员更加熟练地编写高质量的代码。
1.AI大模型学习路线图
2.100套AI大模型商业化落地方案
3.100集大模型视频教程
4.200本大模型PDF书籍
5.LLM面试题合集
6.AI产品经理资源合集
👉获取方式:
😝有需要的小伙伴,可以保存图片到wx扫描二v码免费领取【保证100%免费】🆓