欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka原理及其使用

程序员文章站 2024-03-24 18:37:46
...

1.broker

kafka的集群中的实例,由多个服务组成的集群

2.topic

消息分类器,用于存储与分类不同的消息数据

3.partition

每个topic拆分为多个partition,消息在其内部有递增序列存储(0,1,2,3,4...),partition均匀分布在集群中,增加并发能力,可设置过期时间,在linux服务器上就是一个文件夹,命名规则为topic名+有序序号
 kafka原理及其使用
生产者生产消息时,通过分配策略,将消息打上标号,放入partition
消费者消费消息时,通过id(代表了offset,从哪个位置开始取数据)来取partition中的消息,消费者可以自定义id,每个消费者的id是独立的
消息可靠性,partition仍然保持消息,因此消费者可以重置offset之后重复取消息
持久化:指定时间段持久化
 

备份高可用

以partition为单位备份,有三个角色,当leader不可用时,则从ISR中选取一个升级为leader。生产者发消息给leader,leader转发给replica,当到达某个数目再回执ack(可配置的列表)给生产者
leader(单个):负责响应消费者,生产消息后进入leader
replica(多个):被动接受leader的数据,但不响应消费者
ISR集合(多个):同步备份集合,当follower中数据与leader完全一致时,则加入该集合

分配算法

将broker(n个)和待分配的partition排序,将第i个partition分配到1%n的broker上,将i的partition的第j个replica分配到(i+j)%n的broker上
 kafka原理及其使用
 

4.producer,生产者

参数:topic,往哪个topic插入消息
partition,往哪个partition生产消息,不需要制定,会将消息load balance。一般来说分区数量大于broker数量,可以提高吞吐量,同一个partition的replica分散到不同机器(高可用)
key,分区到不同的partition
message,消息
 

5.consumer,消费者

有两种模式:队列,发布订阅
当所有consumer的consumer group相同时,系统变成队列模式当每个consumer的consumer group都不相同时,系统变成发布订阅
 

6.消费者组consumer group

通过消费组(consumer group)处理这两种模式(为了解决容错性的问题),多个消费者组成消费组,共享group.id,消费组对应多个partition,但是一个partition只能被一个消费者消费
如果需要一个分区被多个消费者消费,则消费者必须处于不同的消费组
rebalance触发条件:增加或者删除消费者,broker增加或减少
 

7.传输方式

kafka将offset保存在zookeeper上,发送完则自加,消费端需要自己维护offset,当消息处理失败,就会重复处理该消息,这种一般是可容忍的(处理日志时)
 

8.消息可靠性

第一种,不保证
第二种,master发送成功即认为成功
第三章,master和slave都确认
at least once模型,消息最起码被消费者读取一次,当发生重复读取时,要不消费端当做事务解决,会写offset,要不就不在乎
partition ack,区分分区的leader和replica的写入成功的个数
message持久化,kafka顺序写入O(1),减少时间复杂度,保持极高的IO效率
message有效期,会长久保持消息,但是可配置

生产者规则

Producer端的delivery guarantee默认是At least once的。也可以设置Producer异步发送实现At most once。Producer可以用主键幂等性实现Exactly once
Kafka delivery guarantee(message传送保证):(1)At most once消息可能会丢,绝对不会重复传输;(2)At least once 消息绝对不会丢,但是可能会重复传输;(3)Exactly once每条信息肯定会被传输一次且仅传输一次,这是用户想要的。
高吞吐:写入和读取都是顺序读取O(1)
 
 

9.与zookeeper关系

 kafka原理及其使用
zookeeper相当于一个文件系统+监听通知机制,用于处理统一命名服务,状态同步服务,集群服务,分布式配置管理等问题。
kafka broker集群受到zookeeper管理,多个broker一起去注册节点,但只有一个会成功,成为kafka broker controller,其他的叫做follower,当controller宕机了,其他的则会重新选举。
当follower的某个partition挂了,controller会从ISR中选取一个成为leader,当所有的partition都挂了,则设置为-1,等待回复。这些宕机的消息传递都是通过zookeeper实现的。

调控方式

管理broker与consumer的动态增减
触发负载均衡(rebalance),设置kafka的brokercontroller和重平衡consumer group
维护partition的消费者的关系

注册细节

broker注册为broker registry,包含ip和端口以及topic,partition等信息
consumer注册为consumer registry,包含所属group 和订阅的topics
consumer group关联临时的 owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。
 
 

10.SpringBoot整合

pom配置

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>

properites文件配置

#kafka配置
spring.kafka.bootstrap-servers=47.111.75.113:9092
#producer
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#consumer
spring.kafka.consumer.group-id=cs_kafka_group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.auto-commit-interval=100s
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

生产者代码

采用定时任务发消息

@Service
public class KafkaProducer {
    private static Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    @Scheduled(cron = "0/3 * * * * *")
    public void send() {
        for (int i = 0; i < 5; i++) {
            Message message = new Message();
            message.setId(System.currentTimeMillis());
            message.setMsg(UUID.randomUUID().toString() + "---" + i);
            message.setSendTime(new Date());
            logger.info("发送消息 ----->>>>>  message = {}", gson.toJson(message));
            kafkaTemplate.send("cstopic", gson.toJson(message));
        }
    }
}

消费者类

采用KafkaListener注解监听消息

@Component
public class KafkaCustomer {
    private static Logger logger = LoggerFactory.getLogger(KafkaCustomer.class);

    @KafkaListener(topics = {"cstopic"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            logger.info("----------------- record =" + record);
            logger.info("------------------ message =" + message);
        }

    }
}

11.Linux kafka配置

单机版本config/server.properties配置

listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://47.111.75.113:9092
#创建topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cstopic 

#生产者发消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic cstopic 

#消费者收消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cstopic --from-beginning  --group cs_kafka_group