news 2026/4/25 16:18:29

【L】表数据迁移

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【L】表数据迁移

document表有一个字段tag存储标签名字,多个标签名字用字段分隔。还有一个tag表,有id和名字字段。现在需要把document和标签的关系写到一个关系表,字段有document表的id和tag表的id,如何将原有数据写入到关系表。

将逗号分隔的字符串迁移到关系表是典型的数据库反范式修复场景。为保证数据一致性和生产安全,下面提供SQL 方案脚本方案,并附完整执行流程。


⚠️ 迁移前必读(防坑指南)

  1. 必须加唯一索引:关系表(document_id, tag_id)必须设联合唯一索引,防重复插入。
  2. 清理脏数据:标签名可能含前后空格、全角逗号、空字符串,迁移时需TRIM过滤。
  3. 缺失标签策略document.tag中的名字在tag表不存在时,需决定是跳过自动新建还是记录日志
  4. 务必先备份:执行前对document表和目标关系表做完整备份。

🛠️ 方案一:纯 SQL 迁移(按数据库方言)

🔹 MySQL 8.0.14+(使用JSON_TABLE拆分)
-- 1. 确保 tag.name 有索引提升 JOIN 性能 -- CREATE INDEX idx_tag_name ON tag(name); -- 2. 执行迁移(INSERT IGNORE 自动忽略已存在或无法匹配的) INSERT IGNORE INTO doc_tag (document_id, tag_id) SELECT DISTINCT d.id, t.id FROM document d CROSS JOIN JSON_TABLE( CONCAT('["', REPLACE(REPLACE(TRIM(d.tag), ', ', '","'), ',', '","'), '"]'), '$[*]' COLUMNS (tag_name VARCHAR(255) PATH '$') ) AS jt JOIN tag t ON TRIM(t.name) = TRIM(jt.tag_name) WHERE d.tag IS NOT NULL AND TRIM(d.tag) != '';
🔹 PostgreSQL(原生数组函数,最稳定)
INSERT INTO doc_tag (document_id, tag_id) SELECT DISTINCT d.id, t.id FROM document d CROSS JOIN LATERAL unnest(string_to_array(d.tag, ',')) AS raw_tag JOIN tag t ON TRIM(t.name) = TRIM(raw_tag) WHERE d.tag IS NOT NULL AND TRIM(d.tag) != '' ON CONFLICT (document_id, tag_id) DO NOTHING; -- 幂等写入
🔹 SQL Server 2022+ / Oracle 23c+

现代版本已支持STRING_SPLIT()JSON_TABLE,语法类似。若版本较低,强烈建议使用方案二脚本迁移,避免复杂递归 CTE 带来的性能灾难。


💻 方案二:应用层脚本迁移(🔥 生产环境首选)

SQL 拆分对脏数据容错率低,脚本方案可精确控制、记录日志、批量提交,更适合千万级数据。

# Python 示例 (使用 pymysql / psycopg2) import re BATCH_SIZE = 1000 cursor.execute("SELECT id, tag FROM document WHERE tag IS NOT NULL") docs = cursor.fetchmany(BATCH_SIZE) while docs: relations = [] missing_tags = set() for doc_id, raw_tags in docs: # 清洗:支持全角/半角逗号,过滤空值 tags = [t.strip() for t in re.split(r'[,\uFF0C]', raw_tags) if t.strip()] for tag_name in tags: cursor.execute("SELECT id FROM tag WHERE name = %s", (tag_name,)) res = cursor.fetchone() if res: relations.append((doc_id, res[0])) else: missing_tags.add(tag_name) # 批量插入关系表 if relations: cursor.executemany( "INSERT IGNORE INTO doc_tag (document_id, tag_id) VALUES (%s, %s)", relations ) if missing_tags: print(f"⚠️ 缺失标签: {missing_tags}") conn.commit() docs = cursor.fetchmany(BATCH_SIZE)

📋 标准执行流程(按步骤落地)

