如何用verl实现GRPO算法?完整代码示例
1. 背景与目标:理解GRPO及其在LLM后训练中的价值
大型语言模型(LLMs)在预训练阶段获得了广泛的语言能力,但要使其行为更符合人类偏好,通常需要进行强化学习(Reinforcement Learning, RL)驱动的后训练。近年来,组相对策略优化(Group Relative Policy Optimization, GRPO)作为一种高效的RLHF替代方案,因其稳定性强、无需显式奖励建模而受到关注。
GRPO的核心思想是:不依赖外部奖励模型,而是通过组内比较机制来生成相对优势信号。具体而言,在同一提示下生成多个响应,根据它们的输出质量(如长度、连贯性、信息量等)进行排序,并基于排名差值构建损失函数,从而引导策略向更优方向更新。
然而,GRPO的实现涉及复杂的多模型协作流程——包括Actor模型生成样本、Reference模型提供KL散度约束、Rollout管理、优势计算与策略更新等环节。传统框架往往难以高效组织这些组件。此时,verl框架的价值凸显出来。
verl 是由字节跳动火山引擎团队开源的强化学习训练框架,专为大模型后训练设计,其核心创新在于HybridFlow 编程模型,将控制流与计算流解耦,既保证了算法逻辑清晰,又实现了高性能分布式执行。本文将详细介绍如何使用 verl 实现 GRPO 算法,并提供可运行的完整代码结构。
2. 技术原理:GRPO与verl架构的融合设计
2.1 GRPO算法核心机制解析
GRPO 的关键在于利用“组内相对排序”代替绝对奖励打分。假设对于一个输入提示 $x$,Actor 模型生成 $K$ 个响应 ${y_1, y_2, ..., y_K}$,并定义每个响应的质量得分 $s(y_k)$(例如基于token数或人工评分),则其相对优势可表示为:
$$ A_k = s(y_k) - \frac{1}{K} \sum_{j=1}^{K} s(y_j) $$
最终的策略梯度损失为:
$$ \mathcal{L}{\text{policy}} = -\mathbb{E}{(x,y)\sim\pi_\theta} \left[ A_k \cdot \log \frac{\pi_\theta(y|x)}{\pi_{\text{ref}}(y|x)} \right] $$
此外还需加入价值函数损失和KL正则项以稳定训练:
$$ \mathcal{L}{\text{total}} = \mathcal{L}{\text{policy}} + \lambda_v \mathcal{L}{\text{value}} + \lambda{\text{kl}} D_{\text{KL}}(\pi_\theta || \pi_{\text{ref}}) $$
该机制避免了对Reward Model的依赖,降低了系统复杂性和标注成本。
2.2 verl的HybridFlow架构如何支持GRPO
verl 采用两层抽象来管理RL训练流程:
- 控制流(Control Flow):由单控制器(Single Controller)统一调度各角色(Actor、Critic、Reference等)之间的交互顺序。
- 计算流(Computation Flow):每个角色内部使用多控制器模式(Multi-Controller),利用Ray实现高并发、低延迟的前向/反向传播。
这种混合架构非常适合GRPO这类需要同步rollout、批量处理、跨样本比较的任务。以下是verl支持GRPO的关键能力:
| 功能 | 说明 |
|---|---|
| 多角色协同 | 支持Actor、Critic、Reference模型并行部署 |
| 异步Rollout | 利用Ray异步任务重叠生成与训练过程 |
| 分布式数据收集 | 可配置多个Rollout Worker并行采样 |
| 灵活优势计算 | 用户自定义compute_advantage接口实现组内排序逻辑 |
3. 实践应用:基于verl实现GRPO的完整代码示例
3.1 环境准备与依赖安装
首先确保已安装 verl 及相关依赖。推荐使用 Python 3.10+ 和 PyTorch 2.0+ 环境。
# 安装 verl(需提前配置好CUDA环境) pip install ray torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 pip install vllm transformers datasets accelerate # 克隆 verl 仓库并安装 git clone https://github.com/volcengine/verl.git cd verl pip install -e .验证安装是否成功:
import verl print(verl.__version__) # 应输出版本号,如 '0.1.0'3.2 构建GRPO训练流程的核心组件
下面是一个简化的 GRPO 训练脚本框架,展示如何使用 verl 实现完整的训练循环。
import torch import ray from verl import DataParallelEngine, RolloutWorker, CentralizedController from verl.utils.policy_helper import compute_logprob from typing import Dict, List import numpy as np @ray.remote class GRPORolloutWorker(RolloutWorker): def __init__(self, config): super().__init__(config) self.actor = self.init_actor() # 加载Actor模型 self.ref_model = self.init_ref_model() # 加载Reference模型 self.tokenizer = self.init_tokenizer() def generate(self, batch: Dict) -> Dict: """生成多个候选响应""" input_ids = batch['input_ids'].to(self.device) attention_mask = batch['attention_mask'].to(self.device) with torch.no_grad(): outputs = self.actor.generate( input_ids=input_ids, attention_mask=attention_mask, do_sample=True, num_return_sequences=self.config.n_samples_per_prompt, # K个样本 max_new_tokens=128 ) # 解码生成结果 responses = self.tokenizer.batch_decode(outputs, skip_special_tokens=True) return {'responses': responses, 'input_ids': input_ids.cpu(), 'outputs': outputs} def compute_kl_ref(self, batch): """计算相对于参考模型的KL散度""" input_ids = batch['input_ids'].to(self.device) with torch.no_grad(): log_probs = compute_logprob(self.actor, input_ids) ref_log_probs = compute_logprob(self.ref_model, input_idss) kl_div = (log_probs - ref_log_probs).mean().item() return {'kl_div': kl_div} @ray.remote class GRPOTrainer: def __init__(self, config): self.config = config self.actor_critic = self.init_actor_critic() # 共享主干的Actor-Critic self.optimizer = torch.optim.AdamW(self.actor_critic.parameters(), lr=config.lr) def compute_advantages(self, responses: List[str]) -> np.ndarray: """基于响应长度作为代理得分进行组内排序""" scores = np.array([len(r) for r in responses]) # 使用长度作为proxy score mean_score = np.mean(scores) advantages = scores - mean_score return advantages def train_step(self, data: Dict) -> Dict: """ GRPO单步训练 data 包含: responses, old_logprobs, values, returns, masks """ responses = data['responses'] input_ids = data['input_ids'].to(self.actor_critic.device) advantages = self.compute_advantages(responses) advantages = torch.tensor(advantages, dtype=torch.float32).to(input_ids.device) # 前向传播 output = self.actor_critic(input_ids) log_probs = compute_logprob_from_output(output, input_ids) values = output.value # 计算策略损失 policy_loss = -(advantages * (log_probs - data['old_logprobs'])).mean() # 价值函数损失 value_loss = torch.nn.functional.mse_loss(values.squeeze(), data['returns']) # KL 正则 kl_loss = ((log_probs - data['old_logprobs']) ** 2).mean() total_loss = policy_loss + 0.1 * value_loss + 0.01 * kl_loss self.optimizer.zero_grad() total_loss.backward() self.optimizer.step() return { 'policy_loss': policy_loss.item(), 'value_loss': value_loss.item(), 'kl_loss': kl_loss.item(), 'total_loss': total_loss.item() } def main(): # 初始化Ray ray.init() # 配置参数 config = { 'n_samples_per_prompt': 4, 'batch_size': 32, 'lr': 1e-6, 'device': 'cuda' if torch.cuda.is_available() else 'cpu' } # 创建Rollout Workers rollout_workers = [GRPORolloutWorker.remote(config) for _ in range(4)] # 创建训练器 trainer = GRPOTrainer.remote(config) # 模拟训练循环 for step in range(100): prompts = ["请写一首关于春天的诗"] * config['batch_size'] inputs = {'input_ids': ..., 'attention_mask': ...} # 实际应从dataloader加载 # 并行生成响应 futures = [worker.generate.remote(inputs) for worker in rollout_workers] results = ray.get(futures) # 整理成组 all_responses = [] for res in results: all_responses.extend(res['responses']) # 按prompt分组(此处简化) grouped_responses = [all_responses[i:i+4] for i in range(0, len(all_responses), 4)] # 每组计算优势并训练 for responses in grouped_responses: advantages = trainer.compute_advantages.remote(responses) # 实际中还需收集logprob、value等信息 train_result = ray.get(trainer.train_step.remote({ 'responses': responses, 'old_logprobs': ..., 'returns': ..., 'input_ids': ... })) print(f"Step {step}, Loss: {train_result['total_loss']:.4f}") ray.shutdown() if __name__ == "__main__": main()3.3 关键实现要点说明
- Rollout并行化:通过 Ray 的
@ray.remote装饰器将GRPORolloutWorker分布式部署,实现高吞吐生成。 - 优势函数定制:
compute_advantages方法可根据实际需求替换为基于RM打分、BLEU、ROUGE等指标。 - KL正则保障稳定性:引入参考模型防止策略偏离过大。
- 灵活扩展性:可通过继承
RolloutWorker和Trainer快速适配新算法。
3.4 性能优化建议
- 启用vLLM加速推理:在Rollout Worker中集成vLLM以提升生成吞吐。
- 使用FSDP或Megatron-LM进行训练:对于百亿级以上模型,启用张量并行与流水线并行。
- 异步Pipeline设计:让Actor更新时,Generator继续生成下一batch数据,减少空闲时间。
- 序列并行支持长文本:开启SP(Sequence Parallelism)以处理超长上下文任务。
4. 总结
本文详细介绍了如何使用verl框架实现GRPO(组相对策略优化)算法,涵盖理论基础、架构设计与工程实践。通过 verl 提供的 HybridFlow 模型,我们能够以模块化方式构建复杂的强化学习流程,在保持代码简洁的同时获得高性能的分布式执行能力。
GRPO 作为一种无需奖励模型的强化学习方法,特别适合资源有限或缺乏标注数据的场景。结合 verl 的灵活性与效率,开发者可以快速搭建适用于大语言模型后训练的GRPO系统,并根据业务需求调整评分策略与训练参数。
未来,随着更多轻量化RL算法的发展,verl 这类通用、可扩展的框架将成为连接算法创新与工程落地的重要桥梁。
5. 参考资料
- verl GitHub 仓库: https://github.com/volcengine/verl
- verl 文档: https://verl.readthedocs.io/en/latest/
- HybridFlow 论文: https://arxiv.org/pdf/2409.19256
- HuggingFace Transformers: https://huggingface.co/docs/transformers
- Ray 官方文档: https://docs.ray.io/en/latest/
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。