半夜被电话叫醒,消息积压了200万条,消费者根本追不上。
这种场景搞过Kafka的应该都经历过,整理一下踩过的坑和解决方案。
坑一:消息积压
现象
监控告警:topic-order的lag超过100万。
# 查看消费者lagkafka-consumer-groups.sh --bootstrap-server localhost:9092\--describe --group order-consumer GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG order-consumer topic-order0123456723456781111111order-consumer topic-order1123456823456791111111order-consumer topic-order2123456923456801111111三个分区,每个积压100多万,加起来300多万。
排查过程
1. 先看生产速度
# 查看topic的写入速度kafka-run-class.sh kafka.tools.GetOffsetShell\--broker-list localhost:9092\--topic topic-order --time -1# 隔10秒再执行一次,算差值# 发现每秒写入约5000条2. 再看消费速度
消费者日志显示处理一条消息要200ms,算下来每秒只能处理5条。
问题找到了:消费太慢。
解决方案
方案一:增加消费者实例
Kafka的分区数决定了最大并行度。3个分区最多3个消费者并行。
# 先增加分区(注意:分区只能增不能减)kafka-topics.sh --bootstrap-server localhost:9092\--alter --topic topic-order --partitions12然后部署12个消费者实例。
方案二:批量消费
// 原来:一条一条处理@KafkaListener(topics="topic-order")publicvoidconsume(Stringmessage){processOrder(message);// 200ms}// 优化后:批量处理@KafkaListener(topics="topic-order")publicvoidconsumeBatch(List<String>messages){// 攒一批再处理,减少IO次数batchProcessOrders(messages);// 批量写库}配置调整:
spring:kafka:consumer:max-poll-records:500# 一次拉取500条listener:type:batch# 批量模式方案三:异步处理
@KafkaListener(topics="topic-order")publicvoidconsume(Stringmessage){// 扔到线程池异步处理executor.submit(()->processOrder(message));}但要注意:异步处理需要手动管理offset提交,不然可能丢消息。
效果
优化后消费速度从5条/秒提升到3000条/秒,积压2小时内消化完。
坑二:消息丢失
现象
业务反馈有订单没收到,但生产端日志显示发送成功了。
排查
1. 生产端配置
props.put("acks","1");// 问题在这acks=1表示leader收到就返回成功,但如果leader挂了、follower还没同步,消息就丢了。
2. 消费端配置
props.put("enable.auto.commit","true");props.put("auto.commit.interval.ms","1000");自动提交offset,如果消费处理到一半程序挂了,offset已经提交了,这条消息就"丢"了。
解决方案
生产端
// acks=all,所有ISR副本都写入才算成功props.put("acks","all");// 重试次数props.put("retries",3);// 开启幂等性props.put("enable.idempotence","true");消费端
// 关闭自动提交props.put("enable.auto.commit","false");// 手动提交@KafkaListener(topics="topic-order")publicvoidconsume(ConsumerRecord<String,String>record,Acknowledgmentack){try{processOrder(record.value());ack.acknowledge();// 处理成功才提交}catch(Exceptione){// 处理失败不提交,会重新消费log.error("处理失败",e);}}Broker端
# 最小ISR副本数 min.insync.replicas=2 # 不允许非ISR副本选举为leader unclean.leader.election.enable=false坑三:重复消费
现象
同一条消息被处理了两次,导致订单重复扣款。
原因
消费者处理完消息,还没来得及提交offset就挂了。重启后从上次提交的offset开始消费,这条消息又被消费一次。
Kafka是at-least-once语义,不保证exactly-once。
解决方案
业务幂等
publicvoidprocessOrder(Stringmessage){Orderorder=JSON.parseObject(message,Order.class);// 先查是否已处理过if(orderService.exists(order.getOrderId())){log.info("订单已处理过,跳过: {}",order.getOrderId());return;}// 处理订单orderService.process(order);}Redis去重
publicvoidprocessOrder(Stringmessage){StringmsgId=extractMsgId(message);// Redis SETNX,已存在返回falsebooleanisNew=redis.setIfAbsent("kafka:processed:"+msgId,"1",24,TimeUnit.HOURS);if(!isNew){log.info("消息已处理过: {}",msgId);return;}// 处理业务doProcess(message);}数据库唯一约束
-- 用唯一约束兜底CREATEUNIQUEINDEXuk_order_idONorders(order_id);坑四:消费者频繁Rebalance
现象
日志里频繁出现:
Revoking previously assigned partitions Rebalance triggered消费者不停地Rebalance,效率极低。
原因
1. 心跳超时
// 默认10秒没心跳就认为消费者挂了session.timeout.ms=10000如果处理一条消息超过10秒,就会被踢出消费组。
2. poll间隔太长
// 默认5分钟内必须调用pollmax.poll.interval.ms=300000处理500条消息花了6分钟,超时了。
解决方案
// 增加session超时时间props.put("session.timeout.ms","30000");props.put("heartbeat.interval.ms","10000");// 增加poll间隔props.put("max.poll.interval.ms","600000");// 减少单次拉取数量props.put("max.poll.records","100");核心原则:确保在max.poll.interval.ms内能处理完max.poll.records条消息。
坑五:顺序消费
需求
同一个用户的操作必须按顺序处理。
问题
默认情况下,消息分散到不同分区,不同分区的消费顺序无法保证。
解决方案
指定分区key
// 用userId作为key,相同userId的消息会落到同一分区kafkaTemplate.send("topic-order",userId,message);单分区方案(不推荐,除非量很小)
// 只用一个分区,保证全局顺序kafkaTemplate.send("topic-order",0,null,message);注意事项
- 同一分区内保证顺序,但重试可能打乱顺序
- 设置
max.in.flight.requests.per.connection=1保证严格顺序
props.put("max.in.flight.requests.per.connection","1");性能调优参数
生产者
# 批量发送,攒够16K或等1ms就发 batch.size=16384 linger.ms=1 # 发送缓冲区 buffer.memory=33554432 # 压缩(推荐lz4) compression.type=lz4消费者
# 单次拉取大小 fetch.min.bytes=1 fetch.max.bytes=52428800 fetch.max.wait.ms=500 # 单次poll记录数 max.poll.records=500Broker
# 日志保留 log.retention.hours=168 log.retention.bytes=1073741824 # 分区数(根据消费者数量设置) num.partitions=12 # 副本 default.replication.factor=3 min.insync.replicas=2监控指标
这几个指标必须监控:
| 指标 | 含义 | 报警阈值 |
|---|---|---|
| ConsumerLag | 消费延迟 | 根据业务定 |
| MessagesInPerSec | 写入速度 | 突增报警 |
| BytesInPerSec | 流量 | 接近带宽报警 |
| UnderReplicatedPartitions | 副本不足的分区 | >0报警 |
| OfflinePartitionsCount | 离线分区 | >0报警 |
集群运维
我们的Kafka集群分布在两个机房,之前两边网络不通很麻烦。后来用星空组网把两个机房组到一个网络里,Kafka的跨机房复制配置简单多了。
总结
Kafka踩坑清单:
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 消息积压 | 消费慢 | 加分区、批量消费、异步处理 |
| 消息丢失 | acks配置不当 | acks=all、手动提交 |
| 重复消费 | at-least-once语义 | 业务幂等、去重 |
| 频繁Rebalance | 超时配置不当 | 调整超时参数 |
| 顺序问题 | 多分区并行 | 指定分区key |
Kafka本身很稳定,大多数问题都是配置和使用不当导致的。
有Kafka相关问题欢迎评论区讨论~