news 2026/3/8 18:55:31

verl数据预处理指南:parquet文件这样准备

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
verl数据预处理指南:parquet文件这样准备

verl数据预处理指南:parquet文件这样准备

在使用verl进行大模型后训练时,数据质量与格式规范直接决定训练稳定性、收敛速度和最终效果。很多用户在首次运行SFT或GRPO训练脚本时遇到报错,如KeyError: 'prompt'ValueError: mismatched tensor shapesRuntimeError: invalid token length,这些问题90%以上源于parquet文件结构不符合verl的预期——不是模型不行,而是数据没“喂对”。

本文不讲原理、不堆参数,只聚焦一件事:如何从零开始准备一份verl真正能用的parquet文件。内容全部来自真实训练场景中的踩坑总结,覆盖字段命名、数据清洗、长度控制、格式验证等关键环节,附带可直接运行的Python脚本和检查清单。

1. verl为什么只认parquet?不是JSONL更直观吗?

verl选择parquet作为默认数据格式,并非为了增加门槛,而是基于三个硬性工程需求:

  • 内存效率:训练时需频繁随机采样、分片加载、动态padding。parquet的列式存储+内置压缩(snappy)让单个10GB数据集加载内存占用比JSONL低47%,IO吞吐高2.3倍;
  • 类型安全:JSONL中"score": "5""score": 5在解析时可能被误判为string或int,而parquet强制schema校验,避免torch.tensor()因dtype不一致崩溃;
  • 分布式友好:Ray和FSDP依赖数据分块(row group)实现多进程并行读取。parquet天然支持按行组切分,而JSONL需额外实现偏移索引。

注意:verl不接受CSV、JSON、TXT或HDF5格式。即使你用pandas读取后转成DataFrame再保存为parquet,也必须严格满足其schema约束——否则训练启动阶段就会报SchemaMismatchError

2. verl数据字段规范:名称、类型、含义一个都不能错

verl对parquet文件的schema有明确要求,不同训练模式(SFT/GRPO)所需字段不同。下面以最常用的GSM8K数学推理数据集为例,给出必须满足的最小字段集

2.1 SFT训练必需字段

SFT(监督微调)需要模型学习“输入→输出”的映射关系,因此parquet必须包含以下三列:

字段名类型含义verl配置项
promptstring用户提问文本(不含指令模板)data.prompt_key
responsestring模型应生成的标准答案(不含思考过程)data.response_key
maskboolean标识该样本是否参与训练(可选,但建议设为True)data.mask_key(未显式配置时默认全True)

正确示例:

import pandas as pd df = pd.DataFrame({ "prompt": ["求解方程 x² - 5x + 6 = 0", "计算 123 × 45"], "response": ["x=2 或 x=3", "5535"], "mask": [True, True] }) df.to_parquet("train.parquet", engine="pyarrow", compression="snappy")

