Java kafka如何实现自定义分区类和拦截器
生产者发送到对应的分区有以下几种方式:
(1)指定了patition,则直接使用;(可以查阅对应的java api, 有多种参数)
(2)未指定patition但指定key,通过对key的value进行hash出一个patition;
(3)patition和key都未指定,使用轮询选出一个patition。
但是kafka提供了,自定义分区算法的功能,由业务手动实现分布:
1、实现一个自定义分区类,custompartitioner实现partitioner
import org.apache.kafka.clients.producer.partitioner; import org.apache.kafka.common.cluster; import java.util.map; public class custompartitioner implements partitioner { /** * * @param topic 当前的发送的topic * @param key 当前的key值 * @param keybytes 当前的key的字节数组 * @param value 当前的value值 * @param valuebytes 当前的value的字节数组 * @param cluster * @return */ @override public int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster) { //这边根据返回值就是分区号, 这边就是固定发送到三号分区 return 3; } @override public void close() { } @override public void configure(map<string, ?> configs) { } }
2、producer配置文件指定,具体的分区类
// 具体的分区类
props.put(producerconfig.partitioner_class_config, "kafka.custompartitioner");
技巧:可以使用producerconfig中提供的配置producerconfig
kafka producer拦截器
拦截器(interceptor)是在kafka 0.10版本被引入的。
interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。
许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。
所使用的类为:
org.apache.kafka.clients.producer.producerinterceptor
我们可以编码测试下:
1、定义消息拦截器,实现消息处理(可以是加时间戳等等,unid等等。)
import org.apache.kafka.clients.producer.producerinterceptor; import org.apache.kafka.clients.producer.producerrecord; import org.apache.kafka.clients.producer.recordmetadata; import java.util.map; import java.util.uuid; public class messageinterceptor implements producerinterceptor<string, string> { @override public void configure(map<string, ?> configs) { system.out.println("这是messageinterceptor的configure方法"); } /** * 这个是消息发送之前进行处理 * * @param record * @return */ @override public producerrecord<string, string> onsend(producerrecord<string, string> record) { // 创建一个新的record,把uuid入消息体的最前部 system.out.println("为消息添加uuid"); return new producerrecord(record.topic(), record.partition(), record.timestamp(), record.key(), uuid.randomuuid().tostring().replace("-", "") + "," + record.value()); } /** * 这个是生产者回调函数调用之前处理 * @param metadata * @param exception */ @override public void onacknowledgement(recordmetadata metadata, exception exception) { system.out.println("messageinterceptor拦截器的onacknowledgement方法"); } @override public void close() { system.out.println("messageinterceptor close 方法"); } }
2、定义计数拦截器
import java.util.map; import org.apache.kafka.clients.producer.producerinterceptor; import org.apache.kafka.clients.producer.producerrecord; import org.apache.kafka.clients.producer.recordmetadata; public class counterinterceptor implements producerinterceptor<string, string>{ private int errorcounter = 0; private int successcounter = 0; @override public void configure(map<string, ?> configs) { system.out.println("这是counterinterceptor的configure方法"); } @override public producerrecord<string, string> onsend(producerrecord<string, string> record) { system.out.println("counterinterceptor计数过滤器不对消息做任何操作"); return record; } @override public void onacknowledgement(recordmetadata metadata, exception exception) { // 统计成功和失败的次数 system.out.println("counterinterceptor过滤器执行统计失败和成功数量"); if (exception == null) { successcounter++; } else { errorcounter++; } } @override public void close() { // 保存结果 system.out.println("successful sent: " + successcounter); system.out.println("failed sent: " + errorcounter); } }
3、producer客户端:
import org.apache.kafka.clients.producer.*; import java.util.arraylist; import java.util.list; import java.util.properties; public class producer1 { public static void main(string[] args) throws exception { properties props = new properties(); // kafka服务端的主机名和端口号 props.put("bootstrap.servers", "localhost:9092"); // 等待所有副本节点的应答 props.put("acks", "all"); // 消息发送最大尝试次数 props.put("retries", 0); // 一批消息处理大小 props.put("batch.size", 16384); // 请求延时,可能生产数据太快了 props.put("linger.ms", 1); // 发送缓存区内存大小,数据是先放到生产者的缓冲区 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer"); // 具体的分区类 props.put(producerconfig.partitioner_class_config, "kafka.custompartitioner"); //定义拦截器 list<string> interceptors = new arraylist<>(); interceptors.add("kafka.messageinterceptor"); interceptors.add("kafka.counterinterceptor"); props.put(producerconfig.interceptor_classes_config, interceptors); producer<string, string> producer = new kafkaproducer<>(props); for (int i = 0; i < 1; i++) { producer.send(new producerrecord<string, string>("test_0515", i + "", "xxx-" + i), new callback() { public void oncompletion(recordmetadata recordmetadata, exception e) { system.out.println("这是producer回调函数"); } }); } /*system.out.println("现在执行关闭producer"); producer.close();*/ producer.close(); } }
总结,我们可以知道拦截器链各个方法的执行顺序,假如有a、b拦截器,在一个拦截器链中:
(1)执行a的configure方法,执行b的configure方法
(2)执行a的onsend方法,b的onsend方法
(3)生产者发送完毕后,执行a的onacknowledgement方法,b的onacknowledgement方法。
(4)执行producer自身的callback回调函数。
(5)执行a的close方法,b的close方法。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。