文章目录
- 一、整体迁移逻辑
- 1.1 架构概览
- 1.2 核心工作流程
- 阶段 1:初始化
- 阶段 2:启动工作线程
- 阶段 3:周期性执行
- 1.3 任务生成逻辑
- 1.3.1 元数据同步
- 1.3.2 DDL 任务生成
- 1.3.3 数据复制任务生成
- 1.4 任务执行流程
- 1.4.1 DDL 任务执行
- 1.4.2 数据复制任务执行
- 1.5 状态跟踪机制
- 1.5.1 任务状态
- 1.5.2 状态查询
- 二、数据传输详细流程
- 2.1 数据传输方式
- 2.2 数据传输架构
- 2.3 数据传输详细步骤
- 步骤 1:任务准备
- 步骤 2:RPC 调用
- 步骤 3:FE 处理请求
- 步骤 4:BE 节点拉取数据
- 步骤 5:事务提交
- 步骤 6:状态跟踪
- 2.4 为什么采用这种方式?
- 2.4.1 为什么不是 JDBC?
- 2.4.2 为什么不是 StreamLoad?
- 2.4.3 为什么采用 BE 到 BE 快照复制?
- 2.5 数据传输关键技术点
- 2.5.1 版本控制
- 2.5.2 快照机制
- 2.5.3 Token 认证
- 2.5.4 并发控制
- 2.6 数据传输流程图
- 2.7 数据传输性能优化
- 2.7.1 并行传输
- 2.7.2 批量处理
- 2.7.3 网络优化
- 三、总结
- 3.1 迁移逻辑总结
- 3.2 数据传输总结
- 3.3 关键设计决策
来源:反编译自
starrocks-cluster-sync-2.0-jar-with-dependencies.jar
反编译工具:CFR 0.152
主类:com.starrocks.sync.SyncJob
Starrocks-跨集群数据迁移工具
一、整体迁移逻辑
1.1 架构概览
StarRocks 跨集群数据迁移工具采用多线程协调架构,通过周期性元数据同步和任务队列管理实现自动化数据迁移。
┌─────────────────────────────────────────────────────────────┐ │ 迁移工具(客户端) │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ meta-handler │ │ ddl-handler │ │replication- │ │ │ │ │ │ │ │job-handler │ │ │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ │ └─────────────────┼─────────────────┘ │ │ │ │ │ ┌──────▼───────┐ │ │ │sync-reporter │ │ │ └──────────────┘ │ └───────────────────────────┬─────────────────────────────────┘ │ ┌───────────────────┼───────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ 源集群 FE │ │ 目标集群 FE │ │ 目标集群 BE │ │ (元数据查询)│ │ (RPC 调用) │ │ (数据接收) │ └──────┬───────┘ └──────────────┘ └──────┬───────┘ │ │ └────────────────────────────────────────┘ │ HTTP 快照传输 ▼ ┌──────────────┐ │ 源集群 BE │ │ (数据提供) │ └──────────────┘1.2 核心工作流程
阶段 1:初始化
SyncJob(){1.读取配置文件(sync.properties,hosts.properties)2.验证必要配置(source_cluster_token 等)3.初始化任务队列4.创建ClusterMetaKeeper(元数据管理器)}阶段 2:启动工作线程
start(){启动4个工作线程:1.meta-handler(元数据处理线程)-周期性更新源集群和目标集群的元数据-生成 DDL 任务-生成数据复制任务2.ddl-handler(DDL 执行线程)-从 DDL 队列取出任务-批量执行 DDL(CREATE/DROP TABLE/PARTITION 等)3.replication-job-handler(数据复制线程)-从复制任务队列取出任务-通过 RPC 发送到目标集群 FE-更新任务状态4.sync-reporter(进度报告线程)-查询事务状态-统计任务进度-输出日志报告}阶段 3:周期性执行
循环执行(直到手动停止): ├─ meta-handler: 更新元数据 → 生成任务 ├─ ddl-handler: 执行 DDL 任务 ├─ replication-job-handler: 发送复制任务 └─ sync-reporter: 报告进度1.3 任务生成逻辑
1.3.1 元数据同步
updateClusterMeta(){并行执行:1.同步源集群元数据-FE 节点信息-BE 节点信息-数据库、表、分区、索引信息-版本信息2.同步目标集群元数据-FE 节点信息-BE 节点信息-数据库、表、分区、索引信息-版本信息-事务状态(运行中/已完成)}1.3.2 DDL 任务生成
produceDDL(){比较源集群和目标集群的元数据差异:1.handleDbDDL()-源集群有,目标集群没有 → 创建数据库-目标集群有,源集群没有 → 删除数据库(可选)2.handleTableDDL()-源集群有,目标集群没有 → 创建表-目标集群有,源集群没有 → 删除表(可选)-表结构不一致 → 删除并重建(可选)3.handlePartitionDDL()-源集群有,目标集群没有 → 添加分区-目标集群有,源集群没有 → 删除分区(可选)-版本不一致 → 删除并重建(可选)4.handleMaterializedViewDDL()-同步物化视图结构(不同步数据)5.handleViewDDL()-同步视图定义}1.3.3 数据复制任务生成
produceReplicationJob(){遍历公共数据库和表:for(数据库 in 公共数据库){for(表 in 公共表){for(分区 in 公共分区){1.比较分区版本-源版本>目标版本 → 需要同步-源版本=目标版本 → 检查版本时间(存算分离)-源版本<目标版本 → 删除目标分区2.收集需要同步的分区信息-分区 ID-源版本号-索引信息-Tablet映射关系-源 BE 节点信息3.检查数据量限制-如果超过 max_replication_data_size_per_job_in_gb-停止添加分区,标记为部分复制}4.创建ReplicationJob-包含所有需要同步的分区-添加到复制任务队列}}}1.4 任务执行流程
1.4.1 DDL 任务执行
DDL 任务队列 ↓ 批量取出(ddlJobBatchSize) ↓ 执行 SQL(CREATE/DROP/ALTER) ↓ 更新元数据1.4.2 数据复制任务执行
复制任务队列 ↓ 批量取出(replicationJobBatchSize) ↓ 转换为 Thrift 请求 ↓ RPC 调用目标集群 FE ↓ 目标 FE 创建事务并协调 BE ↓ 目标 BE 从源 BE 拉取数据 ↓ 查询事务状态跟踪进度1.5 状态跟踪机制
1.5.1 任务状态
JobState{UNKNOWN,// 未知状态INIT,// 已发送到目标集群SENT,// 已发送但未开始运行SENT_FAILED,// 发送失败RUNNING,// 正在运行FAILED,// 失败FINISHED// 完成}1.5.2 状态查询
getTxnStatus(jobToken){1.解析 jobToken →{dbId}-{tableId}_{jobId}2.查询运行中的事务 SHOW PROC'/transactions/{dbId}/running'-如果找到 → RUNNING3.查询已完成的事务 SHOW PROC'/transactions/{dbId}/finished'-如果找到且状态为 VISIBLE → FINISHED-如果找到且状态为 ABORTED → FAILED4.都找不到 → UNKNOWN}二、数据传输详细流程
2.1 数据传输方式
核心结论:数据传输采用BE 到 BE 的直接快照复制机制,通过HTTP 协议传输数据。
不是 JDBC:JDBC 仅用于元数据查询(SHOW PROC、SHOW DATA 等)
不是 StreamLoad:StreamLoad 用于外部数据导入,不是集群间复制
2.2 数据传输架构
┌─────────────────────────────────────────────────────────────┐ │ 迁移工具 │ │ 生成 ReplicationJob(包含源 BE 信息) │ └──────────────────────┬──────────────────────────────────────┘ │ Thrift RPC ▼ ┌─────────────────────────────────────────────────────────────┐ │ 目标集群 FE │ │ 1. 接收 startTableReplication 请求 │ │ 2. 验证 Token 和权限 │ │ 3. 创建事务(LoadJobSourceType = 'REPLICATION') │ │ 4. 将任务分发给目标 BE 节点 │ └──────────────────────┬──────────────────────────────────────┘ │ 任务分发 ▼ ┌─────────────────────────────────────────────────────────────┐ │ 目标集群 BE │ │ 1. 接收复制任务 │ │ 2. 解析源 BE 信息(host, httpPort) │ │ 3. 通过 HTTP 协议连接源 BE │ │ 4. 请求快照数据(包含 srcTabletId, version, schemaHash) │ │ 5. 接收快照数据并写入本地存储 │ │ 6. 报告写入完成 │ └──────────────────────┬──────────────────────────────────────┘ │ HTTP 协议(快照下载) ▼ ┌─────────────────────────────────────────────────────────────┐ │ 源集群 BE │ │ 1. 接收快照下载请求 │ │ 2. 验证 Token(从请求中获取) │ │ 3. 根据版本号生成快照 │ │ 4. 通过 HTTP 协议传输快照数据 │ └─────────────────────────────────────────────────────────────┘2.3 数据传输详细步骤
步骤 1:任务准备
// 在 produceReplicationJob() 中ReplicationJobreplicationJob=newReplicationJob(jobId,// 任务 IDtargetUserName,// 目标集群用户名targetPassword,// 目标集群密码sourceToken,// 源集群 Token(关键!)targetDbId,// 目标数据库 IDtargetTableId,// 目标表 IDdbName,// 数据库名tableName,// 表名srcTableType,// 源表类型srcTableDataSize,// 源表数据大小partitionInfos// 分区信息列表);// 分区信息包含:PartitionInfo{partitionId,// 目标分区 IDsrcVersion,// 源分区版本(关键!)indexInfos// 索引信息列表}// 索引信息包含:IndexInfo{indexId,// 目标索引 IDsrcSchemaHash,// 源 Schema HashtabletInfos// Tablet 信息列表}// Tablet 信息包含:TabletInfo{tabletId,// 目标 Tablet IDsrcTabletId,// 源 Tablet ID(关键!)replicaInfos// 副本信息列表}// 副本信息包含:ReplicaInfo{srcBackend{// 源 BE 节点信息(关键!)host,// 源 BE 主机地址bePort,// BE 服务端口httpPort// HTTP 端口(用于数据传输)}}步骤 2:RPC 调用
// 在 Utils.sendReplicationJob() 中1.随机选择目标集群 FE 节点2.将ReplicationJob转换为TTableReplicationRequest3.通过ThriftRPC 调用:FrontendServiceProxy.call(address,// 目标 FE 地址10000,// 超时时间(10秒)3,// 重试次数client->client.startTableReplication(request))步骤 3:FE 处理请求
// 目标集群 FE 接收到请求后(FE 端逻辑,不在工具代码中)1.验证请求-检查用户名和密码-验证源集群Token-检查并发限制(replication_max_parallel_table_count 等)2.创建事务-事务Label:{dbId}-{tableId}_{jobId}-LoadJobSourceType:'REPLICATION'-事务状态:PREPARE3.分发任务到目标 BE 节点-根据Tablet分布信息-将任务发送到对应的 BE 节点步骤 4:BE 节点拉取数据
// 目标 BE 节点接收到任务后(BE 端逻辑,不在工具代码中)1.解析任务信息-获取源 BE 信息(host,httpPort)-获取源TabletID-获取版本号和SchemaHash2.建立 HTTP 连接-连接到源 BE 的 httpPort(默认8040)-使用源集群Token进行认证3.请求快照数据 HTTPRequest:-URL:http://{srcBeHost}:{httpPort}/api/snapshot/download-Method:POST-Headers:*Authorization:Token{sourceToken}-Body:{"tablet_id":srcTabletId,"version":srcVersion,"schema_hash":srcSchemaHash,"target_tablet_id":tabletId}4.源 BE 处理请求-验证Token-根据版本号生成快照-如果快照为空,返回错误:"Source snapshots is empty"-否则,通过 HTTP 流式传输快照数据5.目标 BE 接收数据-接收快照文件-验证数据完整性(Checksum)-写入本地存储-更新Tablet元数据步骤 5:事务提交
// 所有副本写入完成后1.目标 BE 报告写入完成2.FE 提交事务-事务状态:COMMITTED3.FE 发布数据-事务状态:PUBLISHED-数据变为可见(VISIBLE)4.FE 完成事务-事务状态:FINISHED步骤 6:状态跟踪
// 迁移工具定期查询事务状态1.查询运行中的事务 SHOW PROC'/transactions/{dbId}/running'-如果找到对应的事务 → RUNNING2.查询已完成的事务 SHOW PROC'/transactions/{dbId}/finished'-如果找到且状态为 VISIBLE → FINISHED-如果找到且状态为 ABORTED → FAILED3.更新任务状态-更新 replicationJobState-更新 replicationTableStatus-统计进度2.4 为什么采用这种方式?
2.4.1 为什么不是 JDBC?
JDBC 的特点:
- 基于 MySQL 协议
- 用于 SQL 查询和 DDL 操作
- 需要执行查询计划
- 返回结果集,适合小数据量
为什么不适合:
性能问题:
- 需要执行 SQL 查询,解析查询计划
- 返回结果集需要序列化/反序列化
- 网络往返次数多,效率低
功能限制:
- 无法直接获取 Tablet 级别的数据
- 无法指定版本号获取特定版本的数据
- 无法获取快照数据
资源消耗:
- 占用查询资源
- 增加 FE 负载
- 不适合大数据量传输
代码证据:
// 工具中 JDBC 仅用于元数据查询Utils.execQuerySql("SHOW PROC '/dbs/'",...);Utils.execQuerySql("SHOW DATA",...);Utils.execQuerySql("SHOW PARTITIONS",...);// 没有用于数据传输的 JDBC 调用2.4.2 为什么不是 StreamLoad?
StreamLoad 的特点:
- 用于导入外部数据(文件、Kafka 等)
- 客户端准备数据并上传
- 通过 HTTP 接口上传数据流
为什么不适合:
数据来源:
- StreamLoad 需要客户端准备数据
- 迁移工具无法直接访问源集群的数据文件
- 需要先导出再导入,效率低
数据格式:
- StreamLoad 需要数据格式转换
- 需要解析和重新编码
- 可能丢失元数据信息
一致性:
- StreamLoad 是导入操作,不是复制操作
- 无法保证版本一致性
- 无法支持增量同步
代码证据:
// 请求中包含的是元数据信息,不是数据本身TTableReplicationRequest{src_tablet_id,// 源 Tablet IDsrc_version,// 源版本号src_backend,// 源 BE 信息// 没有数据内容}2.4.3 为什么采用 BE 到 BE 快照复制?
优势分析:
性能优势:
✅ 直接传输二进制数据,无需格式转换 ✅ BE 到 BE 直连,减少网络跳数 ✅ 支持并行传输,提高吞吐量 ✅ 基于版本快照,支持增量同步一致性保证:
✅ 基于版本号,保证数据版本一致性 ✅ 快照是原子操作,要么全部成功要么全部失败 ✅ 支持事务机制,保证数据完整性资源效率:
✅ 不占用 FE 查询资源 ✅ 不执行查询计划,减少 CPU 消耗 ✅ 直接传输存储格式,减少内存消耗灵活性:
✅ 支持指定版本号复制特定版本的数据 ✅ 支持增量同步(只传输版本差异) ✅ 支持断点续传(快照机制)安全性:
✅ 使用 Token 进行认证 ✅ BE 到 BE 直连,减少中间环节 ✅ 支持网络地址映射,适应复杂网络环境
代码证据:
// 1. 请求中包含源 BE 信息ReplicationJob.BackendInfo{host,// 源 BE 主机地址bePort,// BE 服务端口httpPort// HTTP 端口(用于数据传输)}// 2. 请求中包含版本信息PartitionInfo{srcVersion// 源分区版本号}// 3. 错误信息显示使用快照机制if(errorMsg.contains("Source snapshots is empty")){// 说明使用快照机制}2.5 数据传输关键技术点
2.5.1 版本控制
// 版本比较逻辑if(srcPartition.getVisibleVersion()>targetPartition.getVisibleVersion()){// 需要同步:目标 BE 请求指定版本号的快照request.version=srcPartition.getVisibleVersion();}作用:
- 保证数据版本一致性
- 支持增量同步
- 避免重复传输
2.5.2 快照机制
// 快照请求{"tablet_id":srcTabletId,// 源 Tablet ID"version":srcVersion,// 版本号"schema_hash":srcSchemaHash,// Schema Hash"target_tablet_id":tabletId// 目标 Tablet ID}特点:
- 快照对应特定版本的数据
- 快照是只读的,保证数据一致性
- 支持增量快照(只包含版本差异)
2.5.3 Token 认证
// 请求中包含源集群 TokenTTableReplicationRequest{src_token:"wwwwwwww-xxxx-yyyy-zzzz-uuuuuuuuuu"}// 目标 BE 使用 Token 访问源 BEHTTPRequestHeaders:Authorization:Token{src_token}作用:
- 验证目标集群是否有权限访问源集群
- 防止未授权访问
- 保证数据安全
2.5.4 并发控制
// FE 配置参数replication_max_parallel_table_count// 最大并发表数replication_max_parallel_replica_count// 最大并发副本数replication_max_parallel_data_size_mb// 最大并发数据量作用:
- 控制并发度,避免过载
- 平衡迁移速度和集群负载
- 防止资源耗尽
2.6 数据传输流程图
┌─────────────────────────────────────────────────────────────┐ │ 迁移工具 │ │ 1. 生成 ReplicationJob │ │ - 包含源 BE 信息(host, httpPort) │ │ - 包含 Tablet 映射(srcTabletId → tabletId) │ │ - 包含版本信息(srcVersion) │ │ - 包含 Token(srcToken) │ └──────────────────────┬──────────────────────────────────────┘ │ Thrift RPC ▼ ┌─────────────────────────────────────────────────────────────┐ │ 目标集群 FE │ │ 1. 验证请求(Token、权限) │ │ 2. 检查并发限制 │ │ 3. 创建事务(Label: {dbId}-{tableId}_{jobId}) │ │ 4. 分发任务到目标 BE 节点 │ └──────────────────────┬──────────────────────────────────────┘ │ 任务分发 ▼ ┌─────────────────────────────────────────────────────────────┐ │ 目标集群 BE │ │ 1. 接收任务 │ │ 2. 解析源 BE 信息 │ │ 3. 建立 HTTP 连接 │ │ 4. 发送快照下载请求 │ │ POST http://{srcBeHost}:{httpPort}/api/snapshot/download │ │ Headers: Authorization: Token {srcToken} │ │ Body: {tablet_id, version, schema_hash, target_tablet_id}│ └──────────────────────┬──────────────────────────────────────┘ │ HTTP 协议 ▼ ┌─────────────────────────────────────────────────────────────┐ │ 源集群 BE │ │ 1. 验证 Token │ │ 2. 根据版本号生成快照 │ │ 3. 通过 HTTP 流式传输快照数据 │ │ 4. 返回数据(或错误:"Source snapshots is empty") │ └──────────────────────┬──────────────────────────────────────┘ │ HTTP Response(快照数据) ▼ ┌─────────────────────────────────────────────────────────────┐ │ 目标集群 BE │ │ 1. 接收快照数据 │ │ 2. 验证数据完整性(Checksum) │ │ 3. 写入本地存储 │ │ 4. 更新 Tablet 元数据 │ │ 5. 报告写入完成 │ └──────────────────────┬──────────────────────────────────────┘ │ 完成报告 ▼ ┌─────────────────────────────────────────────────────────────┐ │ 目标集群 FE │ │ 1. 接收所有副本的完成报告 │ │ 2. 提交事务(COMMITTED) │ │ 3. 发布数据(PUBLISHED → VISIBLE) │ │ 4. 完成事务(FINISHED) │ └──────────────────────┬──────────────────────────────────────┘ │ 状态查询 ▼ ┌─────────────────────────────────────────────────────────────┐ │ 迁移工具 │ │ 1. 查询事务状态 │ │ SHOW PROC '/transactions/{dbId}/running' │ │ SHOW PROC '/transactions/{dbId}/finished' │ │ 2. 更新任务状态 │ │ - RUNNING / FINISHED / FAILED │ │ 3. 统计进度 │ └─────────────────────────────────────────────────────────────┘2.7 数据传输性能优化
2.7.1 并行传输
- 表级并行:多个表同时传输
- 分区级并行:多个分区同时传输
- Tablet 级并行:多个 Tablet 同时传输
2.7.2 批量处理
- 批量发送任务:
replicationJobBatchSize控制 - 批量查询状态:一次查询多个事务状态
2.7.3 网络优化
- BE 直连:减少网络跳数
- HTTP 协议:成熟稳定,支持大文件传输
- 快照机制:支持增量传输,减少数据量
三、总结
3.1 迁移逻辑总结
- 多线程协调:4 个工作线程各司其职
- 周期性同步:定期更新元数据并生成任务
- 任务队列管理:DDL 任务和复制任务分别管理
- 状态跟踪:通过查询事务状态跟踪任务进度
3.2 数据传输总结
- 传输方式:BE 到 BE 直接传输,使用 HTTP 协议
- 数据格式:基于版本快照的二进制数据
- 认证机制:使用 Token 进行认证
- 优势:高效、可靠、灵活、安全
3.3 关键设计决策
| 决策 | 原因 |
|---|---|
| BE 到 BE 直连 | 减少网络跳数,提高性能 |
| HTTP 协议 | 成熟稳定,支持大文件传输 |
| 快照机制 | 保证数据一致性,支持增量同步 |
| Token 认证 | 保证安全性,简化认证流程 |
| 版本控制 | 支持增量同步,避免重复传输 |
文档生成时间:2025-01-XX
反编译工具:CFR 0.152
JAR 版本:starrocks-cluster-sync-2.0