欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka教程2:Producer,生产者

程序员文章站 2022-06-14 13:46:09
...

该文章主要翻译自java类KafkaProducer的Doc文档,基于Kafka2.3版本,Kafka客户端2.3.0版本

1、官方Doc链接如下

https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html


2、Kafka的Maven依赖如下
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.0</version>
</dependency>


3、基于官方文档整理的内容

KafkaProducer类是Kafka客户端包中的Producer接口的实现类,用来向Kafka集群发布消息。
KafkaProducer本身线程安全,通常使用一个KafkaProducer实例(instance)要比使用多个KafkaProducer实例要快(faster),推荐以单例的形式来用。
以下是发送消息到Kafka集群的官方Doc代码示例:

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

KafkaProducer包含消息缓冲区和一些I/O线程,如果用完KafkaProducer实例后不关闭的话,这些资源就泄露了。如果以单例的形式使用KafkaProducer的话,在程序退出时记得关闭KafkaProducer。
KafkaProducer的 send() 方法是异步的,当调用 send() 的时候它先把消息放在缓冲池中等待发布而它自身则立即执行完毕并返回结果,发布操作是异步操作,这样的话 send() 方法就可以尝试将多个消息攒在一块儿批量发送,提高效率。


看上述官方Doc代码示例,可以对KafkaProducer进行各种配置(Properties ),常用配置介绍如下:


  • acks:控制消息在什么情况下才被认为是 成功发送 的,可以配置的值有4个,分别为"all, -1, 0, 1",解释如下:

Tip: kafka集群存储设计为leader+flower的分区的形式,leader的作用是读/写消息,flower则是容灾用。send()发送消息的确认是由leader统一返回的,leader会根据acks的配置来决定是否等待flower的消息发送确认。

all: 只有leader和一定条件的flower都确定自己成功存储消息了,这次消息发送才被认为是成功的,消息发送比较慢,但是最稳定、最保险,消息丢失几率最小。
-1: 同all。
0: 不等待leader和flower的确认,只要发送了,就认为是成功了,发送消息速度最快,但是消息丢失的几率比较高。
1: 只等待leader的确认,只要leader表示自己收到消息了,就认为是发送成功了,消息发送速度和几率折中,是发送效率和发送稳定性的折中。


  • retries:控制万一 send() 方法消息发送失败,自动重试的次数,设置为 0 ,将禁用自动重试,设置为非 0 的值有可能导致消息重复发送,至于为什么自动重试会导致消息重复发送的原因解释清看官方文档:https://kafka.apache.org/documentation.html#semantics

  • batch.size:控制KafkaProducer缓冲池的大小,默认值:16384 Byte,增大该值会减少网络请求,但是会导致消息发送速度的降低和内存使用的增加(因为对每个活动分区,都会设置一个这么大的缓冲池),个人建议还是不要改动这个值了。

  • linger.ms:(题外:首先吐槽下csdn的这个markdown编辑器,写个linger.ms给自动识别成了链接,怎么都改不掉,智障)控制 send() 方法发送消息的延时,默认值:0,即尝试立即发送。设置为大于0的值,会导致 send() 方法等待一段时间,这段时间内如果有新的消息需要发送,则放在一块儿一起发送,可以减少网络请求数量,但会降低消息发送速度。该配置的作用类似于TCP协议中的 Nagle 算法,具体请自行百度。请注意,即使该配置设置为默认值 0 ,如果一些消息在极其接近的时间内需要发送,KafkaProducer也会将这些消息放在一个网络请求里批量发送,而不是发送多个网络请求,所以在高负载的环境下,KafkaProducer也有较高的性能。个人建议:追求消息发送速度的话,该配置就保持默认值 0 不变吧,追求网络性能的话,就适当的调大该值,比如 3 或者 5 就是一些不错的值,太大的值很导致消息发送速度的严重降低。

  • buffer.memory,max.block.ms:(题外:这里也自动识别成了链接了,智障啊)这两个配置是搭配使用的,buffer.memory 控制KafkaProducer所能使用的所有缓冲池的总的内存大小。如果KafkaProducer发送消息的速度大于Kafka集群处理消息的速度话,KafkaProducer的缓冲池就会慢慢耗尽。缓冲池耗尽之后调用的 send() 方法就会被阻塞,超时时间就是 max.block.ms 控制的时间,超时之后,那些仍被阻塞的 send() 方法就会抛出 TimeoutException 异常。

  • key.serializer,value.serializer:这俩配置不用改,就按照上述实例代码那样写就行了,作用是控制消息对象 ProducerRecord 如何转成 byte 来发送。Kafka客户端包中自带的有俩现成能用的,分别是 ByteArraySerializerStringSerializer ,其中 StringSerializer 就是上述示例代码中的那些配置。

  • enable.idempotence,transactional.id:(题外:这里也自动识别成了链接了,没话说了)这两个配置可以放在一块儿讲。从Kafka客户端包 0.11 版本开始,KafkaProducer支持额外两种模式,幂等模式和事务模式(the idempotent producer and the transactional producer)。幂等模式的作用是提供了 精确只有一次发送(exactly once delivery) 的发送语义,具体请看官方解释 发送语义 的文档:https://kafka.apache.org/documentation.html#semantics。总结来说就是官方Kafka客户端包提供了 3发送语义 ,如下:

