kafka2.0-producer如何将消息分配到partition中_05
Topic:producer-0 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: producer-0 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: producer-0 Partition: 1 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: producer-0 Partition: 2 Leader: 1 Replicas: 1,3 Isr: 1,3
从上一篇文章我们可以知道,名为producer-0的topic有3的partition,分别是partition:0
,partition:1
和partition:2
,并且他们分别在不同的机器上。
在这里我们先讲讲如何将消息发送到指定的partition中,然后在讲默认的partition分配策略(即DefaultPartitioner.java
)
指定的partition发送
话不多说,直接上代码。(提示:在启动生产者之前,先启动消费者。)
生产者代码如下:
/**
* 将消息发送到指定的partition中
* @author yangyaming
*/
public class PartitionProducer {
public static final String TOPIC_NAME = "producer-0";
private static Properties props = new Properties();
static{
props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
public static void main(String[] args) {
Producer<String, String> producer = new KafkaProducer<>(props);
//获取该topic的所有partition信息
List<PartitionInfo> partitions = producer.partitionsFor(TOPIC_NAME);
printAllPartition(partitions);
/**将消息发送分片列表的到第二个partition中**/
int sendPartition = partitions.get(1).partition();
System.out.println("消息将发送到partition:" + sendPartition + "中.");
for (int i = 100; i < 200; i++)
producer.send(new ProducerRecord<String, String>(TOPIC_NAME,sendPartition, Integer.toString(i), Integer.toString(i)));
System.out.println("发送完成");
producer.close();
}
//打印所有的partition
private static void printAllPartition(List<PartitionInfo> partitions) {
if(partitions == null || partitions.size() <= 0)
return;
System.out.println("topic:" + TOPIC_NAME + ",所有的partition如下:");
partitions.forEach((partition) -> System.out.println("partition:" + partition.partition()));
}
}
执行的结果如下图,可以看到消息全部发送到了序号为1的partition中:
消费者代码如下:
/**
* kafka消费者
*/
public class PartitionConsumer {
private static Properties props = new Properties();
static{
props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
public static void main(String args[]){
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(PartitionProducer.TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("partition = %d offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());
}
}
}
执行结果如下,可以看到消费的所有消息都是来自于partition-1中:
很明显,在实际应用中,我们绝对不会将所有的消息都放在一个partition中,这会导致所有的消息将只能存放在一台机器上。
producer.send(new ProducerRecord<String, String>(TOPIC_NAME,sendPartition, Integer.toString(i), Integer.toString(i)));
截取生产者的发送代码,如果我们不指定sendPartition这个入参,那么所有的消费该如何分配到各个partition中呢?
kafka2.0默认的partition分片器(DefaultPartitioner)
分析kafka客户端源码,当没有指定Partitioner.class
配置时,会使用默认的分片规则,代码实现如下:
/**
* 默认的分片策略:
* <ul>
* <li>如果你指定了一个partition,用你指定的partition,如上面例子,我指定了partition-1
* <li>如果没有指定partition,但是你指定了key,那么会根据key进行哈希,分配到对应的partition中
* <li>如果partition和key都没指定,会使用round-robin算法
*/
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
重点分析partition()
方法:
如果你手动指定了分区(就像上面我的例子一样),根本就不会进这个方法,因为不需要进行分区。
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
如果你指定了key这个参数,那么会针对key进行hash,也就是上面这段代码,其实通过
Utils.murmur2(keyBytes)
可以看出,他是根据key的序列化之后的字节码进行hash的,所以间接地,一个消息被分到哪个partition上跟序列化方式也有关系。Utils.murmur2
是在kafka中实现的,源码如下。
/**
* Generates 32 bit murmur2 hash from byte array
* @param data byte array to hash
* @return 32 bit hash of the given array
*/
public static int murmur2(final byte[] data) {
int length = data.length;
int seed = 0x9747b28c;
// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
final int m = 0x5bd1e995;
final int r = 24;
// Initialize the hash to a random value
int h = seed ^ length;
int length4 = length / 4;
for (int i = 0; i < length4; i++) {
final int i4 = i * 4;
int k = (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24);
k *= m;
k ^= k >>> r;
k *= m;
h *= m;
h ^= k;
}
// Handle the last few bytes of the input array
switch (length % 4) {
case 3:
h ^= (data[(length & ~3) + 2] & 0xff) << 16;
case 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8;
case 1:
h ^= data[length & ~3] & 0xff;
h *= m;
}
h ^= h >>> 13;
h *= m;
h ^= h >>> 15;
return h;
}
上面使用的是MurmurHash2,常用的另一种hash算法是djb。MurmurHash算法的复杂度很高,所以很难理解上面这段代码,有兴趣的可以去研究下,下面是MurmurHash2一段介绍。
MurmurHash 是一种非加密型哈希函数,适用于一般的哈希检索操作。 由Austin Appleby在2008年发明,
并出现了多个变种,都已经发布到了公有领域。与其它流行的哈希函数相比,对于规律性较强的key,MurmurHash的随机分布特征表现更良好。
Redis在实现字典时用到了两种不同的哈希算法,MurmurHash便是其中一种(另一种是djb),在Redis中应用十分广泛,包括数据库、集群、哈希键、阻塞操作等功能都用到了这个算法。发明算法的作者被邀到google工作,该算法最新版本是MurmurHash3,基于MurmurHash2改进了一些小瑕疵,使得速度更快,实现了32位(低延时)、128位HashKey,尤其对大块的数据,具有较高的平衡性与低碰撞率。
无论是哪一种hash算法,我们都希望能将消息尽量快速的,均匀的分配到不同的partition中。
如果你没有指定key,那么会采用round-robin算法,其实就是轮询算法,每一个partition依次排队,一个一个的分配。
代码如下:
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
看nextValue()
的实现,很容易知道,map针对每一个topic,都有一个序号,在一开始为空的时候,就随机生成一个数,然后再自增。
然后就是Utils.toPositive(nextValue)
,这个方法在kafka中实现的,其实就是把负数转为正数,如果你测试一下你会发现Utils.toPositive(-1)
返回的是2147483647。Utils.toPositive()
的实现如下:
public static int toPositive(int number) {
return number & 0x7fffffff;
}
其实就是将number
的符号未设置为0,由于java中表示数使用的补码,所以-1的符号位是1,而剩下的31为与0x7fffffff
相同,所以Utils.toPositive(-1)=0x7fffffff
。
所以,在没有key的情况下,是采用依次轮询调度算法。以上源码是kafka2.0版本的,低版本的可能会有所不同。
下一篇: vsftpd