欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka 集群的搭建

程序员文章站 2022-06-14 09:11:06
...

Zookeeper 集群的搭建

kafka 是基于 Zookeeper 的,故需要首先安装 Zookeeper ,它们共同依赖于`JDK1.8

1、搭建zookeeper集群,zookeeper集群要求 最少3个节点

服务器
10.10.56.16
10.10.56.17
10.10.56.18

- 检查 jdk1.8 编译环境

clw-db2:/home/pgsoft/zookeeper-3.4.10/conf # java -version
java version "1.8.0_161"
Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)
clw-db2:/home/pgsoft/zookeeper-3.4.10/conf #
  • 配置Jdk环境变量
clw-db2:/home/pgsoft/zookeeper-3.4.10/conf # vi /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_161/
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
  • 下载zookeeper、kafka压缩包,并解压zookeeper-3.4.10.tar.gz、 kafka_2.11-1.1.0.tgz
clw-db1:/home/pgsoft # tar -xf zookeeper-3.4.10.tar.gz
clw-db1:/home/pgsoft # tar -xf kafka_2.11-1.1.0.tgz 
  • 首先需要配置 zookeeper ,copy配置文件
clw-db1:~ # cd /home/super/pgsoft/zookeeper-3.4.10/conf/
clw-db1:~/pgsoft/zookeeper-3.4.10/conf # ls
configuration.xsl  log4j.properties  zoo_sample.cfg
clw-db1:~/pgsoft/zookeeper-3.4.10/conf # cp zoo_sample.cfg zoo.cfg
  • 配置zoo.cfg
clw-db1:~/pgsoft/zookeeper-3.4.10/conf # vi zoo.cfg
dataDir=/home/zookeeper/log/data
dataLogDir=/home/zookeeper/log/datalog
clientPort=2181
server.1=10.10.56.16:2888:3888
server.2=10.10.56.17:2888:3888
server.3=10.10.56.19:2888:3888

其中ip为急群中的每个节点,第一个2888 端口为节点的通信端口 ,第二个端口为节点选举的端口

配置上述zoo.cfg时需要检查配置的端口2888,3888是否被占用,若占用则重新配置上述端口

  • 默认日志输出到zookeeper.out文件

  • 检测是否本地已经安装或者启动 zookeeper

clw-db1:~/pgsoft/zookeeper-3.4.10/conf # ps -ef|grep 'zookeeper'
root     13012     1  0 May11 ?        00:00:50 /usr/local/jdk1.8.0_161//bin/java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /home/super/pgsoft/zookeeper-3.4.10/bin/../build/classes:/home/super/pgsoft/zookeeper-3.4.10/bin/../build/lib/*.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../lib/slf4j-log4j12-1.6.1.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../lib/slf4j-api-1.6.1.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../lib/netty-3.10.5.Final.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../lib/log4j-1.2.16.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../lib/jline-0.9.94.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../zookeeper-3.4.10.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../src/java/lib/*.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../conf:.:/usr/local/jdk1.8.0_161//lib/dt.jar:/usr/local/jdk1.8.0_161//lib/tools.jar -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /home/super/pgsoft/zookeeper-3.4.10/bin/../conf/zoo.cfg
root     20169 16827  0 10:27 pts/3    00:00:00 grep zookeeper
clw-db1:~/pgsoft/zookeeper-3.4.10/conf # kill -9 13012
clw-db1:~/pgsoft/zookeeper-3.4.10/conf # ps -ef|grep 'zookeeper'
root     20185 16827  0 10:27 pts/3    00:00:00 grep zookeeper
clw-db1:~/pgsoft/zookeeper-3.4.10/conf #

此处我已经启动所以先kill,若无则无需次操作

  • 创建配置文件中配置的dataDir、dataLogDir
mkdir /home/zookeeper/log/data -p
mkdir /home/zookeeper/log/datalog - p
  • 创建集群中该机器的标识,该 1必须与zoo.cfg中配置的server.X 和“X”对应,该机为server.1:10.10.56.16
clw-db1:~/pgsoft/zookeeper-3.4.10/conf # cd /home/zookeeper/log/data/
clw-db1:/home/zookeeper/log/data # vi myid
1
  • 分别在16、17、19上执行上述相同步骤,区别为创建的myid的值不同
服务器 myid值
10.10.56.16 1
10.10.56.17 2
10.10.56.18 3

分别进入zookeeper解压文件的bin目录下,启动 16 zookeeper 服务器

clw-db1:~/pgsoft/zookeeper-3.4.10/bin # sh zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /home/super/pgsoft/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
clw-db1:~/pgsoft/zookeeper-3.4.10/bin # sh zkServer.sh status
  • 启动 17 zookeeper 服务器
lw-db2:/home/pgsoft/zookeeper-3.4.10/bin # sh zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /home/pgsoft/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
clw-db2:/home/pgsoft/zookeeper-3.4.10/bin # sh zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/pgsoft/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: leader
  • 启动 19 zookeeper 服务器
clw-db4:/home/pgsoft/zookeeper-3.4.10/bin # sh zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /home/pgsoft/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
clw-db4:/home/pgsoft/zookeeper-3.4.10/bin # sh zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/pgsoft/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
  • 可分别查看日志选举信息
clw-db1:~/pgsoft/zookeeper-3.4.10/bin # cd /home/pgsoft/zookeeper-3.4.10/bin  
clw-db1:~/pgsoft/zookeeper-3.4.10/bin # tail -f zookeeper.out
2018-05-12 11:07:35,543 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:user.home=/root
2018-05-12 11:07:35,544 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:user.dir=/home/super/pgsoft/zookeeper-3.4.10/bin
2018-05-12 11:07:35,547 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:ZooKeeperServer@173] - Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /home/zookeeper/log/datalog/version-2 snapdir /home/zookeeper/log/data/version-2
2018-05-12 11:07:35,549 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:Follower@64] - FOLLOWING - LEADER ELECTION TOOK - 126947
2018-05-12 11:07:35,554 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:QuorumPeer$QuorumServer@167] - Resolved hostname: 10.10.56.17 to address: /10.10.56.17
2018-05-12 11:07:35,623 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:Learner@332] - Getting a diff from the leader 0x0
2018-05-12 11:08:35,056 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@192] - Accepted socket connection from /127.0.0.1:53101
2018-05-12 11:08:35,058 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@883] - Processing srvr command from /127.0.0.1:53101
2018-05-12 11:08:35,064 [myid:1] - INFO  [Thread-2:NIOServerCnxn@1044] - Closed socket connection for client /127.0.0.1:53101 (no session established for client)
2018-05-12 11:10:20,756 [myid:1] - INFO  [/10.10.56.16:3888:[email protected]] - Received connection request /10.10.56.19:50413
2018-05-12 11:10:20,767 [myid:1] - INFO  [WorkerReceiver[myid=1]:FastLeaderElection@600] - Notification: 1 (message format version), 3 (n.leader), 0x0 (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x0 (n.peerEpoch) FOLLOWING (my state)
  • 查看10.10.56.16端口通信状态
clw-db1:~/pgsoft/zookeeper-3.4.10/bin # lsof -i:2888
COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
java    21773 root   29u  IPv6 31076324      0t0  TCP 10.10.56.16:57232->10.10.56.17:spcsdlobby (ESTABLISHED)
  • 查看10.10.56.16服务器端口选举状态
clw-db1:~/pgsoft/zookeeper-3.4.10/bin # lsof -i:3888
COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
java    21773 root   27u  IPv6 31078845      0t0  TCP 10.10.56.16:ciphire-serv (LISTEN)
java    21773 root   28u  IPv6 31076320      0t0  TCP 10.10.56.16:ciphire-serv->10.10.56.17:54097 (ESTABLISHED)
java    21773 root   30u  IPv6 31074167      0t0  TCP 10.10.56.16:ciphire-serv->10.10.56.19:50413 (ESTABLISHED)
clw-db1:~/pgsoft/zookeeper-3.4.10/bin #
  • 至此 zookeeper 集群搭建成功

kafka 的搭建

  • 首先需要执行上述 zookeeper 的安装

  • 下载 kafka解压

clw-db3:/home/pgsoft # tar -xf kafka_2.11-1.1.0.tgz
  • 修改server.properties配置文件
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/config # cd  /home/pgsoft/kafka_2.11-1.1.0/config/
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/config # vi server.properties

broker.id=1
listeners = PLAINTEXT://10.10.56.18:9092
log.dirs=/home/kafka/log
zookeeper.connect=10.10.56.18:2181

其中broker.id为唯一的整数,listeners 为监听本地端口 zookeeper.connect 为zookeeper服务器 的ip和端口,可进入zookeeper的配置文件下,查看 clientPort 参数的值

clw-db3:/home/pgsoft/zookeeper-3.4.10/conf # cat zoo.cfg
clientPort=2181
  • 创建上述配置的 log.dirs 参数目录
mkdir /home/kafka/log -p
  • 启动kafka服务,需要首先启动 zookeeper服务
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-server-start.sh ../config/server.properties


[2018-05-12 15:26:24,542] INFO Client environment:user.name=super (org.apache.zookeeper.ZooKeeper)
[2018-05-12 15:26:24,542] INFO Client environment:user.home=/home/super (org.apache.zookeeper.ZooKeeper)
[2018-05-12 15:26:24,543] INFO Client environment:user.dir=/home/pgsoft/kafka_2.11-1.1.0/bin (org.apache.zookeeper.ZooKeeper)
[2018-05-12 15:26:24,544] INFO Initiating client connection, connectString=10.10.56.18:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@10e31a9a (org.apache.zookeeper.ZooKeeper)
[2018-05-12 15:26:24,596] INFO [ZooKeeperClient] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2018-05-12 15:26:24,599] INFO Opening socket connection to server 10.10.56.18/10.10.56.18:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2018-05-12 15:26:24,612] INFO Socket connection established to 10.10.56.18/10.10.56.18:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2018-05-12 15:26:24,629] INFO Session establishment complete on server 10.10.56.18/10.10.56.18:2181, sessionid = 0x1634caa45230004, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2018-05-12 15:26:24,637] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
[2018-05-12 15:26:25,069] INFO Cluster ID = BR692_cjQyi9awIrPWxc6A (kafka.server.KafkaServer)
[2018-05-12 15:26:25,198] INFO KafkaConfig values:

看到上述INFO [ZooKeeperClient] Connected 表示kafka连接zookeeper成功

[2018-05-12 15:36:27,042] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-05-12 15:38:35,365] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-05-12 15:38:35,389] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions  (kafka.server.ReplicaFetcherManager)
[2018-05-12 15:38:35,389] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions  (kafka.server.ReplicaAlterLogDirsManager)
[2018-05-12 15:38:35,397] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions test-pgsql-0 (kafka.server.ReplicaFetcherManager)
[2018-05-12 15:38:35,397] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions test-pgsql-0 (kafka.server.ReplicaAlterLogDirsManager)
[2018-05-12 15:38:35,404] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions  (kafka.server.ReplicaFetcherManager)
[2018-05-12 15:38:35,404] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions  (kafka.server.ReplicaAlterLogDirsManager)
[2018-05-12 15:38:35,406] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions test-pgsql-0 (kafka.server.ReplicaFetcherManager)
[2018-05-12 15:38:35,407] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions test-pgsql-0 (kafka.server.ReplicaAlterLogDirsManager)
[2018-05-12 15:38:35,462] INFO Log for partition test-pgsql-0 is renamed to /home/kafka/log/test-pgsql-0.d50b91ea074e41f69957143b59ead435-delete and is scheduled for deletion (kafka.log.LogManager)

看到上述消息表示 kafka启动成功

  • 可查看 kafka 端口,连接情况,该端口为 kafka 的server.properties 文件中 listeners 参数配置的端口值
clw-db3:/home/pgsoft/zookeeper-3.4.10/conf # lsof -i:9092
COMMAND   PID USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
java    25015 root  153u  IPv6 14857291      0t0  TCP 10.10.56.18:XmlIpcRegSvc (LISTEN)
java    25015 root  169u  IPv6 14855619      0t0  TCP 10.10.56.18:40874->10.10.56.18:XmlIpcRegSvc (CLOSE_WAIT)
  • 创建 topic ,用于接收从其他源发送的消息
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --zookeeper 10.10.56.18:2181  --create --replication-factor 1 --partitions 1 --topic pgtopic
Created topic "pgtopic".
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin #

可通过执行 /home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh –help 查看更多用法

  • 查看已经创建的 topic
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --zookeeper 10.10.56.18:2181  --list
__consumer_offsets
pgtopic
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin #
  • 删除 topic test-pgsql,zookeeper地址此处为本地,具体为自己安装的 zookeeper 地址
 clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --list --zookeeper 10.10.56.18:2181
__consumer_offsets
test
test-pgsql
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh  --zookeeper localhost:2181 --delete  --topic test-pgsql
Topic test-pgsql is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
test
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin #   
  • 创建的 topic 会保存在 kafka服务器server.properties配置的log.dir参数配置的目录下
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/config # cd /home/kafka/log/
clw-db3:/home/kafka/log # ls
.lock                  __consumer_offsets-17  __consumer_offsets-26  __consumer_offsets-35  __consumer_offsets-44  __consumer_offsets-9
__consumer_offsets-0   __consumer_offsets-18  __consumer_offsets-27  __consumer_offsets-36  __consumer_offsets-45  cleaner-offset-checkpoint
__consumer_offsets-1   __consumer_offsets-19  __consumer_offsets-28  __consumer_offsets-37  __consumer_offsets-46  log-start-offset-checkpoint
__consumer_offsets-10  __consumer_offsets-2   __consumer_offsets-29  __consumer_offsets-38  __consumer_offsets-47  meta.properties
__consumer_offsets-11  __consumer_offsets-20  __consumer_offsets-3   __consumer_offsets-39  __consumer_offsets-48  pgtopic-0
__consumer_offsets-12  __consumer_offsets-21  __consumer_offsets-30  __consumer_offsets-4   __consumer_offsets-49  recovery-point-offset-checkpoint
__consumer_offsets-13  __consumer_offsets-22  __consumer_offsets-31  __consumer_offsets-40  __consumer_offsets-5   replication-offset-checkpoint
__consumer_offsets-14  __consumer_offsets-23  __consumer_offsets-32  __consumer_offsets-41  __consumer_offsets-6
__consumer_offsets-15  __consumer_offsets-24  __consumer_offsets-33  __consumer_offsets-42  __consumer_offsets-7
__consumer_offsets-16  __consumer_offsets-25  __consumer_offsets-34  __consumer_offsets-43  __consumer_offsets-8
clw-db3:/home/kafka/log # cd pgtopic-0/
clw-db3:/home/kafka/log/pgtopic-0 # ls
00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint
clw-db3:/home/kafka/log/pgtopic-0 #
  • 其中__consumer_offsets_x 为 kafkan内部的默认位移偏移量,通过公式计算存入的topic放入哪个分区,kafka默认50个分区

    Math.abs(groupID.hashCode()) % numPartitions
    • 查询GROUPID,为计算放入那个分区使用
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-consumer-groups.sh --bootstrap-server 10.10.56.18:9092 --list --new-consumer
The [new-consumer] option is deprecated and will be removed in a future major release.The new consumer is used by default if the [bootstrap-server] option is provided.
Note: This will not show information about old Zookeeper-based consumers.
console-consumer-76845
console-consumer-85760
  • 本地模拟外来程序 发送信息kafka即 启动 producer 生产者,发送消息
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-console-producer.sh --broker-list 10.10.56.18:9092 -topic pgtopic
>myte ^H^Hsdf s
>what are you doing now?
>
  • 启动消费者 consumer,即可接受到订阅的消息
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-console-consumer.sh --bootstrap-server 10.10.56.18:9092 --topic pgtopic --from-beginning
mytsdf s
what are you doing now?
  • 查看生产者是否成功发送消息到kafka的 topic为 pgtopic,该目录为kafka配 logdir
clw-db3:/home/kafka/log # cd pgtopic-0/
clw-db3:/home/kafka/log/pgtopic-0 # ls
00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  00000000000000000002.snapshot  leader-epoch-checkpoint
clw-db3:/home/kafka/log/pgtopic-0 # tail -f 00000000000000000000.log
m▒▒cS▒5cS▒5▒▒▒▒▒▒▒▒▒▒▒▒▒▒&mytsdf s O▒▒▒!cS▒▒ucS▒▒u▒▒▒▒▒▒▒▒▒▒▒▒▒▒:.what are you doing now?A▒fc\K▒c\K▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒who am i?O▒▒▒c\LPc\LP▒▒▒▒▒▒▒▒▒▒▒▒▒▒:.please help me out hereN▒X▒▒c\L▒▒c\L▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒8,can you touch my [email protected]▒▒c\M▒c\M▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒i'm hurtA35▒{c\mm▒c\mm▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒go to bedD▒▒▒c\m▒▒c\m▒▒▒▒▒▒▒▒▒▒c\rJ c\rJ ▒▒▒▒▒▒▒▒▒▒▒▒▒▒&hello my babexterm▒▒▒8,i want to push you out  E▒~

启动生产者和消费者遇到的问题

[2018-05-12 16:43:32,516] WARN [Consumer clientId=consumer-1, groupId=console-consumer-58298] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-05-12 16:43:32,574] WARN [Consumer clientId=consumer-1, groupId=console-consumer-58298] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-05-12 16:43:32,677] WARN [Consumer clientId=consumer-1, groupId=console-consumer-58298] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  • 是由于在kafka的server.properties中配置的参数 listeners = PLAINTEXT://10.10.56.18:9092 为ip:10.10.56.18 而启动生产者和消费者时 --broker-list-bootstrap-server 后面的地址为:localhost导致出错

  • 解决方法:localhost修改为10.10.56.18即可成功,该地址需要和kafka的server.properties配置文件中listeners 参数配置的一样

  • 查看kafka 的注册 borker 的id

clw-db3:/home/pgsoft/zookeeper-3.4.10/bin # sh zkCli.sh --help
Connecting to localhost:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/

ids      topics   seqid
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[1]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics
[pgtopic, __consumer_offsets]
[zk: localhost:2181(CONNECTED) 4]

kafka启动会到zookeeper中注册自己的唯一的broker id
- 查看kafka注册到zookeeper集群中的信息

[zk: localhost:2181(CONNECTED) 7] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.10.56.18:9092"],"jmx_port":-1,"host":"10.10.56.18","timestamp":"1526109986873","port":9092,"version":4}
cZxid = 0x11a
ctime = Sat May 12 15:26:26 CST 2018
mZxid = 0x11a
mtime = Sat May 12 15:26:26 CST 2018
pZxid = 0x11a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1634caa45230004
dataLength = 192
numChildren = 0
[zk: localhost:2181(CONNECTED) 8]
  • 查看kafka的 topic 在zookeeper的注册
[zk: localhost:2181(CONNECTED) 7] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.10.56.18:9092"],"jmx_port":-1,"host":"10.10.56.18","timestamp":"1526109986873","port":9092,"version":4}
cZxid = 0x11a
ctime = Sat May 12 15:26:26 CST 2018
mZxid = 0x11a
mtime = Sat May 12 15:26:26 CST 2018
pZxid = 0x11a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1634caa45230004
dataLength = 192
numChildren = 0
[zk: localhost:2181(CONNECTED) 8]

至此单kafka服务搭建完成

kafka集群搭建

  • 10.10.56.16 配置kafka中 broker的 server.properties
clw-db1:/home/pgsoft/kafka_2.11-1.1.0/config # vi server.properties

listeners=10.10.56.16://:9092
broker.id=1
log.dirs=/home/kafka/log
zookeeper.connect=10.10.56.16:2181,10.10.56.17:2181,10.10.56.19:2181
zookeeper.connection.timeout.ms=60000
  • 10.10.56.16创建server.propertieslogdirs配置的目录
clw-db1:~ # mkdir /home/kafka/log -p
  • 10.10.56.17 配置kafka的 server.properties
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/config # vi server.properties

listeners=10.10.56.17://:9092
broker.id=2
log.dirs=/home/kafka/log
zookeeper.connect=10.10.56.16:2181,10.10.56.17:2181,10.10.56.19:2181
  • 10.10.56.17创建server.propertieslogdirs配置的目录
clw-db3:~ # mkdir /home/kafka/log -p
  • 10.10.56.19 配置kafka的 server.properties
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/config # vi server.properties

listeners=10.10.56.19://:9092
broker.id=3
log.dirs=/home/kafka/log
zookeeper.connect=10.10.56.16:2181,10.10.56.17:2181,10.10.56.19:2181
zookeeper.connection.timeout.ms=60000
  • 10.10.56.19 创建server.propertieslogdirs配置的目录
clw-db4:~ # mkdir /home/kafka/log -p

分别启动16、17、19 kafka服务

  • 在zookeeper 19主节点创建 topicrtmkafka ,并查看
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --create --zookeeper 10.10.56.19:2181 --replication-factor 3 --partitions 1 --topic rtmkafka
Created topic "rtmkafka".
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin #
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --list --zookeeper 10.10.56.19:2181
rtmkafka
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin #

–partitions 1 表示1个分区 –replication-factor 3 表示3份备份

  • 在19服务器创建生产者,发布消息: hello my kafka world·
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-console-producer.sh --broker-list 10.10.56.19:9092 --topic rtmkafka
>hello my kafka world·
>
  • 在16服务器创建订阅者,hello my kafka world 为接收到的消息
clw-db2:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-console-consumer.sh --bootstrap-server 10.10.56.19:9092 --topic rtmkafka --from-beginning

hello my kafka world·
  • 在17服务器创建订阅者,hello my kafka world 为接收到的消息
clw-db1:~/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-console-consumer.sh --bootstrap-server 10.10.56.19:9092 --topic rtmkafka --from-beginning

hello my kafka world·
  • 在19服务器查看 topic 的状态
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --describe --zookeeper 10.10.56.19:2181 --topic rtmkafka
Topic:rtmkafka  PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: rtmkafka Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin #

PartitionCount 分区为 1,
ReplicationFactor 复制因子为3份,
Replicas 复制的为 3,1,2
ISR(即in-sync Replica) 3,1,2

  • 当我们 kill 掉服务器16时
clw-db1:~/pgsoft/kafka_2.11-1.1.0/bin # ps -ef|grep 'kafka'
root     12942     1  1 20:11 pts/5    00:00:45 /usr/local/jdk1.8.0_161//bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Xloggc:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/home/super/pgsoft/kafka_2.11-1.1.0/bin/../logs -Dlog4j.configuration=file:./../config/log4j.properties -cp .:/usr/local/jdk1.8.0_161//lib/dt.jar:/usr/local/jdk1.8.0_161//lib/tools.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/aopalliance-repackaged-2.5.0-b32.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/argparse4j-0.7.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/commons-lang3-3.5.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/connect-api-1.1.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/connect-file-1.1.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/connect-json-1.1.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/connect-runtime-1.1.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/connect-transforms-1.1.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/guava-20.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/hk2-api-2.5.0-b32.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/hk2-locator-2.5.0-b32.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/hk2-utils-2.5.0-b32.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jackson-annotations-2.9.4.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jackson-core-2.9.4.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jackson-databind-2.9.4.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jackson-jaxrs-base-2.9.4.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jackson-jaxrs-json-provider-2.9.4.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jackson-module-jaxb-annotations-2.9.4.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/javassist-3.20.0-GA.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/javassist-3.21.0-GA.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/javax.annotation-api-1.2.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/javax.inject-1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/javax.inject-2.5.0-b32.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/javax.servlet-api-3.1.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jersey-client-2.25.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jersey-common-2.25.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jersey-container-servlet-2.25.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jersey-container-servlet-core-2.25.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jersey-guava-2.25.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jersey-media-jaxb-2.25.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jersey-server-2.25.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-client-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-continuation-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-http-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-io-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-security-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-server-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-servlet-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-servlets-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-util-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jopt-simple-5.0.4.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/kafka-clients-1.1.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/kafka-log4j-appender-1.1.0.
root     15503 10096  0 20:56 pts/6    00:00:00 grep kafka
clw-db1:~/pgsoft/kafka_2.11-1.1.0/bin # kill -9 12942
clw-db1:~/pgsoft/kafka_2.11-1.1.0/bin # ps -ef|grep 'kafka'
root     15516 10096  0 20:56 pts/6    00:00:00 grep kafka
  • 19查询 则ISR 会从3,1,2变为3,2
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --describe --zookeeper 10.10.56.19:2181 --topic rtmkafka
Topic:rtmkafka  PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: rtmkafka Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,2
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin #

重新启动16服务器kafka ,在19服务器看到kafka日志

[2018-05-12 20:57:00,028] INFO Result of znode creation at /controller is: OK (kafka.zk.KafkaZkClient)
[2018-05-12 20:57:00,283] INFO [ReplicaAlterLogDirsManager on broker 3] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)
[2018-05-12 20:57:05,194] INFO [Partition rtmkafka-0 broker=3] Shrinking ISR from 3,1,2 to 3,2 (kafka.cluster.Partition)
[2018-05-12 21:02:06,002] INFO [Partition rtmkafka-0 broker=3] Expanding ISR from 3,2 to 3,2,1 (kafka.cluster.Partition)
  • 19服务器查看 topic 状态,Isr变回原来的状态
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --describe --zookeeper 10.10.56.19:2181 --topic rtmkafka
Topic:rtmkafka  PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: rtmkafka Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,2,1
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin #
  • 进入服务器查看创建主题及保存的消息
clw-db4:/home/kafka/log/rtmkafka-0 # ll -lh
total 16K
-rw-r--r-- 1 root root 10M May 12 21:15 00000000000000000000.index
-rw-r--r-- 1 root root 246 May 12 21:18 00000000000000000000.log
-rw-r--r-- 1 root root 10M May 12 21:15 00000000000000000000.timeindex
-rw-r--r-- 1 root root  10 May 12 21:15 00000000000000000001.snapshot
-rw-r--r-- 1 root root  12 May 12 21:16 leader-epoch-checkpoint
clw-db4:/home/kafka/log/rtmkafka-0 #
  • 至此kafka集群搭建完毕