Kafka集群部署指南
一、前言
1、kafka简介
kafka是一个开源的分布式消息引擎/消息中间件,同时kafka也是一个流处理平台。kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了kafka connect、kafka streams以支持连接其他系统的数据(elasticsearch、hadoop等)
kafka最核心的最成熟的还是他的消息引擎,所以kafka大部分应用场景还是用来作为消息队列削峰平谷。另外,kafka也是目前性能最好的消息中间件。
2、kafka架构
在kafka集群(cluster)中,一个kafka节点就是一个broker,消息由topic来承载,可以存储在1个或多个partition中。发布消息的应用为producer、消费消息的应用为consumer,多个consumer可以促成consumer group共同消费一个topic中的消息。
概念/对象 | 简单说明 |
---|---|
broker | kafka节点 |
topic | 主题,用来承载消息 |
partition | 分区,用于主题分片存储 |
producer | 生产者,向主题发布消息的应用 |
consumer | 消费者,从主题订阅消息的应用 |
consumer group | 消费者组,由多个消费者组成 |
3、准备工作
1、kafka服务器
准备3台centos服务器,并配置好静态ip、主机名
服务器名 | ip | 说明 |
---|---|---|
kafka01 | 192.168.88.51 | kafka节点1 |
kafka02 | 192.168.88.52 | kafka节点2 |
kafka03 | 192.168.88.53 | kafka节点3 |
软件版本说明
项 | 说明 |
---|---|
linux server | centos 7 |
kafka | 2.3.0 |
2、zookeeper集群
kakfa集群需要依赖zookeeper存储broker、topic等信息,这里我们部署三台zk
服务器名 | ip | 说明 |
---|---|---|
zk01 | 192.168.88.21 | zookeeper节点 |
zk02 | 192.168.88.22 | zookeeper节点 |
zk03 | 192.168.88.23 | zookeeper节点 |
部署过程参考:
二、部署过程
1、应用&数据目录
#创建应用目录 mkdir /usr/kafka #创建kafka数据目录 mkdir /kafka mkdir /kafka/logs chmod 777 -r /kafka
2、下载&解压
kafka官方下载地址:
这次我下载的是2.3.0版本
#创建并进入下载目录 mkdir /home/downloads cd /home/downloads #下载安装包 wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz #解压到应用目录 tar -zvxf kafka_2.12-2.3.0.tgz -c /usr/kafka
kafka_2.12-2.3.0.tgz 其中2.12是scala编译器的版本,2.3.0才是kafka的版本
3、kafka节点配置
#进入应用目录 cd /usr/kafka/kafka_2.12-2.3.0/ #修改配置文件 vi config/server.properties
通用配置
配置日志目录、指定zookeeper服务器
# a comma separated list of directories under which to store log files log.dirs=/kafka/logs # root directory for all kafka znodes. zookeeper.connect=192.168.88.21:2181,192.168.88.22:2181,192.168.88.23:2181
分节点配置
- kafka01
broker.id=0 #listeners=plaintext://:9092 listeners=plaintext://192.168.88.51:9092
- kafka02
broker.id=1 #listeners=plaintext://:9092 listeners=plaintext://192.168.88.52:9092
- kafka03
broker.id=2 #listeners=plaintext://:9092 listeners=plaintext://192.168.88.53:9092
4、防火墙配置
#开放端口 firewall-cmd --add-port=9092/tcp --permanent #重新加载防火墙配置 firewall-cmd --reload
5、启动kafka
#进入kafka根目录 cd /usr/kafka/kafka_2.12-2.3.0/ #启动 /bin/kafka-server-start.sh config/server.properties & #启动成功输出示例(最后几行) [2019-06-26 21:48:57,183] info kafka commitid: fc1aaa116b661c8a (org.apache.kafka.common.utils.appinfoparser) [2019-06-26 21:48:57,183] info kafka starttimems: 1561531737175 (org.apache.kafka.common.utils.appinfoparser) [2019-06-26 21:48:57,185] info [kafkaserver id=0] started (kafka.server.kafkaserver)
三、kafka测试
1、创建topic
在kafka01(broker)上创建测试tpoic:test-ken-io,这里我们指定了3个副本、1个分区
bin/kafka-topics.sh --create --bootstrap-server 192.168.88.51:9092 --replication-factor 3 --partitions 1 --topic test-ken-io
topic在kafka01上创建后也会同步到集群中另外两个broker:kafka02、kafka03
2、查看topic
我们可以通过命令列出指定broker的
bin/kafka-topics.sh --list --bootstrap-server 192.168.88.52:9092
3、发送消息
这里我们向broker(id=0)的topic=test-ken-io发送消息
bin/kafka-console-producer.sh --broker-list 192.168.88.51:9092 --topic test-ken-io #消息内容 > test by ken.io
4、消费消息
在kafka02上消费broker03的消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning
在kafka03上消费broker02的消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning
然后均能收到消息
test by ken.io
这是因为这两个消费消息的命令是建立了两个不同的consumer
如果我们启动consumer指定consumer group id就可以作为一个消费组协同工,1个消息同时只会被一个consumer消费到
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning --group testgroup_ken bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning --group testgroup_ken
四、备注
1、kafka常用配置项说明
kafka常用broker配置说明:
配置项 | 默认值/示例值 | 说明 |
---|---|---|
broker.id | 0 | broker唯一标识 |
listeners | plaintext://192.168.88.53:9092 | 监听信息,plaintext表示明文传输 |
log.dirs | kafka/logs | kafka数据存放地址,可以填写多个。用","间隔 |
message.max.bytes | message.max.bytes | 单个消息长度限制,单位是字节 |
num.partitions | 1 | 默认分区数 |
log.flush.interval.messages | long.maxvalue | 在数据被写入到硬盘和消费者可用前最大累积的消息的数量 |
log.flush.interval.ms | long.maxvalue | 在数据被写入到硬盘前的最大时间 |
log.flush.scheduler.interval.ms | long.maxvalue | 检查数据是否要写入到硬盘的时间间隔。 |
log.retention.hours | 24 | 控制一个log保留时间,单位:小时 |
zookeeper.connect | 192.168.88.21:2181 | zookeeper服务器地址,多台用","间隔 |
2、附录
本文首发于我的独立博客: