document表有一个字段tag存储标签名字,多个标签名字用字段分隔。还有一个tag表,有id和名字字段。现在需要把document和标签的关系写到一个关系表,字段有document表的id和tag表的id,如何将原有数据写入到关系表。
将逗号分隔的字符串迁移到关系表是典型的数据库反范式修复场景。为保证数据一致性和生产安全,下面提供SQL 方案与脚本方案,并附完整执行流程。
⚠️ 迁移前必读(防坑指南)
- 必须加唯一索引:关系表
(document_id, tag_id)必须设联合唯一索引,防重复插入。 - 清理脏数据:标签名可能含前后空格、全角逗号、空字符串,迁移时需
TRIM过滤。 - 缺失标签策略:
document.tag中的名字在tag表不存在时,需决定是跳过、自动新建还是记录日志。 - 务必先备份:执行前对
document表和目标关系表做完整备份。
🛠️ 方案一:纯 SQL 迁移(按数据库方言)
🔹 MySQL 8.0.14+(使用JSON_TABLE拆分)
-- 1. 确保 tag.name 有索引提升 JOIN 性能 -- CREATE INDEX idx_tag_name ON tag(name); -- 2. 执行迁移(INSERT IGNORE 自动忽略已存在或无法匹配的) INSERT IGNORE INTO doc_tag (document_id, tag_id) SELECT DISTINCT d.id, t.id FROM document d CROSS JOIN JSON_TABLE( CONCAT('["', REPLACE(REPLACE(TRIM(d.tag), ', ', '","'), ',', '","'), '"]'), '$[*]' COLUMNS (tag_name VARCHAR(255) PATH '$') ) AS jt JOIN tag t ON TRIM(t.name) = TRIM(jt.tag_name) WHERE d.tag IS NOT NULL AND TRIM(d.tag) != '';🔹 PostgreSQL(原生数组函数,最稳定)
INSERT INTO doc_tag (document_id, tag_id) SELECT DISTINCT d.id, t.id FROM document d CROSS JOIN LATERAL unnest(string_to_array(d.tag, ',')) AS raw_tag JOIN tag t ON TRIM(t.name) = TRIM(raw_tag) WHERE d.tag IS NOT NULL AND TRIM(d.tag) != '' ON CONFLICT (document_id, tag_id) DO NOTHING; -- 幂等写入🔹 SQL Server 2022+ / Oracle 23c+
现代版本已支持STRING_SPLIT()或JSON_TABLE,语法类似。若版本较低,强烈建议使用方案二脚本迁移,避免复杂递归 CTE 带来的性能灾难。
💻 方案二:应用层脚本迁移(🔥 生产环境首选)
SQL 拆分对脏数据容错率低,脚本方案可精确控制、记录日志、批量提交,更适合千万级数据。
# Python 示例 (使用 pymysql / psycopg2) import re BATCH_SIZE = 1000 cursor.execute("SELECT id, tag FROM document WHERE tag IS NOT NULL") docs = cursor.fetchmany(BATCH_SIZE) while docs: relations = [] missing_tags = set() for doc_id, raw_tags in docs: # 清洗:支持全角/半角逗号,过滤空值 tags = [t.strip() for t in re.split(r'[,\uFF0C]', raw_tags) if t.strip()] for tag_name in tags: cursor.execute("SELECT id FROM tag WHERE name = %s", (tag_name,)) res = cursor.fetchone() if res: relations.append((doc_id, res[0])) else: missing_tags.add(tag_name) # 批量插入关系表 if relations: cursor.executemany( "INSERT IGNORE INTO doc_tag (document_id, tag_id) VALUES (%s, %s)", relations ) if missing_tags: print(f"⚠️ 缺失标签: {missing_tags}") conn.commit() docs = cursor.fetchmany(BATCH_SIZE)📋 标准执行流程(按步骤落地)
| 步骤 | 操作 | 目的 |
|---|---|---|
| 1️⃣ | 备份document表 & 创建空doc_tag表 | 可回滚、零风险 |
| 2️⃣ | 在tag.name加唯一索引 | 加速 JOIN,防重复标签 |
| 3️⃣ | 在doc_tag建联合唯一索引(document_id, tag_id) | 保证幂等性 |
| 4️⃣ | 小批量测试:LIMIT 1000执行迁移语句 | 验证拆分逻辑、脏数据处理 |
| 5️⃣ | 检查日志:确认缺失标签、重复数、耗时 | 评估是否需调整策略 |
| 6️⃣ | 全量执行(建议分批次/事务提交) | 控制事务大小,防锁表/回滚段爆满 |
| 7️⃣ | 数据校验:COUNT(doc_tag) vs 预期值 | 闭环验证 |
🔍 关键问题处理策略
| 场景 | 推荐处理方式 |
|---|---|
标签名在tag表不存在 | 记录日志供人工核对;或自动执行INSERT INTO tag(name) VALUES (?)获取新 ID 再关联 |
| 文档 tag 字段为空/NULL | WHERE tag IS NOT NULL AND TRIM(tag) != ''直接过滤 |
| 数据量 > 500万 | 必须用脚本分批次提交(如每次 5000 条),禁用外键检查,迁移后重建索引 |
| 需要保留原始 tag 字段 | 迁移完成后,通过UPDATE document SET tag = NULL或添加注释标记废弃 |
✅ 最终建议
- 数据量 < 10万:直接用上方 SQL,10秒内可完成。
- 数据量 ≥ 10万 或含历史脏数据:务必用脚本方案,配合事务分批提交,并输出
missing_tags.log。 - 迁移完成后,在代码层彻底废弃
document.tag字符串字段,改为通过doc_tagJOIN 查询,否则后续仍会持续产生数据不一致。
迁移脚本java
以下是基于 Spring Boot + MyBatis 的生产级迁移实现。代码采用游标分页 + JDBC 批量执行器 + 手动分片事务架构,兼顾性能、内存安全与可维护性。
📦 1. 扩展 Mapper 接口
假设你已有DocumentMapper、TagMapper、DocTagMapper,只需补充迁移所需方法:
@Mapper public interface DocumentMapper { // 游标分页查询,避免 OFFSET 深分页性能衰减 @Select("SELECT id, tag FROM document WHERE id > #{lastId} ORDER BY id ASC LIMIT #{limit}") List<Document> selectBatchForMigration(@Param("lastId") Long lastId, @Param("limit") int limit); } @Mapper public interface TagMapper { // 全量加载标签(标签表通常较小,内存缓存收益极高) @Select("SELECT id, name FROM tag") List<Tag> selectAllForCache(); } @Mapper public interface DocTagMapper { // 注意:配合 ExecutorType.BATCH 使用,此处只需单条插入 @Insert("INSERT IGNORE INTO doc_tag (document_id, tag_id) VALUES (#{documentId}, #{tagId})") int insert(DocTag docTag); }💡为什么不用
<foreach>批量插入?ExecutorType.BATCH模式下,循环调用单条insert会被 MyBatis 底层转为 JDBCaddBatch(),性能远高于拼接长 SQL,且不会触发max_allowed_packet限制。
🚀 2. 核心迁移 Service
import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.executor.ExecutorType; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.springframework.stereotype.Service; import java.util.*; import java.util.regex.Pattern; import java.util.stream.Collectors; @Slf4j @Service @RequiredArgsConstructor public class TagMigrationService { private final DocumentMapper documentMapper; private final TagMapper tagMapper; private final DocTagMapper docTagMapper; private final SqlSessionFactory sqlSessionFactory; // 匹配半角逗号、全角逗号、空格 private static final Pattern SPLIT_PATTERN = Pattern.compile("[,,\\s]+"); private static final int DOC_FETCH_BATCH = 2000; private static final int COMMIT_BATCH_SIZE = 2000; private static final int MISSING_TAG_CACHE_LIMIT = 10000; public void executeMigration() { log.info("🚀 开始执行标签关系迁移..."); // 1. 加载标签缓存 (name -> id) Map<String, Long> tagMap = loadTagCache(); log.info("✅ 标签缓存加载完成,共 {} 个", tagMap.size()); // 2. 创建 BATCH 执行器 Session(非自动提交) try (SqlSession batchSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) { DocTagMapper batchMapper = batchSession.getMapper(DocTagMapper.class); long lastId = 0; int insertCounter = 0; int totalRelations = 0; Set<String> missingTags = new LinkedHashSet<>(); while (true) { List<Document> docs = documentMapper.selectBatchForMigration(lastId, DOC_FETCH_BATCH); if (docs.isEmpty()) break; for (Document doc : docs) { if (doc.getTag() == null || doc.getTag().trim().isEmpty()) { lastId = doc.getId(); continue; } // 拆分并清洗 String[] parts = SPLIT_PATTERN.split(doc.getTag()); for (String part : parts) { String cleanName = part.trim(); if (cleanName.isEmpty()) continue; Long tagId = tagMap.get(cleanName); if (tagId != null) { batchMapper.insert(new DocTag(doc.getId(), tagId)); insertCounter++; totalRelations++; } else { if (missingTags.size() < MISSING_TAG_CACHE_LIMIT) { missingTags.add(cleanName); } } } lastId = doc.getId(); } // 达到批次阈值,提交事务并清理一级缓存 if (insertCounter >= COMMIT_BATCH_SIZE) { batchSession.commit(); batchSession.clearCache(); insertCounter = 0; log.info("📦 已提交批次,累计生成关系: {}", totalRelations); } } // 提交剩余数据 if (insertCounter > 0) { batchSession.commit(); } // 处理缺失标签 if (!missingTags.isEmpty()) { log.warn("⚠️ 发现 {} 个未在 tag 表定义的标签名,已记录日志", missingTags.size()); logMissingTags(missingTags); } log.info("🎉 迁移完成!总计写入关系数据: {}", totalRelations); } catch (Exception e) { log.error("❌ 迁移过程中发生异常", e); throw new RuntimeException("标签关系迁移失败", e); } } private Map<String, Long> loadTagCache() { return tagMapper.selectAllForCache().stream() .collect(Collectors.toMap( t -> t.getName().trim(), Tag::getId, (v1, v2) -> v1 // 防重名冲突 )); } private void logMissingTags(Set<String> missingTags) { try (java.io.PrintWriter pw = new java.io.PrintWriter("migration_missing_tags.log", "UTF-8")) { pw.println("# 未匹配到 tag 表的原始标签名(共 " + missingTags.size() + " 个)"); missingTags.forEach(pw::println); } catch (Exception e) { log.error("写入缺失标签日志失败", e); } } }🔧 3. 实体类示例(按需调整)
@Data public class Document { private Long id; private String tag; } @Data public class Tag { private Long id; private String name; } @Data @NoArgsConstructor @AllArgsConstructor public class DocTag { private Long documentId; private Long tagId; }📋 4. 触发方式
// 方式1:命令行启动后自动执行 @Component @RequiredArgsConstructor public class MigrationRunner implements CommandLineRunner { private final TagMigrationService migrationService; @Override public void run(String... args) { migrationService.executeMigration(); } } // 方式2:暴露 REST 接口(推荐用于测试/灰度) @RestController @RequestMapping("/api/migration") @RequiredArgsConstructor public class MigrationController { private final TagMigrationService migrationService; @PostMapping("/tags") public ResponseEntity<String> migrate() { migrationService.executeMigration(); return ResponseEntity.ok("迁移任务已提交至后台执行"); } }⚠️ 5. 生产环境必查清单
| 检查项 | 操作 |
|---|---|
| 唯一索引 | ALTER TABLE doc_tag ADD UNIQUE KEY uk_doc_tag (document_id, tag_id); |
| 标签索引 | ALTER TABLE tag ADD INDEX idx_name (name);加速缓存加载 |
| 事务日志 | 调整innodb_log_file_size(MySQL)或wal_level(PG),防频繁提交刷盘阻塞 |
| 连接池配置 | 迁移期间独占 1 个连接即可,无需高并发连接池 |
| 监控指标 | 观察 DBThreads_running、InnoDB Row Lock、GC 频率 |
💡 6. 常见问题与优化策略
| 问题 | 解决方案 |
|---|---|
| 标签表超大(>50万) | 改为Map<String, Long>分片加载,或改用SELECT id FROM tag WHERE name = ?按需查询(可加本地 Caffeine 缓存) |
| 需要自动补全缺失标签 | 在else分支调用tagMapper.insert(name)并回填 ID,注意加分布式锁防并发重复插入 |
| MySQL 8.0+ 性能瓶颈 | 在application.yml添加mybatis.configuration.default-executor-type=BATCH(全局生效) |
| 断点续传 | 记录lastId到 Redis/DB,启动时读取;或直接用WHERE id > #{lastId}天然支持重试 |
✅ 验证脚本(迁移后必跑)
-- 1. 验证数量一致性 SELECT COUNT(*) FROM doc_tag; -- 应与日志输出 totalRelations 一致 -- 2. 随机抽样验证 SELECT d.id, d.tag, GROUP_CONCAT(t.name) AS matched_tags FROM document d JOIN doc_tag dt ON d.id = dt.document_id JOIN tag t ON dt.tag_id = t.id WHERE d.id BETWEEN 1000 AND 1010 GROUP BY d.id, d.tag; -- 3. 确认无孤立关系 SELECT COUNT(*) FROM doc_tag dt LEFT JOIN document d ON dt.document_id = d.id WHERE d.id IS NULL; -- 应为 0