欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Linux 下 Kafka Cluster 搭建

程序员文章站 2022-07-13 10:49:46
...
概述

http://kafka.apachecn.org/quickstart.html
1.高吞吐量、低延迟:Kafka每秒可以处理几十万条消息,他的延迟最低只有几毫秒
2.每个topic可以分多个partition,consumer group 对partition进行consume操作
3,可扩展性:kafka集群支持热扩展
4.持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
5.容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
6.高并发:支持数千个客户端同时读写

Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群
Topic:一类消息,消息存放的目录即主题,对消息进行划分唯一的逻辑单元
message:Kafka中最基本的传递对象
Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
Segment:partition物理上有多个segment组成,每个segment存着message信息

Kafka存储策略:
1.kafka以topic来进行消息管理,每个topic包含多个partition,每个partition对应一个逻辑log,有多个segment组成
2,每个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可以直接定位到消息的存储位置,避免id到位置的额外映射


-------------------------------------------------------------------------------------------

服务器:

192.101.11.162: zookeeper0  kafka0
192.101.11.163: zookeeper1  kafka1
192.101.11.164: zookeeper2  kafka2

一.准备工作

1.下载、安装JDK1.8,Zookeeper,kafka
下载URL:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
jdk-8u251-linux-x64.tar.gz

下载URL:https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
apache-zookeeper-3.6.1-bin.tar.gz

下载URL:http://kafka.apache.org/downloads
kafka_2.12-2.5.0.tgz

3.解压JDK1.8,Zookeeper,kafka
#tar -zxvf jdk-8u251-linux-x64.tar.gz
#tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
#tar -zxvf kafka_2.12-2.5.0.tgz

4.创建运行目录
#mkdir /usr/local/components/
#mv jdk1.8.0_251 /usr/local/components/
#mv apache-zookeeper-3.6.1-bin /usr/local/components/
#mv kafka_2.12-2.5.0 /usr/local/components/

5.创建数据目录
#mkdir -p /data/zookeeper/
#mkdir -p /data/kafka/

二.安装JDK

1.编辑/etc/profile,增加如下环境变量
#vi /etc/profile

  export JAVA_HOME=/usr/local/components/jdk1.8.0_251
  export PATH=$JAVA_HOME/bin:$PATH

2.让/etc/profile文件修改后立即生效
#source /etc/profile

3.显示PATH环境变量,检验修改后的结果
#echo $PATH

  printenv

4.验证JDK安装完成,显示版本号
#java -version


三.安装zookeeper

1.分发到其他节点
#scp  ...............

2.复制并修改配置
#cp zoo_sample.cfg zoo.cfg

#vi zoo.cfg

修改:
dataDir=/data/zookeeper/

增加:
server.0=192.101.11.162:8880:7770
server.1=192.101.11.163:8880:7770
server.2=192.101.11.164:8880:7770

创建日志目录
#mkdir -p /data/zookeeper/logs/

#cd ./bin
#vi zkEnv.sh

增加:
ZOO_LOG_DIR=/data/zookeeper/logs/

3.分发到其他节点
#scp  ...............

4.配置实例ID,其他节点同理,(三个节点分别为:0,1,2) 
#cd /data/zookeeper/
#echo "0" >myid

5.启动运行
#cd bin
#./zkServer.sh start

四。安装kafka

1.分发到其他节点
#scp  ...............

2.修改配置
#cd config
#vi server.properties

修改broker.id,修改为不同的值(三个节点分别为:0,1,2)
broker.id=0

修改log.dirs,这是Kafka的数据目录
log.dirs=/data/kafka/

修改listeners(192.101.11.162:9092,192.101.11.163:9092,192.101.11.164:9092)
listeners=PLAINTEXT://192.101.11.162:9092
    
socket.send.buffer.bytes=1048576   
socket.receive.buffer.bytes=1048576   
socket.request.max.bytes=104857600   
num.partitions=10 
zookeeper.connect=192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181

注意:内存生产建议配置 4G

3.启动运行
#cd bin
#./kafka-server-start.sh ../config/server.properties &

#./kafka-server-start.sh -daemon ../config/server.properties

