news 2026/5/27 22:48:11

Kafka消息可靠性:从生产到消费的全链路不丢不重

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Kafka消息可靠性:从生产到消费的全链路不丢不重

大家好,我是程序员小策。

先做个自测——你们项目里的 Kafka,消息可靠性是怎么保证的?

A. 生产者acks=all,消费者手动提交 offset——觉得这样就不丢了。
B. 加了个enable.idempotence=true,觉得幂等也够了。
C. 用数据库事务包裹"写业务 + 发消息",两阶段提交然后手动补偿。
D. 不知道,反正运维说 Kafka 很可靠,出问题了找运维。

如果选了 A 或 B,先别急——这两种配置在生产环境单机跑确实没问题,但一到"服务崩了重启"、“网络抖动重试”、"同一条消息被消费了两次"这些场景,就会暴露硬伤:ack 只保证 Broker 收到了,不保证消费者处理完了;幂等只保证生产者不重复发,不保证消费者不重复处理。

今天这篇文章,就是要从 GitHub 上一个生产级项目 ledgerly-saga-outbox-cqrs 的代码出发,一步步拆解如何在生产上做到全链路消息不丢 + 端到端幂等处理


问题定义:消息到底在哪丢了?

一条消息从诞生到被处理,要穿过三段链路:

生产者 → [网络] → Kafka Broker → [网络] → 消费者 → [处理逻辑]

任何一段断了,消息就丢了。大多数人只关注了其中一段。

具体来拆:

阶段怎么丢的典型场景
生产端发消息前服务崩了写数据库成功,但kafkaTemplate.send()还没来得及执行
Broker 端Leader 挂了,副本没同步完acks=1时 Leader 收到就返回成功,副本还没复制,Leader 宕机
消费端自动提交 offset 但没处理完enable.auto.commit=true,消息拉到内存就提交了 offset,还没来得及处理服务重启了

而更隐蔽的问题是——即使消息没丢,重复消费才是最常被忽视的。生产者因网络超时重试 → Broker 收到两条一模一样的消息 → 消费者处理了两遍 → 用户被扣了两次钱。

那么问题来了:怎么同时解决"不丢"和"不重"?


核心概念:用一个外卖订单类比全链路

消息不丢(At-Least-Once):每一段链路都有确认机制,没收到确认就重试,直到确认为止。

幂等处理(Idempotency):同一条消息无论被处理多少次,最终结果和执行一次完全一样。

打个比方——你在美团点了一份黄焖鸡米饭:

不丢(At-Least-Once)怎么保证?

  • 你下单 → 平台必须告诉你"下单成功"(生产端确认)
  • 平台推给商家 → 商家必须确认"收到订单"(Broker 确认)
  • 骑手取餐 → 必须扫码确认"已取餐"(消费端手动提交 offset)

任何一步没收到确认,系统就重推。

但重推带来了新问题——重复。

  • 网络抖了一下,平台没收到商家的确认,于是又推了一次。
  • 商家看到两条一模一样的订单——如果做了两份黄焖鸡,用户只付了一份钱,商家亏了。

幂等就是商家的"去重逻辑":订单号(idempotency key)已经处理过?直接返回第一次的结果,不再重复做菜。

翻译回技术语言:

  • 订单号= Kafka 消息头里的idempotency-key
  • "已经处理过"的判断= Redis SETNX + 数据库唯一约束
  • “返回第一次结果”= IdempotencyService 查到已有记录直接返回

接下来看代码怎么落地。


代码实现:拆解一个生产级 Kafka 项目

以下代码全部来自 dkrmerve/ledgerly-saga-outbox-cqrs,一个生产级 Spring Boot 项目,涵盖了Transactional Outbox、DLT 死信队列、Redis 去重、DB 幂等四个维度。

阶段一:生产端不丢 —— Transactional Outbox 模式

问题:下面这种写法,如果服务崩在send之前,数据库已经写了,消息没发出去。

// 反例:写DB和发消息不是原子的@TransactionalpublicvoidcreateOrder(Orderorder){orderRepository.save(order);// ① 成功了kafkaTemplate.send("order-topic",order);// ② 还没来得及执行 → 服务崩了}

解法:Transactional Outbox。不在业务方法里直接发 Kafka,而是先把消息和业务数据在同一个事务里写到一张outbox_event表,然后由独立的定时任务从这张表里取消息发到 Kafka。

看代码。第一步:业务操作 + 写 Outbox 表在同一个事务中OutboxService.java):

