Kafka 2.5.0发布——弃用对Scala2.11的支持
近日kafka发布了最新版本 2.5.0,增加了很多新功能:
下载地址:
-
对tls 1.3的支持(默认为1.2)
-
引入用于 kafka streams 的 co-groups
-
用于 kafka consumer 的增量 rebalance 机制
-
为更好的监控操作增加了新的指标
-
升级zookeeper至 3.5.7
-
取消了对scala 2.1.1的支持
下面详细说明本次更新:
一、新功能
1、kafka streams: add cogroup in the dsl
当多个流聚集在一起以形成单个较大的对象时(例如,购物网站可能具有购物车流,心愿单流和购买流。它们共同构成一个客户),将其在kafka streams dsl中使用非常困难。
通常需要您将所有流分组并聚合到ktables,然后进行多个外部联接调用,最后得到具有所需对象的ktable。这将为每个流和一长串valuejoiners创建一个状态存储,每个新记录都必须经过此连接才能到达最终对象。
创建使用单个状态存储的cogroup 方法将:
-
减少从状态存储获取的数量。对于多个联接,当新值进入任何流时,都会发生连锁反应,联接处理器将继续调用valuegetters,直到我们访问了所有状态存储。
-
性能略有提高。如上所述,所有valuegetters都被调用,还导致所有valuejoiners被调用,从而强制重新计算所有其他流的当前联接值,从而影响性能。
2、add support for tls 1.3
java 11添加了对tls 1.3的支持。添加对java 11的支持后,我们应该对此提供支持。
3、不再支持scala 2.11
为什么不再支持?
我们目前为3个scala版本构建kafka:2.11、2.12和最近发布的2.13。由于我们必须在每个受支持的版本上编译和运行测试,因此从开发和测试的角度来看,这是一笔不小的成本。
scala 2.11.0于2014年4月发布,对2.11.x的支持于2017年11月结束(到发布kafka 2.5时将超过2年)。scala 2.12.0于2016年11月发布,scala 2.13.0于2019年6月发布。基于此,现在该放弃对scala 2.11的支持了,以便我们使测试矩阵易于管理(最近的kafka-trunk-jdk8占用了将近10个小时,它将使用3个scala版本构建并运行单元测试和集成测试。此外,scala 2.12和更高版本还改进了与java 8功能接口的互操作性(scala 2.12中首次引入)。更具体地说,scala 2.12中的lambda可以与java 8代码相同的方式与java 8功能接口一起使用。
在我们的下载页面中,我们推荐自kafka 2.1.0起使用scala 2.12构建的kafka二进制文件。我们切换到scala 2.12作为kafka 2.2.0中源tarball,构建和系统测试的默认scala版本。
二、改进与修复
- 当输入 topic 事务时,kafka streams lag 不为 0
- kafka-streams 可配置内部 topics message.timestamp.type=createtime
- 将 kstream#totable 添加到 streams dsl
- 将 commit/list offsets 选项添加到 adminclient
- 将 voidserde 添加到 serdes
- 改进 sensor retrieval
[kafka-3061] 修复guava依赖问题
[kafka-4203] java生产者默认的最大消息大小不再与broker默认一致
[kafka-5868] kafka消费者reblance时间过长问题
三、其他版本升级至2.5.0指南
如果要从2.1.x之前的版本升级,请参阅以下注释,以了解用于存储偏移量的架构的更改。将inter.broker.protocol.version更改为最新版本后,将无法降级到2.1之前的版本。
在所有broker上更新server.properties并添加以下属性。current_kafka_version指的是您要升级的版本。current_message_format_version是指当前使用的消息格式版本。如果以前覆盖了消息格式版本,则应保留其当前值。或者,如果要从0.11.0.x之前的版本升级,则应将current_message_format_version设置为与current_kafka_version相匹配。
- inter.broker.protocol.version = current_kafka_version(例如0.10.0、0.11.0、1.0、2.0、2.2)。
- log.message.format.version = current_message_format_version
- 如果要从0.11.0.x或更高版本升级,并且尚未覆盖消息格式,则只需要覆盖broker间协议版本。
- inter.broker.protocol.version = current_kafka_version(0.11.0,1.0,1.1,2.0,2.1,2.2,2.3)。
- 一次升级一个broker:关闭broker,更新代码,然后重新启动。完成此操作后,broker将运行最新版本,并且您可以验证集群的行为和性能是否符合预期。如果有任何问题,此时仍可以降级。
- 验证群集的行为和性能后,通过编辑
inter.broker.protocol.version
并将其设置为2.5来提高协议版本 。 - 逐一重新启动broker,以使新协议版本生效。broker开始使用最新协议版本后,将无法再将群集降级到较旧版本。
- 如果您已按照上述说明覆盖了消息格式版本,则需要再次滚动重启以将其升级到最新版本。一旦所有(或大多数)使用者均已升级到0.11.0或更高版本,则在每个broker上将log.message.format.version更改为2.5,然后逐一重新启动它们。请注意,不再维护的较旧的scala客户端不支持0.11中引入的消息格式,因此,为避免转换成本,必须使用较新的java客户端。
2.5.0主要的变化,可能产生的升级影响
- 当
rebalanceprotocol#cooperative
使用时,consumer#poll
仍然可以返回数据,此外,consumer#commitsync
现在可以抛出rebalanceinprogressexception来通知用户此类事件,commitfailedexception
并允许用户完成正在进行的reblance,然后重新尝试为那些仍然拥有的分区提交偏移量。 - 为了提高典型网络环境中的弹性,默认值
zookeeper.session.timeout.ms
已从6s增加到18s,replica.lag.time.max.ms
从10s增加到30s。 -
cogroup()
添加了新的dsl运营商,用于一次将多个流聚合在一起。 - 添加了新的
kstream.totable()
api,可将输入事件流转换为ktable。 - 添加了新的serde类型
void
以表示输入主题中的空键或空值。 - 弃用
useprevioustimeoninvalidtimestamp
并替换为usepartitiontimeoninvalidtimestamp
。 - 通过添加挂起的偏移防护机制和更强大的事务提交一致性检查,改进了一次精确语义,这大大简化了可伸缩的一次精确应用程序的实现。
- 弃用
kafkastreams.store(string, queryablestoretype)
并替换为kafkastreams.store(storequeryparameters)
。 - 不再支持scala 2.11。
- 软件包中的所有scala类
kafka.security.auth
均已弃用。请注意,在2.4.0中已弃用kafka.security.auth.authorizer
和kafka.security.auth.simpleaclauthorizer
。 - 默认情况下,tlsv1和tlsv1.1已被禁用,因为它们具有已知的安全漏洞。现在默认情况下仅启用tlsv1.2。您可以通过在配置选项
ssl.protocol
和中明确启用它们来继续使用tlsv1和tlsv1.1ssl.enabled.protocols
。 - zookeeper已升级到3.5.7,并且如果3.4数据目录中没有快照文件,则zookeeper从3.4.x升级到3.5.7可能会失败。这通常发生在测试升级中,其中zookeeper 3.5.7尝试加载没有创建快照文件的现有3.4数据目录。有关问题请参考:https://issues.apache.org/jira/browse/zookeeper-3056
- zookeeper 3.5.7版支持有或没有客户端证书的tls加密的到zookeeper的连接,并且可以使用其他kafka配置来利用此功能。