KafkaConsumer0.9(三) 博客分类: kafka kafkaconsumerkafka0.9
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test_group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); List<TopicPartition> list = new ArrayList<TopicPartition>(); TopicPartition tp = new TopicPartition("test_topic", 0); list.add(tp); consumer.assign(list); consumer.seek(tp, 96); // consumer.seekToBeginning(tp); // consumer.seekToEnd(tp); int commitInterval = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); if (buffer.size() >= commitInterval) { batchProcessRecords(buffer); consumer.commitSync(); buffer.clear(); } } }
1. 在上一篇中我们看了一个简单的自动提交offset的example,但很多情况下我们为了避免消息丢失,需要确保消息被处理完了之后才提交offset,这就需要手动地提交。在上面这个例子中,我们从kafka中抓取数据并缓存在List中,只有当消息达到一定的数量的时候我们才批量处理,假设我们使用自动提交,如果在我们还没来得及处理之前consumer就异常终止,那么有可能这些消息的offset已经被自动提交掉了,等我们的consumer重新连接上来了之后,上次没有处理完成的消息会被我们完全略过,造成数据丢失,这就是"at-most-once delivery"。解决的办法是,只有在批量处理完消息之后,才用consumer.commitSync()手动地提交offset,但这样的副作用的,假如我们正在批量处理消息,这时consumer异常终止,offset没有被提交但有部分消息已经被处理过了,当consumer重连上来时,这批没有被commit的消息会被重新处理一次,造成会有部分消息被重复处理,这就是"at-least-once delivery"。
2. kafka提供load balance机制来确保consumer正常工作,简单的说partitions会被分配给正在监听这个topic的多个consumers(同一个group),当其中一个consumer process异常终止,它之前所占有的partitions会被分配给其他consumer process,从而保证所有的数据都能被正常消费掉。但有时我们并不需要load balance机制,例如:
- 为了节省网络带宽,我们只希望consumer从某一个partition抓取数据,并存储在本地。这在大数据计算或存储中是很常见的行为。在这种情况下我们并不希望另一台机器的consumer来消费这台机器的partition。
- 如果程序本身带有HA机制,例如使用类似于YARN,Mesos等集群管理框架,那么当一个consumer终止了之后,它会被重启,或者是另一个consumer会被启动来替代它,在这种情况下我们不需要kafka重新分配partition。
要做到这点很简单,替换掉上个例子的consumer.subscribe(Arrays.asList("test_topic")),我们使用consumer.assign(list),在本例中,consumer只会消费partition0的数据。
3. 在之前的版本中,如果我们需要消费旧数据(已经commit offset),我们需要用SimpleConsumer。但是在0.9中,这变得更简单了。在本例中,consumer.seek(tp, 96)表示我们从partition 0的offset 96开始抓取数据,consumer.seekToBeginning(tp)表示从头开始抓取数据,consumer.seekToEnd(tp)表示从最后开始抓取数据,换句话说,只消费consumer启动之后新进来的数据。
推荐阅读
-
Kafka MirrorMaker实践 博客分类: kafka kafkamirrormaker
-
KafkaConsumer0.9(二) 博客分类: kafka kafka consumerkafka0.9
-
Kafka0.10新特性 博客分类: kafka kafka0.10
-
KafkaConsumer0.9(三) 博客分类: kafka kafkaconsumerkafka0.9
-
KafkaConsumer0.9(一) 博客分类: kafka kafka consumerkafka0.9
-
KafkaConsumer0.9(二) 博客分类: kafka kafka consumerkafka0.9
-
KafkaConsumer0.9(三) 博客分类: kafka kafkaconsumerkafka0.9
-
Kafka0.10新特性 博客分类: kafka kafka0.10
-
Kafka MirrorMaker实践 博客分类: kafka kafkamirrormaker
-
跟我一起学Mybatis之(三)----Scope and Lifecycle 博客分类: MyBatis MyBatis范围Scope and Lifecycle