欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java kafka如何实现自定义分区类和拦截器

程序员文章站 2023-09-07 15:01:57
生产者发送到对应的分区有以下几种方式:(1)指定了patition,则直接使用;(可以查阅对应的java api, 有多种参数)(2)未指定patition但指定key,通过对key的value进行h...

生产者发送到对应的分区有以下几种方式:

(1)指定了patition,则直接使用;(可以查阅对应的java api, 有多种参数)

(2)未指定patition但指定key,通过对key的value进行hash出一个patition;

(3)patition和key都未指定,使用轮询选出一个patition。

但是kafka提供了,自定义分区算法的功能,由业务手动实现分布:

1、实现一个自定义分区类,custompartitioner实现partitioner

import org.apache.kafka.clients.producer.partitioner;
import org.apache.kafka.common.cluster;

import java.util.map;

public class custompartitioner implements partitioner {

  /**
   *
   * @param topic 当前的发送的topic
   * @param key  当前的key值
   * @param keybytes 当前的key的字节数组
   * @param value 当前的value值
   * @param valuebytes 当前的value的字节数组
   * @param cluster
   * @return
   */
  @override
  public int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster) {
    //这边根据返回值就是分区号, 这边就是固定发送到三号分区
    return 3;
  }

  @override
  public void close() {

  }
  @override
  public void configure(map<string, ?> configs) {

  }

}

2、producer配置文件指定,具体的分区类

// 具体的分区类
props.put(producerconfig.partitioner_class_config, "kafka.custompartitioner");

技巧:可以使用producerconfig中提供的配置producerconfig

kafka producer拦截器

拦截器(interceptor)是在kafka 0.10版本被引入的。

interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。

许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。

所使用的类为:

org.apache.kafka.clients.producer.producerinterceptor

我们可以编码测试下:

1、定义消息拦截器,实现消息处理(可以是加时间戳等等,unid等等。)

import org.apache.kafka.clients.producer.producerinterceptor;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.clients.producer.recordmetadata;

import java.util.map;
import java.util.uuid;

public class messageinterceptor implements producerinterceptor<string, string> {

  @override
  public void configure(map<string, ?> configs) {
    system.out.println("这是messageinterceptor的configure方法");
  }

  /**
   * 这个是消息发送之前进行处理
   *
   * @param record
   * @return
   */
  @override
  public producerrecord<string, string> onsend(producerrecord<string, string> record) {
    // 创建一个新的record,把uuid入消息体的最前部
    system.out.println("为消息添加uuid");
    return new producerrecord(record.topic(), record.partition(), record.timestamp(), record.key(),
        uuid.randomuuid().tostring().replace("-", "") + "," + record.value());
  }

  /**
   * 这个是生产者回调函数调用之前处理
   * @param metadata
   * @param exception
   */
  @override
  public void onacknowledgement(recordmetadata metadata, exception exception) {
    system.out.println("messageinterceptor拦截器的onacknowledgement方法");
  }

  @override
  public void close() {
    system.out.println("messageinterceptor close 方法");
  }
}

2、定义计数拦截器

import java.util.map;
import org.apache.kafka.clients.producer.producerinterceptor;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.clients.producer.recordmetadata;

public class counterinterceptor implements producerinterceptor<string, string>{
  private int errorcounter = 0;
  private int successcounter = 0;

  @override
  public void configure(map<string, ?> configs) {
    system.out.println("这是counterinterceptor的configure方法");
  }

  @override
  public producerrecord<string, string> onsend(producerrecord<string, string> record) {
    system.out.println("counterinterceptor计数过滤器不对消息做任何操作");
    return record;
  }

  @override
  public void onacknowledgement(recordmetadata metadata, exception exception) {
    // 统计成功和失败的次数
    system.out.println("counterinterceptor过滤器执行统计失败和成功数量");
    if (exception == null) {
      successcounter++;
    } else {
      errorcounter++;
    }
  }

  @override
  public void close() {
    // 保存结果
    system.out.println("successful sent: " + successcounter);
    system.out.println("failed sent: " + errorcounter);
  }
}

3、producer客户端:

import org.apache.kafka.clients.producer.*;

import java.util.arraylist;
import java.util.list;
import java.util.properties;

public class producer1 {
  public static void main(string[] args) throws exception {
    properties props = new properties();
    // kafka服务端的主机名和端口号
    props.put("bootstrap.servers", "localhost:9092");
    // 等待所有副本节点的应答
    props.put("acks", "all");
    // 消息发送最大尝试次数
    props.put("retries", 0);
    // 一批消息处理大小
    props.put("batch.size", 16384);
    // 请求延时,可能生产数据太快了
    props.put("linger.ms", 1);
    // 发送缓存区内存大小,数据是先放到生产者的缓冲区
    props.put("buffer.memory", 33554432);
    // key序列化
    props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
    // value序列化
    props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");
    // 具体的分区类
    props.put(producerconfig.partitioner_class_config, "kafka.custompartitioner");
    //定义拦截器
    list<string> interceptors = new arraylist<>();
    interceptors.add("kafka.messageinterceptor");
    interceptors.add("kafka.counterinterceptor");
    props.put(producerconfig.interceptor_classes_config, interceptors);

    producer<string, string> producer = new kafkaproducer<>(props);
    for (int i = 0; i < 1; i++) {
      producer.send(new producerrecord<string, string>("test_0515", i + "", "xxx-" + i), new callback() {
        public void oncompletion(recordmetadata recordmetadata, exception e) {
          system.out.println("这是producer回调函数");
        }
      });
    }
    /*system.out.println("现在执行关闭producer");
    producer.close();*/
    producer.close();
  }
}

总结,我们可以知道拦截器链各个方法的执行顺序,假如有a、b拦截器,在一个拦截器链中:

(1)执行a的configure方法,执行b的configure方法

(2)执行a的onsend方法,b的onsend方法

(3)生产者发送完毕后,执行a的onacknowledgement方法,b的onacknowledgement方法。

(4)执行producer自身的callback回调函数。

(5)执行a的close方法,b的close方法。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。