Kafka 集群的搭建
Zookeeper 集群的搭建
kafka
是基于 Zookeeper
的,故需要首先安装 Zookeeper ,它们共同依赖于`JDK1.8
1、搭建zookeeper集群,zookeeper集群要求 最少3个节点
服务器 |
---|
10.10.56.16 |
10.10.56.17 |
10.10.56.18 |
- 检查 jdk1.8
编译环境
clw-db2:/home/pgsoft/zookeeper-3.4.10/conf # java -version
java version "1.8.0_161"
Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)
clw-db2:/home/pgsoft/zookeeper-3.4.10/conf #
- 配置Jdk环境变量
clw-db2:/home/pgsoft/zookeeper-3.4.10/conf # vi /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_161/
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
- 下载zookeeper、kafka压缩包,并解压zookeeper-3.4.10.tar.gz、 kafka_2.11-1.1.0.tgz
clw-db1:/home/pgsoft # tar -xf zookeeper-3.4.10.tar.gz
clw-db1:/home/pgsoft # tar -xf kafka_2.11-1.1.0.tgz
- 首先需要配置
zookeeper
,copy配置文件
clw-db1:~ # cd /home/super/pgsoft/zookeeper-3.4.10/conf/
clw-db1:~/pgsoft/zookeeper-3.4.10/conf # ls
configuration.xsl log4j.properties zoo_sample.cfg
clw-db1:~/pgsoft/zookeeper-3.4.10/conf # cp zoo_sample.cfg zoo.cfg
- 配置
zoo.cfg
clw-db1:~/pgsoft/zookeeper-3.4.10/conf # vi zoo.cfg
dataDir=/home/zookeeper/log/data
dataLogDir=/home/zookeeper/log/datalog
clientPort=2181
server.1=10.10.56.16:2888:3888
server.2=10.10.56.17:2888:3888
server.3=10.10.56.19:2888:3888
其中ip为急群中的每个节点,第一个2888 端口为节点的通信端口 ,第二个端口为节点选举的端口
配置上述zoo.cfg
时需要检查配置的端口
2888,3888是否被占用,若占用则重新配置上述端口
默认日志输出到zookeeper.out文件
检测是否本地已经安装或者启动
zookeeper
clw-db1:~/pgsoft/zookeeper-3.4.10/conf # ps -ef|grep 'zookeeper'
root 13012 1 0 May11 ? 00:00:50 /usr/local/jdk1.8.0_161//bin/java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /home/super/pgsoft/zookeeper-3.4.10/bin/../build/classes:/home/super/pgsoft/zookeeper-3.4.10/bin/../build/lib/*.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../lib/slf4j-log4j12-1.6.1.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../lib/slf4j-api-1.6.1.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../lib/netty-3.10.5.Final.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../lib/log4j-1.2.16.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../lib/jline-0.9.94.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../zookeeper-3.4.10.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../src/java/lib/*.jar:/home/super/pgsoft/zookeeper-3.4.10/bin/../conf:.:/usr/local/jdk1.8.0_161//lib/dt.jar:/usr/local/jdk1.8.0_161//lib/tools.jar -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /home/super/pgsoft/zookeeper-3.4.10/bin/../conf/zoo.cfg
root 20169 16827 0 10:27 pts/3 00:00:00 grep zookeeper
clw-db1:~/pgsoft/zookeeper-3.4.10/conf # kill -9 13012
clw-db1:~/pgsoft/zookeeper-3.4.10/conf # ps -ef|grep 'zookeeper'
root 20185 16827 0 10:27 pts/3 00:00:00 grep zookeeper
clw-db1:~/pgsoft/zookeeper-3.4.10/conf #
此处我已经启动所以先kill,若无则无需次操作
- 创建配置文件中配置的dataDir、dataLogDir
mkdir /home/zookeeper/log/data -p
mkdir /home/zookeeper/log/datalog - p
- 创建集群中该机器的标识,该
1
必须与zoo.cfg中配置的server.X 和“X”对应,该机为server.1:10.10.56.16
clw-db1:~/pgsoft/zookeeper-3.4.10/conf # cd /home/zookeeper/log/data/
clw-db1:/home/zookeeper/log/data # vi myid
1
- 分别在16、17、19上执行上述相同步骤,区别为创建的myid的值不同
服务器 | myid值 |
---|---|
10.10.56.16 | 1 |
10.10.56.17 | 2 |
10.10.56.18 | 3 |
分别进入zookeeper解压文件的bin目录下,启动 16 zookeeper
服务器
clw-db1:~/pgsoft/zookeeper-3.4.10/bin # sh zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /home/super/pgsoft/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
clw-db1:~/pgsoft/zookeeper-3.4.10/bin # sh zkServer.sh status
- 启动 17
zookeeper
服务器
lw-db2:/home/pgsoft/zookeeper-3.4.10/bin # sh zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /home/pgsoft/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
clw-db2:/home/pgsoft/zookeeper-3.4.10/bin # sh zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/pgsoft/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: leader
- 启动 19
zookeeper
服务器
clw-db4:/home/pgsoft/zookeeper-3.4.10/bin # sh zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /home/pgsoft/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
clw-db4:/home/pgsoft/zookeeper-3.4.10/bin # sh zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/pgsoft/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
- 可分别查看日志选举信息
clw-db1:~/pgsoft/zookeeper-3.4.10/bin # cd /home/pgsoft/zookeeper-3.4.10/bin
clw-db1:~/pgsoft/zookeeper-3.4.10/bin # tail -f zookeeper.out
2018-05-12 11:07:35,543 [myid:1] - INFO [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:user.home=/root
2018-05-12 11:07:35,544 [myid:1] - INFO [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:user.dir=/home/super/pgsoft/zookeeper-3.4.10/bin
2018-05-12 11:07:35,547 [myid:1] - INFO [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:ZooKeeperServer@173] - Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /home/zookeeper/log/datalog/version-2 snapdir /home/zookeeper/log/data/version-2
2018-05-12 11:07:35,549 [myid:1] - INFO [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:Follower@64] - FOLLOWING - LEADER ELECTION TOOK - 126947
2018-05-12 11:07:35,554 [myid:1] - INFO [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:QuorumPeer$QuorumServer@167] - Resolved hostname: 10.10.56.17 to address: /10.10.56.17
2018-05-12 11:07:35,623 [myid:1] - INFO [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:Learner@332] - Getting a diff from the leader 0x0
2018-05-12 11:08:35,056 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@192] - Accepted socket connection from /127.0.0.1:53101
2018-05-12 11:08:35,058 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@883] - Processing srvr command from /127.0.0.1:53101
2018-05-12 11:08:35,064 [myid:1] - INFO [Thread-2:NIOServerCnxn@1044] - Closed socket connection for client /127.0.0.1:53101 (no session established for client)
2018-05-12 11:10:20,756 [myid:1] - INFO [/10.10.56.16:3888:[email protected]] - Received connection request /10.10.56.19:50413
2018-05-12 11:10:20,767 [myid:1] - INFO [WorkerReceiver[myid=1]:FastLeaderElection@600] - Notification: 1 (message format version), 3 (n.leader), 0x0 (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x0 (n.peerEpoch) FOLLOWING (my state)
- 查看10.10.56.16端口通信状态
clw-db1:~/pgsoft/zookeeper-3.4.10/bin # lsof -i:2888
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 21773 root 29u IPv6 31076324 0t0 TCP 10.10.56.16:57232->10.10.56.17:spcsdlobby (ESTABLISHED)
- 查看10.10.56.16服务器端口选举状态
clw-db1:~/pgsoft/zookeeper-3.4.10/bin # lsof -i:3888
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 21773 root 27u IPv6 31078845 0t0 TCP 10.10.56.16:ciphire-serv (LISTEN)
java 21773 root 28u IPv6 31076320 0t0 TCP 10.10.56.16:ciphire-serv->10.10.56.17:54097 (ESTABLISHED)
java 21773 root 30u IPv6 31074167 0t0 TCP 10.10.56.16:ciphire-serv->10.10.56.19:50413 (ESTABLISHED)
clw-db1:~/pgsoft/zookeeper-3.4.10/bin #
- 至此
zookeeper
集群搭建成功
kafka 的搭建
首先需要执行上述
zookeeper
的安装下载
kafka
解压
clw-db3:/home/pgsoft # tar -xf kafka_2.11-1.1.0.tgz
- 修改server.properties配置文件
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/config # cd /home/pgsoft/kafka_2.11-1.1.0/config/
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/config # vi server.properties
broker.id=1
listeners = PLAINTEXT://10.10.56.18:9092
log.dirs=/home/kafka/log
zookeeper.connect=10.10.56.18:2181
其中broker.id为唯一的整数,listeners 为监听本地端口 zookeeper.connect 为zookeeper服务器 的ip和端口,可进入zookeeper的配置文件下,查看 clientPort
参数的值
clw-db3:/home/pgsoft/zookeeper-3.4.10/conf # cat zoo.cfg
clientPort=2181
- 创建上述配置的
log.dirs
参数目录
mkdir /home/kafka/log -p
- 启动
kafka
服务,需要首先启动zookeeper
服务
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-server-start.sh ../config/server.properties
[2018-05-12 15:26:24,542] INFO Client environment:user.name=super (org.apache.zookeeper.ZooKeeper)
[2018-05-12 15:26:24,542] INFO Client environment:user.home=/home/super (org.apache.zookeeper.ZooKeeper)
[2018-05-12 15:26:24,543] INFO Client environment:user.dir=/home/pgsoft/kafka_2.11-1.1.0/bin (org.apache.zookeeper.ZooKeeper)
[2018-05-12 15:26:24,544] INFO Initiating client connection, connectString=10.10.56.18:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@10e31a9a (org.apache.zookeeper.ZooKeeper)
[2018-05-12 15:26:24,596] INFO [ZooKeeperClient] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2018-05-12 15:26:24,599] INFO Opening socket connection to server 10.10.56.18/10.10.56.18:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2018-05-12 15:26:24,612] INFO Socket connection established to 10.10.56.18/10.10.56.18:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2018-05-12 15:26:24,629] INFO Session establishment complete on server 10.10.56.18/10.10.56.18:2181, sessionid = 0x1634caa45230004, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2018-05-12 15:26:24,637] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
[2018-05-12 15:26:25,069] INFO Cluster ID = BR692_cjQyi9awIrPWxc6A (kafka.server.KafkaServer)
[2018-05-12 15:26:25,198] INFO KafkaConfig values:
看到上述INFO [ZooKeeperClient] Connected
表示kafka连接zookeeper成功
[2018-05-12 15:36:27,042] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-05-12 15:38:35,365] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-05-12 15:38:35,389] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions (kafka.server.ReplicaFetcherManager)
[2018-05-12 15:38:35,389] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions (kafka.server.ReplicaAlterLogDirsManager)
[2018-05-12 15:38:35,397] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions test-pgsql-0 (kafka.server.ReplicaFetcherManager)
[2018-05-12 15:38:35,397] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions test-pgsql-0 (kafka.server.ReplicaAlterLogDirsManager)
[2018-05-12 15:38:35,404] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions (kafka.server.ReplicaFetcherManager)
[2018-05-12 15:38:35,404] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions (kafka.server.ReplicaAlterLogDirsManager)
[2018-05-12 15:38:35,406] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions test-pgsql-0 (kafka.server.ReplicaFetcherManager)
[2018-05-12 15:38:35,407] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions test-pgsql-0 (kafka.server.ReplicaAlterLogDirsManager)
[2018-05-12 15:38:35,462] INFO Log for partition test-pgsql-0 is renamed to /home/kafka/log/test-pgsql-0.d50b91ea074e41f69957143b59ead435-delete and is scheduled for deletion (kafka.log.LogManager)
看到上述消息表示 kafka
启动成功
- 可查看
kafka
端口,连接情况,该端口为kafka
的server.properties 文件中listeners
参数配置的端口值
clw-db3:/home/pgsoft/zookeeper-3.4.10/conf # lsof -i:9092
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 25015 root 153u IPv6 14857291 0t0 TCP 10.10.56.18:XmlIpcRegSvc (LISTEN)
java 25015 root 169u IPv6 14855619 0t0 TCP 10.10.56.18:40874->10.10.56.18:XmlIpcRegSvc (CLOSE_WAIT)
- 创建
topic
,用于接收从其他源发送的消息
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --zookeeper 10.10.56.18:2181 --create --replication-factor 1 --partitions 1 --topic pgtopic
Created topic "pgtopic".
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin #
可通过执行 /home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh –help 查看更多用法
- 查看已经创建的
topic
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --zookeeper 10.10.56.18:2181 --list
__consumer_offsets
pgtopic
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin #
- 删除
topic
test-pgsql,zookeeper地址此处为本地,具体为自己安装的zookeeper
地址
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --list --zookeeper 10.10.56.18:2181
__consumer_offsets
test
test-pgsql
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test-pgsql
Topic test-pgsql is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
test
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin #
- 创建的
topic
会保存在 kafka服务器server.properties配置的log.dir
参数配置的目录下
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/config # cd /home/kafka/log/
clw-db3:/home/kafka/log # ls
.lock __consumer_offsets-17 __consumer_offsets-26 __consumer_offsets-35 __consumer_offsets-44 __consumer_offsets-9
__consumer_offsets-0 __consumer_offsets-18 __consumer_offsets-27 __consumer_offsets-36 __consumer_offsets-45 cleaner-offset-checkpoint
__consumer_offsets-1 __consumer_offsets-19 __consumer_offsets-28 __consumer_offsets-37 __consumer_offsets-46 log-start-offset-checkpoint
__consumer_offsets-10 __consumer_offsets-2 __consumer_offsets-29 __consumer_offsets-38 __consumer_offsets-47 meta.properties
__consumer_offsets-11 __consumer_offsets-20 __consumer_offsets-3 __consumer_offsets-39 __consumer_offsets-48 pgtopic-0
__consumer_offsets-12 __consumer_offsets-21 __consumer_offsets-30 __consumer_offsets-4 __consumer_offsets-49 recovery-point-offset-checkpoint
__consumer_offsets-13 __consumer_offsets-22 __consumer_offsets-31 __consumer_offsets-40 __consumer_offsets-5 replication-offset-checkpoint
__consumer_offsets-14 __consumer_offsets-23 __consumer_offsets-32 __consumer_offsets-41 __consumer_offsets-6
__consumer_offsets-15 __consumer_offsets-24 __consumer_offsets-33 __consumer_offsets-42 __consumer_offsets-7
__consumer_offsets-16 __consumer_offsets-25 __consumer_offsets-34 __consumer_offsets-43 __consumer_offsets-8
clw-db3:/home/kafka/log # cd pgtopic-0/
clw-db3:/home/kafka/log/pgtopic-0 # ls
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint
clw-db3:/home/kafka/log/pgtopic-0 #
-
其中__consumer_offsets_x 为 kafkan内部的默认位移偏移量,通过公式计算存入的topic放入哪个分区,kafka默认50个分区
Math.abs(groupID.hashCode()) % numPartitions
- 查询GROUPID,为计算放入那个分区使用
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-consumer-groups.sh --bootstrap-server 10.10.56.18:9092 --list --new-consumer
The [new-consumer] option is deprecated and will be removed in a future major release.The new consumer is used by default if the [bootstrap-server] option is provided.
Note: This will not show information about old Zookeeper-based consumers.
console-consumer-76845
console-consumer-85760
- 本地
模拟
外来程序发送信息
到kafka
即 启动producer
生产者,发送消息
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-console-producer.sh --broker-list 10.10.56.18:9092 -topic pgtopic
>myte ^H^Hsdf s
>what are you doing now?
>
- 启动消费者
consumer
,即可接受到订阅的消息
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-console-consumer.sh --bootstrap-server 10.10.56.18:9092 --topic pgtopic --from-beginning
mytsdf s
what are you doing now?
- 查看生产者是否成功发送消息到kafka的 topic为
pgtopic
,该目录为kafka配logdir
clw-db3:/home/kafka/log # cd pgtopic-0/
clw-db3:/home/kafka/log/pgtopic-0 # ls
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex 00000000000000000002.snapshot leader-epoch-checkpoint
clw-db3:/home/kafka/log/pgtopic-0 # tail -f 00000000000000000000.log
m▒▒cS▒5cS▒5▒▒▒▒▒▒▒▒▒▒▒▒▒▒&mytsdf s O▒▒▒!cS▒▒ucS▒▒u▒▒▒▒▒▒▒▒▒▒▒▒▒▒:.what are you doing now?A▒fc\K▒c\K▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒who am i?O▒▒▒c\LPc\LP▒▒▒▒▒▒▒▒▒▒▒▒▒▒:.please help me out hereN▒X▒▒c\L▒▒c\L▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒8,can you touch my [email protected]▒▒c\M▒c\M▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒i'm hurtA35▒{c\mm▒c\mm▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒go to bedD▒▒▒c\m▒▒c\m▒▒▒▒▒▒▒▒▒▒c\rJ c\rJ ▒▒▒▒▒▒▒▒▒▒▒▒▒▒&hello my babexterm▒▒▒8,i want to push you out E▒~▒
启动生产者和消费者遇到的问题
[2018-05-12 16:43:32,516] WARN [Consumer clientId=consumer-1, groupId=console-consumer-58298] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-05-12 16:43:32,574] WARN [Consumer clientId=consumer-1, groupId=console-consumer-58298] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-05-12 16:43:32,677] WARN [Consumer clientId=consumer-1, groupId=console-consumer-58298] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
是由于在kafka的server.properties中配置的参数
listeners = PLAINTEXT://10.10.56.18:9092
为ip:10.10.56.18 而启动生产者和消费者时--broker-list
和-bootstrap-server
后面的地址为:localhost
导致出错解决方法:localhost修改为10.10.56.18即可成功,该地址需要和kafka的server.properties配置文件中listeners 参数配置的一样
查看kafka 的注册
borker
的id
clw-db3:/home/pgsoft/zookeeper-3.4.10/bin # sh zkCli.sh --help
Connecting to localhost:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/
ids topics seqid
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[1]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics
[pgtopic, __consumer_offsets]
[zk: localhost:2181(CONNECTED) 4]
kafka启动会到zookeeper中注册自己的唯一的broker id
- 查看kafka注册到zookeeper集群中的信息
[zk: localhost:2181(CONNECTED) 7] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.10.56.18:9092"],"jmx_port":-1,"host":"10.10.56.18","timestamp":"1526109986873","port":9092,"version":4}
cZxid = 0x11a
ctime = Sat May 12 15:26:26 CST 2018
mZxid = 0x11a
mtime = Sat May 12 15:26:26 CST 2018
pZxid = 0x11a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1634caa45230004
dataLength = 192
numChildren = 0
[zk: localhost:2181(CONNECTED) 8]
- 查看kafka的
topic
在zookeeper的注册
[zk: localhost:2181(CONNECTED) 7] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.10.56.18:9092"],"jmx_port":-1,"host":"10.10.56.18","timestamp":"1526109986873","port":9092,"version":4}
cZxid = 0x11a
ctime = Sat May 12 15:26:26 CST 2018
mZxid = 0x11a
mtime = Sat May 12 15:26:26 CST 2018
pZxid = 0x11a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1634caa45230004
dataLength = 192
numChildren = 0
[zk: localhost:2181(CONNECTED) 8]
至此单kafka服务搭建完成
kafka集群搭建
- 10.10.56.16 配置kafka中
broke
r的server.properties
clw-db1:/home/pgsoft/kafka_2.11-1.1.0/config # vi server.properties
listeners=10.10.56.16://:9092
broker.id=1
log.dirs=/home/kafka/log
zookeeper.connect=10.10.56.16:2181,10.10.56.17:2181,10.10.56.19:2181
zookeeper.connection.timeout.ms=60000
- 10.10.56.16创建
server.properties
中logdirs
配置的目录
clw-db1:~ # mkdir /home/kafka/log -p
- 10.10.56.17 配置kafka的
server.properties
clw-db3:/home/pgsoft/kafka_2.11-1.1.0/config # vi server.properties
listeners=10.10.56.17://:9092
broker.id=2
log.dirs=/home/kafka/log
zookeeper.connect=10.10.56.16:2181,10.10.56.17:2181,10.10.56.19:2181
- 10.10.56.17创建
server.properties
中logdirs
配置的目录
clw-db3:~ # mkdir /home/kafka/log -p
- 10.10.56.19 配置kafka的
server.properties
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/config # vi server.properties
listeners=10.10.56.19://:9092
broker.id=3
log.dirs=/home/kafka/log
zookeeper.connect=10.10.56.16:2181,10.10.56.17:2181,10.10.56.19:2181
zookeeper.connection.timeout.ms=60000
- 10.10.56.19 创建
server.properties
中logdirs
配置的目录
clw-db4:~ # mkdir /home/kafka/log -p
分别启动16、17、19 kafka服务
- 在zookeeper 19主节点创建
topic
为rtmkafka
,并查看
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --create --zookeeper 10.10.56.19:2181 --replication-factor 3 --partitions 1 --topic rtmkafka
Created topic "rtmkafka".
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin #
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --list --zookeeper 10.10.56.19:2181
rtmkafka
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin #
–partitions 1 表示1个分区 –replication-factor 3 表示3份备份
- 在19服务器创建生产者,发布消息:
hello my kafka world·
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-console-producer.sh --broker-list 10.10.56.19:9092 --topic rtmkafka
>hello my kafka world·
>
- 在16服务器创建订阅者,
hello my kafka world
为接收到的消息
clw-db2:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-console-consumer.sh --bootstrap-server 10.10.56.19:9092 --topic rtmkafka --from-beginning
hello my kafka world·
- 在17服务器创建订阅者,
hello my kafka world
为接收到的消息
clw-db1:~/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-console-consumer.sh --bootstrap-server 10.10.56.19:9092 --topic rtmkafka --from-beginning
hello my kafka world·
- 在19服务器查看
topic
的状态
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --describe --zookeeper 10.10.56.19:2181 --topic rtmkafka
Topic:rtmkafka PartitionCount:1 ReplicationFactor:3 Configs:
Topic: rtmkafka Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin #
PartitionCount 分区为 1,
ReplicationFactor 复制因子为3份,
Replicas 复制的为 3,1,2
ISR(即in-sync Replica) 3,1,2
- 当我们
kill
掉服务器16时
clw-db1:~/pgsoft/kafka_2.11-1.1.0/bin # ps -ef|grep 'kafka'
root 12942 1 1 20:11 pts/5 00:00:45 /usr/local/jdk1.8.0_161//bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Xloggc:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/home/super/pgsoft/kafka_2.11-1.1.0/bin/../logs -Dlog4j.configuration=file:./../config/log4j.properties -cp .:/usr/local/jdk1.8.0_161//lib/dt.jar:/usr/local/jdk1.8.0_161//lib/tools.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/aopalliance-repackaged-2.5.0-b32.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/argparse4j-0.7.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/commons-lang3-3.5.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/connect-api-1.1.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/connect-file-1.1.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/connect-json-1.1.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/connect-runtime-1.1.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/connect-transforms-1.1.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/guava-20.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/hk2-api-2.5.0-b32.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/hk2-locator-2.5.0-b32.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/hk2-utils-2.5.0-b32.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jackson-annotations-2.9.4.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jackson-core-2.9.4.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jackson-databind-2.9.4.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jackson-jaxrs-base-2.9.4.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jackson-jaxrs-json-provider-2.9.4.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jackson-module-jaxb-annotations-2.9.4.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/javassist-3.20.0-GA.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/javassist-3.21.0-GA.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/javax.annotation-api-1.2.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/javax.inject-1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/javax.inject-2.5.0-b32.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/javax.servlet-api-3.1.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jersey-client-2.25.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jersey-common-2.25.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jersey-container-servlet-2.25.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jersey-container-servlet-core-2.25.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jersey-guava-2.25.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jersey-media-jaxb-2.25.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jersey-server-2.25.1.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-client-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-continuation-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-http-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-io-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-security-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-server-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-servlet-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-servlets-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jetty-util-9.2.24.v20180105.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/jopt-simple-5.0.4.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/kafka-clients-1.1.0.jar:/home/super/pgsoft/kafka_2.11-1.1.0/bin/../libs/kafka-log4j-appender-1.1.0.
root 15503 10096 0 20:56 pts/6 00:00:00 grep kafka
clw-db1:~/pgsoft/kafka_2.11-1.1.0/bin # kill -9 12942
clw-db1:~/pgsoft/kafka_2.11-1.1.0/bin # ps -ef|grep 'kafka'
root 15516 10096 0 20:56 pts/6 00:00:00 grep kafka
- 19查询 则ISR 会从3,1,2变为3,2
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --describe --zookeeper 10.10.56.19:2181 --topic rtmkafka
Topic:rtmkafka PartitionCount:1 ReplicationFactor:3 Configs:
Topic: rtmkafka Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,2
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin #
重新启动16服务器kafka ,在19服务器看到kafka日志
[2018-05-12 20:57:00,028] INFO Result of znode creation at /controller is: OK (kafka.zk.KafkaZkClient)
[2018-05-12 20:57:00,283] INFO [ReplicaAlterLogDirsManager on broker 3] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)
[2018-05-12 20:57:05,194] INFO [Partition rtmkafka-0 broker=3] Shrinking ISR from 3,1,2 to 3,2 (kafka.cluster.Partition)
[2018-05-12 21:02:06,002] INFO [Partition rtmkafka-0 broker=3] Expanding ISR from 3,2 to 3,2,1 (kafka.cluster.Partition)
- 19服务器查看
topic
状态,Isr变回原来的状态
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin # sh kafka-topics.sh --describe --zookeeper 10.10.56.19:2181 --topic rtmkafka
Topic:rtmkafka PartitionCount:1 ReplicationFactor:3 Configs:
Topic: rtmkafka Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,2,1
clw-db4:/home/pgsoft/kafka_2.11-1.1.0/bin #
- 进入服务器查看创建主题及保存的消息
clw-db4:/home/kafka/log/rtmkafka-0 # ll -lh
total 16K
-rw-r--r-- 1 root root 10M May 12 21:15 00000000000000000000.index
-rw-r--r-- 1 root root 246 May 12 21:18 00000000000000000000.log
-rw-r--r-- 1 root root 10M May 12 21:15 00000000000000000000.timeindex
-rw-r--r-- 1 root root 10 May 12 21:15 00000000000000000001.snapshot
-rw-r--r-- 1 root root 12 May 12 21:16 leader-epoch-checkpoint
clw-db4:/home/kafka/log/rtmkafka-0 #
- 至此kafka集群搭建完毕
推荐阅读
-
搭建高可用的redis集群,避免standalone模式带给你的苦难
-
vue.js搭建图书管理平台的技术解答
-
一、记一次失败的 CAS 搭建 之 环境配置_PHP教程
-
基于mod_proxy+Apache 2.2.16+Tomcat 7的负载均衡与集群配置
-
教你使用eclipse 搭建Swt 环境的全过程
-
springboot搭建访客管理系统的实现示例
-
时序数据库DolphinDB基于Docker的集群部署教程
-
想用centos5.5 x64搭建lnmp生成环境,求怎么进行相关软件的版本选择
-
Nginx + FastCGI 程序(C/C++) 搭建高性能web service的Demo及部署发布
-
支持集群版 redis 的客户端例子