kafka发送消息Key为null时如何选择分区
程序员文章站
2022-03-03 09:17:29
分区的原则通过分析,建议发送消息时轮询指定分区。我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值;(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调...
分区的原则
通过分析,建议发送消息时轮询指定分区。
我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。
(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition
数进行取余得到 partition 值;
(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后
面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition
值,也就是常说的 round-robin 算法。
源码代码:
在Kafka new producer上如果Key为null则每条消息都会选择不同的Partition:
if (record.partition() != null) {
// they have given us a partition, use it
if (record.partition() < 0 || record.partition() >= numPartitions)
throw new IllegalArgumentException("Invalid partition given with record: "
+ record.partition()
+ " is not in the range [0..."
+ numPartitions
+ "].");
return record.partition();
} else if (record.key() == null) {
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster
.availablePartitionsForTopic(record.topic());
if (availablePartitions.size() > 0) {
int part = Utils.abs(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.abs(nextValue) % numPartitions;
}
} else {
// hash the key to choose a partition
return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
}
可以看出这是一种round-robin模式选择分区ID的。
参考:Key为null时Kafka如何选择分区(Partition)
本文地址:https://blog.csdn.net/yangshengwei230612/article/details/110535995