Kafka
1.Kafka定义
Kafka是一个分布式的基于发布订阅的消息队列,主要用应用于大数据实时处理领域
2.MQ应用场景
- 异步处理, 实现解耦
- 消峰,解决发送消息和处理消息速度不一致问题(缓冲)
3.消息队列的两种模式
-
点对点模式(一对一,消息消费者主动拉取消息,消息收到后删除消息)
消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。 -
发布/订阅模式(一对多,消费者消费数据之后不会清除消息)
- 注意:此处虽然是订阅,但是实际上还是消费者主动拉去的方式,这样消费者他的消费速度是由消费者决定的
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费
- 注意:此处虽然是订阅,但是实际上还是消费者主动拉去的方式,这样消费者他的消费速度是由消费者决定的
4.Kafka架构
Kafka集群中会有多个Broker, 每一个Broker内部在存储消息时是按照不同的Topic去存储的(相当于给消息做分类), Topic消息会存在分区,也就相当于当前的消息存储分散到不同的Broker上提高了并发能力, 而每一个分区为了防止宕机出现设置了Leader和follower进行了备份
消费者是以消费者组出现,一个组里面只能一个消费者消费消费一个分区的数据; 不同组的可以消费同一个分区的数据;一个组的一个消费者能消费不同分区数据(一个主题有多个分区);
zookeeper保存kafka集群信息,并且保存一些消费者的信息(如:消费者消费到第几条数据了,如果消费者宕机了再恢复之后可以继续消费); 注意:0.9版本之前记录在zk,之后又记录会kafka了
5.Kafka快速入门
- Kafka使用Scala语言编写
5.1安装
- 下载kafka安装包官网地址
- 修改Server.properties; 如果有多台kafka直接将整个kafka解压目录复制到其他机器然后修改broker.id
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#kafka运行日志存放的路径,数据也在这个目录
log.dirs=/opt/module/kafka/logs
#配置连接Zookeeper集群地址
zookeeper.connect=zk102:2181,zk103:2181,zk104:2181
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
- 启动
- 启动zk集群
- 启动kafka: bin/kafka-server-start.sh -daemon config/server.properties
- 多台机器每台执行此命令即可,如果需要群起写相应脚本即可
6.kafka常用命令
6.1 kafka-topics.sh
bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
-
固定部分: bin/kafka-topics.sh --zookeeper hadoop102:2181 命令脚本 + zk地址
-
命令部分:
- –list
- –create
- –topic topicName --partitions num --replication-factory num
- –describe
- –topic topicName
- –delete
-
注意事项
- replication-factory num其中的num只能小于等于当前的kafka机器数;因为kafka在存储副本的时候是以目录存储的,入托两台机器你需要三个副本,那么在一台机器中就会存在重名的副本名这样会有问题
6.2kafka-console-producer.sh
kafka-console-producer.sh \ --broker-list hadoop102:9092 --topic first
- 发送消息不是操作的元数据所以不需要链接zk集群,直接链接kafka集群,并制定要发送数据的topic名称
- 消费命令同样,不加参数从最大便宜开始读; 此处注意如果没有使用–bootstrap-server 使用的zk链接那么偏移量信息存储在zk中
kafka-console-consumer.sh \ --bootstrap-server hadoop102:9092 --from-beginning --topic first
7.kafka的log目录
- 因为之前我们配置的log.dir配置到我们的日志目录所以我们看到日志和数据是在一起的,此处我们应该将此参数配置一个新建的数据目录来存储topic相关的数据
- 在该目录中可以看到server.log,在此文件中能够查看到集群启动的日志信息
- 如果创建了一个topic,如myTopic并且分区2副本数2,此目录下可以看到一个文件夹为myTopic-1/ myTopic-2的文件夹用于存储Topic的数据
7.1kafka记录消费者消费偏移量
- 默认一个分区的消费偏移量在新建的topic中用50个分区存, 下图为对first-1分区进行了消费产生的topic记录偏移量的文件;(注意kafka只会建50分区存储所有的消费者消费偏移量)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GZn08U5g-1584410693691)(en-resource://database/895:1)]
8. 存储
8.1 存储时间
- 默认一个队列的数据存储7天,七天之后就不能再用–from-begining消费了
8.2 存储结构
9. 如何删库
- 删除log下面的分区数据
- 删除zk中的除了zookeeper节点的所有数据(还可以到zk配置的数据目录下去删除数据,此处需要停了zk集群)
10.Kafka生产者
10.1生产消息分区原则
- 以下是方法的重载
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0OvUt6xu-1584410693692)(en-resource://database/901:1)]
(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。
10.2可靠性投递
- kafka采用了ack机制,也就是当leader收到消息之后并且完成了全部副本的同步在返回ack
- 问题:如果挂了一台follower怎么处理,此处可以使用ISR(同步副本集合)解决;此时leader只需要把数据在ISR中同步完成就可以ACK了;
- 为什么不使用半数以上同步完成返回ack
- 因为半数以上同步完成才ack,那么n台机器挂了必须有2n+1个副本才能返回ack
10.2.1 ISR(in sync replica set)
- 当Leader挂了从这个集合里面选择一个机器作为leader,加入ISR的条件就是能否快速响应leader的心跳,如果能说明leader和当前follower网络通信没问题,这样也能保证follower中的副本数据是比较多的
Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。
- 为什么不使用消息条数控制(ISR数据存储在ZK)
因为kafka存在批量消息发送,那么此时如果批量消息大于设置的数字,直接所有ISR被剔除,然后很快这些ISR又要被拉回来;所以在0.9版本之后剔除消息个数这个参数
10.2.2 Ack机制选择
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。
所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
- acks参数配置:
-
0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据;
-
1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
-
1(all):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。
-
10.2.3 解决存储数据不一致问题-HW & LEO
- 当刚刚同步完ISR还未ACkleader挂了,这样会有重复数据
- 如果此次一共10条, follower1同步了8条, follower2七条, 如果此时选择了follower2为leader怎么处理数据不一致问题
-
LEO(log end offset)
- 代表当前副本或Leader获取到数据的最后偏移量
-
HW(high watermark)
- 代表用户在我当前副本可见的偏移量; 如果不设置这个值可能我leader已经有10条完整信息用户能读到,但是此时follower没有同步完整leader挂了,这是后数据就不一致了
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-asfRPfqP-1584410693692)(en-resource://database/903:1)]
- 具体解决
(1)follower故障
follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
(2)leader故障
leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
10.2.4 Exactly-one - 解决ack:-1重复问题
- 方案: 幂等性 + ack:-1 = Exactly-one(精准一次)
- 使用的就是用key来去重,每个消息有***,具体实现如下
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZS2bgoEo-1584410693693)(en-resource://database/905:1)]
11.kafka消费者
11.1消费方式
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
11.2消费者组的分区分配策略
- 消费的偏移量是根据
假如现在一个消费者组消费了两个topic,现在两个topic有6个分区
-
轮询方式
轮询会把这两个topic看成一个整体,然后在吧所有分区排序轮询分配, -
轮询问题
加入一个组两个消费者,A消费topic1 B消费topic2,那么此时轮询把两给主题看为一个整体这样过去使用就不行了; 所以在使用轮询方式的时候应该避免一个组内不同消费者消费的不同队列 -
Range分配
Range根据一个主题的的分区多少来平均分配给消费者,如果不同组的消费者会冉钊不同组进行
kafka事务
- 解决了精准一次性不能解决跨分区夸回话的问题, 0.11+版本新增功能
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-W2QbNFFp-1584410693694)(en-resource://database/907:1)]
kafka发送数据流程
- 可以自定义分区器,拦截器,序列化器
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gjW2JEpJ-1584410693694)(en-resource://database/909:1)]
- kafka pom
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
public class CustomProducer {
public static void main(String[] args) throws
ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list
props.put("acks", "all");
props.put("retries", 1);//重试次数
props.put("batch.size", 16384);//批次大小
props.put("linger.ms", 1);//等待时间
props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new
KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String,
String>("first", Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
}
kafka中配置记忆
- 在ProducerConfig中包含了所有配置的定义
- 在ConsumerConfig中包含了所有配置的定义
下一篇: activemq源码笔记