欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

KafkaConsumer0.9(二) 博客分类: kafka kafka consumerkafka0.9 

程序员文章站 2024-03-26 13:46:11
...

先看一个简单的KafkaConsumer例子:

 

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test_group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test_topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
    }
}

 

我们看到0.9的consumer最大的变化是:

 

  • 通过consumer.subscribe(Arrays.asList("test_topic"))来声明要订阅的topic,而之前的版本是用Whitelist声明。
  • 通过consumer.poll(100)直接抓取消息,而之前需要遍历KafkaStream的迭代器。(这个比之前方便太多了。。。)。
  • MessageAndMetadata变成了ConsumerRecords

enable.auto.commit表示已一个固定的时间间隔自动提交offsets,时间间隔由auto.commit.interval.ms控制。

 

bootstrap.servers表示kafka集群的broker列表。客户端会连接到这个列表中的任意一台机器,获取到整个集群的信息,因此理论上只需要在bootstrap.servers列出一台机器就够了,但是考虑到灾备,建议在bootstrap.servers中包含所有broker。

 

key.deserializer和value.deserializer指定了如何解析记录的key和value,在本例中我们认定key和value都是字符串。

 

在本例中,client端启动了一个从属于test_group的consumer来订阅test_topic。当其中一个consumer process断开之后,kafka broker会通过心跳机制自动检测到,因此集群始终能够知道哪些consumer是活着的。只要被认为是活着,这个consumer就能够从分配给它的partition中获取数据;一旦心跳丢失超过session.timeout.ms,consumer会被认为死掉,它所占有的partition将会被分配给别的process。