大数据基础之Kafka——Kafka基本简介及基本操作命令
程序员文章站
2022-05-01 10:12:31
...
为什么使用消息中间件(MQ)
- 异步调用 (同步变异步)
- 应用解耦 (提供基于数据的接口层)
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 - 流量削峰 (缓解瞬时高流量压力)
消息中间件的术语
- Broker:消息服务器,提供核心服务
- Producer:消息生产者
- Consumer:消息消费者
- Topic:主题,发布订阅模式下的消息统一汇集地
- Queue:队列,P2P模式下的消息队列
常见的消息中间件
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
- Reids
消息中间件的模式分类
点对点
P2P点对点:使用queue作为通信载体
解析:
消息生产者生产消息发送到queue队列中,然后消息消费者从queue中消费消息
消费者消费消息后,queue中不再存储,所以消息消费者不可能消费到已经被消费过的消息,queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费
发布/订阅模式
Pub/Sub发布订阅(广播):使用topic作为通信载体
解析:
消息生产者将消息发布到Topic中,同时有多个消费者订阅消费该消息。和点对点方式不同,发布到Topic的消息会被所有消费者消费
queue实现了负载均衡,将生产者生产的消息发送到消息队列中,由多个消费者消费,但一个消息只能被一个消费者消费,当没有消费者可用时,这个消息会被保存到直到有一个可用的消费者。
Topic实现了发布和订阅,当你发布一个消息,所有订阅这个Topic的服务都能得到这个消息,所以从1到N个订阅者都能得到一个消息的拷贝。
Kafka简介
- Kafka是一种高吞吐量的分布式发布-订阅消息系统,专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计。
- 快速,单Broker每秒几百MB读取
- 不停机扩展集群
- 消息副本冗余
- 实时数据管道 - 使用Scala
Kafka架构
- Topic:维护一个主题中的消息,可视为消息分类
- Producer:想Kafka主题生产消息
- Consumer:消费主题并处理消息
- Broker:Kafka集群中的服务器
Kafka Topic
- Topic
- 主题是已发布消息的类别名称
- 发布和订阅数据必须指定主题
- 主题副本数量不大于Brokers个数 - Partition
- 一个主题包含多个分区,默认按Key Hash分区
- 每个Partition对应一个文件夹<topic_name>-<partition_id>
- 每个Partition被视为一个有序的日志文件(LogSegment)
- Replication策略是基于Partition,而不是Topic
- 每个Partition都有一个Leader,0或多个Followers
Kafka Message
- header:消息头,固定长度
- offset:唯一确定每条消息在分区内的位置
- CRC32:用crc32校验消息
- “magic”:表示本次发布Kafka服务程序协议版本号
- “attributes”:表示为独立版本,或标识压缩类型、或编码类型 - body:消息体
- key:表示消息键,可选,没有则为null
- value:表示实际消息数据
Kafka Producer
- 生产者将消息写入到Broker
- Producer直接发送消息到Broker上的Leader Partition
- Producer客户端自己控制着消息被推送到哪些Partition
- 随机分配、自定义分区算法等
- Batch推送提高效率
Kafka Broker
- Kafka集群中每个Broker都可以响应Producer的请求
- 每个Broker充当Leader和Follwers保持负载均衡
- Leader处理所有读写请求
- Follwers被动复制Leader
Kafka Consumer
- 消费者通过订阅消费消息
- offset的管理是基于消费组(group.id)的级别
- 每个Partition只能由同一消费组内的一个Consumer来消费
- 每个Consumer可以消费多个分区
- 消费过的数据仍会保留在Kafka中
- 消费者不能超过分区数量 - 消费模式
- 队列:所有消费者在一个消费组内
- 发布/订阅:所有消费者被分配到不同的消费组
Kafka的基本操作命令
- 创建Topic
kafka-topics.sh --create --zookeeper 你的zookeeper的IP:2181 --replication-factor 副本数 --partitions 分区数 --topic 消息队列名
- 检查是否创建成功(查看topic列表)
kafka-topics.sh --zookeeper 你的zookeeper的IP:2181 --list
- 向消息队列中生产消息
kafka-console-producer.sh --topic 消息队列名 --broker-list 你的kafka队列IP:9092
- 消费消息
kafka-console-consumer.sh --bootstrap-server 你的kafka队列IP:9092 --topic 消息队列名
- 查看Topic*有多少条数据
kafka-run-class.sh kafka.tools.GetOffsetShell --topic 消息队列名 --time -1 --broker-list 你的kafka队列IP:9092
- 将消费过的Topic偏移量重置归0
kafka-consumer-groups.sh -bootstrap-server 你的kafka队列IP:9092 --group 你的消费者组名 --reset-offsets -all-topics --to-earliest --execute
- 删除Topic
kafka-topic.sh --delete --zookeeper 你的zookeeper的IP:2181 --topic 消息队列名
上一篇: 大数据技术_Hive_DML
下一篇: php?$_REQUEST