4.创建Topic

# ./kafka-topics.sh --create --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181 --topic test --partitions 1 --replication-factor 3  ##一个分区,三个副本

注意:replication-factor 用于设置主题副本数,每个副本分布在不同节点,不能超过总节点数。

5.查看Topic列表

# ./kafka-topics.sh --list --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181

6.查看Topic状态

# ./kafka-topics.sh --describe --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181  --topic test


删除Topic
# ./kafka-topics.sh --delete --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181  --topic test


7.producer发送消息

#./kafka-console-producer.sh --broker-list 192.101.11.162:9092,192.101.11.163:9092,192.101.11.164:9092 --topic test

8.consumer接收消息

#./kafka-console-consumer.sh --bootstrap-server 192.101.11.162:9092,192.101.11.163:9092,192.101.11.164:9092 --topic test --from-beginning

9.修改分区数
# ./kafka-topics.sh --alter --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181  --topic test --pattitions 3

注意:修改分区数时,只能增加分区个数,不能减少分区个数,否则会报错

10、分区重新分配
vim reassign.json
{
  "topics":[{"topic":"test"}],
  "version":1
}

# ./kafka-reassign-partitions.sh --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181 --topics-to-move-json-file reassign.json --broker-list 192.101.11.162:9092,192.101.11.163:9092,192.101.11.164:9092 --generate

## 命令输出两个json字符串,第一个json内容为当前的分区副本分配情况,第二个为重新分配的候选方案(注意:这是只是生成一份可行性的方案,并没有真正执行重分配的动作)。
## 保存生成的第二个json(Proposed partition reassignment configuration)到名为result.json的文件里。
vim result.json

执行分配策略:
# ./kafka-reassign-partitions.sh --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181 --reassignment-json-file result.json --execute

查看分区重新分配的进度:
# ./kafka-reassign-partitions.sh --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181 --reassignment-json-file result.json --verify

分区分配策略:
RangeAssignor(默认策略),RoundRobinAssignor,StickyAssignor

RangeAssignor(默认策略)策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。
RoundRobinAssignor策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。

partition中segment文件存储结构:
segment file 组成:由两大部分组成,分别为index file和data file。此两个文件一一对应,成对出现,后缀.index和.log分别表示为segment的索引文件和数据文件

数据文件的分段:
kafka解决查询效率的手段之一是将数据文件分段。
一句话:kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到高效性。

日志删除:
kafka日志管理器允许定制删除策略。目前的策略是删除修改时间在N天之前的日志(按时间顺序),也可以使用另外一个策略:保留最后的N GB数据的策略(按大小删除)。为了避免在删除
时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取曹组的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteAtrayList.
kafka消费日志删除思想:kafka把topic中一个partition大小文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
vi config/server.properties
log.cleanup.plicy=delete ##启用删除策略
##直接删除,删除后的消息不可恢复,可配置以下两个策略
log.retention.hours=16  ##清理超过指定时间message
log.retention.bytes=1073741824  ##清理指定大小后,删除旧的消息

磁盘存储优势:
kafka在设计的时候,采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且不允许修改已经写入的消息,
这种方式属于典型的顺序写入的操作,所以就算是kafka使用磁盘作为存储介质,所能实现的吞吐量也非常可观。

kafka大量使用页面缓存,这也是kafka实现高吞吐的重要因素之一。
kafka还使用了零拷贝技术来进一步提升性能。

五、kafka-eagle安装:

0、kafka需要开启JMX端口

    找到kafka安装路径,进入到bin文件夹,修改下面的地方。
    vi kafka-server-start.sh
...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
fi
...

1、下载kafka-eagle:
https://codeload.github.com/smartloli/kafka-eagle-bin/tar.gz/v2.0.0

tar -zvxf kafka-eagle-bin-2.0.0.tar.gz

cd kafka-eagle-bin-2.0.0/

tar -zvxf kafka-eagle-web-2.0.0-bin.tar.gz

ln -s /opt/kong/kafka-eagle-bin-2.0.0/kafka-eagle-web-2.0.0 /usr/local/kafka-eagle

2、环境配置
vi /etc/profile
export KE_HOME=/usr/local/kafka-eagle
export PATH=$PATH:$KE_HOME/bin

3、配置修改

cd /usr/local/kafka-eagle/conf
vi system-config.properties

# multi zookeeper&kafka cluster list -- The client connection address of the Zookeeper cluster is set here
#如果只有一个集群的话,就写一个cluster1就行了
#kafka.eagle.zk.cluster.alias=cluster1,cluster2  
kafka.eagle.zk.cluster.alias=cluster1
#这里填上刚才上准备工作中的zookeeper.connect地址
cluster1.zk.list=192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181
#如果多个集群,继续写,如果没有注释掉
#cluster2.zk.list=192.168.18.21:2181,192.168.18.22:2181,192.168.18.23:2181

# zk limit -- Zookeeper cluster allows the number of clients to connect to
kafka.zk.limit.size=25

# kafka eagel webui port -- WebConsole port access address
kafka.eagle.webui.port=8048     ###web界面地址端口

# kafka offset storage -- Offset stored in a Kafka cluster, if stored in the zookeeper, you can not use this option
kafka.eagle.offset.storage=kafka

# delete kafka topic token -- Set to delete the topic token, so that administrators can have the right to delete
kafka.eagle.topic.token=keadmin

# kafka sasl authenticate, current support SASL_PLAINTEXT
#如果kafka开启了sasl认证,需要在这个地方配置sasl认证文件
kafka.eagle.sasl.enable=false
kafka.eagle.sasl.protocol=SASL_PLAINTEXT
kafka.eagle.sasl.mechanism=PLAIN
kafka.eagle.sasl.client=/data/kafka-eagle/conf/kafka_client_jaas.conf

#下面两项是配置数据库的,默认使用sqlite,如果量大,建议使用mysql,这里我使用的是sqlit
#如果想使用mysql建议在文末查看官方文档
# Default use sqlite to store data
kafka.eagle.driver=org.sqlite.JDBC
# It is important to note that the '/hadoop/kafka-eagle/db' path must exist.
kafka.eagle.url=jdbc:sqlite:/data/app/kafka-eagle/db/ke.db   #这个地址,按照安装目录进行配置
kafka.eagle.username=root
kafka.eagle.password=smartloli

# <Optional> set mysql address
#kafka.eagle.driver=com.mysql.jdbc.Driver
#kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
#kafka.eagle.username=root
#kafka.eagle.password=smartloli



如果开启了sasl认证,需要自己去修改kafka-eagle目录下的conf/kafka_client_jaas.conf

4、启动kafka-eagle

  cd ${KE_HOME}/bin
  chmod +x ke.sh
  ./ke.sh start
 
  [2020-08-27 21:49:07] INFO: Status Code[0]
[2020-08-27 21:49:07] INFO: [Job done!]
Welcome to
    __ __    ___     ____    __ __    ___            ______    ___    ______                               __     ______
   / //_/   /   |   / __/   / //_/   /   |          / ____/   /   |  / ____/   /                            /    / ____/
  / ,<     / /| |  / /_    / ,<     / /| |         / __/     / /| | / / __    /                            /    / __/
/ /| |   / ___ | / __/   / /| |   / ___ |        / /___    / ___ |/ /_/ /   / /                           ___ / /___
/_/ |_|  /_/  |_|/_/     /_/ |_|  /_/  |_|       /_____/   /_/  |_|\____/   /___                           __//_____/
                                                                                                          

Version 2.0.0 -- Copyright 2016-2020
*******************************************************************
* Kafka Eagle Service has started success.
* Welcome, Now you can visit 'http://192.101.11.152:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************

  查看日志是否出问题
  tailf ../log/log.log
  如果没问题,则直接登录
  http://192.101.11.152:8048/ke
  默认用户名:admin
  默认密码:123456
  如果进入到一下界面,就说明你安装成功了!


ke.sh start     ##启动kafka Eagle
ke.sh stop      ##停止kafka-eagle
ke.sh restart   ##重启kafka-eagle
ke.sh status    ##查看kafka-eagle系统运行状态
ke.sh stats     ##统计kafka-eagle系统占用资源情况