【笔记】kafka权威指南-常用配置和要点记录
Kafka 的应用场景
-
消息队列
Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。
-
行为跟踪和日志收集。
敏感操作和日志,都可以写到 kafka 里进行统一,分情况的监控、和分析
-
流式处理,比如,kafka 和 storm 的对接,来实现流式处理
还有其他的需要面向业务需求和场景来具体分析,主要就是利用 kafka 应对高并发的能力,还有实时处理的快速响应,数据实时落地磁盘的可靠性。
名词定义
-
主题和分区:kafka消息通过主题进行分类。一个主题可以设置分为若干个分区,一个分区就是一个提交日志。
一个主题包含几个分区,所以无法在整个主题范围内保证消息有序,但可以保证消息在单个分区内的顺序。
-
一个独立的 kafka 服务器被称为 broker。
broker 是集群的组成部分,每个集群都有一个 broker 充当管理角色。
-
多集群:随着 kafka 集群数量的增加最好使用多集群。
- 数据类型分离
- 安全需求隔离
- 多数据中心(容灾)
- MirrorMaker:kafka提供的一个用于在集群之间进行消息复制的工具。
常用配置
- Spring @KafkaListener max.poll.records 默认 int 最大值 ,所以最好设置一下,否则积压数据的情况下,一次拉取过多,处理时间长会导致超时,连接断开、offset提交失败、重复消费。
- broker.id 整数 broker 间唯一
-
zookeeper.connect zookeeper 配置,格式: hostname:prot/path;…
最好指定 path 这是在 zookeeper 存储数据节点的根目录
-
log.dirs 消息存储位置, ‘,’ 分割多个本地FS路径
broker 在增加分区的时候会优先选择分区数少的路径,而不考虑占用的磁盘大小
num.recovery.threads.per.data.dir 处理日志片段的线程数
# 三种情况会用到
1. 启动时检查每个日志片段
2. 关闭时关闭日志片段
3. 崩溃后重启,检查和截短日志片段
# 因为只在启动和关闭时用以并行操作,所以可以调大一点
# 如果设置为8个,并且log.dirs 配置了三个路径,则会启动 3*8 = 24个线程。
- num.partitions 默认创建分区数量
- log.retention.ms 信息保留时间,默认 168 小时,以片段的最后修改时间计算。若存在 log.retention.minutes 则以最小的为准。
-
log.retention.bytes 每个分区的最大数据量 ,若设置为 1G 则三个分区的topic最多保存3G的数据量。
保存时间和数据量以先到为准,即只要达到一个条件就删除。
-
log.segment.bytes 每个日志片段的大小
如果太小则会频繁关闭文件,降低效率,一个片段的所有数据都达到超时时间才会删除整个片段,所以太大 则管理的粒度太粗,同时也会降低使用时间戳获取偏移量的精确度。
-
log.segment.ms 指定多长时间后关闭日志片段
与上面的 byte 参数依旧不冲突,先达到哪个,就先关闭。but 这个参数可能引起很多小的(达不到bytes 上限的) segment 在达到ms数时,同时关闭。会影响磁盘性能。
message.max.bytes 单个消息大小,默认 1 000 000 就是大约1M。
硬件
- 磁盘吞吐量 在broker 和 partition 数量不变的前提下,生产者客户端的性能直接受到服务端磁盘吞吐量的影响。
- 内存 磁盘性能影响生产者,而内存影响消费者。
- 网络和磁盘容量
生产者
#### 部分参数
1. bootstrap.servers 不需要指定所有的但至少两个以上,以防其中一个宕机。客户端会自己去borker上获取服务端列表。
2. key/value.serializer 序列化工具
3. acks 指定多少个分区副本收到消息才认为成功。(0,1,all) 0 则不等待任何回复。
4. buffer.memory 缓冲要发送到服务器的消息。若服务器写入速度不够,缓冲区大于改配置则send方法会别阻塞,超过 max.block.ms 则会抛出异常。
5. compression.type 压缩方式,默认不压缩
snappy,gzip,lz4 相对来说 snappy 占用cpu较小,压缩率可观。gzip 会占用更多cpu 同时也会提供更高的压缩率。
-
retries 收到临时性错误的重试次数,重试等待时间默认100ms 除非配置 retry.backoff.ms
对于找不到 leader 等错误会进行重试,比如消息太大则无法通过重试解决,需要在逻辑中手动处理。
batch.size 多条发向同一个分区的消息会合并为一个批次,该参数指定批次大小。生产者并不会等他填满才会发送,所以不用担心该参数过大会造成延迟。
- linger.ms 批次发送的等待时间,与上一条件不冲突,到达一个限制就发送,默认为 0 不延时。
- max.in.flight.requests.per.connection 指定了生产者在收到服务器响应前可以再发送多少条消息。过大会占用内存,设为1 可以在发生重试的情况下也能保证顺序写入(当前生产者下)。但会严重影响生产者性能。
- timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与ack机制匹配,指定时间内没有收到副本确认,就会返回错误。
- reuqest.timeout.ms 生产者在发送数据时等待服务器返回响应的时间。
- metadata.fetch.timeout.ms 生产者在获取元数据(比如目标分区首领是谁)时等待服务器响应的时间。
-
max.request.size 控制生产者发送的请求大小(一个批次)。
borker 对大小也有限制, message.max.bytes 单条消息超过该大小会被broker拒绝。
receive.buffer.bytes 和 send.buffer.bytes TCP socket 接受和发送数据包的缓冲区大小 -1 则使用操作系统默认值。一般跨数据中心网络延迟较高可以适当增大。
发送方式
- 只发送而不接收消息 send(record)
- 同步发送 send(record).get()
- 异步发送 send(record,new Callback(){…})
关于序列化器
不建议自己定义,建议使用通用序列化器,除了String 等还有 avro 、 JSON 。。
record 的组成
- Topic
- [Partition]
- [key]
- value
分区
- key 有两个作用
- 作为消息的附加信息
- 决定消息被写道主题的那个分区
- 如果 key 为null 则默认使用轮询算法将消息均衡分布到各个分区上
- 如果key不为空并且使用了默认分区器 则 会 hash 分布,(使用kafka自己的hash算法,即使升级java版本,并不会影响hash值)
- 如果使用 key 来映射分区,则在创建主题时就设计好分区,不要再增加新分区,因为如果增加了,则会导致hash值改变,数据写入不同的分区。
- 重写 Partitioner 自己实现分区算法。
- key 只能是字符串
消费者
- 每个 group 里的多个消费者共同消费同一主题时,会各自消费一部分
- 具体来说,每个同一个 group 内的多个消费者会分配有一个或多个分区,各自负责消费各自负责的分区内的消息
- 所以,如果组内消费者数量大于分区数量,则对于增加吞吐量是无益的,因为多出来的消费者是不会拥有分区,就会闲置,也就不会有消费的
- 当消费者的消费频次大于 session.timeout.ms 时就会认为该消费者下线,触发事件为每个消费者重新分配分区,这个行为称为-再均衡
- 再均衡 分区所有权从一个消费者转移到另一个消费者的行为,在再均衡期间,消费者无法读取消息,整个群组短时间内不可用。
- 群组协调器 每个群组会有一个指定 broker 作为群组协调器(不同的群组可以不同),消费者通过向协调器发送心跳来维持从属关系和分区所有权(消费者会在轮询消息和提交偏移量的时候来发送心跳)。
常用方法
- consumer.subscribe(**); 订阅主题,可以是list或String或正则
- poll(ms) 轮询,参数 阻塞时间,返回 ConsumerRecords ,在第一次调用 poll 方法时,他会负责查找 GroupCoordinator 然后加入群组,接受分配的分区。心跳也是在轮询里发出去的,所以要确保轮询中处理的工作要尽快完成。
参数
- fetch.min.bytes 消费者从服务器获取记录的最小字节数。broker在收到消费者数据请求时,如果可用的数据量小于 此处指定的大小,那么就会等待有足够数据后,才会返回给消费者。把该属性值设置大一点可以降低broker的工作负载。
- fetch.max.wait.ms 用于指定上一配置中数据量不够需要等待的最大时长,默认500ms 如果没有足够的数据流入则会导致500ms的延迟。
- max.partition.fetch.bytes 服务器从每个分区里返回给消费者的最大字节数 默认1M ,就是在未消费数据充足的情况下,一次 kafkaConsumer.pull() 得到的数据 = 1M * 该消费者拥有的分区数。
- session.timeout.ms 消费者可以与服务器断开连接的时间,超时没有提交心跳信息则认为死亡,出发再均衡,将他的分区分配给其他消费者,默认 3s
- heartbeat.interval.ms 指定了poll方法向协调者发送心跳的频率,必须比 session.timeout.ms 小,每次poll方法时会检查是否需要发送心跳信息
- auto.offset.reset 指定了消费者在读取一个没有偏移量的分区或偏移量无效时该如何处理,默认 latest 从最新开始读取,还可以设置 earliest 从分区起始位置开始
- enable.auto.commit 默认是true ,设为false则需要手动提交 offset
- auto.commit.interval.ms 控制自动提交 offset 频率
- partition.assignment.strategy 指定 PartitionAssignor 分配分区的策略
- org.apache.kafka.clients.consumer.RangeAssignor 默认,连续分配,假设两个消费者 三个分区,则 第一个[0,1] 第二个消费者 2
- **.RoundRobinAssignor 轮询策略 依然假设三个topic 2个消费者,消费者1[0,2] 消费者2[1]
- 也可以指定自己类来自己实现策略
- client.id 任意字符串,在日志、度量标志和配额里标记客户端
- max.poll.records 控制单次请求 能够返回的记录数量,可以控制轮询里需要处理的数据量
- receive.buffer.bytes 和 send.buffer.bytes 读写数据时 TCP 缓冲区大小。-1 则使用系统默认大小。
request.timeout.ms 必须必 session.timeout.ms 、 fetch.max.wait.ms 数值大
提交偏移量
- consumer.commitSync() 同步提交
-
consumer.commitAsync() 异步提交
- 一般在轮询中 异步提交 而 try-catch-finally{ 同步提交 }
- 无参提交是将整个批次的数据都提交,可以传入一个 Map
再均衡监听器 ConsumerRebalanceListener
可以实现接口的两个方法传入 消费者配置中,实现监听,当触发在均衡时,会会分别在在均衡开始前和重新分配后调用对应方法。
特定偏移量开始处理
- seekToBeginning(Collection) 从起始位置处理
- seekToEnd(Collection) 从分区结束为止开始处理
-
seek 特定位置处理
consumer.seek(partition,offsetOfThisPartition);
退出
- 在另一个线程中调用 consumer.wakeup() 会在下一次 poll() 的时候抛出WakeupException异常,不需要处理,只是退出的一种方式
- wakeup() 是唯一一个可以在其他线程中安全调用的方法
- 在退出(无论是正常退出还是catch到异常finally时)之前最好 调用 consumer.close() 来通知协调器 再均衡,而不是等超时。
独立消费者-没有群组的消费者
当只需要消费几个或全部分区的时候,并不需要群组和再均衡,则就可以使用简单的没有群组的消费者来直接消费,需要指定要消费的partition。
points
- 日志目录配置在 log4j.properties
上一篇: Drawable
下一篇: kafka2.3集群搭建