欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka api

程序员文章站 2024-01-12 12:50:28
...

一、producer api

1、消息发送流程

    kafka的producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到kafka broker。

kafka api

2、异步发送

引入maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.0</version>
</dependency>

 

不带回调的api:

Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.10.110:9092,192.168.10.132:9092,192.168.10.177:9092");
properties.put("acks", "all");
properties.put("retries", 3);
//批次大小
properties.put("batch.size", 16384);
//等待时间
properties.put("linger.ms", 1);
//缓冲区大小
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
    kafkaProducer.send(new ProducerRecord<>("topic", "demo -- " + i));
}
kafkaProducer.close();

带回调的api:

Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.10.110:9092,192.168.10.132:9092,192.168.10.177:9092");
properties.put("acks", "all");
properties.put("retries", 3);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
    kafkaProducer.send(new ProducerRecord<>("topic", 0,"demo","callback - " + i), new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e == null) {
                System.out.println(recordMetadata.partition() + "--" + recordMetadata.offset());
            }
        }
    });
}
kafkaProducer.close();

选择分区,如果没选分区有key的话,会用key进行hash分配到分区。

可以自定义分区,实现Partitioner接口,然后在properties中添加partitioner.class指定分区器。

3、同步发送

    send()返回值是Future,调用get()方法会阻塞发送。

4、自定义拦截器

    实现ProducerInterceptor接口,然后在properties中添加interceptor.classes,value是一个list

二、consumer api

1、自动提交offset

Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.10.110:9092,192.168.10.132:9092,192.168.10.177:9092");
properties.put("group.id", "2");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//offset重置
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("topic"));
while (true) {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
    System.err.println(records.count());
    for (ConsumerRecord<String, String> record : records) {
        System.err.println(record.key() + " - " + record.value() + " - " + record.partition() + " - " + record.offset());

    }
}

offset重置有两个前提,一个是一个新的消费组,还有一个是数据过期。

2、手动提交offset

    将enable.auto.commit设置为false,然后consumer.commitSync同步提交和consumer.commitAsync异步提交

3、自定义存储offset

    kafka0.9版本之前,offset存储在zookeeper,0.9之后,默认将offset存储在kafka的一个内置topic中。除此之外,kafka还可以选择自定义存储offset。

    offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalace。

    当有新的消费者加入消费者组,已有的消费者退出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做Rebalace。 

    自定义存储offset,需要借助ConsumerRebalanceListener。

kafka api

做成一个事务放关系型数据库,然后重启时调用seek()。