kafka入门到放弃
Kafka学习笔记
- broker在zookeeper目录/brokers/ids创建自己id的临时节点,并订阅该路径
- broker停机、网络分区或者长时间垃圾回收停顿,broker会断开连接,监听broker列表的kafka组件会收到此消息
- 关闭broker也会消失,但是重启后会立即加入集群
- 向kafka生产者和消费者注册拦截器,当异常发送时,把消息发送到第三方存储系统。再丢回原始的topic
控制器
-
集群第一个启动的broker通过在zk创建一个临时节点/controller让自己成为控制器
-
集群只有一个控制器
-
每个节点通过watch /controller目录,让自己成为新的控制器,每个新选出的控制器通过zk的条件递增操作获得一个全新的,数值更大的controller epoch。 其他broker在知道当前controller epoch后,当控制器发出包含较旧的epoch消息就会忽略
包含分区首领的broker离开集群,控制器收到zk的通知,遍历离开集群broker的所有分区,选举分区的新首领(一般是分区副本列表的下一个),并确实谁是跟随者。发送给包含新首领或现有跟随者的broker
broker加入集群,检查broker ID是否包含现有分区的副本。如果有,控制器把变更通知发送给新加入的broker和其他broker,新broker上的副本开始从首领那里复制消息
复制
-
首领副本:partition的leader副本
-
跟随者副本:partition的follower副本
-
首选首领:创建主题时选定的首领就是分区的首选首领。当副本的首领副本非首选首领会触发rebalance,均衡broker的压力
-
同步副本:跟随者副本在replica.lag.time.max.ms的时间范围内请求了最新数据会被认定为是同步的。首领副本挂了的时候候选者只会在同步副本产生。与zk保持心跳
消息
- broker按照请求到达的顺序来处理它们–这种顺序保证让kafka具有了消息队列的特性,同时保证保存的消息也是有序的。
消息格式
- request type
- request version(不同版本的会有不同的处理方式)
- correlation id (一个具有唯一性的数字,用于标识请求消息)
- client id–用于标识发送请求的客户端
消息的处理流程
broker会在它所监听的每个端口运行一个acceptor线程,这个线程会创建一个连接,并发它交给processor线程去处理,processor线程的数量是可配置的。processor线程负责把从客户端获取的请求放入请求队列,然后从响应队列获取响应消息,把他们发送给客户端。请求队列由io线程负责处理。
常见的请求
生产者的请求和消费者或者跟随者副本的请求
元数据请求:客户端通过请求任意broker获取元数据,所有副本首领的信息
acks:生产者发送消息需要多少个broker确认。1表示只要首领副本。 all表示需要所有
zero copy:直接从linux的文件缓存区拷贝到网卡
数据一致性:等待所有同步副本复制了消息,这些消息才能被消费者消费。
分区分配
轮询所有broker的方式,为每个broker分配首领副本,跟随者副本会分配给非首领副本的broker
为broker指定了机架,会按机架的顺序轮询分配
日志文件
一个分区会被分配到一个目录下,这个目录下可能存在多个文件,称为片段。当分区内的消息达到1gb(默认)或者一周的数据时,会新创建一个片段。
当前正在写数据的片段叫做活跃片段,是无法被删除的。
索引
每个片段具有一个索引,运行消费者从指定offset的位置消费。索引可以被删除。kafka会重新生成索引
数据一致性
unclean.leader.election.enable=true
这个参数为true,表示运行非同步的副本称为首领副本。会出现数据不一致的问题。为false,会降低可用性
min.insync.replicas
kafka认为消息被所有同步副本写入才认为已提交。当所有同步副本只有首领副本时,安全性较低。提供这个参数,只有分区的同步副本大于等于这个值时消息才能被写入。否则发生者会收到NotEnoughReplicasException
生产者
在生产者配置asks=1,只要首领副本写入就是成功。这是高性能,但是当首领副本宕机后存在消息丢失。
asks=all加上min.insync.replicas结合,就可以决定在返回确认前至少有多少个副本收到消息。 这是很保险的做法,但也是比较耗时和低性能的。
生产者
生产者发送消息流程图
消费者
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。在再均衡期间,整个群组是不可用的(不消费消息)
一个消费者加入或者离开消费群组,这个群组就会发送再均衡。
通过被指派为群组协调器的broker(不同群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。会在轮询消息或者提交偏移量时发送心跳。长时间没有心跳,协调器会认为消费者已经宕机,执行再均衡。
再均衡触发的情况如下
- 消费者长时间没有心跳(full gc或者被其他夯住了)
- 消费者主动离开群组(应用重启)
- 消费者加入群组(应用重启)
消费者的位移保存在_consumer_offset_topic,为了持久化处理
手动提交或者自动提交offset
分区分配策略
使用RangeAssignor。topic隔离。顺序分配,可能靠前的消费者会被多分配一个
分区
如何选择分区数
根据发送的tps和单条消息的处理耗时,分区数= 发送的tps * 耗时ms / 1000
日志
一个分区对应一个日志,一个日志被拆分为多个日志段,日志段是物理存储结构,日志是逻辑概念。
每个日志段对应一个log文件和两个索引文件。偏移量索引和时间索引文件
-
日志的写入是追下形式的,顺序写。
-
最后一个日志段被称为活跃日志段,不能删除。日志段文件的命名以第一条消息的偏移量 + 0补全的20位数字表示
-
一个分区的所有日志会在topic-分区编号的文件夹下。
-
最新版本的日志格式中,使用批量保存记录,减少日志头的存储占用
-
使用concurrentSkipListMap保存所有日志对象,每个日志分段的baseOffset作为key。
-
分段日志内使用二分查找
-
时间索引日志是先查找到偏移量再找到物理定位。
通过时间索引查找的步骤
步骤1:将targetTimeStamp和每个日志分段中的最大时间戳largestTimeStamp逐一对比,直到找到不小于 targetTimeStamp 的 largestTimeStamp 所对应的日志分段。日志分段中的largestTimeStamp的计算是先查询该日志分段所对应的时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取其值,否则取该日志分段的最近修改时间。
步骤 2:找到相应的日志分段之后,在时间戳索引文件中使用二分查找算法查找到不大于targetTimeStamp的最大索引项,即[1526384718283,28],如此便找到了一个相对偏移量28。
步骤3:在偏移量索引文件中使用二分算法查找到不大于28的最大索引项,即[26,838]。
步骤4:从步骤1中找到日志分段文件中的838的物理位置开始查找不小于targetTimeStamp的消息。
零拷贝
只需要两次拷贝,完成了磁盘的数据发送。使用DMA(direct memory access)实现
正常需要四次拷贝才能完成发送。
时间轮
使用时间轮实现延迟操作。
- 生产延迟:如果生产者配置的acks是全部副本,那么需要等待所有ISR副本同步完毕才返回,或者等待超时时间达到。
- 拉取延迟:follower副本的拉取hold一段时间,超时或者新消息达到后返回。
控制器
-
负责管理整个集群所有分区和副本的状态。
-
负责leader副本的崩溃选举
-
负责分区ISR集合变化,通过各broker更新元数据
-
负责分区数量变化的重分配,并通知各副本
控制器选举
/controller目录为临时节点,各个broker负责监听,当控制器崩溃后,自己尝试称为控制器。
每一次控制器的选举/controller_epoch目录下的数据都会加1。
每个与控制器交互的请求都会判断请求中的controller_epoch是否小于内存中的值,小于则抛弃。
分区首领选举
创建分区和分区上线会进行leader选举。
选举策略是OfflinePartitionLeaderElectionStrategy。按照创建topic时,指定的分区副本顺序。选取第一个为leader副本,并且是在ISR集合中的。
消费者协调器和组协调器
每个消费组对应一个组协调器,一个消费组有一个消费组的leader,默认为第一个加入组的消费者。
消费者加入组再均衡的过程如下**
- 消费者确定它所属的消费组对应的broker,建立与broker的连接,进入第二阶段。(消费位移负责的broker即为组协调器)
- 消费者向协调器发出加入组的消息。组协调器负责选举消费组的leader和选举分区分配策略
- 消费组的leader负责实施分区分配策略,再由组协调器将方案同步给各个消费者
- 心跳阶段。使用心跳来维持与消费组的从属关系