笔记 -- kafka
kafka
- 概念
- 架构组成
- 代码示例
- 配置分析
- 发送端配置
- acks
- batch.size
- linger.ms
- compression.type (Producer压缩器)
- max.request.size (请求数据的最大字节数)
- buffer.memory (producer 缓冲区大小)
- retries(失败后重试次数)
- 消费端配置
- Topic(主题)
- Partition(分区)
- Coordinator
- Rebalance(均衡)
- offset 偏移量
- LogSegment (分段)
- 日志清除/压缩
- 零拷贝
- 副本机制(Replication)
- 数据传输的事务定义
- Kafka通信协议
- kafka如何保证数据不丢失?
概念
分布式消息 发布/订阅 系统
高吞吐量、内置分区、冗余及容错性(每秒处理几十万消息)
在kafka集群中,没有中心节点的概念,集群中所有的服务器都是对等的,可以再不做任何配置更改的情况下实现服务器的添加和删除。
架构组成
Producer
将消息push到broker
1.producer将消息封装到producerRecord类实例中。
2.producerRecord类序列化后,再发送到内存缓冲区。
3.由sender线程负责将缓冲区中的消息封装到一个批次中发送给broker。
Broker
Consumer
监听broker,主动从broker pull消息消费。
Consumer.poll() 获取数据消费,但实际是通过发起fetch请求执行,并将从partition获取的数据放在本地缓存。
Consumer.poll() 需要循环调用,如果长时间不出发fetch请求,心跳连接仍在,consumer会被认为处于livelock状态,从而被broker从consumer group中剔除。
并不是每次poll都会发起fetch请求。
原因: 在满足max.partition.fetch.bytes限制的情况下,假如一次fetch请求到了100个record,放到本地缓存后,由于max.poll.records限制每次只能poll出15个record。那么Consumer需要执行7次才能将这100个record消费完毕。
Zookeeper
管理协调Producer、Broker、Consumer的请求
代码示例
KafkaProducerDemo.class
public class KafkaProducerDemo extends Thread{
private final KafkaProducer<Integer,String> producer;
private final String topic;
private final boolean isAysnc;
public KafkaProducerDemo(String topic,boolean isAysnc){
Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"49.235.16.28:9092"); //kafka集群地址
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaConsumerDemo2"); //这是客户端ID
properties.put(ProducerConfig.ACKS_CONFIG,"0"); //0:无需broker确认。1:m节点broker确认即可。All(-1):需要所有节点确认
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerSerializer"); //设置key序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer"); //设置value序列化
producer=new KafkaProducer<Integer, String>(properties);
this.topic=topic;
this.isAysnc=isAysnc;
}
@Override
public void run() {
int num=0;
while(num<10){
String message="message_"+num;
System.out.println("begin send message:"+message);
if(isAysnc){//异步发送
producer.send(new ProducerRecord<Integer, String>(topic, message), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(recordMetadata!=null){
System.out.println("async-offset:"+recordMetadata.offset()+"->partition"+recordMetadata.partition());
}
}
});
}else{//同步发送 future/callable
try {
RecordMetadata recordMetadata = producer.send(new ProducerRecord<Integer, String>(topic,message)).get();
System.out.println("sync-offset:"+recordMetadata.offset()+ "->partition"+recordMetadata.partition());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
num++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new KafkaProducerDemo("test0815",true).start();
}
}
KafkaConsumerDemo.class
public class KafkaConsumerDemo extends Thread {
private final KafkaConsumer kafkaConsumer;
public KafkaConsumerDemo(String topic) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"49.235.16.28:9092"); //kafka集群IP
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo2"); //分组ID
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); //间隔时间
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer"); //key反序列化
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer"); //value反序列
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//earliest 从头开始消费
kafkaConsumer = new KafkaConsumer(properties);
kafkaConsumer.subscribe(Collections.singletonList(topic)); //订阅
}
@Override
public void run() {
while (true) {
ConsumerRecords<Integer, String> consumerRecord = kafkaConsumer.poll(1000);
for (ConsumerRecord record : consumerRecord) {
System.out.println("message receive:" + record.value());
kafkaConsumer.commitAsync();
}
}
}
public static void main(String[] args) {
new KafkaConsumerDemo("test0815").start();
}
}
配置分析
发送端配置
acks
对数据要求很高,用all(-1),其他用1。
0 :写入的消息不需要等待副本确认。
1 :写入的消息被leader副本记录则认为提交成功。
all(-1) :写入的消息需要被复制到ISR的所有副本,才认为提交成功。性能低,最安全,但如果副本只有一个,副本宕机时有可能数据丢失
batch.size
以大小为单位,Producer把发送到同一个分区的消息封装进一个batch,在不考虑linger.ms的情况下,batch满了再统一发送。
batch越小,吞吐量越低。越大,吞吐量越高。默认16KB。
linger.ms
以时间为单位,Producer将时间间隔内的所有请求进行一次聚合,再统一发送。
默认0,表示不做停留,这样会导致大量的小batch被发送出去(导致batch.size不生效),给网络IO带来极大压力。
假如设置linger.ms=5,表示producer的请求延迟5ms发送。
batch.size和linger.ms的作用是一样的,是kafka性能调优的关键参数。如果两个都配置了,只要满足其中一个,Producer就会发送请求。
compression.type (Producer压缩器)
目前支持none(不压缩)、gzip、snappy、lz4(Lz4的效果最好)。
max.request.size (请求数据的最大字节数)
默认1MB, 防止大的数据包影响吞吐量
buffer.memory (producer 缓冲区大小)
指定producer缓存消息的缓冲区大小,默认33554432字节(32M)。
Producer启动时在内存中创建一块缓冲区,用于存放消息,然后由专属线程sender负责从缓冲区拿消息,进行真正的发送。
消息持续发送过程中,当缓冲区填满后,producer立即进入阻塞状态,直到缓冲区的内存释放出来。阻塞时间一旦超过max.block.ms设置的值,producer就会抛异常:TimeoutException。这种情况下就需要调高buffer.memory的值,增大缓冲区。
retries(失败后重试次数)
max.in.flight.requests.per.connection设置如果大于1,重试可能会造成消息乱序。
0.11.1.0版本已经支持“精确到一次语义”,重试不会造成消息重复发送。
消费端配置
Consumer.poll(1000)
Consumer拿到足够多的数据就会返回一个ConsumerRecords,但如果阻塞了 1000ms,哪怕仍没有拿到足够多的数据,也依旧返回。
1000:最长阻塞时间。
bootstrap.servers
设置broker地址:host1:port1;host2:port2…
heartbeat.interval.ms (心跳间隔)
Consumer与coordinator之间的心跳,是为了确认consumer存活、加入或者退出groupd。
这个值必须小于session.timeout.ms,如果心跳间隔时间超过session.timeout.ms,coordinator会认为该consumer退出,并由group内consumer重新rebalance。通常心跳间隔时间(3S)小于session.timeout.ms的1/3.
session.timeout.ms
Consumer session 过期时间。默认10S。
这个值得设置必须在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
max.partition.fetch.bytes
fetch操作时,指定每个分区返回最大字节数,默认1M。
这个值必须比broker能够接受的最大消息的配置(max.message.size)大,否则会引起消费者无法消费数据(出口>入口)。
max.partition.fetch.bytes设置不能过大,会导致consumer消费数据时间过长,没有及时再次poll而会话过期。
groupid
一个组内可以有多个消费者,并共用一个group id
如果topic某个消息被组内某个consumer消费了,那么组内其他consumer不可再消费。
各组之间消费互不影响。
enable.auto.commit (自动提交)
默认值:true
Consumer消费后自动提交offset+1。只有提交后,该消息才不会被再次接收。
可以配合auto.commit.interval.ms 控制自动提交的频率(默认5S),当然也可以consumer.commitSync()的方式实现手动提交
自动提交与手动提交
自动提交其实在很多场景都不适用,因为自动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。
auto.offfset.reset (消费)
latest: 从topic最新的数据开始消费(默认)
earliest:从topic最早的消息开始消
none: 如果offset不存在,则抛异常
max.poll.records
每次调用poll()返回的消息数,减少poll间隔
Topic(主题)
存储消息的逻辑概念,可认为是消息集合。
每个topic可划分多个partition(分区),分区越多,吞吐量越大。
Partition(分区)
partition是一块保存具体数据的空间,本质是磁盘上存放数据的文件夹,
所以partition不能跨Broker,也不能在同一个Broker上跨磁盘。
partition中的每个消息会被分配一个offset(偏移量),它是消息在此partition的唯一编号。
offset只保证同一partition内消息是有序的。
Kafka支持动态添加partition,但不支持删减partition,
因为如果将删减的partition上的数据转移到其他partition上,会破坏其他partition上消息的有序性。
消息由key+value组成,key、value皆可为空。
根据partition规则,broker将收到的消息存储到其中一个partition,类似于将数据做分片处理。
partition分布
单节点
如果topic(firstTopic)有3个partition,那么配置dir路径下(默认:/tmp/kafka-log )有3个目录,firstTopic-0、firstTopic-1、firstTopic-2。
集群
集群中有n个broker,一个topic中的多个partition如何分布在这些broker上?
将partition排序,第i个partition放到(i mod n)个broker上
消息如何写入partition?
消息由key、value组成,key、value皆可为空,那么消息存储在哪个partition中?
方式一:producer自定义分区 (Partitioner 接口)
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.gupaoedu.kafka.MyPartition"); //partition类名全路径
public class MyPartition implements Partitioner {
private Random random = new Random();
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); // 获得分区列表
int partitionNum = 0;
if (key == null) {
partitionNum = random.nextInt(partitionInfos.size()); // key为空,随机分区
} else {
partitionNum = Math.abs((key.hashCode()) % partitionInfos.size()); //hash取模
}
System.out.println("key:" + key + ",value:" + value + "," + partitionNum);
return partitionNum; // 指定发送的分区值
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
方式二:默认分区算法 (Hash取模算法)
Key不为空,默认采用hash取模算法。
key为空,则在”metadata.max.age.ms”时间范围内,随机选一个partition,在默认情况下(10分钟内),数据只会发送到当前partition上。
因为broker - partition的对应关系可能会发生变化,所以10分钟刷新一次。(metadata.class存储了topic/partition和broker的映射关系)
从Partition消费消息
Consumer指定Partition
TopicPartition topicPartition=new TopicPartition(topic,0); //指定0分区
kafkaConsumer.assign(Arrays.asList(topicPartition)); // 可接收多个指定
消息分配策略
range(默认)
在同一topic中,按partition和consumer的数量分配。
缺陷:
订阅多个topic时,分配不均。
roundRobin(轮询)
整合所有topic的partition,按字典排序,最后将partition轮询给各个消费者。
缺陷:
组内consumer订阅不同分区时,分配不均。
组内一consumer宕机,会导致所有分区重新轮询分配,严重浪费资源。
stickyAssignor(粘性)
优势:
相比roundRobin,stickyAssignor更加平均。
组内一consumer宕机,将其分区分配给其他consumer,其他consumer原有的分区保持不动,
消息分配策略的触发条件
- group新增/剔除consumer
- Topic新增Patition
Coordinator
一个broker节点,负责管理consumer group。
Coordinator如何定义?
组内第一个consumer启动后,consumer向kafka集群任意一borker发送一个GroupCoordinatorRequest请求,服务端会返回负载最小的broker节点id,并将此节点定义为coordinator。之后该组内的所有consumer都会和该coordinator协调通信。
如何确定consumer group的coidinator是哪个borker?
consumer group的位移信息写入哪个consumer_offsets_*,那么其分区leader所在的borker就是coordinator。
Rebalance(均衡)
将partition均分给每个consumer的过程就叫Rebalance。
本质上是一组协议,分配策略为的就是rebalance。
Rebalance的过程
Join
consumer向coordinator发送joinGroup请求,coordinator会从consumer中选择一个担任leader角色。并把组成员信息、订阅信息发送给leader。
Sync
leader consumer负责分配消费方案,即哪个consumer消费哪些partition,一旦分配完成,leader会将方案封装进syncGroup请求发送给coordinator,非leader也会发syncGroup请求,只不过内容为null。Coordinator会把收到的分配方案response给各个consumer,这样各个consumer就知道自己该消费哪些partition。
partition分配方案是放在客户端进行的,这样有更好的灵活性。
offset 偏移量
offset存储在哪里?consumer_offsets
kafka默认提供了50个consumer_offsets_*的topic,用于存放consumer group 某一时刻提交的offset信息。
不同groupid用哪个consumer_offsets_ 呢?
计算公式: (“groupid”.hashCode())%50 ;
如果计算结果5,那么当前group的offset信息保存在consumer_offsets_5里面。
LogSegment (分段)
log.segment.bytes=1073741824 //设置分段大小,默认1G
kafka以Segment为单位,将partition进一步细分。从而避免的单个文件数据量过大而导致的操作难问题。
Segment的命名从0000开始,后续文件的命名以上一个Segment文件中最后一条消息的offset值命名。
Segment是一个逻辑概念,对应着partition目录(如log/test-0)下的.index和.log文件。如果partition被分为多个Segment,那么此目录下也会有多个.index和.log文件。
Kafka0.10.1.0之后,对于每个Segment文件新增了.timeindex文件,基于时间戳操作消息。
.timeIndex
文件映射时间戳和对应offset:
.index
文件记录了offset和对应的物理位置:
.index 与 .log 映射关系
Log文件内容分析
keysize :key大小。
compresscodec :压缩编码
payload :消息具体内容
日志清除/压缩
日志的分段存储,方便了kafka进行日志清理。Kafka启动一个后台线程,定期检查是够存在可以删除的消息。
日志清理策略
1.根据消息保留时间
配置 log.retention.hours=168 (默认7天)
2.根据topic存储大小
配置 log.retention.bytes=1073741824(默认1G,不开启)
日志压缩策略
实际场景中,key对应的value值不断变化,并且消费者只关心最新的value,所以kafka会在后台启动线程,定期将相同key合并,只保留最新value。
零拷贝
消费者从kafka服务器获取消息时,服务器先从磁盘读取数据到内存,再将内存的数据通过socket发送给消费者。看似简单的操作实际上有很多步骤。
一次交互的步骤
▪ 操作系统将数据从磁盘读入到内核空间的页缓存。
▪ 应用程序将数据从内核空间读入到用户空间缓存中。
▪ 应用程序将数据写回到内核空间到 socket 缓存中。
▪ 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出。
整个过程4次上下文切换及4次数据复制,其中CPU复制了两次。
零拷贝的优势
将磁盘数据复制到页面缓存中,最后将页面缓存的数据发送到网络中。
发送给不同订阅者时,可以使用同一个页面的缓存,避免了大量复制操作。
如果有10个消费者,传统方式下,数据复制次数4*10=40次。但零拷贝下,数据复制次数1+10=11次。
1:从磁盘复制到页面缓存
10:10个消费者个读了一次页面缓存
副本机制(Replication)
对于单个partition而言,在集群中是单点的。一旦该partition不可用,那么partition中的消息就消费不了了,所以kafka通过副本机制备份。
副本角色
Leader副本
每个Partition有且只有一个副本可以作为leader。
负责所有Producer、Consumer的请求。
Producer提交消息后,复制消息到所有的同步副本。
Follower副本
每个Partition中,除了Leader以外的所有Replica均为Follower副本。
不处理任何来自客户端的请求,只通过Fetch Request拉取Leader replica的数据进行同步。
ISR副本
包含leader副本和所有与leader副本保持同步的follower副本。
OSR副本
由于同步落后而被剔除的副本列表。
AR副本
所有副本集合:ISR + OSR
如何判断follower副本是不是同步副本?
过去10S从M副本获取过消息,并在过去6S与ZK发送过心跳。
副本因子(replication-factor)
决定了副本的个数。
如果副本因子是3,那么包含Leader副本在内,所有副本个数是3。
副本分配策略
多个副本如何分配到不同的broker上?
Partition排序的时候,第i个partition分配到(i mod n)broker上,那么第i个partition的第j个副本分配到(i+j mod n)borker。
如何知道leader副本在哪个borker上?
在zk上查,get /brokers/topics/secondTopic/partitions/1/state,查询结果
leader_epoch:0 表示partition 1 的leader副本在broker0 上。
Isr:当前可用且消息量与leader差不多的partition的副本集,也就是说如果某副本最后一条消息的offset与leader副本最后一条消息的offset之差超过阀值(replica.lag.time.max.ms),那么该副本会被踢出isr。
绿色是leader副本
副本数据同步
首先,写请求先写入leader副本,再同步到follower副本,那么follower副本的数据略少于leader副本是可以容忍的,只要不超过阀值。
当然如果follower副本长时间没有同步数据,会被leader副本踢出,因为当Acks设置为all(-1)时,如果某个follower故障导致HW无法递增,那么消息就无法提交,也就不会有后续的数据写进来。
副本同步机制
Consumer消费数据时,只能消费到HW的位置。HW之后的数据对consumer来说是不可见的。
Acks = 1时,消息被leader副本记录后则提交成功,然后leader副本再将消息同步给follower副本(类似异步复制)。所以leader宕机后,HW~LEO之间的数据可能会丢。
Acks设置为all(-1)时,消息被ISR副本记录后则提交成功(类似同步复制)。所以all的数据安全性是最高的。
副本属性
LEO
日志末端位移(log end offset)
记录了该副本底层日志(.log)中下一条消息的offset。
如果LEO=10,那么该副本保存了10条数据,offset为[0,9]。
HW
水位值
新消息被ISR副本同步后,HW才会移到这条消息的位置。
HW的值小于等于LEO。
副本(replica)都宕机了怎么办?
1.等待ISR中任一副本活过来,选它作为leader
2.选择第一个活过来的副本作为leader(不一定是ISR中的)
数据传输的事务定义
at most once
最多一次,这个和JMS中”非持久化”消息类似.发送一次,无论成败,将不会重发。
消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理.那么此后”未处理”的消息将不能被fetch到,这就是“atmost once”。
at least once
消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功。
消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是“atleast once”,原因offset没有及时的提交给zookeeper,zookeeper恢复正常还是之前offset状态。
exactly once
消息只会发送一次。
kafka中并没有严格的去实现(基于2阶段提交,事务),我们认为这种策略在kafka中是没有必要的。
通常情况下“at-least-once”是我们首选。(相比 at most once而言,重复接收数据总比丢失数据要好)。
Kafka通信协议
Producer、Broker和Consumer之间采用的是一套自行设计基于TCP层的协议,根据业务需求定制。
基本数据类型
定长数据类型
int8,int16,int32和int64,对应到Java中就是byte, short, int和long。
变长数据类型
bytes和string。变长的数据类型由两部分组成,分别是一个有符号整数N(表示内容的长度)和N个字节的内容。其中,N为-1表示内容为null。bytes的长度由int32表示,string的长度由int16表示。
数组
数组由两部分组成,分别是一个由int32类型的数字表示的数组长度N和N个元素。
kafka如何保证数据不丢失?
如果master副本挂了,其他follower副本都是非同步副本,那么在开启unclean.leader.election=true的情况下,非同步副本被选举为master副本,那必然会丢数据。
Producer端的处理
设置参数ACKS
request,timeout.ms:如果网络异常收不到响应,则等待,这里有个配置等待时间 request.timeout.ms:发送消息等待时间。
metadata.fetch.time.out 从kafka 获取元数据的等待时间。
max.block.ms : 配置控制了KafkaProducer.send()并将KafkaProducer.partitionsFor()被阻塞多长时间。由于缓冲区已满或元数据不可用,这些方法可能会被阻塞止。用户提供的序列化序或分区程序中的阻塞将不计入此超时。
retries :重试次数 ,
retry.backoff.ms: 重试直接的等待时间, 默认是100 ms
batch.size: 多个消息发送给同一个分区的时候,生产者会把消息打成一个批,批大小设置过大占内存,过小发送频繁,并且生产者不是必须满批发送,有个等待时间。
linger.ms设置 等待多久批不满则发送。
Consumer端的处理:
auto.offset.reset:没有偏移量可以提交的时候,系统从哪里开始消费。 有两种设置 :earliest 和latest 。
group-id:一个topic被 同一个消费组的不同消费者消费 ,相当于是队列模式。被不同消费组消费相当于是 订阅模式。 一个partition在同一个时刻只有一个consumer instance在消费。
enable.auto.commit:自动提交 ,如果开启了自动提交,那么系统会自动进行提交offset。可能会引起,并未消费掉,就提交了offset.引起数据的丢失。
auto.commit.interval.ms:默认是5秒钟提交一次。
kafka高可用的原因
1.采用操作系统层面的页面缓存来缓存数据。
2.日志顺序写入,并采用零拷贝的方式提升IO性能。
3.Topic分成多个Partition,Partition分成多个logSegment。
4.发送端和消费端都可采用并行的方式生产消费消息。
本文地址:https://blog.csdn.net/weixin_41943050/article/details/107342211
上一篇: php+redis实战留言板(todolist)与互粉功能
下一篇: 阿里云搭建go开发环境