kafka api
一、producer api
1、消息发送流程
kafka的producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到kafka broker。
2、异步发送
引入maven:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
不带回调的api:
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.10.110:9092,192.168.10.132:9092,192.168.10.177:9092");
properties.put("acks", "all");
properties.put("retries", 3);
//批次大小
properties.put("batch.size", 16384);
//等待时间
properties.put("linger.ms", 1);
//缓冲区大小
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>("topic", "demo -- " + i));
}
kafkaProducer.close();
带回调的api:
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.10.110:9092,192.168.10.132:9092,192.168.10.177:9092");
properties.put("acks", "all");
properties.put("retries", 3);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>("topic", 0,"demo","callback - " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println(recordMetadata.partition() + "--" + recordMetadata.offset());
}
}
});
}
kafkaProducer.close();
选择分区,如果没选分区有key的话,会用key进行hash分配到分区。
可以自定义分区,实现Partitioner接口,然后在properties中添加partitioner.class指定分区器。
3、同步发送
send()返回值是Future,调用get()方法会阻塞发送。
4、自定义拦截器
实现ProducerInterceptor接口,然后在properties中添加interceptor.classes,value是一个list
二、consumer api
1、自动提交offset
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.10.110:9092,192.168.10.132:9092,192.168.10.177:9092");
properties.put("group.id", "2");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//offset重置
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("topic"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
System.err.println(records.count());
for (ConsumerRecord<String, String> record : records) {
System.err.println(record.key() + " - " + record.value() + " - " + record.partition() + " - " + record.offset());
}
}
offset重置有两个前提,一个是一个新的消费组,还有一个是数据过期。
2、手动提交offset
将enable.auto.commit设置为false,然后consumer.commitSync同步提交和consumer.commitAsync异步提交
3、自定义存储offset
kafka0.9版本之前,offset存储在zookeeper,0.9之后,默认将offset存储在kafka的一个内置topic中。除此之外,kafka还可以选择自定义存储offset。
offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalace。
当有新的消费者加入消费者组,已有的消费者退出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做Rebalace。
自定义存储offset,需要借助ConsumerRebalanceListener。
做成一个事务放关系型数据库,然后重启时调用seek()。
上一篇: 根据模板文件生成一个静态html文件的类