上周接了个数据迁移的活,要把10万条数据从老系统导入新系统。
写了个简单的批量插入,跑起来一看——5分钟。
领导说太慢了,能不能快点?
折腾了一下午,最后优化到3秒,记录一下过程。
最初的代码(5分钟)
最开始写的很简单,foreach循环插入:
// 方式1:循环单条插入(最慢) for (User user : userList) { userMapper.insert(user); }10万条数据,每条都要走一次网络请求、一次SQL解析、一次事务提交。
算一下:假设每条插入需要3ms,10万条就是300秒 = 5分钟。
这是最蠢的写法,但我见过很多项目都这么写。
第一次优化:批量SQL(30秒)
把循环插入改成批量SQL:
<!-- Mapper.xml --> <insert id="batchInsert"> INSERT INTO user (name, age, email) VALUES <foreach collection="list" item="item" separator=","> (#{item.name}, #{item.age}, #{item.email}) </foreach> </insert>// 分批插入,每批1000条 int batchSize = 1000; for (int i = 0; i < userList.size(); i += batchSize) { int end = Math.min(i + batchSize, userList.size()); List<User> batch = userList.subList(i, end); userMapper.batchInsert(batch); }从5分钟降到30秒,提升10倍。
原理:一条SQL插入多条数据,减少网络往返次数。
但还有问题:30秒还是太慢。
第二次优化:JDBC批处理(8秒)
MySQL有个参数叫rewriteBatchedStatements,开启后可以把多条INSERT合并成一条。
第一步:修改数据库连接URL
jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true第二步:使用MyBatis的批处理模式
@Autowired private SqlSessionFactory sqlSessionFactory; public void batchInsertWithExecutor(List<User> userList) { try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) { UserMapper mapper = sqlSession.getMapper(UserMapper.class); int batchSize = 1000; for (int i = 0; i < userList.size(); i++) { mapper.insert(userList.get(i)); if ((i + 1) % batchSize == 0) { sqlSession.flushStatements(); sqlSession.clearCache(); } } sqlSession.flushStatements(); sqlSession.commit(); } }从30秒降到8秒。
原理:
ExecutorType.BATCH模式下,MyBatis会缓存SQL,最后一次性发送给数据库执行。配合rewriteBatchedStatements=true,MySQL驱动会把多条INSERT合并。
第三次优化:多线程并行(3秒)
8秒还是不够快,上多线程:
public void parallelBatchInsert(List<User> userList) { int threadCount = 4; // 根据数据库连接池大小调整 int batchSize = userList.size() / threadCount; ExecutorService executor = Executors.newFixedThreadPool(threadCount); List<Future<?>> futures = new ArrayList<>(); for (int i = 0; i < threadCount; i++) { int start = i * batchSize; int end = (i == threadCount - 1) ? userList.size() : (i + 1) * batchSize; List<User> subList = userList.subList(start, end); futures.add(executor.submit(() -> { batchInsertWithExecutor(subList); })); } // 等待所有任务完成 for (Future<?> future : futures) { try { future.get(); } catch (Exception e) { thrownew RuntimeException(e); } } executor.shutdown(); }从8秒降到3秒。
注意事项:
线程数不要超过数据库连接池大小
如果需要事务一致性,这个方案不适用
要考虑主键冲突的问题
优化效果对比
方案 | 耗时 | 提升倍数 |
|---|---|---|
循环单条插入 | 300秒 | 基准 |
批量SQL | 30秒 | 10倍 |
JDBC批处理 | 8秒 | 37倍 |
多线程并行 | 3秒 | 100倍 |
踩过的坑
坑1:foreach拼接SQL过长
<foreach collection="list" item="item" separator=",">如果一次插入太多条,SQL会非常长,可能超过max_allowed_packet限制。
解决:分批插入,每批500-1000条。
坑2:rewriteBatchedStatements不生效
检查几个点:
URL参数是否正确:
rewriteBatchedStatements=true是否使用了
ExecutorType.BATCHMySQL驱动版本是否太旧
坑3:自增主键返回问题
批量插入时想获取自增主键:
<insert id="batchInsert" useGeneratedKeys="true" keyProperty="id">注意:rewriteBatchedStatements=true时,自增主键返回可能有问题,需要升级MySQL驱动到8.0.17+。
坑4:内存溢出
10万条数据一次性加载到内存,可能OOM。
解决:分页读取 + 分批插入。
int pageSize = 10000; int total = countTotal(); for (int i = 0; i < total; i += pageSize) { List<User> page = selectByPage(i, pageSize); batchInsertWithExecutor(page); }最终方案代码
@Service publicclass BatchInsertService { @Autowired private SqlSessionFactory sqlSessionFactory; /** * 高性能批量插入 * 10万条数据约3秒 */ public void highPerformanceBatchInsert(List<User> userList) { if (userList == null || userList.isEmpty()) { return; } int threadCount = Math.min(4, Runtime.getRuntime().availableProcessors()); int batchSize = (int) Math.ceil((double) userList.size() / threadCount); ExecutorService executor = Executors.newFixedThreadPool(threadCount); CountDownLatch latch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { int start = i * batchSize; int end = Math.min((i + 1) * batchSize, userList.size()); if (start >= userList.size()) { latch.countDown(); continue; } List<User> subList = new ArrayList<>(userList.subList(start, end)); executor.submit(() -> { try { doBatchInsert(subList); } finally { latch.countDown(); } }); } try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } executor.shutdown(); } private void doBatchInsert(List<User> userList) { try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) { UserMapper mapper = sqlSession.getMapper(UserMapper.class); for (int i = 0; i < userList.size(); i++) { mapper.insert(userList.get(i)); if ((i + 1) % 1000 == 0) { sqlSession.flushStatements(); sqlSession.clearCache(); } } sqlSession.flushStatements(); sqlSession.commit(); } } }总结
优化点 | 关键配置 |
|---|---|
批量SQL | foreach拼接,分批1000条 |
JDBC批处理 | rewriteBatchedStatements=true+ExecutorType.BATCH |
多线程 | 线程数 ≤ 连接池大小 |
核心原则:减少网络往返 + 减少事务次数 + 并行处理。