Kafka 7:自定义分区器
程序员文章站
2024-01-12 12:58:22
...
1.分区
- 我们在新增 ProducerRecord 对象中可以看到,ProducerRecord 包含了目标主题,键和值,Kafka 的消息都是一个个的键值对。键可以设置为默认的 null。
- 键的主要用途有两个:一,用来决定消息被写往主题的哪个分区,拥有相同键的消息将被写往同一个分区,二,还可以作为消息的附加消息。
- 如果键值为 null,并且使用默认的分区器,分区器使用轮询算法将消息均衡地分布到各个分区上。
- 如果键不为空,并且使用默认的分区器,Kafka 对键进行散列,然后根据散列值把消息映射到特定 的分区上。很明显,同一个键总是被映射到同一个分区。但是只有不改变主题分区数量的情况下,键和分区之间的映射才能保持不变,一旦增加了新的 分区,就无法保证了,所以如果要使用键来映射分区,那就要在创建主题的时候把分区规划好,而且永远不要增加新分区。
2.自定义分区器
- 某些情况下,数据特性决定了需要进行特殊分区,比如电商业务,北京的业务量明显比较大,占据了总业务量的 20%,我们需要对北京的订单进行 单独分区处理,默认的散列分区算法不合适了, 我们就可以自定义分区算法,对北京的订单单独处理,其他地区沿用散列分区算法。或者某些情况下, 我们用 value 来进行分区。
3.创建主题self-partition,"./kafka-topics.sh --zookeeper localhost:2181 --create --topic self-partition --replication-factor 1 --partitions 8"
4.添加常量self-partition
5.创建自定义分区器SelfPartitioner,实现Partitioner接口
package org.example.selfPartition;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class SelfPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取分区数
int num = cluster.partitionCountForTopic(topic);
//根据value的哈希值对分区数取余的方式得到分区id
int partId = ((String) value).hashCode() % num;
return partId;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
6.创建生产者SelfPartitionProducer
package org.example.selfPartition;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.example.config.BusiConst;
import java.util.Properties;
import java.util.concurrent.Future;
public class SelfPartitionProducer {
public static void main(String[] args) {
//生产者必须指定3个属性(broker地址清单,key和value的序列化器)
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.42.111:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//使用自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.example.selfPartition.SelfPartitioner");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
try {
ProducerRecord<String, String> record;
for (int i = 0; i < 4; i++) {
record = new ProducerRecord<String,String>(BusiConst.SELF_PARTITION,
String.valueOf(i), "Fisher"+i);
Future<RecordMetadata> future = producer.send(record);
System.out.println("do something!");
RecordMetadata recordMetadata = future.get();//阻塞在这里
if (recordMetadata != null) {
System.out.println("offset:" + recordMetadata.offset() + ";partition:" + recordMetadata.partition());
}
}
} catch (Exception e) {
e.printStackTrace();
}finally {
producer.close();
}
}
}