Kafka的使用之在CentOS上安装Kafka集群
安装准备
版本
Kafka版本:kafka_2.11-0.9.0.0
Zookeeper版本:zookeeper-3.4.9
Zookeeper 集群:liuyazhuang121 liuyazhuang122 liuyazhuang123
物理环境
安装三台物理机:
192.168.209.121 liuyazhuang121(运行3个Broker)
192.168.209.122 liuyazhuang122(运行2个Broker)
192.168.209.123 liuyazhuang123(运行2个Broker)
该集群的创建主要分为三步,单节点单Broker,单节点多Broker,多节点多Broker
单节点单Broker
本节以liuyazhuang121上创建一个Broker为例
下载kafka
cd /mq/ wget https://mirrors.hust.edu.cn/apache/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz copyfiles.sh kafka_2.11-0.9.0.0.tgz bjyfnbserver /mq/ tar zxvf kafka_2.11-0.9.0.0.tgz -C /mq/ ln -s /mq/kafka_2.11-0.9.0.0 /mq/kafka mkdir /mq/kafka/logs
配置
修改config/server.properties
vi /mq/kafka/config/server.properties broker.id=1 listeners=PLAINTEXT://:9092 port=9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/mq/kafka/logs/kafka-logs num.partitions=10 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=liuyazhuang121:2181,liuyazhuang122:2181,liuyazhuang123:2181 zookeeper.connection.timeout.ms=6000
启动Kafka服务
cd /mq/kafka;sh bin/kafka-server-start.sh -daemon config/server.properties
或
sh /mq/kafka/bin/kafka-server-start.sh -daemon /mq/kafka/config/server.properties
netstat -ntlp|grep -E '2181|9092' (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) tcp6 0 0 :::9092 :::* LISTEN 26903/java tcp6 0 0 :::2181 :::* LISTEN 24532/java
创建Topic
sh /mq/kafka/bin/kafka-topics.sh --create --zookeeper liuyazhuang121:2181,liuyazhuang122:2181,liuyazhuang123:2181 --replication-factor 1 --partitions 1 --topic test
查看Topic
sh /mq/kafka/bin/kafka-topics.sh --list --zookeeper liuyazhuang121:2181,liuyazhuang122:2181,liuyazhuang123:2181
producer发送消息
$ sh /mq/kafka/bin/kafka-console-producer.sh --broker-list liuyazhuang121:9092 --topic test first message
consumer接收消息
$ sh bin/kafka-console-consumer.sh --zookeeper liuyazhuang121:2181,liuyazhuang122:2181,liuyazhuang123:2181 --topic test --from-beginning first message
如果要最新的数据,可以不带--from-beginning参数即可。
单节点多个Broker
将上个章节中的文件夹再复制两份分别为kafka_2,kafka_3
cp -r /mq/kafka_2.11-0.9.0.0 /mq/kafka_2.11-0.9.0.0_2 cp -r /mq/kafka_2.11-0.9.0.0 /mq/kafka_2.11-0.9.0.0_3 ln -s /mq/kafka_2.11-0.9.0.0_2 /mq/kafka_2 ln -s /mq/kafka_2.11-0.9.0.0_3 /mq/kafka_3
分别修改kafka_2/config/server.properties以及kafka_3/config/server.properties 文件中的broker.id,以及port属性,确保唯一性
vi /mq/kafka_2/config/server.properties broker.id=2 listeners=PLAINTEXT://:9093 port=9093 host.name=liuyazhuang121 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/mq/kafka_2/logs/kafka-logs num.partitions=10 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=liuyazhuang121:2181,liuyazhuang122:2181,liuyazhuang123:2181 zookeeper.connection.timeout.ms=6000 vi /mq/kafka_3/config/server.properties broker.id=3 listeners=PLAINTEXT://:9094 port=9094 host.name=liuyazhuang121 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/mq/kafka_3/logs/kafka-logs num.partitions=10 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=liuyazhuang121:2181,liuyazhuang122:2181,liuyazhuang123:2181 zookeeper.connection.timeout.ms=6000
启动
启动另外两个Broker:
sh /mq/kafka_2/bin/kafka-server-start.sh -daemon /mq/kafka_2/config/server.properties sh /mq/kafka_3/bin/kafka-server-start.sh -daemon /mq/kafka_3/config/server.properties
检查端口
[liuyazhuang@liuyazhuang121 config]$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3 (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) tcp6 0 0 :::2181 :::* LISTEN 24532/java tcp6 0 0 :::9092 :::* LISTEN 26903/java tcp6 0 0 :::9093 :::* LISTEN 28672/java tcp6 0 0 :::9094 :::* LISTEN 28734/java
创建一个replication factor为3的topic
sh /mq/kafka/bin/kafka-topics.sh --create --zookeeper liuyazhuang121:2181,liuyazhuang122:2181,liuyazhuang123:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看Topic的状态
$ sh /mq/kafka/bin/kafka-topics.sh --describe -zookeeper liuyazhuang121:2181,liuyazhuang122:2181,liuyazhuang123:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
从上面的内容可以看出,该topic包含1个part,replicationfactor为3,且Node3 是leador
解释如下:
"leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions. "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive. "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
再来看一下之前创建的test topic, 从下图可以看出没有进行replication
$ sh /mq/kafka/bin/kafka-topics.sh --describe --zookeeper liuyazhuang121:2181,liuyazhuang122:2181,liuyazhuang123:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
多个节点的多个Broker
在liuyazhuang122、liuyazhuang123上分别把下载的文件解压缩到kafka_4,kafka_5,kafka_6两个文件夹中,再将liuyazhuang121上的server.properties配置文件拷贝到这三个文件夹中
vi /mq/kafka_4/config/server.properties broker.id=4 listeners=PLAINTEXT://:9095 port=9095 host.name=liuyazhuang122 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/mq/kafka_4/logs/kafka-logs num.partitions=10 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=liuyazhuang121:2181,liuyazhuang122:2181,liuyazhuang123:2181 zookeeper.connection.timeout.ms=6000 vi /mq/kafka_5/config/server.properties broker.id=5 listeners=PLAINTEXT://:9096 port=9096 host.name=liuyazhuang122 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/mq/kafka_5/logs/kafka-logs num.partitions=10 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=liuyazhuang121:2181,liuyazhuang122:2181,liuyazhuang123:2181 zookeeper.connection.timeout.ms=6000 vi /mq/kafka_6/config/server.properties broker.id=6 listeners=PLAINTEXT://:9097 port=9097 host.name=liuyazhuang123 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/mq/kafka_6/logs/kafka-logs num.partitions=10 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=liuyazhuang121:2181,liuyazhuang122:2181,liuyazhuang123:2181 zookeeper.connection.timeout.ms=6000 vi /mq/kafka_7/config/server.properties broker.id=7 listeners=PLAINTEXT://:9098 port=9098 host.name=liuyazhuang123 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/mq/kafka_7/logs/kafka-logs num.partitions=10 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=liuyazhuang121:2181,liuyazhuang122:2181,liuyazhuang123:2181 zookeeper.connection.timeout.ms=6000
启动服务
sh /mq/kafka/bin/kafka-server-start.sh -daemon /mq/kafka/config/server.properties sh /mq/kafka_2/bin/kafka-server-start.sh -daemon /mq/kafka_2/config/server.properties sh /mq/kafka_3/bin/kafka-server-start.sh -daemon /mq/kafka_3/config/server.properties sh /mq/kafka_4/bin/kafka-server-start.sh -daemon /mq/kafka_4/config/server.properties sh /mq/kafka_5/bin/kafka-server-start.sh -daemon /mq/kafka_5/config/server.properties sh /mq/kafka_6/bin/kafka-server-start.sh -daemon /mq/kafka_6/config/server.properties sh /mq/kafka_7/bin/kafka-server-start.sh -daemon /mq/kafka_7/config/server.properties
检查
$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3
停服务
sh /mq/kafka/bin/kafka-server-stop.sh
如果使用脚本停broker服务,会把单节点上的多broker服务都停掉,慎重!!!
ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM
到目前为止,三台物理机上的7个Broker已经启动完毕:
[liuyazhuang@liuyazhuang121 bin]$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3 (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) tcp6 0 0 :::2181 :::* LISTEN 24532/java tcp6 0 0 :::9092 :::* LISTEN 33212/java tcp6 0 0 :::9093 :::* LISTEN 32997/java tcp6 0 0 :::9094 :::* LISTEN 33064/java [liuyazhuang@liuyazhuang122 config]$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3 (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) tcp6 0 0 :::2181 :::* LISTEN 6899/java tcp6 0 0 :::9095 :::* LISTEN 33251/java tcp6 0 0 :::9096 :::* LISTEN 33279/java [liuyazhuang@liuyazhuang123 config]$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3 (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 14562/java tcp 0 0 0.0.0.0:9097 0.0.0.0:* LISTEN 23246/java tcp 0 0 0.0.0.0:9098 0.0.0.0:* LISTEN 23270/java
producer发送消息
$ sh /mq/kafka/bin/kafka-console-producer.sh --broker-list liuyazhuang121:9092 --topic my-replicated-topic
consumer接收消息
$ sh /mq/kafka_4/bin/kafka-console-consumer.sh --zookeeper liuyazhuang121:2181,liuyazhuang122:2181,liuyazhuang123:2181 --topic my-replicated-topic --from-beginning
上一篇: 详解Linux系统中的文件权限设置
下一篇: 汽车半路没油,百度告诉我加点水