欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

kafka发送消息Key为null时如何选择分区

程序员文章站 2022-03-03 09:17:29
分区的原则通过分析,建议发送消息时轮询指定分区。我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值;(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调...

分区的原则

通过分析,建议发送消息时轮询指定分区。

我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。
kafka发送消息Key为null时如何选择分区

(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition
数进行取余得到 partition 值;
(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后
面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition
值,也就是常说的 round-robin 算法。

源码代码:
在Kafka new producer上如果Key为null则每条消息都会选择不同的Partition:

if (record.partition() != null) {
    // they have given us a partition, use it
    if (record.partition() < 0 || record.partition() >= numPartitions)
        throw new IllegalArgumentException("Invalid partition given with record: "
                                           + record.partition()
                                           + " is not in the range [0..."
                                           + numPartitions
                                           + "].");
    return record.partition();
} else if (record.key() == null) {
    int nextValue = counter.getAndIncrement();
    List<PartitionInfo> availablePartitions = cluster
        .availablePartitionsForTopic(record.topic());
    if (availablePartitions.size() > 0) {
        int part = Utils.abs(nextValue) % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        // no partitions are available, give a non-available partition
        return Utils.abs(nextValue) % numPartitions;
    }
} else {
    // hash the key to choose a partition
    return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
}

可以看出这是一种round-robin模式选择分区ID的。

参考:Key为null时Kafka如何选择分区(Partition)

本文地址:https://blog.csdn.net/yangshengwei230612/article/details/110535995

相关标签: # kafka