欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka的Consumer构建

程序员文章站 2022-03-02 17:00:31
...

3.2.1.1 消息消费的demo代码

消息消费的demo代码如下:

package com.tuozixuan.kafka.demo;

 

import java.util.Arrays;

import java.util.Properties;

 

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

 

publicclass ConsumerTest {

 

    publicstaticvoid main(String[] args) {

 

        String topicName = "test";

        String groupId = "test-group";

 

        Properties props = new Properties();

        // 必须指定的属性

        props.put("bootstrap.servers", "10.4.23.159:9092");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("group.id", groupId);

 

        // 可选属性

        props.put("enable.auto.commit", "true");

        props.put("auto.commit.interval.ms", "1000");

        props.put("auto.offset.reset", "earliest"); // 从最早的消息开始读取

 

        // 创建consumer实例,订阅topic

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList(topicName));

 

        try {

            while (true) {

                ConsumerRecords<String, String> records = consumer.poll(1000);

                for (ConsumerRecord<String, String> record : records) {

                    System.out.printf("offset:%d key:%s value:%s%n", record.offset(), record.key(), record.value());

                }

            }

        } finally {

            consumer.close();

        }

    }

}

 

 

构造consumer需要下面6个步骤:

  • 构造一个java.util.Properties对象,至少指定bootstrap.serverskey.deserializervalue.deserializergroup.id的值。
  • 使用上一步创建的Properties实例构造KafkaConsumer对象。
  • 调用   KafkaConsumer.subscribe方法订阅consumer group感兴趣的topic列表。
  • 循环调用KafkaConsumer.poll方法获取封装在ConsumerRecordtopic消息。
  • 处理获取到的ConsumerRecord对象。
  • 关闭KafkaConsumer

3.2.1.2 构造Properties对象

在创建的Properties对象中,必须指定的参数有4个:bootstrap.serverskey.deserializervalue.deserializergroup.id的值。参数的具体含义见3.2.2 consumer主要参数

3.2.1.3 构造KafkaConsumer对象

创建KafkaConsumer实例代码如下:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

创建KafkaConsumer也可同时指定keyvaluedeseralizer,若采用这种方式,则不需要在Properties中指定key.deserializervalue.deserializer

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props,new StringDeserializer(),new StringDeserializer());

3.2.1.4 订阅topic列表

订阅topic的代码如下:

consumer.subscribe(Arrays.asList("topic1","topic2","topic3"));

该方法还支持正则表达式。假设consumer group要消费所有以kafka开头的topic,则可以如此订阅:

consumer.subscribe(Pattern.compile("kafka.*"),new NoOpConsumerRebalanceListener());

注意:subscribe方法不是增量式的,后续的subscribe调用会完全覆盖之前的订阅语句。

3.2.1.5 获取消息

consumer使用KafkaConsumer.poll方法从订阅topic中并行地获取多个分区的消息。为了实现这一点,新版本的consumerpoll方法使用了类似linuxselect I/O机制--所有相关的事件(包括rebalance、获取消息等)都发生在一个事件循环(event loop)中。这样consumer端只使用一个线程就能够完成所有类型的I/O操作。

try {

            while (true) {

                ConsumerRecords<String, String> records = consumer.poll(1000);

                 // 执行具体的消费逻辑

            }

        } finally {

            consumer.close();

        }

上面代码中的1000代表超时设置(timeout,通常情况下如果consumer拿到了足够多的可用数据,那么它可以立即从该方法返回;但若当前没有足够多的数据可供返回,consumer会处于阻塞状态。这个超时参数即控制阻塞的最大时间。这里的1000表示即使没有那么多数据,consumer最多也不要等待超过1秒的时间。

若用户有定时方面的需求,那么根据需求设定timeout是一个不错的选择。否则,设定一个比较大的值甚至Integer.MAX_VALUE,是不错的建议。

3.2.1.6 处理ConsumerRecord对象

poll调用返回ConsumerRecord封装的Kafka消息,拿到这些消息后consumer可以处理自己的业务逻辑。

Kafka consumer的角度而言,poll方法返回即认为consumer成功消费了消息。如果发现poll返回消息的速度过慢,那么可以调节相应的参数来提升poll方法的效率;若消息的业务级处理逻辑过慢,则应该考虑简化处理逻辑或者把处理逻辑放入单独的线程执行。

3.2.1.7 关闭consumer

consumer程序结束后一定要显式关闭consumer以释放KafkaConsumer运行过程中占用的各种系统资源(比如线程资源、内存、Socket连接等)。

KafkaConsumer.close():关闭consumer并最多等待30

KafkaConsumer.close(timeout):关闭consumer并最多等待给定的timeout秒。