欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka-高性能的分布式消息队列

程序员文章站 2022-07-14 12:34:36
...


前言

本文是本人在之前的实际开发中使用kafka中的总结,相关参考的优秀文章会附在总结的最后

一 简介

Kafka是一个分布式,支持分区(partition),多副本(replica),基于zookeeper协调的高性能的分布式消息系统。 这句话基本上是涵盖了kafka全部的特性特征

二 架构

附一张kakfa的架构图(该架构图片来源于:51CTO博客
Kafka-高性能的分布式消息队列
在一套完备的kafka架构中,是存在着多个Producer,多个Broker,多个Consumer,每个Producer可以对应多个Topic,每个Consumer只能对应一个ConsumerGroup。

1 zookeeper在kafka集群中的作用

kafka需要依赖于zookeeper进行集群节点的管理,以及在consumer group 发生变化时进行rebalance。
kafka对应zookeeper的数据存储结构如下:
Kafka-高性能的分布式消息队列
上面的kafka01并不是zookeeper的节点。可以通过zookeeper的get / 进行查看
Kafka-高性能的分布式消息队列

1.1 Broker注册

一个Broker是一个kafka的部署节点,Broker是分布式部署并且相互依赖,每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点
如/brokers/ids/[0…N]。
该节点存储着当前注册broker的基本信息:
Kafka-高性能的分布式消息队列
kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker Id进行注册,其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。

1.2 Topic注册

在kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上并将其分配在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录,如:
/borkers/topicsKafka-高性能的分布式消息队列
Kafka中每个Topic都会以/brokers/topics/[topic]的形式被记录,如/brokers/topics/login和/brokers/topics/search等。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册自己的Broker ID并写入针对该Topic的分区总数,如/brokers/topics/login/3->2,这个节点表示Broker ID为3的一个Broker服务器,对于"login"这个Topic的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。
Kafka-高性能的分布式消息队列

1.3 生产者负载均衡

由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。

  • (1)四层负载均衡,根据生产者的IP地址和端口来为其确定一个相关联的Broker。通常,一个生产者只会对应单个Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的TCP连接,只需要和Broker维护单个TCP连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差异巨大,同时,生产者也无法实时感知到Broker的新增和删除。
  • (2) 使用Zookeeper进行负载均衡,由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制。
1.4 消费者负载均衡

与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。

1.5 分区与消费者的关系

消费组 (Consumer Group):
consumer group 下有多个Consumer(消费者)
对于每个消费组(consumer group),kafka都会为其分配一个唯一的全局的Group ID,Group 内部的所有消费者共享该 ID订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)
同时,Kafka为每个消费者分配一个Consumer ID,通常采用"Hostname:UUID"形式表示。
在kafka中,规定每个消费分区(partition)只能被同组的一个消费者进行消费, ,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一个 消息分区 的标识,节点内容就是该 消息分区 上 消费者的Consumer ID。

1.6 消息消费进度Offset 记录

在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是Offset的值。

1.7 消费者注册

消费者服务器在初始化启动时加入消费者分组的步骤如下

注册到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。

消费者分组 中的 消费者 的变化注册监听。每个 消费者 都需要关注所属 消费者分组 中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。

