欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka 7:自定义分区器

程序员文章站 2024-01-12 12:58:22
...

1.分区

  • 我们在新增 ProducerRecord 对象中可以看到,ProducerRecord 包含了目标主题,键和值,Kafka 的消息都是一个个的键值对。键可以设置为默认的 null。
  • 键的主要用途有两个:一,用来决定消息被写往主题的哪个分区,拥有相同键的消息将被写往同一个分区,二,还可以作为消息的附加消息。
  • 如果键值为 null,并且使用默认的分区器,分区器使用轮询算法将消息均衡地分布到各个分区上。
  • 如果键不为空,并且使用默认的分区器,Kafka 对键进行散列,然后根据散列值把消息映射到特定 的分区上。很明显,同一个键总是被映射到同一个分区。但是只有不改变主题分区数量的情况下,键和分区之间的映射才能保持不变,一旦增加了新的 分区,就无法保证了,所以如果要使用键来映射分区,那就要在创建主题的时候把分区规划好,而且永远不要增加新分区。

2.自定义分区器

  • 某些情况下,数据特性决定了需要进行特殊分区,比如电商业务,北京的业务量明显比较大,占据了总业务量的 20%,我们需要对北京的订单进行 单独分区处理,默认的散列分区算法不合适了, 我们就可以自定义分区算法,对北京的订单单独处理,其他地区沿用散列分区算法。或者某些情况下, 我们用 value 来进行分区。

3.创建主题self-partition,"./kafka-topics.sh --zookeeper localhost:2181 --create --topic self-partition --replication-factor 1 --partitions 8"

Kafka 7:自定义分区器

4.添加常量self-partition

Kafka 7:自定义分区器

5.创建自定义分区器SelfPartitioner,实现Partitioner接口

Kafka 7:自定义分区器

package org.example.selfPartition;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class SelfPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //获取分区数
        int num = cluster.partitionCountForTopic(topic);
        //根据value的哈希值对分区数取余的方式得到分区id
        int partId = ((String) value).hashCode() % num;
        return partId;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

6.创建生产者SelfPartitionProducer

package org.example.selfPartition;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.example.config.BusiConst;

import java.util.Properties;
import java.util.concurrent.Future;

public class SelfPartitionProducer {
    public static void main(String[] args) {
        //生产者必须指定3个属性(broker地址清单,key和value的序列化器)
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.42.111:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //使用自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.example.selfPartition.SelfPartitioner");
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        try {
            ProducerRecord<String, String> record;
            for (int i = 0; i < 4; i++) {
                record = new ProducerRecord<String,String>(BusiConst.SELF_PARTITION,
                        String.valueOf(i), "Fisher"+i);
                Future<RecordMetadata> future = producer.send(record);
                System.out.println("do something!");
                RecordMetadata recordMetadata = future.get();//阻塞在这里
                if (recordMetadata != null) {
                    System.out.println("offset:" + recordMetadata.offset() + ";partition:" + recordMetadata.partition());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            producer.close();
        }
    }
}

7.查看使用自定义分区器之后的打印

Kafka 7:自定义分区器

8.注释掉自定义分区器之后的打印

Kafka 7:自定义分区器

相关标签: Kafka kafka