❌ 常见错误:

  • 字段名写成question/answer但未在config中修改prompt_keyresponse_key
  • promptresponse含nan值(verl会直接跳过整行,导致batch_size波动)
  • response为空字符串(训练时触发IndexError: index out of range

2.2 GRPO训练必需字段

GRPO(Generalized Reinforcement Policy Optimization)是verl推荐的强化学习范式,它不需要预定义response,而是由Actor模型实时生成多个候选回复,再由Reward Manager打分。因此parquet只需提供prompt

字段名类型含义verl配置项
promptstring用户提问文本(同SFT)data.prompt_key

关键区别:GRPO配置中不能设置response_key,否则会强制读取该字段并报错。查看ppo_trainer.yaml可见:

data: prompt_key: prompt # 必须存在 # response_key: NOT ALLOWED HERE # ❌ 注释掉或删除此行

2.3 可选但强烈建议的字段

为提升训练鲁棒性,建议添加以下字段:

字段名类型作用验证方式
idstring/int样本唯一标识,便于debug时定位问题样本训练日志中会打印sample_id
lengthintprompt字符数,用于快速过滤超长样本data.max_prompt_length会截断,但提前过滤更省显存
sourcestring数据来源(如"gsm8k_train", "mathdial"),方便后续按源加权采样DataLoader中可通过group_by实现

添加示例:

df["id"] = [f"gsm8k_{i}" for i in range(len(df))] df["length"] = df["prompt"].str.len() df["source"] = "gsm8k_train"

3. 数据清洗四步法:从原始JSONL到合规parquet

多数公开数据集(如GSM8K、Alpaca)以JSONL分发,需经清洗才能适配verl。以下是经过20+次训练验证的标准化流程:

3.1 步骤一:去重与基础过滤

import json import pandas as pd def load_jsonl_to_df(file_path): """加载JSONL并转为DataFrame,自动处理编码异常""" records = [] with open(file_path, "r", encoding="utf-8") as f: for line_num, line in enumerate(f, 1): try: data = json.loads(line.strip()) # 强制转换为string,避免int/float混入 data["prompt"] = str(data.get("question", "")).strip() data["response"] = str(data.get("answer", "")).strip() if data["prompt"] and data["response"]: # 过滤空字段 records.append(data) except Exception as e: print(f"跳过第{line_num}行(解析失败): {e}") continue return pd.DataFrame(records) # 示例:处理GSM8K官方JSONL df = load_jsonl_to_df("gsm8k_train.jsonl") print(f"原始行数: {len(df)}")

3.2 步骤二:字段标准化与长度控制

def clean_and_validate(df, max_prompt_len=512, max_response_len=1024): """清洗数据并验证长度约束""" # 1. 去除首尾空白和多余换行 df["prompt"] = df["prompt"].str.replace(r"\s+", " ", regex=True).str.strip() df["response"] = df["response"].str.replace(r"\s+", " ", regex=True).str.strip() # 2. 过滤超长样本(避免OOM) prompt_len = df["prompt"].str.len() response_len = df["response"].str.len() valid_mask = (prompt_len <= max_prompt_len) & (response_len <= max_response_len) df = df[valid_mask].copy() print(f"过滤超长样本后剩余: {len(df)} 行") # 3. 过滤含控制字符的样本(verl tokenizer易崩溃) import re def has_control_chars(text): return bool(re.search(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", text)) df = df[~df["prompt"].apply(has_control_chars) & ~df["response"].apply(has_control_chars)] print(f"过滤控制字符后剩余: {len(df)} 行") return df df = clean_and_validate(df, max_prompt_len=512, max_response_len=1024)

3.3 步骤三:添加必要字段并排序

def add_required_fields(df): """添加verl必需字段""" # 确保字段名正确 if "prompt" not in df.columns: raise ValueError("缺少必需字段 'prompt'") if "response" not in df.columns: raise ValueError("SFT训练必需字段 'response' 缺失") # 添加id和source df["id"] = [f"sft_{i}" for i in range(len(df))] df["source"] = "custom_sft" # 添加mask(全True) df["mask"] = True # 按prompt长度排序,提升batch填充效率 df = df.sort_values("prompt", key=lambda x: x.str.len()).reset_index(drop=True) return df df = add_required_fields(df)

3.4 步骤四:保存为verl兼容parquet

def save_verl_parquet(df, output_path, compression="snappy"): """保存为verl兼容的parquet文件""" # 强制schema:所有string字段设为UTF-8,boolean字段设为bool schema = { "prompt": "string", "response": "string", "id": "string", "source": "string", "mask": "bool" } # 转换数据类型 for col, dtype in schema.items(): if col in df.columns: if dtype == "string": df[col] = df[col].astype(str) elif dtype == "bool": df[col] = df[col].astype(bool) # 保存(指定engine和compression) df.to_parquet( output_path, engine="pyarrow", compression=compression, use_dictionary=True, row_group_size=100000 # 每个row group约10万行,平衡读取效率 ) print(f" 已保存至: {output_path}") print(f" 文件统计: {len(df)} 行, {df.memory_usage(deep=True).sum() / 1024**2:.1f} MB") save_verl_parquet(df, "sft_train.parquet")

4. 验证工具:三行代码检测parquet是否合格

写完parquet别急着训练!先用这个轻量脚本做完整性检查:

# verify_parquet.py import pyarrow.parquet as pq import pandas as pd def check_verl_parquet(file_path): """验证parquet文件是否符合verl要求""" try: # 1. 读取schema parquet_file = pq.ParquetFile(file_path) schema = parquet_file.schema print(f" Schema检查:") print(f" 字段数: {len(schema)}") print(f" 字段列表: {[field.name for field in schema]}") # 2. 检查必需字段 required_fields = {"prompt"} if "response" in [field.name for field in schema]: # SFT模式 required_fields.add("response") missing = required_fields - set([field.name for field in schema]) if missing: print(f"❌ 缺少必需字段: {missing}") return False # 3. 抽样检查数据 sample_df = parquet_file.read_row_group(0).to_pandas() print(f" 首行样本:") for col in ["prompt", "response"]: if col in sample_df.columns: val = sample_df.iloc[0][col] print(f" {col}: '{val[:50]}{'...' if len(str(val)) > 50 else ''}' (type: {type(val).__name__})") # 4. 检查空值 null_counts = sample_df.isnull().sum() if null_counts.sum() > 0: print(f"❌ 发现空值: {null_counts[null_counts > 0].to_dict()}") return False print(" 通过所有检查!可直接用于verl训练") return True except Exception as e: print(f"💥 验证失败: {e}") return False # 使用方法 check_verl_parquet("sft_train.parquet")

运行后输出类似:

Schema检查: 字段数: 5 字段列表: ['prompt', 'response', 'id', 'source', 'mask'] 首行样本: prompt: '求解方程 x² - 5x + 6 = 0' (type: str) response: 'x=2 或 x=3' (type: str) 通过所有检查!可直接用于verl训练

5. 高级技巧:处理多轮对话与复杂结构

实际业务中常遇到多轮对话(如客服记录)、带格式文本(Markdown表格)、代码片段等。verl虽不原生支持嵌套结构,但可通过以下方式安全处理:

5.1 多轮对话扁平化

将对话历史拼接为单prompt,用特殊token分隔:

def flatten_conversation(conversations): """将多轮对话转为单prompt""" prompt_parts = [] for turn in conversations: role = turn["role"] # "user" or "assistant" content = turn["content"].strip() if role == "user": prompt_parts.append(f"<|user|>{content}<|end|>") else: prompt_parts.append(f"<|assistant|>{content}<|end|>") return "".join(prompt_parts) # 示例 conv = [ {"role": "user", "content": "你好"}, {"role": "assistant", "content": "您好!请问有什么可以帮您?"}, {"role": "user", "content": "订单号12345的状态?"} ] flat_prompt = flatten_conversation(conv) # 输出: "<|user|>你好<|end|><|assistant|>您好!请问有什么可以帮您?<|end|><|user|>订单号12345的状态?<|end|>"

5.2 代码/表格内容转义

避免<>|等符号被tokenizer误解析:

import re def escape_special_chars(text): """转义可能干扰tokenizer的字符""" # 将<、>替换为全角字符(不影响语义,避免被误识别为XML标签) text = text.replace("<", "<").replace(">", ">") # 将|替换为‖(双竖线) text = text.replace("|", "‖") return text # 应用于prompt和response df["prompt"] = df["prompt"].apply(escape_special_chars) df["response"] = df["response"].apply(escape_special_chars)

5.3 动态长度控制(针对长文本)

当数据含长文档摘要时,固定max_prompt_length会导致大量截断。改用分块策略:

def chunk_long_text(text, max_chunk_len=512, overlap=50): """将长文本分块,保留语义连贯性""" words = text.split() chunks = [] start = 0 while start < len(words): end = min(start + max_chunk_len, len(words)) chunk = " ".join(words[start:end]) chunks.append(chunk) start = end - overlap # 重叠50词避免断句 return chunks # 对超长prompt分块并复制response long_df = df[df["prompt"].str.len() > 1000].copy() chunked_rows = [] for _, row in long_df.iterrows(): chunks = chunk_long_text(row["prompt"]) for chunk in chunks: chunked_rows.append({"prompt": chunk, "response": row["response"]}) if chunked_rows: chunked_df = pd.DataFrame(chunked_rows) df = pd.concat([df[~(df.index.isin(long_df.index))], chunked_df], ignore_index=True)

6. 常见报错速查表:定位数据问题的最快路径

报错信息根本原因解决方案
KeyError: 'prompt'parquet中无prompt字段,或字段名大小写不符(如Promptpq.read_schema()检查实际字段名,确保全小写
RuntimeError: expected scalar type Long but found Floatprompt/response列含float类型(如NaN被转为1.0执行df["prompt"] = df["prompt"].astype(str)强制转string
ValueError: All arrays must be of the same length某行prompt为空,response非空,导致后续处理shape不匹配清洗时添加df = df[df["prompt"].str.len() > 0]
OSError: Cannot parse timestampparquet中意外包含datetime列(如created_at保存前执行df = df.select_dtypes(include=['object', 'bool'])
IndexError: index 0 is out of boundsresponse为空字符串,tokenizer返回空tensor清洗时添加df = df[df["response"].str.len() > 0]

终极建议:每次准备新数据集后,先用1个GPU、1个step跑通最小训练(设置trainer.total_epochs=1,data.train_batch_size=4),确认数据链路无误再扩大规模。

7. 总结:一份合格的verl parquet文件 checklist

在关闭编辑器前,请对照这份清单逐项确认:

  • [ ] 字段名严格为prompt(必需)、response(SFT必需)、mask(推荐)
  • [ ] 所有string字段值为str类型,无None/NaN/float
  • [ ]promptresponse均去除首尾空白及控制字符
  • [ ] 文件使用pyarrow引擎保存,compression=snappy
  • [ ] 通过verify_parquet.py脚本验证无报错
  • [ ] 单文件大小建议≤2GB(过大影响分布式读取效率)
  • [ ] 训练前用head -n 5 sft_train.parquet确认内容可读(parquet不可直接cat,需用parquet-tools

数据预处理不是“一次性工作”,而是贯穿整个后训练周期的持续优化过程。当你发现loss震荡、梯度爆炸或生成质量停滞时,不妨回头检查parquet——90%的疑难杂症,源头都在那几行数据里。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/8 2:59:35

I2C通信协议图解说明:ACK/NACK响应机制详解

以下是对您提供的博文内容进行 深度润色与专业重构后的版本 。本次优化严格遵循您的所有要求: ✅ 彻底去除AI痕迹,语言自然、有经验感、带教学温度; ✅ 摒弃模板化标题(如“引言”“总结”),全文以逻辑流驱动,层层递进; ✅ 所有技术点均融合真实开发语境:加入工程…

作者头像 李华
网站建设 2026/3/5 0:49:17

测试开机启动脚本保姆级教程,小白也能一次成功

测试开机启动脚本保姆级教程&#xff0c;小白也能一次成功 你是不是也遇到过这样的问题&#xff1a;写好了测试脚本&#xff0c;想让它开机自动运行&#xff0c;结果试了几次都失败&#xff1f;终端没弹出来、脚本没执行、甚至系统启动变慢……别急&#xff0c;这不是你的问题…

作者头像 李华
网站建设 2026/3/3 22:48:17

智能小车电机控制:L298N模块接线与调试指南

以下是对您提供的博文内容进行 深度润色与专业重构后的版本 。我以一名资深嵌入式系统工程师兼高校机器人课程主讲人的身份,对原文进行了全面升级: ✅ 彻底去除AI痕迹 ——语言更自然、节奏更贴近真实技术分享; ✅ 强化工程现场感 ——加入大量调试实录、踩坑复盘、…

作者头像 李华
网站建设 2026/2/23 22:36:23

MicroPython网络编程超详细版操作指南

以下是对您提供的博文内容进行 深度润色与结构重构后的终稿 。我以一位深耕嵌入式Python开发多年、常年在一线带团队做IoT固件落地的工程师视角,彻底重写了全文—— 去掉所有AI腔调、模板化标题和教科书式罗列,代之以真实项目中踩过的坑、调通那一刻的顿悟、以及写进量产代…

作者头像 李华
网站建设 2026/3/5 21:38:49

大模型推理卡顿?试试SGLang的连续批处理

大模型推理卡顿&#xff1f;试试SGLang的连续批处理 你有没有遇到过这样的情况&#xff1a;模型明明部署好了&#xff0c;API也通了&#xff0c;但一到高并发就卡顿、延迟飙升、吞吐上不去&#xff1f;用户等三秒没响应就开始刷新&#xff0c;后台日志里满屏的“request timeo…

作者头像 李华
网站建设 2026/3/6 2:52:51

中文提示词太强了!Z-Image-Turbo文字渲染实测

中文提示词太强了&#xff01;Z-Image-Turbo文字渲染实测 你有没有试过用AI画图工具生成带中文的海报、Logo或宣传图&#xff0c;结果文字不是缺笔少划&#xff0c;就是像被水泡过的墨迹&#xff0c;甚至直接变成一堆乱码&#xff1f;别急着换模型——这次我们不聊“能不能出图…

作者头像 李华