At most once—Messages may be lost but are never redelivered. 解释:最多发送一次,消息可能丢失,但是绝对不会重复发送。
At least once—Messages are never lost but may be redelivered. 解释:最少发送一次,消息可能重复发送,但是绝对不会丢失。
Exactly once—this is what people actually want, each message is delivered once and only once. 解释:消息严格精确只发送一次,消息的发送可以成功,也可以失败,但绝不会不会丢失,也不会重复发送。

enable.idempotence 默认值为 false 。设置为 true 的话则启用幂等模式,这时 retries 将会被自动设置为 Integer.MAX_VALUE 并且 acks 将会自动设置为 all 。启用幂等模式的时候不要设置 retries 的值,让它保持默认值就行了,因为幂等模式下会自动设置它的值。幂等模式也不是完全保险的,它只在一个session内起作用,如果KafkaProducer断开了于Kafka集群的链接,然后又重连了,这时候幂等的效果就没了,这种情况下需要自行检测消息是否发送成功以避免消息的重复发送。
事务模式的作用则是允许KafkaProducer以原子操作的方式同时向多个分区和主题(topic)发送消息。这是一种比较复杂的kafka使用方式,估计也不常用。transactional.id 的值为string,没有默认值,如果给它设置了一个值,则KafkaProducer会自动认为启用了事务模式,所以如果不需要事务模式的话这个配置就一定不要管它。如果启用了事务模式,则幂等模式也会自动启用。为了保证事务模式的正常使用,那些参与了事务模式的主题(topic)也应该配置容灾,尤其是 replication.factor 最低需要配置为 3 ,同时这些主题的 min.insync.replicas 也应最低配置为 2 。如果想要正常使用事务模式,KafkaConsumer也需要配置为只读已提交消息。所以事务模式是比较复杂的。所有事务模式相关的API都是同步的,并且在事务失败时会抛出异常,官方Doc示例代码如下:

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

 producer.initTransactions();

 try {
     producer.beginTransaction();
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
     producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
     // We can't recover from these exceptions, so our only option is to close the producer and exit.
     producer.close();
 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     producer.abortTransaction();
 }
 producer.close();

从上述示例代码可以看出,每个KafkaProducer只能同时有一个开启的事务,所有 beginTransaction()commitTransaction() 方法之间的消息都将被当做事务的一部分。当启用了事务模式,也就是设置了 transactional.id 的值的时候,KafkaProducer必须开启事务才能发送消息。从上述代码中,我们可以看出KafkaProducer的事务模式也有一些缺陷,比如不能从某些特定异常中恢复和回滚,所以事务模式还是少用为妙。


原创不易,转帖请注明出处 — ShiZhongqi