欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka2.0-producer如何将消息分配到partition中_05

程序员文章站 2022-03-26 21:48:28
...
Topic:producer-0    PartitionCount:3    ReplicationFactor:2 Configs:
    Topic: producer-0   Partition: 0    Leader: 2   Replicas: 2,1   Isr: 2,1
    Topic: producer-0   Partition: 1    Leader: 3   Replicas: 3,2   Isr: 3,2
    Topic: producer-0   Partition: 2    Leader: 1   Replicas: 1,3   Isr: 1,3

从上一篇文章我们可以知道,名为producer-0的topic有3的partition,分别是partition:0partition:1partition:2,并且他们分别在不同的机器上。
在这里我们先讲讲如何将消息发送到指定的partition中,然后在讲默认的partition分配策略(即DefaultPartitioner.java

指定的partition发送

话不多说,直接上代码。(提示:在启动生产者之前,先启动消费者。
生产者代码如下:

/**
 * 将消息发送到指定的partition中
 * @author yangyaming
 */
public class PartitionProducer {

    public static final String TOPIC_NAME = "producer-0"; 

    private  static Properties props = new Properties();

    static{
         props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 1);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    }

    public static void main(String[] args) {
         Producer<String, String> producer = new KafkaProducer<>(props);

         //获取该topic的所有partition信息
         List<PartitionInfo> partitions = producer.partitionsFor(TOPIC_NAME);
         printAllPartition(partitions);

         /**将消息发送分片列表的到第二个partition中**/
         int sendPartition = partitions.get(1).partition();

         System.out.println("消息将发送到partition:" + sendPartition + "中.");

         for (int i = 100; i < 200; i++)
             producer.send(new ProducerRecord<String, String>(TOPIC_NAME,sendPartition, Integer.toString(i), Integer.toString(i)));
         System.out.println("发送完成");
         producer.close();
    }

    //打印所有的partition
    private static void printAllPartition(List<PartitionInfo> partitions) {
        if(partitions == null || partitions.size() <= 0)
            return;

        System.out.println("topic:" + TOPIC_NAME + ",所有的partition如下:");

        partitions.forEach((partition) -> System.out.println("partition:" + partition.partition()));
    }

}

示例源码:https://github.com/Mryangtaofang/sample

执行的结果如下图,可以看到消息全部发送到了序号为1的partition中:
kafka2.0-producer如何将消息分配到partition中_05
消费者代码如下:

/**
 * kafka消费者
 */
public class PartitionConsumer {

    private  static Properties props = new Properties();

    static{
         props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
         props.put("group.id", "test");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public  static void main(String args[]){
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList(PartitionProducer.TOPIC_NAME));
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
                 System.out.printf("partition = %d offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());
         }
    }
}

执行结果如下,可以看到消费的所有消息都是来自于partition-1中:
kafka2.0-producer如何将消息分配到partition中_05

很明显,在实际应用中,我们绝对不会将所有的消息都放在一个partition中,这会导致所有的消息将只能存放在一台机器上。

producer.send(new ProducerRecord<String, String>(TOPIC_NAME,sendPartition, Integer.toString(i), Integer.toString(i)));

截取生产者的发送代码,如果我们不指定sendPartition这个入参,那么所有的消费该如何分配到各个partition中呢?

kafka2.0默认的partition分片器(DefaultPartitioner)

分析kafka客户端源码,当没有指定Partitioner.class配置时,会使用默认的分片规则,代码实现如下:

/**
 * 默认的分片策略:
 * <ul>
 * <li>如果你指定了一个partition,用你指定的partition,如上面例子,我指定了partition-1
 * <li>如果没有指定partition,但是你指定了key,那么会根据key进行哈希,分配到对应的partition中
 * <li>如果partition和key都没指定,会使用round-robin算法
 */
public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {}

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

    public void close() {}

}

重点分析partition()方法:

