(更新)Kafka-可靠的生产者Producer(Java)。
程序员文章站
2022-06-14 14:54:59
...
更新:2017.12.15。
背景。
- kafka-version:0.9.0.0。
send消息方式:异步。
数据量:百M/s 万n/s。 - 大量数据发送到kafka出现数据丢失,记录Producer传输消息可靠性和性能保证讨论的完成。
- Kafka 0.9以高的版本,出了一组新的API,用异步方式发送消息,号称性能好过旧的API。生产环境要可靠地传输消息到kafka。
/**异步发送给定的记录,并返回一个最终包含响应信息的future对象。
* 为了实现同步的发送消息,并监控每条消息是否发送成功,需要对每次调用send方法后返回的Future对象调用get方法。
* get方法的调用会导致当前线程block,直到发送结果(不管是成功还是失败)返回。
*/
Interface Producer<K,V>Future send(ProducerRecord record)
Producer Configs。
阅读官方文件。得到 ‘retries’+’acks’传输可靠性保证解决方案。
acks。
0 | 1 | all |
---|---|---|
生产者将不会等待服务器的任何确认。记录将立即被添加到套接字缓冲区,并考虑发送。不能保证服务器在这种情况下已经收到了记录,并且重试的配置将不会生效(因为客户机通常不会知道任何故障)。每个记录返回的偏移量总是被设置为-1。 | 分区的leader将把记录写入本地日志,但不会等待所有follower的全部确认。在这种情况下,如果leader在确认了记录之后和在追随者复制之前故障,该记录将会丢失。 | leader将等待完整的同步副本,以确认记录。这保证了只要至少有一个同步副本的副本仍然存在,记录就不会丢失。这是最有力的保证。 |
retries。
设置大于0的值,producer会重新发送任何未发送完成的记录,而这些记录有可能是暂时的错误(延迟)。未设置max.in.flight.requests.per为1可能会改变记录的顺序,因为如果两个批被发送到一个单独的分区,第一个失败并且重试,但是第二个成功,那么第二个批处理中的记录可能会首先出现。
max.in.flight.requests.per.connection。
producer会在阻塞前向connection对象发送最大未确认记录数。设置大于1时,就有因先发送的记录发送未完成并Retires,造成记录重新排序的可能。
Retries-重复生产消息
解决方案:消息体中添加全局唯一标识,consumers管理“周期内已消费消息主键集合”,实现“一次且一次”。
- see this blog post。
- see kafka系列三-向kafka发消息。
【摘抄】
producer.retries(内部重试机制)能解决的消息发送未完成的异常我们描述为可恢复异常。producer.retries已经能够实现了我们大部分的需求场景。还有一些错误是需要应用进行处理的:
- 不可恢复异常(比如消息大小非法、权限认证失败);
- 消息发送到broker之前发生的异常(比如序列化异常);
- 生产者达到重试上限的异常,或者由于消息重试导致消息堆积最终内存溢出的异常。
对于这些异常,我们可以记录日志,持久化到数据库或者简单的抛弃。但如果说我们的处理方式仍然为不断重试,那么建议把这样的重试策略下沉到生产者内部重试机制。
不可恢复异常。
消息大小非法。
producer重试上限异常。
?
上一篇: springmvc图片上传、jquery 图片上传预览
下一篇: canvas游戏之贪食蛇