1. 批量操作为什么容易写"坏"?
常见问题分析:
1.1MyBatis常见错误模式
// ❌ 错误1:循环中多次调用Mapper单条插入 @Transactional public void badBatchInsert(List<User> users) { for (User user : users) { userMapper.insert(user); // 每次都是独立的SQL执行 } // 问题:虽然加了@Transactional,但仍然是N次数据库往返 } // ❌ 错误2:错误的foreach拼接方式 // Mapper XML中的错误写法 <insert id="batchInsert"> INSERT INTO user (name, age) VALUES <foreach collection="list" item="item" separator=","> (#{item.name}, #{item.age}) </foreach> </insert> // 问题:当list过大时,SQL超长,可能超出数据库限制 // ❌ 错误3:忘记配置批处理支持 # application.properties缺少关键配置 spring.datasource.url=jdbc:mysql://localhost:3306/test # 缺少:rewriteBatchedStatements=true1.2性能陷阱
N+1问题:循环调用Mapper方法
内存泄漏:未分批次处理大数据量
连接耗尽:长时间占用数据库连接
事务过大:百万级数据在一个事务中
1.3事务管理不当
// ❌ 事务范围太大或太小 @Transactional // 默认传播机制,可能不合适 public void batchProcess(List<Data> dataList) { // 处理逻辑... }2. 批量插入:真正的标准写法
2.1MyBatis配置优化
# application.yml spring: datasource: url: jdbc:mysql://localhost:3306/db?rewriteBatchedStatements=true&useSSL=false&serverTimezone=UTC # MySQL必须加rewriteBatchedStatements=true # PostgreSQL加:reWriteBatchedInserts=true hikari: maximum-pool-size: 20 minimum-idle: 10 connection-timeout: 30000 mybatis: configuration: default-executor-type: batch # 全局设置为批量执行器 # 或仅在需要时动态切换2.2方式一:使用MyBatis的BatchExecutor(推荐)
@Service public class UserServiceImpl implements UserService { @Autowired private SqlSessionFactory sqlSessionFactory; @Autowired private UserMapper userMapper; // 常规的SimpleExecutor @Override public void batchInsert(List<User> users) { // 获取Batch模式的SqlSession SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false); try { UserMapper batchMapper = sqlSession.getMapper(UserMapper.class); for (int i = 0; i < users.size(); i++) { batchMapper.insert(users.get(i)); // 每1000条提交一次,避免内存溢出 if (i % 1000 == 0 && i > 0) { sqlSession.flushStatements(); } } // 提交剩余数据 sqlSession.flushStatements(); sqlSession.commit(); } catch (Exception e) { sqlSession.rollback(); throw new RuntimeException("批量插入失败", e); } finally { sqlSession.close(); } } }2.3方式二:foreach动态SQL(适合中小批量)
<!-- UserMapper.xml --> <insert id="batchInsert" parameterType="java.util.List"> INSERT INTO user (name, age, email, create_time) VALUES <foreach collection="list" item="item" index="index" separator=","> ( #{item.name}, #{item.age}, #{item.email}, NOW() ) </foreach> </insert> <!-- 支持分页的写法 --> <insert id="batchInsertByChunk"> INSERT INTO user (name, age, email) VALUES <foreach collection="list" item="item" separator=","> (#{item.name}, #{item.age}, #{item.email}) </foreach> ON DUPLICATE KEY UPDATE name = VALUES(name), age = VALUES(age) </insert>// Service层实现分批次插入 @Service @Slf4j public class BatchService { private static final int BATCH_SIZE = 1000; @Autowired private UserMapper userMapper; public void safeBatchInsert(List<User> users) { if (CollectionUtils.isEmpty(users)) { return; } // 分批次处理 List<List<User>> partitions = Lists.partition(users, BATCH_SIZE); for (int i = 0; i < partitions.size(); i++) { List<User> batch = partitions.get(i); try { userMapper.batchInsert(batch); log.info("第{}批次插入完成,共{}条", i + 1, batch.size()); } catch (Exception e) { log.error("第{}批次插入失败", i + 1, e); // 可选:记录失败数据,继续后续批次 } } } }2.4方式三:使用MyBatis-Plus的批量操作
// 如果使用MyBatis-Plus @Service public class UserServicePlusImpl extends ServiceImpl<UserMapper, User> implements UserService { @Autowired private SqlSessionFactory sqlSessionFactory; /** * 使用MyBatis-Plus的saveBatch(内部优化版) */ @Transactional(rollbackFor = Exception.class) public void batchInsertWithMP(List<User> users) { // 方式1:使用MP自带方法(默认每次1000条) saveBatch(users); // 方式2:自定义批次大小 saveBatch(users, 2000); } /** * 高性能批量插入(推荐生产环境使用) */ public void highPerformanceBatchInsert(List<User> users) { // 切换为BatchExecutor SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH); try { UserMapper mapper = sqlSession.getMapper(UserMapper.class); int count = 0; for (User user : users) { mapper.insert(user); count++; // 每1000条提交一次 if (count % 1000 == 0) { sqlSession.flushStatements(); // 注意:这里不会提交事务,只是刷新语句 } } sqlSession.flushStatements(); sqlSession.commit(); } finally { sqlSession.close(); } } }2.5方式四:JDBC批处理 + MyBatis
@Component public class JdbcBatchService { @Autowired private DataSource dataSource; public void jdbcBatchInsert(List<User> users) throws SQLException { String sql = "INSERT INTO user (name, age, email) VALUES (?, ?, ?)"; try (Connection conn = dataSource.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { conn.setAutoCommit(false); for (int i = 0; i < users.size(); i++) { User user = users.get(i); ps.setString(1, user.getName()); ps.setInt(2, user.getAge()); ps.setString(3, user.getEmail()); ps.addBatch(); // 每批1000条执行一次 if (i % 1000 == 0 && i > 0) { ps.executeBatch(); conn.commit(); ps.clearBatch(); } } // 执行剩余批次 ps.executeBatch(); conn.commit(); } catch (SQLException e) { // 异常处理 throw e; } } }3. 批量更新和删除的正确写法
3.1批量更新
<!-- 方式1:批量更新不同条件 --> <update id="batchUpdate"> <foreach collection="list" item="item" separator=";"> UPDATE user SET name = #{item.name}, age = #{item.age}, email = #{item.email} WHERE id = #{item.id} </foreach> </update> <!-- 方式2:使用CASE WHEN(更高效) --> <update id="batchUpdateByCase"> UPDATE user SET name = CASE id <foreach collection="list" item="item"> WHEN #{item.id} THEN #{item.name} </foreach> ELSE name END, age = CASE id <foreach collection="list" item="item"> WHEN #{item.id} THEN #{item.age} </foreach> ELSE age END, email = CASE id <foreach collection="list" item="item"> WHEN #{item.id} THEN #{item.email} </foreach> ELSE email END WHERE id IN <foreach collection="list" item="item" open="(" separator="," close=")"> #{item.id} </foreach> </update>// Service层批量更新 @Service @Transactional(rollbackFor = Exception.class) public class BatchUpdateService { @Autowired private UserMapper userMapper; public void smartBatchUpdate(List<User> users) { // 分批次更新,每批500条 List<List<User>> batches = Lists.partition(users, 500); for (List<User> batch : batches) { if (batch.size() == 1) { // 单条走普通更新 userMapper.update(batch.get(0)); } else { // 批量使用CASE WHEN userMapper.batchUpdateByCase(batch); } } } }3.2批量删除
<!-- 批量删除 --> <delete id="batchDelete"> DELETE FROM user WHERE id IN <foreach collection="list" item="id" open="(" separator="," close=")"> #{id} </foreach> </delete> <!-- 分批删除(避免IN参数过多) --> <delete id="batchDeleteByChunk"> DELETE FROM user WHERE id IN <foreach collection="list" item="id" open="(" separator="," close=")"> #{id} </foreach> LIMIT 1000 -- 限制每次删除数量 </delete>// 安全的批量删除 @Service @Slf4j public class BatchDeleteService { private static final int DELETE_BATCH_SIZE = 1000; @Autowired private UserMapper userMapper; @Transactional(rollbackFor = Exception.class) public void safeBatchDelete(List<Long> ids) { if (CollectionUtils.isEmpty(ids)) { return; } // 如果ID数量不大,直接删除 if (ids.size() <= DELETE_BATCH_SIZE) { userMapper.batchDelete(ids); return; } // 大批量ID,分批次删除 List<List<Long>> partitions = Lists.partition(ids, DELETE_BATCH_SIZE); for (List<Long> batchIds : partitions) { try { int affected = userMapper.batchDelete(batchIds); log.info("删除批次完成,影响行数: {}", affected); // 可选:添加延迟,避免锁竞争 Thread.sleep(50); } catch (Exception e) { log.error("批量删除失败,批次ID数: {}", batchIds.size(), e); // 根据业务决定是否继续 } } } }4. 高级技巧和最佳实践
4.1动态切换ExecutorType
@Component public class BatchExecutorTemplate { @Autowired private SqlSessionFactory sqlSessionFactory; /** * 在BatchExecutor中执行操作 */ public <T> T executeInBatch(Callable<T> callable) { SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH); try { T result = callable.call(); sqlSession.commit(); return result; } catch (Exception e) { sqlSession.rollback(); throw new RuntimeException("批处理执行失败", e); } finally { sqlSession.close(); } } /** * 批量插入模板方法 */ public <T> void batchInsert(String statement, List<T> dataList, int batchSize, Consumer<T> parameterSetter) { SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH); try { for (int i = 0; i < dataList.size(); i++) { T data = dataList.get(i); // 这里简化处理,实际需要更复杂的参数设置 sqlSession.insert(statement, data); if (i % batchSize == 0 && i > 0) { sqlSession.flushStatements(); } } sqlSession.flushStatements(); sqlSession.commit(); } finally { sqlSession.close(); } } }4.2监控和调优配置
@Configuration public class MyBatisBatchConfig { @Bean public PerformanceInterceptor performanceInterceptor() { PerformanceInterceptor interceptor = new PerformanceInterceptor(); interceptor.setMaxTime(1000); // SQL执行最大时间,单位ms interceptor.setFormat(true); return interceptor; } @Bean public ConfigurationCustomizer configurationCustomizer() { return configuration -> { // 开启二级缓存 configuration.setCacheEnabled(true); // 设置默认执行器 configuration.setDefaultExecutorType(ExecutorType.SIMPLE); // 设置批量操作的fetchSize configuration.setDefaultFetchSize(1000); }; } }4.3异常处理和重试机制
@Service @Slf4j public class RobustBatchService { @Autowired private UserMapper userMapper; @Retryable(value = SQLException.class, maxAttempts = 3, backoff = @Backoff(delay = 1000)) @Transactional(rollbackFor = Exception.class) public void batchInsertWithRetry(List<User> users) { try { // 分批次插入 List<List<User>> batches = Lists.partition(users, 1000); for (List<User> batch : batches) { userMapper.batchInsert(batch); } } catch (DuplicateKeyException e) { log.warn("重复数据插入,尝试去重后重新插入"); // 处理重复数据逻辑 handleDuplicateData(users); } catch (DataIntegrityViolationException e) { log.error("数据完整性错误", e); throw e; } } private void handleDuplicateData(List<User> users) { // 实现去重逻辑 } }4.4生产环境完整示例
@Component @Slf4j public class ProductionBatchService { @Autowired private SqlSessionFactory sqlSessionFactory; @Autowired private PlatformTransactionManager transactionManager; private static final int BATCH_SIZE = 1000; private static final int MAX_RETRY = 3; /** * 生产级批量插入 * 特性: * 1. 分批次处理 * 2. 事务管理 * 3. 重试机制 * 4. 性能监控 * 5. 异常处理 */ public BatchResult batchInsertUsers(List<User> users) { if (CollectionUtils.isEmpty(users)) { return BatchResult.empty(); } BatchResult result = new BatchResult(); long startTime = System.currentTimeMillis(); // 分批次 List<List<User>> batches = Lists.partition(users, BATCH_SIZE); for (int batchIndex = 0; batchIndex < batches.size(); batchIndex++) { List<User> batch = batches.get(batchIndex); boolean success = executeWithRetry(() -> { return processBatch(batch, batchIndex); }, MAX_RETRY); if (success) { result.addSuccess(batch.size()); } else { result.addFailed(batch); // 记录失败批次,可以后续处理 log.error("批次{}处理失败,数据量: {}", batchIndex, batch.size()); } } long cost = System.currentTimeMillis() - startTime; log.info("批量插入完成,总计: {},成功: {},失败: {},耗时: {}ms", users.size(), result.getSuccessCount(), result.getFailedCount(), cost); return result; } private boolean processBatch(List<User> batch, int batchIndex) { // 使用编程式事务,更精细控制 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); def.setTimeout(30); // 30秒超时 TransactionStatus status = transactionManager.getTransaction(def); try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) { UserMapper mapper = sqlSession.getMapper(UserMapper.class); for (User user : batch) { mapper.insert(user); } sqlSession.flushStatements(); transactionManager.commit(status); log.debug("批次{}插入成功,数据量: {}", batchIndex, batch.size()); return true; } catch (Exception e) { transactionManager.rollback(status); log.error("批次{}插入失败", batchIndex, e); return false; } } private boolean executeWithRetry(Supplier<Boolean> task, int maxRetry) { int retryCount = 0; while (retryCount <= maxRetry) { try { return task.get(); } catch (Exception e) { retryCount++; if (retryCount > maxRetry) { throw e; } log.warn("操作失败,第{}次重试", retryCount); // 指数退避 try { Thread.sleep(1000 * (long) Math.pow(2, retryCount)); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new RuntimeException("重试被中断", ie); } } } return false; } @Data public static class BatchResult { private int successCount; private List<List<User>> failedBatches = new ArrayList<>(); public void addSuccess(int count) { successCount += count; } public void addFailed(List<User> batch) { failedBatches.add(batch); } public int getFailedCount() { return failedBatches.stream().mapToInt(List::size).sum(); } public static BatchResult empty() { return new BatchResult(); } } }5. 总结:
永远不要循环调用Mapper单条方法
始终配置数据库连接参数(rewriteBatchedStatements)
合理分批次(1000-5000条/批)
使用合适的事务策略
监控和限制执行时间
实现完善的错误处理和重试机制
测试不同数据量下的性能表现
生产环境要有熔断和降级策略
性能对比建议:
小批量(<1000条):使用foreach动态SQL
中批量(1000-10万条):使用BatchExecutor
大批量(>10万条):考虑使用多线程 + 分库分表策略