news 2026/6/9 19:48:47

Kafka批量消费实现

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Kafka批量消费实现

批量消费指的是一次性拉取一批消息,然后批量处理
依赖spring-kafka

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.4.RELEASE</version></dependency>

配置消费者工厂

@ConfigurationpublicclassKafkaConsumerConfig{@BeanpublicConsumerFactory<String,String>consumerFactory(){Map<String,Object>props=newHashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG,"batch-consumer-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);// 批量消费配置props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,100);// 每次最多拉取100条props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,10240);// 至少10KB才返回returnnewDefaultKafkaConsumerFactory<>(props);}@BeanpublicConcurrentKafkaListenerContainerFactory<String,String>batchFactory(ConsumerFactory<String,String>consumerFactory){ConcurrentKafkaListenerContainerFactory<String,String>factory=newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setBatchListener(true);// 启用批量监听// 手动提交模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);returnfactory;}}

实现批量消费监听器

@ServicepublicclassOrderBatchConsumer{@KafkaListener(topics="order-topic",containerFactory="batchFactory")publicvoidconsumeBatch(List<ConsumerRecord<String,String>>records,Acknowledgmentack){log.info("收到批量消息: {} 条",records.size());try{// 业务处理for(ConsumerRecord<String,String>record:records){processOrder(record.value());}// 全部成功才提交offsetack.acknowledge();log.info("批次处理成功,已提交offset");}catch(Exceptione){log.error("批量处理失败",e);// 不提交offset,等待重新投递}}}

批量消费可靠性保障
不能在finally中提交offset,因为不管消费是否成功都会提交offset

@KafkaListener(topics="topic",containerFactory="batchFactory")publicvoidconsume(List<ConsumerRecord<String,String>>records,Acknowledgmentack){try{processBatch(records);}finally{ack.acknowledge();// 无论成败都提交,会丢消息!}}

不能设置自动提交,自动提交模式下,批量消费有可能会丢消息。要改为手动提交

props.put("enable.auto.commit","true");// 自动提交// 处理过程中失败,但offset已自动提交,消息丢失

需要用到线程池的时候,需要等到全部消息处理成功才能提交。使用CompletionService实现

@ServicepublicclassReliableBatchConsumer{privatefinalExecutorServiceexecutor=Executors.newFixedThreadPool(10);@KafkaListener(topics="payment-topic",containerFactory="batchFactory")publicvoidconsumeBatch(List<ConsumerRecord<String,String>>records,Acknowledgmentack){CompletionService<Boolean>completionService=newExecutorCompletionService<>(executor);List<Future<Boolean>>futures=newArrayList<>();// 1. 提交所有任务到线程池并发处理for(ConsumerRecord<String,String>record:records){Callable<Boolean>task=()->{try{processPayment(record.value());returntrue;}catch(Exceptione){log.error("支付处理失败: {}",record.value(),e);returnfalse;}};futures.add(completionService.submit(task));}// 2. 等待所有任务完成并检查结果booleanallSuccess=true;try{for(inti=0;i<records.size();i++){Future<Boolean>future=completionService.take();if(!future.get()){allSuccess=false;break;// 发现失败立即终止}}}catch(Exceptione){allSuccess=false;log.error("任务执行异常",e);}// 3. 全部成功才提交offsetif(allSuccess){ack.acknowledge();log.info("批次全部成功,已提交offset");}else{log.warn("批次中有失败消息,不提交offset,等待重新投递");// 重新投递会导致重复消费,需要业务保证幂等性}}}

当有失败消息的时候,不提交offset,等待消息重投或者重新拉取,但是会有消息重复的情况,业务上要做好幂等。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/2 21:45:00

语音合成与huggingface镜像网站结合:加速大模型权重下载

语音合成与Hugging Face镜像网站结合&#xff1a;加速大模型权重下载 在智能语音应用快速落地的今天&#xff0c;开发者常常面临一个看似简单却极其耗时的问题&#xff1a;如何高效地将一个动辄数GB的语音合成模型从云端拉到本地&#xff1f;尤其是在国内网络环境下&#xff0…

作者头像 李华
网站建设 2026/6/5 22:42:01

语音合成在智能家居中的应用:基于GLM-TTS的本地化语音提醒

语音合成在智能家居中的应用&#xff1a;基于GLM-TTS的本地化语音提醒 在现代家庭中&#xff0c;智能音箱每天清晨用机械的声音播报天气&#xff1a;“今天气温26度&#xff0c;晴。”听起来高效&#xff0c;却总少了点人情味。如果这个声音换成你母亲温柔的叮嘱——“宝贝&…

作者头像 李华
网站建设 2026/5/30 22:05:30

GLM-TTS能否用于会议纪要转语音?提升信息传达效率

GLM-TTS能否用于会议纪要转语音&#xff1f;提升信息传达效率 在远程协作日益频繁的今天&#xff0c;企业会议数量激增&#xff0c;而会后整理出的纪要却常常“沉睡”在邮箱或文档系统中。员工不愿读、没空看&#xff0c;导致关键决策和任务分配被遗漏——这几乎是每个团队都面…

作者头像 李华
网站建设 2026/6/6 3:39:26

语音合成中的节奏控制:如何调节语速快慢而不失真?

语音合成中的节奏控制&#xff1a;如何调节语速快慢而不失真&#xff1f; 在智能语音助手、有声书平台和虚拟主播日益普及的今天&#xff0c;用户早已不再满足于“能说话”的机器声音。他们期待的是自然流畅、富有情感、节奏得体的语音输出——尤其是当需要加速播放长篇内容&am…

作者头像 李华
网站建设 2026/6/7 18:42:13

【限时干货】PHP视频加密播放系统设计:防止抓包、破解与非法传播

第一章&#xff1a;PHP视频加密播放系统概述在现代Web应用开发中&#xff0c;视频内容的版权保护成为关键需求之一。PHP视频加密播放系统通过结合服务端加密技术与前端解密播放机制&#xff0c;实现对敏感视频资源的安全分发。该系统不仅防止用户直接下载原始视频文件&#xff…

作者头像 李华
网站建设 2026/5/30 21:12:55

PHP分库分表自动扩容方案曝光:支撑业务爆发式增长的技术底牌

第一章&#xff1a;PHP分库分表自动扩容方案曝光&#xff1a;支撑业务爆发式增长的技术底牌 在高并发、大数据量的业务场景下&#xff0c;传统单库单表架构已无法满足性能与扩展性需求。PHP作为广泛应用的后端语言&#xff0c;其生态中的分库分表自动扩容方案成为支撑业务爆发式…

作者头像 李华