对Broker服务器变化注册监听。消费者需要对/broker/ids/[0-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。

进行消费者负载均衡。为了让同一个Topic下不同分区的消息尽量均衡地被多个 消费者 消费而进行 消费者 与 消息 分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会发出消费者负载均衡。

2 kafka如何保证数据的可靠性和一致性

Kafka的ack机制,指的是producer的消息发送确认机制,这直接影响到Kafka集群的吞吐量和消息可靠性。而吞吐量和可靠性就像硬币的两面,两者不可兼得,只能平衡。
request.required.acks有三个只0,1,-1,

  • 0:生产者不会等待broker的ack,这个延迟最低但是存储的保证最弱当server挂掉的时候就会丢数据,简单的说,producer发送一次就不再发送了,不管是否发送成功
  • -1:简单地说producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功
  • 1:简单来说就是,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。这里有一个地方需要注意,这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。数据丢失,如果其他的follow副本在拉取之前的follow副本的时候不行
3 kafka的数据丢失问题

常见的数据丢失的解决方案:
1,消费端的数据丢失:

  • (1)关闭掉消费段自动提交offset,设置处理完消息之后手动提交,这个还是会出现消费端的数据丢失
  • (2)自己还没有处理完,系统程序就挂掉了,保证自己程序的幂等性
    2,kafka本身弄丢了数据:
    丢失场景:这块比较常见的一个场景,就是kafka某个broker宕机,然后重新选举partiton的leader时。要是此时其他的follower刚好还有些数据没有同步,结果此时leader挂了,然后选举某个follower成leader之后,他不就少了一些数据?这就丢了一些数据。
  • 1,设置topic的replication.factor参数,大于1,要求每个partition的副本大于1
  • 2,在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower。
  • 3,设置ack为-1或者是all
  • 4,设置无限重试的次数,
    3,生产端的数据丢失问题,
    设置ack为all
4 kafka的ISR机制

问题如下:kafka为了保证数据的一致性使用了ISR机制,isr 的全称是:In-Sync Replicas isr 是一个副本的列表,里面存储的都是能跟leader 数据一致的副本,确定一个副本在isr列表中,有2个判断条件

  • 条件1:根据副本和leader 的交互时间差,如果大于某个时间差 就认定这个副本不行了,就把此副本从isr 中剔除,此时间差根据配置参数rerplica.lag.time.max.ms=10000 决定 单位ms
  • 条件2:根据leader 和副本的信息条数差值决定是否从isr 中剔除此副本,此信息条数差值根据配置参数rerplica.lag.max.messages=4000 决定 单位ms

三 kafka常见命令

1,查看kafka topic列表,使用 --list

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

2,查看kafka特定topic的详情,使用–topic与–describe参数

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic lx_test_topic --describe
Topic:lx_test_topic     PartitionCount:1        ReplicationFactor:1     Configs:
Topic: lx_test_topic    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
列出了lx_test_topic的parition数量、replica因子以及每个partition的leader、replica信息

3,查看指定topic中的实时推送过来的数据

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic  RT.DPC.STATION.DI

4,创建指定topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
replication-factor:表示副本个数
partitions:表示分区个数

5,创建生产者

bin/kafka-console-producer.sh --broker-list localhost:9192 --topic test --producer.config config/producer.properties
producer.properties:表示对应生产者的相关参数配置

6,创建消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
from-beginning:表示从对应的分区的offset0开始消费数据
consumer.properties:消费者对应的相关参数配置

7,topic分区扩容

./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 4 --topic zhoucg_topic

8,topic副本修改

1,根据topic的分区的分区情况修改partitions-topic.json信息
{
        "partitions":
                [
                {
                        "topic": "test1",
                        "partition": 0,
                        "replicas": [1,2]
                },
                {
                        "topic": "test1",
                        "partition": 1,
                        "replicas": [0,3]
                },
                {
                        "topic": "test1",
                        "partition": 2,
                        "replicas": [4,5]
                }
                ],
        "version":1
}
2,执行副本迁移
../bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --reassignment-json-file partitions-topic.json --execute 
3,查看迁移情况
../bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --reassignment-json-file partitions-topic.json --verify
Status of partition reassignment:
Reassignment of partition [mx_prd_nginx_access,0] is still in progress
Reassignment of partition [mx_prd_nginx_access,1] completed successfully
Reassignment of partition [mx_prd_nginx_access,2] is still in progress

四 java操作kafka

kafka生产者:

/**
 * Created by zhoucg on 2019-02-22.
 * kafka服务端
 * 将消息发送到kafka的指定topic下
 */
public class KafkaProducerOps{

    public static void main(String[] args) throws Exception{


        Properties properties = new Properties();

        InputStream inputStream = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
        properties.load(inputStream);

        Producer<String,String> producer = new KafkaProducer<>(properties);
        String topic = "hadoop";
        String key = "1";
        String value = "今天是个好天气";

        /**
         * kafka同步发送
         */
        ProducerRecord<String,String> producerRecord = new ProducerRecord<>(topic,key,value);
        producer.send(producerRecord);
        producer.close();

        /**
         * kafka异步发送
         */
        producer.send(producerRecord,(metadata,exception) -> {
            System.out.println("kafka异步调用报错");
        });
    }
}

producer.properties配置:

############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=10.1.156.18:9192

# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none

# name of the partitioner class for partitioning events; default partition spreads data randomly
# partitioner.class=

# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=

# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=

# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=

# the maximum size of a request in bytes
#max.request.size=

# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=

# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=

#####设置自定义的topic
producer.topic=hadoop

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

消费者:

/**
 * Created by zhoucg on 2019-02-22.
 * kafka消费端,
 */
public class KafkaConsumerOps {


    public static void main(String[] args) throws Exception{

        Properties properties = new Properties();
        InputStream in = KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties");
        properties.load(in);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        /**
         * 消费者订阅topic
         */
        Collection<String> topics = Arrays.asList("hadoop");
        consumer.subscribe(topics);
        while (true) {
            // 接下来就要从topic中拉取数据
            ConsumerRecords<String, String> records = consumer.poll(1000);
            if(!records.isEmpty()) {
                for(ConsumerRecord<String, String> record : records) {
                    String msg = record.value();
                    String recordTopic = record.topic();
                    long offset = record.offset();
                    int partition = record.partition();
                    String key = record.key();
                    System.out.format("%d\t%d\t%s\t%s\n", offset, partition, key, msg);
                }
                consumer.commitSync();
            }
        }
    }
}

consumer.properties配置:

# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect= 10.1.152.212:2181

bootstrap.servers=10.1.152.212:9192,10.1.156.17:9192

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#consumer group id
group.id=test-consumer-group

#consumer timeout
#consumer.timeout.ms=5000

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

五 参考

https://zhuanlan.zhihu.com/p/79579389
https://www.jianshu.com/p/a036405f989c
https://blog.csdn.net/zhaoyachao123/article/details/89527233