kafka原理及其使用
程序员文章站
2024-03-24 18:37:46
...
1.broker
kafka的集群中的实例,由多个服务组成的集群
2.topic
消息分类器,用于存储与分类不同的消息数据
3.partition
每个topic拆分为多个partition,消息在其内部有递增序列存储(0,1,2,3,4...),partition均匀分布在集群中,增加并发能力,可设置过期时间,在linux服务器上就是一个文件夹,命名规则为topic名+有序序号
生产者生产消息时,通过分配策略,将消息打上标号,放入partition
消费者消费消息时,通过id(代表了offset,从哪个位置开始取数据)来取partition中的消息,消费者可以自定义id,每个消费者的id是独立的
消息可靠性,partition仍然保持消息,因此消费者可以重置offset之后重复取消息
持久化:指定时间段持久化
备份高可用
以partition为单位备份,有三个角色,当leader不可用时,则从ISR中选取一个升级为leader。生产者发消息给leader,leader转发给replica,当到达某个数目再回执ack(可配置的列表)给生产者
leader(单个):负责响应消费者,生产消息后进入leader
replica(多个):被动接受leader的数据,但不响应消费者
ISR集合(多个):同步备份集合,当follower中数据与leader完全一致时,则加入该集合
分配算法
将broker(n个)和待分配的partition排序,将第i个partition分配到1%n的broker上,将i的partition的第j个replica分配到(i+j)%n的broker上
4.producer,生产者
参数:topic,往哪个topic插入消息
partition,往哪个partition生产消息,不需要制定,会将消息load balance。一般来说分区数量大于broker数量,可以提高吞吐量,同一个partition的replica分散到不同机器(高可用)
key,分区到不同的partition
message,消息
5.consumer,消费者
有两种模式:队列,发布订阅
当所有consumer的consumer group相同时,系统变成队列模式当每个consumer的consumer group都不相同时,系统变成发布订阅
6.消费者组consumer group
通过消费组(consumer group)处理这两种模式(为了解决容错性的问题),多个消费者组成消费组,共享group.id,消费组对应多个partition,但是一个partition只能被一个消费者消费
如果需要一个分区被多个消费者消费,则消费者必须处于不同的消费组
rebalance触发条件:增加或者删除消费者,broker增加或减少
7.传输方式
kafka将offset保存在zookeeper上,发送完则自加,消费端需要自己维护offset,当消息处理失败,就会重复处理该消息,这种一般是可容忍的(处理日志时)
8.消息可靠性
第一种,不保证
第二种,master发送成功即认为成功
第三章,master和slave都确认
at least once模型,消息最起码被消费者读取一次,当发生重复读取时,要不消费端当做事务解决,会写offset,要不就不在乎
partition ack,区分分区的leader和replica的写入成功的个数
message持久化,kafka顺序写入O(1),减少时间复杂度,保持极高的IO效率
message有效期,会长久保持消息,但是可配置
生产者规则
Producer端的delivery guarantee默认是At least once的。也可以设置Producer异步发送实现At most once。Producer可以用主键幂等性实现Exactly once
Kafka delivery guarantee(message传送保证):(1)At most once消息可能会丢,绝对不会重复传输;(2)At least once 消息绝对不会丢,但是可能会重复传输;(3)Exactly once每条信息肯定会被传输一次且仅传输一次,这是用户想要的。
高吞吐:写入和读取都是顺序读取O(1)
9.与zookeeper关系
zookeeper相当于一个文件系统+监听通知机制,用于处理统一命名服务,状态同步服务,集群服务,分布式配置管理等问题。
kafka broker集群受到zookeeper管理,多个broker一起去注册节点,但只有一个会成功,成为kafka broker controller,其他的叫做follower,当controller宕机了,其他的则会重新选举。
当follower的某个partition挂了,controller会从ISR中选取一个成为leader,当所有的partition都挂了,则设置为-1,等待回复。这些宕机的消息传递都是通过zookeeper实现的。
调控方式
管理broker与consumer的动态增减
触发负载均衡(rebalance),设置kafka的brokercontroller和重平衡consumer group
维护partition的消费者的关系
注册细节
broker注册为broker registry,包含ip和端口以及topic,partition等信息
consumer注册为consumer registry,包含所属group 和订阅的topics
consumer group关联临时的 owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。
10.SpringBoot整合
pom配置
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
properites文件配置
#kafka配置
spring.kafka.bootstrap-servers=47.111.75.113:9092
#producer
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#consumer
spring.kafka.consumer.group-id=cs_kafka_group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.auto-commit-interval=100s
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
生产者代码
采用定时任务发消息
@Service
public class KafkaProducer {
private static Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
@Scheduled(cron = "0/3 * * * * *")
public void send() {
for (int i = 0; i < 5; i++) {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(UUID.randomUUID().toString() + "---" + i);
message.setSendTime(new Date());
logger.info("发送消息 ----->>>>> message = {}", gson.toJson(message));
kafkaTemplate.send("cstopic", gson.toJson(message));
}
}
}
消费者类
采用KafkaListener注解监听消息
@Component
public class KafkaCustomer {
private static Logger logger = LoggerFactory.getLogger(KafkaCustomer.class);
@KafkaListener(topics = {"cstopic"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
logger.info("----------------- record =" + record);
logger.info("------------------ message =" + message);
}
}
}
11.Linux kafka配置
单机版本config/server.properties配置
listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://47.111.75.113:9092
#创建topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cstopic
#生产者发消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic cstopic
#消费者收消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cstopic --from-beginning --group cs_kafka_group
上一篇: CGAL文件格式的转换
下一篇: idea一些常用功能快捷键