💣 开篇:为什么我们决定“弃用” Seata?
在上一阶段的项目复盘中,我们发现订单中心的数据库 CPU 飙升,且大量事务处于 Lock Wait 状态。
排查发现,罪魁祸首竟然是不仅被视为“银弹”,也被广泛使用的Seata AT 模式。
Seata AT 的痛点(高并发场景):
- 全局锁(Global Lock):二阶段提交(2PC)为了保证隔离性,持锁时间 = RPC 调用时间 + 业务执行时间。长链路下,这就是性能毒药。
- 吞吐量瓶颈:TC(事务协调者)集群在高并发下容易成为单点瓶颈。
- 死锁风险:复杂链路下的全局锁与本地锁极易死锁。
架构哲学的反思:
在微服务架构中,99% 的业务场景(如下单、支付、积分)其实不需要强一致性(CP),只需要最终一致性(AP)。
于是,我们决定**“弃重从轻”**,全面切换到基于RocketMQ 事务消息的最终一致性方案。
🏗️ 架构演进:从“同步等待”到“异步确保”
我们的目标业务场景:用户下单 -> 扣减库存 -> 发放积分。
1. 核心方案:RocketMQ 事务消息机制
RocketMQ 的“事务消息”是解决分布式事务的神器。它将消息发送拆分为两个阶段,确保本地事务执行与消息发送的原子性。
交互时序图 (Sequence Diagram):
💻 落地实战:代码里的魔鬼细节
不要以为有了 RocketMQ 就万事大吉,“最终一致性”的难点在于 Consumer 端。
Step 1: 生产者端 (Producer) - 保证“发”与“做”的原子性
我们需要实现RocketMQLocalTransactionListener接口。
@Component@RocketMQTransactionListenerpublicclassOrderTransactionListenerimplementsRocketMQLocalTransactionListener{@AutowiredprivateOrderServiceorderService;/** * 执行本地事务:插入订单数据 */@OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){try{// 解析消息中的订单信息OrderDTOorder=JSON.parseObject(newString((byte[])msg.getPayload()),OrderDTO.class);// 1. 核心业务:创建订单 (数据库操作)orderService.createOrder(order);// 2. 成功,通知 MQ 提交消息returnRocketMQLocalTransactionState.COMMIT;}catch(Exceptione){log.error("创建订单失败",e);// 3. 失败,通知 MQ 回滚 (Consumer 永远收不到这条消息)returnRocketMQLocalTransactionState.ROLLBACK;}}/** * 事务回查:防止网络丢包导致 MQ 不知道你是成功还是失败 */@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){OrderDTOorder=JSON.parseObject(newString((byte[])msg.getPayload()),OrderDTO.class);// 查询数据库,看订单到底有没有创建成功booleanexists=orderService.checkOrderExists(order.getOrderId());returnexists?RocketMQLocalTransactionState.COMMIT:RocketMQLocalTransactionState.ROLLBACK;}}Step 2: 消费者端 (Consumer) - 幂等性是生命线
这是最关键的一步!
MQ 保证消息至少投递一次,意味着网络抖动时 Consumer 可能收到重复消息。如果积分加了两次,就是资损!
幂等性设计流程图:
graph TD Start(收到 MQ 消息) --> CheckRedis{1. 查询 Redis/去重表} CheckRedis -- "Key 已存在" --> Ignore[直接丢弃/返回成功] CheckRedis -- "Key 不存在" --> DoBiz[2. 执行业务: 加积分] DoBiz --> DBTx{3. 数据库事务提交?} DBTx -- "成功" --> SaveKey[4. 写入去重 Key (Redis/DB)] DBTx -- "失败" --> Retry[抛出异常/等待重试] SaveKey --> End(消费完成 ACK)消费者代码示例:
@Service@RocketMQMessageListener(topic="order-topic",consumerGroup="point-group")publicclassPointConsumerimplementsRocketMQListener<String>{@AutowiredprivatePointServicepointService;@OverridepublicvoidonMessage(Stringmessage){OrderDTOorder=JSON.parseObject(message,OrderDTO.class);StringbizKey="POINT_ADD_"+order.getOrderId();// 1. 幂等性检查 (这里演示简单版,生产环境建议用去重表)if(redisTemplate.hasKey(bizKey)){log.info("重复消息,忽略: {}",bizKey);return;}// 2. 执行业务 (加积分)try{pointService.addPoints(order.getUserId(),order.getAmount());// 3. 标记为已处理redisTemplate.opsForValue().set(bizKey,"1",24,TimeUnit.HOURS);}catch(Exceptione){// 4. 抛出异常,利用 RocketMQ 的重试机制thrownewRuntimeException("消费失败,等待重试");}}}🛡️ 异常兜底:最终一致性的“最终”有多远?
如果 Consumer 一直失败怎么办?(例如积分服务挂了,或者代码有 Bug)。
- 重试机制:RocketMQ 默认重试 16 次(从 1s 到 2h 不等)。绝大多数网络抖动都能在重试中恢复。
- 死信队列 (DLQ):如果 16 次重试后依然失败,消息进入死信队列。
- 人工/定时任务干预:
- 我们需要开发一个监控服务,订阅 DLQ。
- 一旦发现死信,发送告警(钉钉/邮件)。
- 人工排查 Bug,或通过脚本重新投递消息。
💡 架构总结:何时用 Seata,何时用 MQ?
没有最好的架构,只有最合适的架构。
| 对比维度 | Seata (AT 模式) | RocketMQ 事务消息 |
|---|---|---|
| 一致性强弱 | 强一致性(刚性事务) | 最终一致性(柔性事务) |
| 并发性能 | 低 (受限于全局锁) | 极高(异步解耦) |
| 代码侵入性 | 极低 (注解即可) | 高 (需写 Listener 和 幂等逻辑) |
| 适用场景 | 后台管理系统、配置变动、对一致性要求极高的资金转账 | 电商下单、秒杀、支付回调、日志记录 |
博主建议:
在核心的高并发链路中,请大胆地**“弃笨从简”。虽然 MQ 方案写起来麻烦(要处理幂等、回查、死信),但它换来的是系统的高可用性和无限的扩展能力**。这笔交易,绝对划算。
Next Step:
您是否想了解 Consumer 端**“基于本地消息表”的 100% 可靠幂等方案(比 Redis 更稳)?或者需要一份死信队列告警**的 Python 脚本?
在评论区回复“幂等”或“脚本”,我为您安排下一期干货!