如果你手动指定了分区(就像上面我的例子一样),根本就不会进这个方法,因为不需要进行分区。

 // hash the keyBytes to choose a partition
 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

如果你指定了key这个参数,那么会针对key进行hash,也就是上面这段代码,其实通过Utils.murmur2(keyBytes)可以看出,他是根据key的序列化之后的字节码进行hash的,所以间接地,一个消息被分到哪个partition上跟序列化方式也有关系。
Utils.murmur2是在kafka中实现的,源码如下。

 /**
  * Generates 32 bit murmur2 hash from byte array
  * @param data byte array to hash
  * @return 32 bit hash of the given array
  */
 public static int murmur2(final byte[] data) {
     int length = data.length;
     int seed = 0x9747b28c;
     // 'm' and 'r' are mixing constants generated offline.
     // They're not really 'magic', they just happen to work well.
     final int m = 0x5bd1e995;
     final int r = 24;

     // Initialize the hash to a random value
     int h = seed ^ length;
     int length4 = length / 4;

     for (int i = 0; i < length4; i++) {
         final int i4 = i * 4;
         int k = (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24);
         k *= m;
         k ^= k >>> r;
         k *= m;
         h *= m;
         h ^= k;
     }

     // Handle the last few bytes of the input array
     switch (length % 4) {
         case 3:
             h ^= (data[(length & ~3) + 2] & 0xff) << 16;
         case 2:
             h ^= (data[(length & ~3) + 1] & 0xff) << 8;
         case 1:
             h ^= data[length & ~3] & 0xff;
             h *= m;
     }

     h ^= h >>> 13;
     h *= m;
     h ^= h >>> 15;

     return h;
  }

上面使用的是MurmurHash2,常用的另一种hash算法是djb。MurmurHash算法的复杂度很高,所以很难理解上面这段代码,有兴趣的可以去研究下,下面是MurmurHash2一段介绍。

MurmurHash 是一种非加密型哈希函数,适用于一般的哈希检索操作。 由Austin Appleby在2008年发明,
并出现了多个变种,都已经发布到了公有领域。与其它流行的哈希函数相比,对于规律性较强的key,MurmurHash的随机分布特征表现更良好。
Redis在实现字典时用到了两种不同的哈希算法,MurmurHash便是其中一种(另一种是djb),在Redis中应用十分广泛,包括数据库、集群、哈希键、阻塞操作等功能都用到了这个算法。发明算法的作者被邀到google工作,该算法最新版本是MurmurHash3,基于MurmurHash2改进了一些小瑕疵,使得速度更快,实现了32位(低延时)、128位HashKey,尤其对大块的数据,具有较高的平衡性与低碰撞率。

无论是哪一种hash算法,我们都希望能将消息尽量快速的,均匀的分配到不同的partition中。

如果你没有指定key,那么会采用round-robin算法,其实就是轮询算法,每一个partition依次排队,一个一个的分配。

代码如下:

int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
    int part = Utils.toPositive(nextValue) % availablePartitions.size();
    return availablePartitions.get(part).partition();
} else {
    // no partitions are available, give a non-available partition
    return Utils.toPositive(nextValue) % numPartitions;
}

nextValue()的实现,很容易知道,map针对每一个topic,都有一个序号,在一开始为空的时候,就随机生成一个数,然后再自增。

然后就是Utils.toPositive(nextValue),这个方法在kafka中实现的,其实就是把负数转为正数,如果你测试一下你会发现Utils.toPositive(-1)返回的是2147483647。Utils.toPositive()的实现如下:

  public static int toPositive(int number) {
      return number & 0x7fffffff;
  }

其实就是将number的符号未设置为0,由于java中表示数使用的补码,所以-1的符号位是1,而剩下的31为与0x7fffffff相同,所以Utils.toPositive(-1)=0x7fffffff

所以,在没有key的情况下,是采用依次轮询调度算法。以上源码是kafka2.0版本的,低版本的可能会有所不同。

相关标签: kafka