@ServicepublicclassOutboxService{privatefinalOutboxRepositoryoutboxRepository;privatefinalObjectMapperom=newObjectMapper();@TransactionalpublicvoidenqueueSagaCommand(longorderId,StringidempotencyKey,UUIDcorrelationId,SagaEventevent){enqueue(KafkaTopics.topicSagaCommands(),orderId,idempotencyKey,correlationId,event.eventType,event);}privatevoidenqueue(Stringtopic,longorderId,StringidempotencyKey,UUIDcorrelationId,StringeventType,Objectpayload){try{OutboxEventEntitye=newOutboxEventEntity();e.setId(UUID.randomUUID());e.setTopic(topic);e.setAggregateType("ORDER");e.setAggregateId(String.valueOf(orderId));e.setEventType(eventType);e.setPayload(om.writeValueAsString(payload));e.setStatus("NEW");// ← 初始状态:待发送e.setCorrelationId(correlationId);e.setIdempotencyKey(idempotencyKey);// ← 幂等键跟着消息走e.setOccurredAt(Instant.now());e.setPublishAttempts(0);// ← 重试计数器outboxRepository.save(e);}catch(Exceptionex){thrownewRuntimeException(ex);}}}

关键设计OutboxService.enqueue()OrderService.createOrder()在同一个@Transactional下执行。PostgreSQL 的事务保证了:要么订单 + Outbox 记录一起写入,要么一起回滚。不存在"订单写了、消息没写"的情况。

第二步:独立的定时任务从 Outbox 表取消息,发到 KafkaOutboxPublisherJob.java):

@EnableScheduling@ComponentpublicclassOutboxPublisherJob{privatefinalOutboxRepositoryoutboxRepository;privatefinalOutboxKafkaPublisherpublisher;privatefinalintbatchSize;// 每次取多少条privatefinalintleaseSeconds;// 租约时间,防止多节点重复发送privatefinalStringnodeId;// 当前节点标识@Scheduled(fixedDelayString="${ledgerly.outbox.publishFixedDelayMs}")@TransactionalpublicvoidpublishLoop(){List<OutboxEventEntity>batch=outboxRepository.leaseBatch(batchSize);if(batch.isEmpty())return;InstantlockUntil=Instant.now().plusSeconds(leaseSeconds);for(OutboxEventEntitye:batch){outboxRepository.markLocked(e.getId(),nodeId,lockUntil);try{publisher.publish(e);// 发到 Kafkae.setStatus("PUBLISHED");// 标记已发送e.setPublishedAt(Instant.now());e.setPublishAttempts(e.getPublishAttempts()+1);e.setLastError(null);}catch(Exceptionex){e.setPublishAttempts(e.getPublishAttempts()+1);e.setLastError(ex.getMessage());e.setStatus("NEW");// 恢复为NEW,下次重试}outboxRepository.save(e);}}}

关键设计点

  • 租约机制(Lease):多节点部署时,leaseBatch()SELECT ... FOR UPDATE SKIP LOCKED给记录加锁,防止同一消息被多个节点重复发送。
  • 重试机制:发送失败的消息状态回退为NEW,下一轮定时任务会重新拾取。
  • 不再丢:只要消息写入了 Outbox 表,就一定会被发送到 Kafka——即使服务重启也不怕。

第三步:真正发送到 Kafka,带上幂等键和链路追踪信息(OutboxKafkaPublisher.java):

@ComponentpublicclassOutboxKafkaPublisher{privatefinalKafkaTemplate<String,String>kafkaTemplate;publicvoidpublish(OutboxEventEntitye){// key = aggregateId,确保同一订单的消息进同一分区,保证有序ProducerRecord<String,String>record=newProducerRecord<>(e.getTopic(),e.getAggregateId(),e.getPayload());// 在 Kafka Header 中注入元数据——消费端幂等和链路追踪的基础record.headers().add(KafkaHeaders.CORRELATION_ID,e.getCorrelationId().toString().getBytes(StandardCharsets.UTF_8));if(e.getIdempotencyKey()!=null){record.headers().add(KafkaHeaders.IDEMPOTENCY_KEY,e.getIdempotencyKey().getBytes(StandardCharsets.UTF_8));}record.headers().add(KafkaHeaders.EVENT_ID,e.getId().toString().getBytes(StandardCharsets.UTF_8));record.headers().add(KafkaHeaders.ORDER_ID,e.getAggregateId().getBytes(StandardCharsets.UTF_8));record.headers().add(KafkaHeaders.EVENT_TYPE,e.getEventType().getBytes(StandardCharsets.UTF_8));kafkaTemplate.send(record).completable().join();// 同步等待结果}}

阶段二:Broker 端不丢 —— 生产级配置

光靠代码不够,Kafka Broker 端必须配上正确的参数。看这个项目的application.yml

spring:kafka:bootstrap-servers:localhost:9092producer:acks:all# ① 等待所有ISR副本确认properties:enable.idempotence:true# ② 生产者幂等(PID + Sequence Number)consumer:enable-auto-commit:false# ③ 禁止自动提交offsetproperties:isolation.level:read_committed# ④ 只读已提交的事务消息listener:ack-mode:manual# ⑤ 手动确认模式

逐条解释为什么这样配:

参数为什么丢了会怎样
acksall/-1等待所有 ISR(In-Sync Replicas)副本都写入后才返回成功。Leader 挂了,任一 ISR 副本能接替acks=1时 Leader 确认后立即宕机,副本还没同步,消息永久丢失
enable.idempotencetrueBroker 给每个 Producer 分配 PID,Producer 给每条消息分配 Sequence Number。Broker 发现重复的 PID+Seq 就丢弃网络超时重试 → Broker 收到重复消息 → 消费者处理两遍
enable.auto.commitfalse必须手动提交 offset。自动提交 = 消息拉到内存就认为"消费成功",处理逻辑还没跑服务就崩了重启后从已提交的 offset 继续,中间的消息没处理但 offset 已经跳过了
isolation.levelread_committed只消费已提交事务的消息,未提交的事务消息不可见。配合事务生产者使用读到未提交的事务消息,事务回滚后这条消息实际不存在
ack-modemanual消费者处理完业务逻辑后,手动调用acknowledgment.acknowledge()record模式在 listener 返回后就自动提交,异常时消息已被标记为已消费

阶段三:消费端不重 —— 双重去重(Redis + DB)

现在消息一定会到达消费者,但可能到达多次(生产者重试、网络重试、Rebalance 重试)。

这个项目的消费端去重分为两层:

第一层:Redis 快速去重(RedisDedupService.java

@ServicepublicclassRedisDedupService{privatefinalStringRedisTemplateredis;privatefinalDurationttl;// 默认86400秒 = 24小时/** * 使用 Redis SETNX 原子操作判断是否是第一次处理 * key = "dedup:{consumer}:{eventId}" * 返回 true = 第一次处理,可以继续 * 返回 false = 已处理过,跳过 */publicbooleanfirstTime(Stringconsumer,StringeventId){Stringkey="dedup:"+consumer+":"+eventId;Booleanok=redis.opsForValue().setIfAbsent(key,"1",ttl);returnBoolean.TRUE.equals(ok);}}

第二层:DB 权威去重(InboxService.java

Redis 是快速路径——如果数据过期了或者 Redis 挂了,仍然需要数据库兜底:

@ServicepublicclassInboxService{privatefinalInboxRepositoryinboxRepository;privatefinalRedisDedupServiceredisDedupService;/** * Exactly-once 双重保障: * ① Redis SETNX:快速判断"大概率是不是重复" * ② DB Inbox 表唯一约束:(eventId, consumer) 联合主键——权威去重 * * claim() 和业务逻辑在同一个 @Transactional 中执行, * 任何一步失败都整体回滚,保证原子性。 */@Transactionalpublicbooleanclaim(Stringconsumer,UUIDeventId){booleanlikelyFirst=redisDedupService.firstTime(consumer,eventId.toString());if(!likelyFirst){// Redis 命中了,大概率重复;但最终以 DB 为准}InboxEventEntitye=newInboxEventEntity();e.setEventId(eventId);e.setConsumer(consumer);e.setProcessedAt(Instant.now());try{inboxRepository.save(e);// ← 唯一约束:重复插入抛异常returntrue;// → 第一次处理,继续执行业务逻辑}catch(Exceptionex){returnfalse;// → 重复消息,跳过}}}

双重去重的精妙之处

  • Redis SETNX:O(1) 时间复杂度,挡住 99% 的重复流量
  • DB 唯一约束:Redis 数据过期或宕机后的保底方案,在同一个事务中执行,保证去重和业务处理的原子性
  • 两层都失败 = 消息真的重复了,跳过不处理

阶段四:处理失败怎么办 —— 死信队列(DLT)

消息不丢了,也不重复了,但如果业务处理一直失败怎么办?不能无限重试。这个项目的方案是:0 次重试,直接进死信队列KafkaConfig.java):

@ConfigurationpublicclassKafkaConfig{@BeanConcurrentKafkaListenerContainerFactory<String,String>kafkaListenerContainerFactory(ConsumerFactory<String,String>consumerFactory,KafkaTemplate<Object,Object>kafkaTemplate){ConcurrentKafkaListenerContainerFactory<String,String>factory=newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);// 死信队列:处理失败的消息自动转发到 {原topic}.DLTDeadLetterPublishingRecovererrecoverer=newDeadLetterPublishingRecoverer(kafkaTemplate,(record,ex)->newTopicPartition(record.topic()+".DLT",record.partition()));// 0次重试 → 立即进入 DLT,由人工或定时任务处理DefaultErrorHandlererrorHandler=newDefaultErrorHandler(recoverer,newFixedBackOff(0L,0));errorHandler.addNotRetryableExceptions(IllegalArgumentException.class);factory.setCommonErrorHandler(errorHandler);returnfactory;}}

这样,整个消息生命周期形成了完整的闭环:

业务操作 → Outbox表 → 定时发送 → Kafka → 消费者 ↓ Redis SETNX(快速去重) ↓ DB 唯一约束(权威去重) ↓ 执行业务逻辑 ↙ ↘ 成功 失败 手动Ack offset → {topic}.DLT(死信队列)

边界情况与陷阱:代码跑起来才会翻的车

看起来很完美了对吧?但以下几个坑在生产上真实发生过。

陷阱一:Outbox 表无限膨胀。

Outbox 表发完消息后不删记录,几个月后表里有几千万条数据,定时扫描越来越慢。解法:定期归档或删除status='PUBLISHED'published_at < NOW() - 7天的记录。

陷阱二:Redis SETNX 的 TTL 设置不当。

TTL 太短(比如 5 分钟),Consumer Rebalance 后重试 → Redis key 已过期 → 查不到 → 重复处理。上面项目里 TTL 是 86400 秒(24 小时),覆盖了绝大多数重试窗口。

陷阱三:max.poll.interval.ms太小导致死循环 Rebalance。

消费者处理慢 → 超过max.poll.interval.ms→ 被踢出消费者组 → Rebalance → 重新分配分区 → 重新处理同一条消息 → 更慢 → 又超时 → 又 Rebalance……解法:增大max.poll.interval.ms(默认 5 分钟),或减小max.poll.records每次少拉几条。

陷阱四:acks=all+min.insync.replicas=1= 白配了。

acks=all等的是所有 ISR 副本,但如果min.insync.replicas=1且 ISR 里只剩 Leader 一个副本,那就退化成了acks=1必须同时设置min.insync.replicas >= 2


高级考量:ID 生成与顺序性

消息不丢不重了,还有一个隐性需求——同一个订单的操作必须有序消费。用户先下单后取消,取消消息不能被先消费。

这个项目的做法:aggregateId(订单 ID)作为 Kafka 消息的 Key。

// OutboxKafkaPublisher.java 中的这行代码ProducerRecord<String,String>record=newProducerRecord<>(e.getTopic(),e.getAggregateId(),e.getPayload());// ↑ key = orderId,保证同一订单的消息进同一分区

Kafka 保证同一分区内的消息严格有序。把同一个订单的所有操作路由到同一分区 = 该订单的所有操作有序。

那么全局有序呢?所有消息进一个分区就行——但那样吞吐量就只有单分区的能力。实际生产上几乎不需要全局有序,分区有序足够。


对比表格:四种可靠性方案对比

方案核心思路不丢不重复杂度适用场景
纯 Kafka 参数acks=all+ 手动提交✅ Broker端❌ 消费端不防重允许少量重复的场景(如日志)
Kafka 事务executeInTransaction+send✅ 生产端幂等发消息 + 写DB需原子性但允许重复消费
Transactional Outbox + Redis去重DB事务写Outbox → 定时发Kafka → Redis SETNX去重✅ 全链路⚠️ Redis 不可靠对数据一致性要求高的业务
Outbox + Redis + DB双重去重(本项目)上述基础上加DB唯一约束兜底✅ 全链路✅ 端到端金融、交易等对重复零容忍的场景

面试追问:面试官想听的不是"配几个参数"

追问 1:enable.idempotence=true的原理是什么?它和消费者幂等有什么区别?

→ 回答方向:Kafka 生产者幂等是Broker 层面的去重——Broker 给每个 Producer 分配一个 PID(Producer ID),Producer 给每个消息分区分配一个单调递增的 Sequence Number。Broker 收到消息时检查PID + Seq是否连续,发现重复或乱序就丢弃。但这只保证生产者到 Broker 这一段不重复。消费者拿到消息后重复消费,生产者幂等管不了——必须做消费端幂等。

追问 2:为什么不用 Kafka 的事务消息(initTransactions+commitTransaction)替代 Outbox?

→ 回答方向:Kafka 事务可以保证"发消息"和"消息本身的原子性",但它不能保证"发消息"和"写 MySQL"的原子性——除非用 EOS(Exactly Once Semantics)全家桶,但那要求消费者也必须是事务消费(isolation.level=read_committed),且要求下游也是 Kafka。你写的是 PostgreSQL,Kafka 事务管不着。Outbox 模式把"写DB+写Outbox表"放在同一个本地事务中,是最简单可靠的方案。

追问 3:Outbox 定时任务的轮询间隔(700ms)怎么定的?会不会成为瓶颈?

→ 回答方向:轮询间隔是延迟和 DB 压力的权衡。700ms 意味着消息最多延迟 700ms 才能被消费。如果需要更低延迟,可以用 Debezium 之类的 CDC 工具监听 Outbox 表 binlog 实时发送。但如果业务允许秒级延迟,700ms 完全可以接受。瓶颈在leaseBatch()SKIP LOCKED——它能保证多节点并行取不同的批次,水平扩展即可增加吞吐。

追问 4:Redis 去重的 TTL 过期了怎么办?消息重试窗口比 TTL 还长。

→ 回答方向:这就是为什么需要DB 唯一约束作为兜底。Redis 是性能优化,DB 是数据一致性的保底方案。即使 Redis key 过期了,DB 的InboxEventEntity(eventId, consumer)联合主键也能保证不重复。两层去重,谁快用谁,谁稳信谁。


总结

消息不丢靠 Outbox + acks=all,消息不重靠 Redis SETNX + DB 唯一约束双重去重,处理失败靠 DLT 兜底。

读完这篇你应该能:

  • 画出 Kafka 消息从生产到消费的全链路,并标注每一段的可靠性保障措施
  • 在项目里落地 Transactional Outbox 模式,用 DB 事务替代"手动发消息"
  • 设计 Redis + DB 双重去重方案,而不是开口只说"用 Redis 做幂等"
  • 在面试时说出enable.idempotence的底层原理(PID + Sequence Number),而不只是"配个参数就行"
  • 理解 DLT 死信队列的价值——不是每条失败的消息都值得无限重试,有时候快速失败然后人工介入才是对的
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/27 22:45:51

Unity 2022 LTS实战:从零手搓一个《原神》风格的可缩放、可展开小地图(附完整C#脚本)

Unity 2022 LTS实战&#xff1a;从零构建《原神》级动态小地图系统在开放世界游戏的沉浸感塑造中&#xff0c;小地图系统远不止是简单的导航工具。当玩家在《原神》的提瓦特大陆奔跑时&#xff0c;那个会呼吸的圆形地图——随着角色移动流畅旋转的图标、展开时丝滑的动画过渡、…

作者头像 李华
网站建设 2026/5/27 22:44:44

从2的0次方到256次方:一张表看懂计算机的“二进制宇宙”

1. 二进制世界的基石&#xff1a;2的幂次方表 打开电脑时&#xff0c;你是否想过屏幕上闪烁的光点背后&#xff0c;其实藏着无数个0和1的秘密&#xff1f;这张从2的0次方到256次方的完整对照表&#xff0c;就是打开计算机世界大门的万能钥匙。我第一次接触这个表格时&#xff0…

作者头像 李华
网站建设 2026/5/27 22:43:36

Agent 面试,项目是 20 分,讲项目是 80 分

近一年我帮人改过上百份带"Agent 项目"的简历&#xff0c;模拟面试也做了不少场。一个让我自己都意外的发现&#xff1a;项目做得不错但讲得很差的人&#xff0c;几乎拿不到 offer。同样的项目讲清楚的人&#xff0c;反而能拿到手软。 项目只是入场券&#xff0c;讲…

作者头像 李华
网站建设 2026/5/27 22:41:17

AI工程师:角色、技术与职责深度剖析

引言&#xff1a;AI浪潮中的核心构建者 在人工智能技术从实验室走向产业化的浪潮中&#xff0c;AI工程师已成为连接算法研究与商业价值的桥梁。他们不仅是代码的编写者&#xff0c;更是复杂AI系统的设计者、构建者和维护者。本文将深入剖析AI工程师在业界的多重角色、必须掌握的…

作者头像 李华
网站建设 2026/5/27 22:41:06

增强现实(AR)在教育中的应用:设计框架、效果评估与理论支撑

1. 项目概述&#xff1a;增强现实如何重塑学习体验 作为一名长期关注教育技术与创新应用的研究者&#xff0c;我亲眼见证了技术浪潮如何一次次冲击传统的教学围墙。从早期的多媒体课件到后来的在线学习平台&#xff0c;每一次变革都试图解决同一个核心问题&#xff1a;如何让知…

作者头像 李华