KafkaConsumer0.9(二) 博客分类: kafka kafka consumerkafka0.9
程序员文章站
2024-03-26 13:46:23
...
先看一个简单的KafkaConsumer例子:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test_group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("test_topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } }
我们看到0.9的consumer最大的变化是:
- 通过consumer.subscribe(Arrays.asList("test_topic"))来声明要订阅的topic,而之前的版本是用Whitelist声明。
- 通过consumer.poll(100)直接抓取消息,而之前需要遍历KafkaStream的迭代器。(这个比之前方便太多了。。。)。
- MessageAndMetadata变成了ConsumerRecords
enable.auto.commit表示已一个固定的时间间隔自动提交offsets,时间间隔由auto.commit.interval.ms控制。
bootstrap.servers表示kafka集群的broker列表。客户端会连接到这个列表中的任意一台机器,获取到整个集群的信息,因此理论上只需要在bootstrap.servers列出一台机器就够了,但是考虑到灾备,建议在bootstrap.servers中包含所有broker。
key.deserializer和value.deserializer指定了如何解析记录的key和value,在本例中我们认定key和value都是字符串。
在本例中,client端启动了一个从属于test_group的consumer来订阅test_topic。当其中一个consumer process断开之后,kafka broker会通过心跳机制自动检测到,因此集群始终能够知道哪些consumer是活着的。只要被认为是活着,这个consumer就能够从分配给它的partition中获取数据;一旦心跳丢失超过session.timeout.ms,consumer会被认为死掉,它所占有的partition将会被分配给别的process。
推荐阅读
-
Kafka MirrorMaker实践 博客分类: kafka kafkamirrormaker
-
KafkaConsumer0.9(二) 博客分类: kafka kafka consumerkafka0.9
-
Kafka0.10新特性 博客分类: kafka kafka0.10
-
KafkaConsumer0.9(三) 博客分类: kafka kafkaconsumerkafka0.9
-
KafkaConsumer0.9(一) 博客分类: kafka kafka consumerkafka0.9
-
KafkaConsumer0.9(二) 博客分类: kafka kafka consumerkafka0.9
-
KafkaConsumer0.9(三) 博客分类: kafka kafkaconsumerkafka0.9
-
Kafka0.10新特性 博客分类: kafka kafka0.10
-
Kafka MirrorMaker实践 博客分类: kafka kafkamirrormaker
-
消息中间件选型分析:从Kafka与RabbitMQ的对比看全局 博客分类: 技术选型