步骤操作目的
1️⃣备份document表 & 创建空doc_tag可回滚、零风险
2️⃣tag.name加唯一索引加速 JOIN,防重复标签
3️⃣doc_tag建联合唯一索引(document_id, tag_id)保证幂等性
4️⃣小批量测试LIMIT 1000执行迁移语句验证拆分逻辑、脏数据处理
5️⃣检查日志:确认缺失标签、重复数、耗时评估是否需调整策略
6️⃣全量执行(建议分批次/事务提交)控制事务大小,防锁表/回滚段爆满
7️⃣数据校验:COUNT(doc_tag) vs 预期值闭环验证

🔍 关键问题处理策略

场景推荐处理方式
标签名在tag表不存在记录日志供人工核对;或自动执行INSERT INTO tag(name) VALUES (?)获取新 ID 再关联
文档 tag 字段为空/NULLWHERE tag IS NOT NULL AND TRIM(tag) != ''直接过滤
数据量 > 500万必须用脚本分批次提交(如每次 5000 条),禁用外键检查,迁移后重建索引
需要保留原始 tag 字段迁移完成后,通过UPDATE document SET tag = NULL或添加注释标记废弃

✅ 最终建议

  1. 数据量 < 10万:直接用上方 SQL,10秒内可完成。
  2. 数据量 ≥ 10万 或含历史脏数据:务必用脚本方案,配合事务分批提交,并输出missing_tags.log
  3. 迁移完成后,在代码层彻底废弃document.tag字符串字段,改为通过doc_tagJOIN 查询,否则后续仍会持续产生数据不一致。

迁移脚本java

以下是基于 Spring Boot + MyBatis 的生产级迁移实现。代码采用游标分页 + JDBC 批量执行器 + 手动分片事务架构,兼顾性能、内存安全与可维护性。

📦 1. 扩展 Mapper 接口

假设你已有DocumentMapperTagMapperDocTagMapper,只需补充迁移所需方法:

