欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

(更新)Kafka-可靠的生产者Producer(Java)。

程序员文章站 2022-06-14 14:54:59
...
更新:2017.12.15。

背景。

  • kafka-version:0.9.0.0。
    send消息方式:异步。
    数据量:百M/s 万n/s。
  • 大量数据发送到kafka出现数据丢失,记录Producer传输消息可靠性和性能保证讨论的完成。
  • Kafka 0.9以高的版本,出了一组新的API,用异步方式发送消息,号称性能好过旧的API。生产环境要可靠地传输消息到kafka。
/**异步发送给定的记录,并返回一个最终包含响应信息的future对象。
*   为了实现同步的发送消息,并监控每条消息是否发送成功,需要对每次调用send方法后返回的Future对象调用get方法。
*   get方法的调用会导致当前线程block,直到发送结果(不管是成功还是失败)返回。
*/
Interface Producer<K,V>Future send(ProducerRecord record)

Producer Configs。

阅读官方文件。得到 ‘retries’+’acks’传输可靠性保证解决方案。

acks。

0 1 all
生产者将不会等待服务器的任何确认。记录将立即被添加到套接字缓冲区,并考虑发送。不能保证服务器在这种情况下已经收到了记录,并且重试的配置将不会生效(因为客户机通常不会知道任何故障)。每个记录返回的偏移量总是被设置为-1。 分区的leader将把记录写入本地日志,但不会等待所有follower的全部确认。在这种情况下,如果leader在确认了记录之后和在追随者复制之前故障,该记录将会丢失。 leader将等待完整的同步副本,以确认记录。这保证了只要至少有一个同步副本的副本仍然存在,记录就不会丢失。这是最有力的保证。

retries。

设置大于0的值,producer会重新发送任何未发送完成的记录,而这些记录有可能是暂时的错误(延迟)。未设置max.in.flight.requests.per为1可能会改变记录的顺序,因为如果两个批被发送到一个单独的分区,第一个失败并且重试,但是第二个成功,那么第二个批处理中的记录可能会首先出现。

max.in.flight.requests.per.connection。

producer会在阻塞前向connection对象发送最大未确认记录数。设置大于1时,就有因先发送的记录发送未完成并Retires,造成记录重新排序的可能。

Retries-重复生产消息

解决方案:消息体中添加全局唯一标识,consumers管理“周期内已消费消息主键集合”,实现“一次且一次”

【摘抄】
producer.retries(内部重试机制)能解决的消息发送未完成的异常我们描述为可恢复异常。producer.retries已经能够实现了我们大部分的需求场景。还有一些错误是需要应用进行处理的:

  • 不可恢复异常(比如消息大小非法、权限认证失败);
  • 消息发送到broker之前发生的异常(比如序列化异常);
  • 生产者达到重试上限的异常,或者由于消息重试导致消息堆积最终内存溢出的异常。
    对于这些异常,我们可以记录日志,持久化到数据库或者简单的抛弃。但如果说我们的处理方式仍然为不断重试,那么建议把这样的重试策略下沉到生产者内部重试机制。

不可恢复异常。

消息大小非法。

kafka单条消息过大,生产消息未完成

producer重试上限异常。