欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka的三种消费模式

程序员文章站 2022-05-11 19:20:49
...
  • 自动提交offset
     以下实例代码展示了如何自动提交topic的offset:
public void autoOffsetCommit() {
    Properties props = new Properties();
    props.put("bootstrap.servers""localhost:9092");
    props.put("group.id""test");
    props.put("enable.auto.commit""true");
    props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<StringString> consumer = new KafkaConsumer<StringString>(props);
    consumer.subscribe(Arrays.asList("foo""bar"));
    while (true) {
        ConsumerRecords<StringString> records = consumer.poll(100);
        for (ConsumerRecord<StringString> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n"record.offset()record.key()record.value());
        }
    }
}
     Properties的实例props中存放的key意义:
     1)bootstrap.servers表示要连接的Kafka集群中的节点,其中9092表示端口号;
     2)enable.auto.commit为true,表示在auto.commit.interval.ms时间后会自动提交topic的offset,其中auto.commit.interval.ms默认值为5000ms;
     3)其中foo和bar为要消费的topic名称,由group.id为test作为consumer group统一进行管理;
     4)key.deserializer和value.deserializer表示指定将字节序列化为对象。
  • 手动提交offset
      生产环境中,需要在数据消费完全后再提交offset,也就是说在数据从kafka的topic取出来后并被逻辑处理后,才算是数据被消费掉,此时需要手动去提交topic的offset。
      以下实例代码展示了如何手动提交topic的offset:
     
public void manualOffsetCommit() {
    Properties props = new Properties();
    props.put("bootstrap.servers""localhost:9092");
    props.put("group.id""test");
    props.put("enable.auto.commit""false");
    props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer""org.apache.kafka.common.serialization.Stirng.Deserializer");

    KafkaConsumer<StringString> consumer = new KafkaConsumer<StringString>(props);
    consumer.subscribe(Arrays.asList("foo""bar"));
    final int minBatchSize = 200;
    List<ConsumerRecord<StringString>> buffer = new ArrayList<>();
    while (true) {
        ConsumerRecords<StringString> records = consumer.poll(100);
        for (ConsumerRecord<StringString> record : records) {
            buffer.add(record);
        }

        if (buffer.size() >= minBatchSize) {
            // operation to handle data
            consumer.commitSync();
            buffer.clear();
        }
    }
}
 
      本方案的缺点是必须保证所有数据被处理后,才提交topic的offset。为避免数据的重复消费,可以用第三种方案,根据每个partition的数据消费情况进行提交,称之为“at-least-once”。
  • 手动提交partition的offset
     以下实例代码展示了如何手动提交topic中每一partition的offset:
     
public void manualOffsetCommitOfPartition() {
    Properties props = new Properties();
    props.put("bootstrap.servers""localhost:9092");
    props.put("group.id""test");
    props.put("enable.auto.commit""false");
    props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer""org.apache.kafka.common.serialization.Stirng.Deserializer");

    KafkaConsumer<StringString> consumer = new KafkaConsumer<StringString>(props);
    consumer.subscribe(Arrays.asList("foo""bar"));

    boolean running = true;
    try {
        while (running) {
            ConsumerRecords<StringString> records = consumer.poll(Long.MAX_VALUE);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<StringString>> partitionRecords = records.records(partition);
                for (ConsumerRecord<StringString> record : partitionRecords) {
                    System.out.println(record.offset() + " : " + record.value());
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
        }
    } finally {
        consumer.close();
    }
}

 

 
相关标签: kafka consumer