@Mapper public interface DocumentMapper { // 游标分页查询,避免 OFFSET 深分页性能衰减 @Select("SELECT id, tag FROM document WHERE id > #{lastId} ORDER BY id ASC LIMIT #{limit}") List<Document> selectBatchForMigration(@Param("lastId") Long lastId, @Param("limit") int limit); } @Mapper public interface TagMapper { // 全量加载标签(标签表通常较小,内存缓存收益极高) @Select("SELECT id, name FROM tag") List<Tag> selectAllForCache(); } @Mapper public interface DocTagMapper { // 注意:配合 ExecutorType.BATCH 使用,此处只需单条插入 @Insert("INSERT IGNORE INTO doc_tag (document_id, tag_id) VALUES (#{documentId}, #{tagId})") int insert(DocTag docTag); }

💡为什么不用<foreach>批量插入?
ExecutorType.BATCH模式下,循环调用单条insert会被 MyBatis 底层转为 JDBCaddBatch(),性能远高于拼接长 SQL,且不会触发max_allowed_packet限制。


🚀 2. 核心迁移 Service

import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.executor.ExecutorType; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.springframework.stereotype.Service; import java.util.*; import java.util.regex.Pattern; import java.util.stream.Collectors; @Slf4j @Service @RequiredArgsConstructor public class TagMigrationService { private final DocumentMapper documentMapper; private final TagMapper tagMapper; private final DocTagMapper docTagMapper; private final SqlSessionFactory sqlSessionFactory; // 匹配半角逗号、全角逗号、空格 private static final Pattern SPLIT_PATTERN = Pattern.compile("[,,\\s]+"); private static final int DOC_FETCH_BATCH = 2000; private static final int COMMIT_BATCH_SIZE = 2000; private static final int MISSING_TAG_CACHE_LIMIT = 10000; public void executeMigration() { log.info("🚀 开始执行标签关系迁移..."); // 1. 加载标签缓存 (name -> id) Map<String, Long> tagMap = loadTagCache(); log.info("✅ 标签缓存加载完成,共 {} 个", tagMap.size()); // 2. 创建 BATCH 执行器 Session(非自动提交) try (SqlSession batchSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) { DocTagMapper batchMapper = batchSession.getMapper(DocTagMapper.class); long lastId = 0; int insertCounter = 0; int totalRelations = 0; Set<String> missingTags = new LinkedHashSet<>(); while (true) { List<Document> docs = documentMapper.selectBatchForMigration(lastId, DOC_FETCH_BATCH); if (docs.isEmpty()) break; for (Document doc : docs) { if (doc.getTag() == null || doc.getTag().trim().isEmpty()) { lastId = doc.getId(); continue; } // 拆分并清洗 String[] parts = SPLIT_PATTERN.split(doc.getTag()); for (String part : parts) { String cleanName = part.trim(); if (cleanName.isEmpty()) continue; Long tagId = tagMap.get(cleanName); if (tagId != null) { batchMapper.insert(new DocTag(doc.getId(), tagId)); insertCounter++; totalRelations++; } else { if (missingTags.size() < MISSING_TAG_CACHE_LIMIT) { missingTags.add(cleanName); } } } lastId = doc.getId(); } // 达到批次阈值,提交事务并清理一级缓存 if (insertCounter >= COMMIT_BATCH_SIZE) { batchSession.commit(); batchSession.clearCache(); insertCounter = 0; log.info("📦 已提交批次,累计生成关系: {}", totalRelations); } } // 提交剩余数据 if (insertCounter > 0) { batchSession.commit(); } // 处理缺失标签 if (!missingTags.isEmpty()) { log.warn("⚠️ 发现 {} 个未在 tag 表定义的标签名,已记录日志", missingTags.size()); logMissingTags(missingTags); } log.info("🎉 迁移完成!总计写入关系数据: {}", totalRelations); } catch (Exception e) { log.error("❌ 迁移过程中发生异常", e); throw new RuntimeException("标签关系迁移失败", e); } } private Map<String, Long> loadTagCache() { return tagMapper.selectAllForCache().stream() .collect(Collectors.toMap( t -> t.getName().trim(), Tag::getId, (v1, v2) -> v1 // 防重名冲突 )); } private void logMissingTags(Set<String> missingTags) { try (java.io.PrintWriter pw = new java.io.PrintWriter("migration_missing_tags.log", "UTF-8")) { pw.println("# 未匹配到 tag 表的原始标签名(共 " + missingTags.size() + " 个)"); missingTags.forEach(pw::println); } catch (Exception e) { log.error("写入缺失标签日志失败", e); } } }

🔧 3. 实体类示例(按需调整)

@Data public class Document { private Long id; private String tag; } @Data public class Tag { private Long id; private String name; } @Data @NoArgsConstructor @AllArgsConstructor public class DocTag { private Long documentId; private Long tagId; }

📋 4. 触发方式

// 方式1:命令行启动后自动执行 @Component @RequiredArgsConstructor public class MigrationRunner implements CommandLineRunner { private final TagMigrationService migrationService; @Override public void run(String... args) { migrationService.executeMigration(); } } // 方式2:暴露 REST 接口(推荐用于测试/灰度) @RestController @RequestMapping("/api/migration") @RequiredArgsConstructor public class MigrationController { private final TagMigrationService migrationService; @PostMapping("/tags") public ResponseEntity<String> migrate() { migrationService.executeMigration(); return ResponseEntity.ok("迁移任务已提交至后台执行"); } }

⚠️ 5. 生产环境必查清单

检查项操作
唯一索引ALTER TABLE doc_tag ADD UNIQUE KEY uk_doc_tag (document_id, tag_id);
标签索引ALTER TABLE tag ADD INDEX idx_name (name);加速缓存加载
事务日志调整innodb_log_file_size(MySQL)或wal_level(PG),防频繁提交刷盘阻塞
连接池配置迁移期间独占 1 个连接即可,无需高并发连接池
监控指标观察 DBThreads_runningInnoDB Row LockGC 频率

💡 6. 常见问题与优化策略

问题解决方案
标签表超大(>50万)改为Map<String, Long>分片加载,或改用SELECT id FROM tag WHERE name = ?按需查询(可加本地 Caffeine 缓存)
需要自动补全缺失标签else分支调用tagMapper.insert(name)并回填 ID,注意加分布式锁防并发重复插入
MySQL 8.0+ 性能瓶颈application.yml添加mybatis.configuration.default-executor-type=BATCH(全局生效)
断点续传记录lastId到 Redis/DB,启动时读取;或直接用WHERE id > #{lastId}天然支持重试

✅ 验证脚本(迁移后必跑)

-- 1. 验证数量一致性 SELECT COUNT(*) FROM doc_tag; -- 应与日志输出 totalRelations 一致 -- 2. 随机抽样验证 SELECT d.id, d.tag, GROUP_CONCAT(t.name) AS matched_tags FROM document d JOIN doc_tag dt ON d.id = dt.document_id JOIN tag t ON dt.tag_id = t.id WHERE d.id BETWEEN 1000 AND 1010 GROUP BY d.id, d.tag; -- 3. 确认无孤立关系 SELECT COUNT(*) FROM doc_tag dt LEFT JOIN document d ON dt.document_id = d.id WHERE d.id IS NULL; -- 应为 0
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/25 16:18:23

碧蓝航线自动脚本:解放双手的全能游戏助手

碧蓝航线自动脚本&#xff1a;解放双手的全能游戏助手 【免费下载链接】AzurLaneAutoScript Azur Lane bot (CN/EN/JP/TW) 碧蓝航线脚本 | 无缝委托科研&#xff0c;全自动大世界 项目地址: https://gitcode.com/gh_mirrors/az/AzurLaneAutoScript AzurLaneAutoScript&a…

作者头像 李华
网站建设 2026/4/25 16:16:22

7-Zip:重新定义免费压缩工具的终极指南

7-Zip&#xff1a;重新定义免费压缩工具的终极指南 【免费下载链接】7z 7-Zip Official Chinese Simplified Repository (Homepage and 7z Extra package) 项目地址: https://gitcode.com/gh_mirrors/7z1/7z 你是否曾因文件太大无法通过邮件发送而烦恼&#xff1f;是否担…

作者头像 李华
网站建设 2026/4/25 16:13:07

智能手表测血氧准不准?拆解红光与红外光PPG的底层逻辑与校准难题

智能手表血氧监测精度揭秘&#xff1a;从PPG原理到日常使用的科学指南 清晨六点&#xff0c;你的智能手表在监测到血氧饱和度降至92%时发出警报——这个数据是否值得担忧&#xff1f;当我们把健康监测的主动权交给腕上这个小设备时&#xff0c;很少有人真正了解那些闪烁的红外光…

作者头像 李华
网站建设 2026/4/25 16:11:58

如何构建本地AI写作助手:KoboldAI的完整实践指南

如何构建本地AI写作助手&#xff1a;KoboldAI的完整实践指南 【免费下载链接】KoboldAI-Client For GGUF support, see KoboldCPP: https://github.com/LostRuins/koboldcpp 项目地址: https://gitcode.com/gh_mirrors/ko/KoboldAI-Client 你是否曾面对空白文档&#xf…

作者头像 李华
网站建设 2026/4/25 16:09:35

开源音乐格式转换工具实战:5步解锁网易云音乐加密文件

开源音乐格式转换工具实战&#xff1a;5步解锁网易云音乐加密文件 【免费下载链接】ncmdump 转换网易云音乐 ncm 到 mp3 / flac. Convert Netease Cloud Music ncm files to mp3/flac files. 项目地址: https://gitcode.com/gh_mirrors/nc/ncmdump 还在为网易云音乐下载…

作者头像 李华