分布式锁防并发 与 事务后置动作
一、分布式锁防并发
1.1 解决什么问题
在分布式微服务环境下,同一个请求可能因为用户重复点击、MQ 重试、定时任务并发等原因被多个线程/多个实例同时执行。Java 的synchronized或ReentrantLock只能锁住单个 JVM 进程内的线程,跨实例无效。
分布式锁通过外部中间件(Redis、ZooKeeper、数据库)提供跨进程、跨机器的互斥能力。
1.2 核心概念
| 概念 | 说明 |
|---|---|
| 锁粒度 | 按业务 key 加锁(如订单号),不同订单不互斥,同一订单互斥 |
| 获取方式 | 阻塞等待(tryLock with timeout)或立即失败(tryLock 0ms) |
| 自动释放 | 设置过期时间,防止持有者崩溃后死锁 |
| 可重入 | 同一线程可重复获取同一把锁(取决于实现) |
| Redisson vs 自建 | Redisson 提供看门狗续期、可重入、公平锁等高级特性;自建一般用SET NX EX |
1.3 Redis 分布式锁原理
加锁: SET lock_key unique_value NX PX 30000 → NX: 只有 key 不存在时才设置(互斥) → PX: 30 秒后自动过期(防死锁) 释放: 用 Lua 脚本保证原子性 if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) end1.4 代码示例(通用)
importorg.springframework.data.redis.core.StringRedisTemplate;importorg.springframework.data.redis.core.script.DefaultRedisScript;importorg.springframework.stereotype.Service;importjava.util.Collections;importjava.util.UUID;importjava.util.concurrent.TimeUnit;@ServicepublicclassOrderService{privatefinalStringRedisTemplateredisTemplate;// 释放锁的 Lua 脚本:确保只有持有者能释放privatestaticfinalStringRELEASE_SCRIPT="if redis.call('get', KEYS[1]) == ARGV[1] then "+" return redis.call('del', KEYS[1]) "+"else "+" return 0 "+"end";publicOrderService(StringRedisTemplateredisTemplate){this.redisTemplate=redisTemplate;}publicvoidprocessOrder(StringorderId){StringlockKey="lock:order:"+orderId;StringlockValue=UUID.randomUUID().toString();// 唯一标识,防止误删他人锁booleanlocked=false;try{// 尝试加锁,30秒自动过期locked=Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey,lockValue,30,TimeUnit.SECONDS));if(!locked){thrownewRuntimeException("订单正在处理中,请勿重复提交");}// ========= 业务逻辑 =========doBusinessLogic(orderId);// ============================}finally{if(locked){// 原子释放:只释放自己加的锁DefaultRedisScript<Long>script=newDefaultRedisScript<>(RELEASE_SCRIPT,Long.class);redisTemplate.execute(script,Collections.singletonList(lockKey),lockValue);}}}privatevoiddoBusinessLogic(StringorderId){// 扣库存、生成发货单等...}}1.5 使用 try-with-resources 封装
/** * 分布式锁封装,实现 AutoCloseable 支持 try-with-resources. */publicclassRedisDistributedLockimplementsAutoCloseable{privatefinalStringRedisTemplateredisTemplate;privatefinalStringlockKey;privatefinalStringlockValue;privatebooleanacquired=false;publicRedisDistributedLock(StringRedisTemplateredisTemplate,StringlockKey){this.redisTemplate=redisTemplate;this.lockKey=lockKey;this.lockValue=UUID.randomUUID().toString();}publicbooleantryLock(longtimeout,TimeUnitunit){this.acquired=Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey,lockValue,timeout,unit));returnthis.acquired;}@Overridepublicvoidclose(){if(acquired){Stringscript="if redis.call('get', KEYS[1]) == ARGV[1] then "+" return redis.call('del', KEYS[1]) "+"else return 0 end";redisTemplate.execute(newDefaultRedisScript<>(script,Long.class),Collections.singletonList(lockKey),lockValue);}}}使用方式:
try(RedisDistributedLocklock=newRedisDistributedLock(redisTemplate,"lock:order:"+orderId)){if(lock.tryLock(30,TimeUnit.SECONDS)){doBusinessLogic(orderId);}else{thrownewRuntimeException("获取锁失败");}}// close() 自动释放锁1.6 注意事项
| 问题 | 解决 |
|---|---|
| 锁过期但业务未执行完 | Redisson 看门狗机制自动续期;或评估好超时时间 |
| Redis 主从切换丢锁 | RedLock 算法(多数派加锁),但复杂度高,非强一致场景一般不用 |
| 锁粒度太粗 | 用lock:order:{orderId}而非lock:order,避免全局串行 |
| 释放他人的锁 | 用 UUID 标记持有者,Lua 脚本原子校验 |
注:
博客:
https://blog.csdn.net/badao_liumang_qizhi
二、事务编排 + 事务后置动作
2.1 解决什么问题
一个典型场景:数据库写入成功后需要发 MQ 消息通知下游。如果在同一个事务方法中直接发消息:
- 消息已发,但事务回滚 → 下游收到脏消息
- 事务已提交,但消息发送失败 → 下游丢消息
事务后置动作保证:只有事务成功提交后才执行消息发送等副作用操作。
2.2 Spring 事务同步机制
Spring 提供了TransactionSynchronization接口,可以注册回调在事务的不同阶段执行:
publicinterfaceTransactionSynchronization{voidbeforeCommit(booleanreadOnly);// 提交前voidbeforeCompletion();// 完成前(无论成功失败)voidafterCommit();// 提交成功后 ★voidafterCompletion(intstatus);// 完成后(带状态码)}通过TransactionSynchronizationManager.registerSynchronization()注册。
2.3 核心知识点
| 知识点 | 说明 |
|---|---|
@Transactional | 方法级事务,Spring AOP 代理管理 begin/commit/rollback |
TransactionSynchronizationManager | 线程绑定的事务同步管理器,每个事务可注册多个回调 |
afterCommit() | 事务提交成功后触发,此时数据已持久化,适合发 MQ/调外部接口 |
afterCompletion(STATUS_ROLLED_BACK) | 事务回滚后触发,适合做补偿/告警 |
| Propagation | 嵌套事务(REQUIRES_NEW)场景下,同步回调跟随各自事务独立触发 |
2.4 代码示例(通用)
importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;importorg.springframework.transaction.support.TransactionSynchronization;importorg.springframework.transaction.support.TransactionSynchronizationManager;@ServicepublicclassPaymentService{privatefinalPaymentRepositorypaymentRepository;privatefinalMessageProducermessageProducer;privatefinalNotificationServicenotificationService;publicPaymentService(PaymentRepositorypaymentRepository,MessageProducermessageProducer,NotificationServicenotificationService){this.paymentRepository=paymentRepository;this.messageProducer=messageProducer;this.notificationService=notificationService;}@Transactional(rollbackFor=Exception.class)publicvoidcompletePayment(StringpaymentId,StringuserId){// ======= 事务内操作(数据库) =======Paymentpayment=paymentRepository.findById(paymentId).orElseThrow(()->newRuntimeException("支付单不存在"));payment.setStatus("COMPLETED");paymentRepository.save(payment);// 扣减库存inventoryRepository.deductStock(payment.getProductId(),payment.getQuantity());// ======= 注册事务后置动作 =======// 只有上面的 save + deductStock 都成功提交后,才会执行以下逻辑TransactionSynchronizationManager.registerSynchronization(newTransactionSynchronization(){@OverridepublicvoidafterCommit(){// 发送 MQ 消息通知物流系统messageProducer.send("payment.completed",paymentId);// 发送用户通知notificationService.notifyUser(userId,"您的支付已完成");}@OverridepublicvoidafterCompletion(intstatus){if(status==STATUS_ROLLED_BACK){// 事务回滚后的补偿逻辑(如告警)log.warn("支付事务回滚, paymentId={}",paymentId);}}});}}2.5 封装为可复用的 Collector 工具类
可使用AfterTransactionActionCollector来简化注册多个后置动作的场景:
importorg.springframework.transaction.support.TransactionSynchronization;importjava.util.ArrayList;importjava.util.List;/** * 事务后置动作收集器. * 在事务方法中收集多个后置动作,事务提交后统一执行. */publicclassAfterTransactionActionCollectorimplementsTransactionSynchronization{privatefinalList<Runnable>commitActions=newArrayList<>();privatefinalList<Runnable>rollbackActions=newArrayList<>();/** 添加事务提交后执行的动作. */publicvoidaddCommitSyncAction(Runnableaction){commitActions.add(action);}/** 添加事务回滚后执行的动作. */publicvoidaddRollbackAction(Runnableaction){rollbackActions.add(action);}@OverridepublicvoidafterCommit(){for(Runnableaction:commitActions){try{action.run();}catch(Exceptione){// 后置动作失败不影响已提交的事务,仅记录日志log.error("事务后置动作执行失败",e);}}}@OverridepublicvoidafterCompletion(intstatus){if(status==STATUS_ROLLED_BACK){for(Runnableaction:rollbackActions){try{action.run();}catch(Exceptione){log.error("回滚后置动作执行失败",e);}}}}}使用方式:
@Transactional(rollbackFor=Exception.class)publicvoidcreateOrder(OrderRequestrequest){// 数据库操作Orderorder=orderRepository.save(buildOrder(request));// 收集多个后置动作AfterTransactionActionCollectorcollector=newAfterTransactionActionCollector();collector.addCommitSyncAction(()->mqProducer.send("order.created",order.getId()));collector.addCommitSyncAction(()->pushService.pushToUser(order.getUserId(),"下单成功"));collector.addRollbackAction(()->alertService.alert("订单创建事务回滚: "+request.getOrderNo()));// 注册到当前事务TransactionSynchronizationManager.registerSynchronization(collector);}2.6 与其他方案对比
| 方案 | 优点 | 缺点 |
|---|---|---|
| 事务后置动作(本方案) | 简单直接,无额外中间件 | 应用崩溃时消息可能丢失 |
| 本地消息表 | 可靠性最高,可重试 | 需要额外表 + 定时补偿任务 |
| RocketMQ 事务消息 | 中间件级保障 | 依赖特定 MQ 实现,编码复杂 |
| @TransactionalEventListener | Spring 原生注解,解耦优雅 | 事件驱动模式,需额外定义事件类 |
2.7 注意事项
| 问题 | 说明 |
|---|---|
| afterCommit 中抛异常 | 不会导致事务回滚(已提交),但会中断后续 action,需要 try-catch |
| 没有活跃事务 | 调用registerSynchronization会抛异常,需确保在@Transactional方法内 |
| 异步 vs 同步 | afterCommit 默认同步执行,长耗时操作建议投递到线程池 |
| REQUIRES_NEW | 嵌套事务各自独立,内层事务提交时触发内层的 afterCommit,不等外层 |
三、两者如何配合
在发货场景中,两者组合使用的完整时序:
1. 获取分布式锁(Redis) ↓ 成功 2. 开启数据库事务(@Transactional) ↓ 3. 业务逻辑:校验 → 扣库存 → 生成发货单 → 保存明细 ↓ 4. 注册事务后置动作(发 MQ 给物流) ↓ 5. 事务提交 ↓ 6. afterCommit 触发 → MQ 消息发出 ↓ 7. 释放分布式锁(finally / try-with-resources)锁保证同一订单不会被并发处理;事务后置保证消息不会因为事务回滚而成为脏数据。