Kafka的接口回调 +自定义分区、拦截器
程序员文章站
2022-04-06 12:25:43
一、接口回调+自定义分区 1.接口回调:在使用消费者的send方法时添加Callback回调 注意:在自定义分区后,你的消费者会收不到消息,因为消费者默认接收的分区为0。 二、拦截器 1)创建生产者类; 2)创建自定义拦截器类实现ProducerInterceptor接口,重写抽象方法; 3)在业务 ......
一、接口回调+自定义分区
1.接口回调:在使用消费者的send方法时添加callback回调
producer.send(new producerrecord<string, string>("xinnian", "20" + i + "年新年好!"), new callback() {
public void oncompletion(recordmetadata recordmetadata, exception e) {
if (recordmetadata!=null){
system.out.println(recordmetadata.topic()+"-----"+recordmetadata.offset()+"-----"+recordmetadata.partition());
}
}
2.自定义分区:定义类实现patitioner接口,实现接口的方法:
设置configure、分区逻辑partition(return 1;)、释放资源close、在生产者的配置过程中添加入分区属性。
在定义生产者属性时添加分区的属性即可
/**
* @author: princesshug
* @date: 2019/2/28, 16:24
* @blog: https://www.cnblogs.com/hellobigtable/
*/
public class partitiondemo implements partitioner {
public int partition(string s, object o, byte[] bytes, object o1, byte[] bytes1, cluster cluster) {
return 1;
}
public void close() {
}
public void configure(map<string, ?> map) {
}
}
public class producerdemo {
public static void main(string[] args) {
properties prop = new properties();
//参数配置
//kafka节点的地址
prop.put("bootstrap.servers", "192.168.126.128:9092");
//发送消息是否等待应答
prop.put("acks", "all");
//配置发送消息失败重试
prop.put("retries", "0");
//配置批量处理消息大小
prop.put("batch.size", "10241");
//配置批量处理数据延迟
prop.put("linger.ms","5");
//配置内存缓冲大小
prop.put("buffer.memory", "12341235");
//消息在发送前必须序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");
prop.put("partitioner.class", "partitiondemo");
kafkaproducer<string, string> producer = new kafkaproducer<string, string>(prop);
for (int i=10;i<100;i++){
producer.send(new producerrecord<string, string>("xinnian", "20" + i + "年新年好!"), new callback() {
public void oncompletion(recordmetadata recordmetadata, exception e) {
if (recordmetadata!=null){
system.out.println(recordmetadata.topic()+"-----"+recordmetadata.offset()+"-----"+recordmetadata.partition());
}
}
});
}
producer.close();
}
}
注意:在自定义分区后,你的消费者会收不到消息,因为消费者默认接收的分区为0。
二、拦截器
1)创建生产者类;
2)创建自定义拦截器类实现producerinterceptor接口,重写抽象方法;
3)在业务逻辑方法producerrecord方法中,修改返回值,
return new producerrecord<string,string>(
record.topic(),
record.partiiton(),
record.key(),
system.currenttimemillis() + "-" + record.value() + "-" + record.topic());
4)在生产者类中将自定义拦截器生效
prop.put(producerconfig.interceptor_classea_config,"com.wyh.com.wyh.kafka.interceptor.timeinterceptor");
5)运行生产者main方法,或者在linux端用shell测试。
/** * @author: princesshug * @date: 2019/2/28, 20:59 * @blog: https://www.cnblogs.com/hellobigtable/ */ public class timeinterceptor implements producerinterceptor<string, string> { //业务逻辑 public producerrecord<string, string> onsend(producerrecord<string, string> producerrecord) { return new producerrecord<string,string>( producerrecord.topic(), producerrecord.partition(), producerrecord.key(), system.currenttimemillis()+"--"+producerrecord.value() ); } //发送失败调用 public void onacknowledgement(recordmetadata recordmetadata, exception e) { } //释放资源 public void close() { } //获取配置信息 public void configure(map<string, ?> map) { } } public class itctorproducer { public static void main(string[] args) { //配置生产者属性 properties prop = new properties(); //kafka节点的地址 prop.put("bootstrap.servers", "192.168.126.128:9092"); //发送消息是否等待应答 prop.put("acks", "all"); //配置发送消息失败重试 prop.put("retries", "0"); //配置批量处理消息大小 prop.put("batch.size", "1024"); //配置批量处理数据延迟 prop.put("linger.ms","5"); //配置内存缓冲大小 prop.put("buffer.memory", "12341235"); //消息在发送前必须序列化 prop.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer"); //添加拦截器 arraylist<string> inlist = new arraylist<string>(); inlist.add("interceptor.timeinterceptor"); prop.put(producerconfig.interceptor_classes_config,inlist); //实例化producer kafkaproducer<string, string> producer = new kafkaproducer<string, string>(prop); //发送消息 for (int i=0;i<99;i++){ producer.send(new producerrecord<string, string>("xinnian","you are genius!"+i)); } //释放资源 producer.close(); } }