上一篇【第13篇】Kafka序列化器深度解析——自定义Serializer不再是难题
下一篇【第15篇】Kafka集群元数据源码解析——生产者如何"认识"整个集群
摘要
消息经过序列化变成byte[]之后,下一步就是决定发往哪个分区。这一决定看似简单,实则影响深远——分对了负载均衡吞吐翻倍,分错了热点分区全线崩溃。Kafka的默认分区策略用Hash+RoundRobin双剑合璧,2.4版本推出的Sticky Partitioner更是在延迟和批量之间找到了精细平衡。本文将深入源码剖析分区器的工作原理,从DefaultPartitioner到StickyPartitioner,再到手把手教你实现一个按业务Key路由的自定义分区器。读完这篇,分区不再看运气。
一、分区器在KafkaProducer中的位置
先回顾分区器在整个发送链路中的位置——它在消息序列化之后、进入RecordAccumulator之前:
KafkaProducer.send() 调用链: Interceptors.onSend() // ① 拦截器处理 │ ▼ waitOnMetadata() // ② 等待集群元数据就绪 │ ▼ Serializer.serialize() // ③ 序列化Key和Value │ ▼ Partitioner.partition() // ④ 选择目标分区 ← 本文主角 │ ▼ RecordAccumulator.append() // ⑤ 放入缓冲区从调用链可以看出,分区器需要依赖两个输入:
- 已序列化的Key(byte[]):用于计算Hash值
- 集群元数据(Cluster对象):需要知道Topic有多少个分区
二、Partitioner接口——只需要实现partition()方法
publicinterfacePartitionerextendsConfigurable,Closeable{/** * 选择目标分区 * @param topic Topic名称 * @param key 消息Key(Java对象,未序列化) * @param keyBytes 已序列化的Key(byte数组) * @param value 消息Value(未序列化) * @param valueBytes 已序列化的Value * @param cluster 集群元数据快照 * @return 分区编号 */intpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster);voidclose();}注意区分两个概念:
- key(Object):原始Key对象,还没经过序列化
- keyBytes(byte[]):已经序列化好的Key,可直接用于Hash计算
KafkaProducer调用时,如果ProducerRecord指定了partition字段(即record.partition() != null),就直接用指定的分区,不会调用Partitioner。只有没指定分区时才会走Partitioner.partition()。
三、DefaultPartitioner源码解析——经典的双模式策略
3.1 核心源码
publicclassDefaultPartitionerimplementsPartitioner{// Counter初始化为随机数,避免重启后所有消息都去同一个分区privatefinalAtomicIntegercounter=newAtomicInteger(newRandom().nextInt());// 并发安全的StickyPartition缓存privatefinalConcurrentMap<String,Integer>stickyPartitionCache=newConcurrentHashMap<>();publicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){// 获取Topic的分区信息List<PartitionInfo>partitions=cluster.partitionsForTopic(topic);intnumPartitions=partitions.size();if(keyBytes==null){// 情况一:消息没有Key —— Sticky分区策略(2.4+)returnstickyPartitionCache.computeIfAbsent(topic,t->{// 先找可用分区(有Leader的分区)List<PartitionInfo>availablePartitions=cluster.availablePartitionsForTopic(t);if(availablePartitions.isEmpty()){// 没有可用分区,退化为RoundRobinintnextValue=counter.getAndIncrement();returnDefaultPartitioner.toPositive(nextValue)%numPartitions;}else{// 选择一个可用分区并"粘住"intpart=DefaultPartitioner.toPositive(counter.getAndIncrement())%availablePartitions.size();returnavailablePartitions.get(part).partition();}});}else{// 情况二:消息有Key —— Hash取模// murmur2是一种高效的、低碰撞率的哈希算法returnDefaultPartitioner.toPositive(Utils.murmur2(keyBytes))%numPartitions;}}// 将负数转为正数(取绝对值的等价操作)staticinttoPositive(intnumber){returnnumber&0x7fffffff;}}3.2 两种策略图解
【DefaultPartitioner 分区策略】 消息有Key ──► murmur2(Key) % 分区数 ──► 固定分区 (相同Key → 相同分区 → 顺序保证) 消息无Key ──► Sticky策略 ──► 同一个"批次"粘在同一个可用分区 (2.4+) 批次满后切换到新分区 ──► RoundRobin ──► counter++ % 分区数(逐条轮询) (2.3及之前) 无批量优化,可能产生大量小批次3.3 为什么counter要用AtomicInteger
KafkaProducer是线程安全的,多个业务线程可能同时调用send()。DefaultPartitioner必须也是线程安全的。这就是为什么用AtomicInteger而不是普通的int——两个线程并发调用counter.getAndIncrement()时,不会出现计数错误。
3.4 toPositive()方法:负数转正数
number & 0x7fffffff这个位掩码操作是为了把负数转成正数。murmur2()可能返回负数(因为返回类型是int,包含符号位),但分区编号必须是≥0的整数。
负数: 1xxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx & 掩码: 0xxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx ────────────────────────────────────────── 结果: 0xxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx ← 永远是正数四、Sticky Partitioner——2.4版本的性能优化利器
4.1 问题:老版本RoundRobin的痛点
在Kafka 2.3及之前,没有Key的消息使用RoundRobin策略——每条消息随机选一个分区。这会导致什么问题?
【RoundRobin策略产生大量小批次】 Topic: orders (3个分区) msg1 → P0 msg2 → P1 RecordAccumulator中的状态: msg3 → P2 P0: [msg1] ← 只有1条消息就凑满一个批次? msg4 → P0 P1: [msg2] ← 每条消息单独开Batch? msg5 → P1 P2: [msg3] msg6 → P2 结果:每个分区的Batch都只有少量消息 → 发送许多小请求 → 网络开销大4.2 Sticky策略的优化
Sticky策略的思想是:"粘"在同一个分区上,直到当前Batch满了,再换下一个分区。
【Sticky策略批量优化效果】 msg1 → P0 msg2 → P0 ← 粘住P0 RecordAccumulator中的状态: msg3 → P0 ← 继续粘 P0: [msg1, msg2, msg3, msg4] ← 大Batch msg4 → P0 ← Batch满了! P1: [msg5, msg6, msg7] msg5 → P1 ← 切换到P1 P2: [msg8, msg9] msg6 → P1 msg7 → P1 msg8 → P2 msg9 → P2 结果:每个分区攒了更大的Batch → 减少网络请求 → 吞吐量提升StickyPartitionCache的具体实现中就一个ConcurrentHashMap<String, Integer>,Key是Topic名,Value是粘住的Partition编号。当Batch满了被Sender取走之后,下次再append新消息时,会重新选一个分区。
4.3 对比总结
| 对比维度 | RoundRobin (旧) | Sticky (新,2.4+) |
|---|---|---|
| 分区选择 | 逐条轮询 | 粘住分区,Batch满后切换 |
| Batch填充率 | 低(每个分区各攒一点) | 高(每个分区攒满再走) |
| 请求数量 | 多(小Batch多) | 少(大Batch少) |
| 网络开销 | 大 | 小 |
| 消息延迟 | 低(及时发送) | 略高(等待凑Batch) |
| 适用场景 | 低延迟要求 | 高吞吐要求 |
五、自定义分区器实战——按业务Key路由
5.1 场景:用户消息优先处理分区
假设你有一个topic叫"user-events",有6个分区。你希望VIP用户的消息发往低编号分区(P0-P1),普通用户消息发往高编号分区(P4-P5),中间分区用于系统消息。
/** * 自定义分区器:VIP用户优先分区 * VIP用户 → P0, P1 * 系统消息 → P2, P3 * 普通用户 → P4, P5 */publicclassVipAwarePartitionerimplementsPartitioner{privatestaticfinalSet<String>VIP_USERS=newHashSet<>(Arrays.asList("vip_001","vip_002","vip_003"// VIP用户白名单));privatestaticfinalStringSYSTEM_KEY="__SYSTEM__";@Overridepublicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){List<PartitionInfo>partitions=cluster.partitionsForTopic(topic);intnumPartitions=partitions.size();// 把Key转成字符串StringkeyStr=(keyBytes!=null)?newString(keyBytes):"";if(SYSTEM_KEY.equals(keyStr)){// 系统消息 → P2, P3// 用简单的随机分配intbase=2;intoffset=ThreadLocalRandom.current().nextInt(2);returnbase+offset;}elseif(VIP_USERS.contains(keyStr)){// VIP用户 → P0, P1// 用Hash保证同一VIP用户消息有序inthash=Math.abs(Utils.murmur2(keyBytes));returnhash%2;// P0或P1}else{// 普通用户 → P4, P5// 也用Hash,同一用户的消息在同一分区inthash=Math.abs(Utils.murmur2(keyBytes));return4+(hash%2);// P4或P5}}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?>configs){}}5.2 配置使用
Propertiesprops=newProperties();props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.example.VipAwarePartitioner");// 指定自定义分区器props.put("bootstrap.servers","localhost:9092");// ... 其他配置KafkaProducer<String,String>producer=newKafkaProducer<>(props);// VIP用户消息自动路由到P0或P1producer.send(newProducerRecord<>("user-events","vip_001","VIP用户登录"));// 普通用户消息自动路由到P4或P5producer.send(newProducerRecord<>("user-events","normal_user_123","普通用户点击"));六、分区数与吞吐量的关系——数学不小了
【分区数与吞吐量的关系图】 吞吐量(TPS) ▲ │ ┌────────────────────── │ ┌─────┘ ← 达到瓶颈(磁盘/网络) │ ┌─────┘ │ ┌────┘ ← 线性增长区间 │ ┌────┘ │ ┌──┘ └─┴────┬────┬────┬────┬────┬────┬────► 分区数 1 3 6 9 12 15 18 分区太少 ──► 无法充分利用集群能力 分区太多 ──► 元数据开销大、文件句柄多、Leader选举慢经验法则:
- 分区数 =
max(总吞吐量需求 / 单分区吞吐量, 消费者实例数) - 单分区吞吐量一般:~10MB/s 写,~50MB/s 读
- 分区总数(所有Topic)建议不超过Broker数量的4000倍
七、分区器选型决策
| 场景 | 推荐策略 | 配置 |
|---|---|---|
| 需要消息顺序 | Key Hash | DefaultPartitioner+ 带Key的消息 |
| 高吞吐、不关心顺序 | Sticky | DefaultPartitioner(默认) |
| 按业务规则路由 | 自定义Partitioner | partitioner.class=xxx |
| 指定分区发送 | 直接指定分区 | ProducerRecord中指定partition |
| 均匀分布无Key消息 | RoundRobin | 需实现自定义Partitioner |
本篇小结
分区器看似简单,实则内涵丰富:
- DefaultPartitioner是双模式:有Key走murmur2哈希(保证同Key顺序),无Key走Sticky(保证批量效率)。Kafka 2.4的Sticky优化是一个典型的"用稍高延迟换更高吞吐"的trade-off案例
- 自定义分区器的关键是理解输入参数——你拿到的是已序列化的keyBytes和集群元数据,足以实现任意复杂的分区逻辑
- 分区数量不是越多越好,需要根据吞吐量需求和消费者并发数综合计算
- 尽量让分区在各个Broker上均匀分布,避免热点——下一篇我们讲集群元数据,看看Producer是怎么知道这些拓扑信息的
上一篇【第13篇】Kafka序列化器深度解析——自定义Serializer不再是难题
下一篇【第15篇】Kafka集群元数据源码解析——生产者如何"认识"整个集群