从kafka-0.9版本及以后,kafka的消费者组和offset信息就不存zookeeper了,而是存到broker服务器上
三种安装Kafka的方式,分别为:单节点单Broker部署、单节点多Broker部署、集群部署(多节点多Broker)。实际生产环境中使用的是第三种方式,以集群的方式来部署Kafka。
Kafka强依赖ZK,如果想要使用Kafka,就必须安装ZK,Kafka中的消费偏置信息、kafka集群、topic信息会被存储在ZK中。有人可能会说我在使用Kafka的时候就没有安装ZK,那是因为Kafka内置了一个ZK,一般我们不使用它。
一、Kafka 单节点部署
Kafka中单节点部署又分为两种,一种为单节点单Broker部署,一种为单节点多Broker部署。因为是单节点的Kafka,所以在安装ZK时也只需要单节点即可。
安装完Zookeeper后,输入命令启动后,jps中并没有查看到QuorumPeerMain进程
1.Kafka 单节点单Broker部署及使用
部署架构
http://kafka.apache.org/quickstart
配置Kafka
参考官网:http://kafka.apache.org/quickstart
> bin
/zookeeper-server-start
.sh config
/zookeeper
.properties 启动自带的zookeeper
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties >> /tmp/kafka-logs/zookeeper.out 2>&1 &
1、下载Kafka并解压
Kafka目录介绍
-
/bin 操作kafka的可执行脚本,还包含windows下脚本
-
/config 配置文件所在目录
-
/libs 依赖库目录
-
/logs 日志数据目录,目录kafka把server端日志分为5种类型,分为:server,request,state,log-cleaner,controller
配置zookeeper
请参考zookeeper
配置kafka
1)进入kafka安装工程根目录编辑config/server.properties
kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect,kafka server端config/server.properties参数说明和解释如下:
2、启动Kafka
进入kafka目录,敲入命令 bin/kafka-server-start.sh config/server.properties &
nohup ./bin/kafka-server-start.sh ./config/server.properties > /tmp/kafka-logs/kafka.out 2>&1 &
netstat -tunlp|egrep "(2181|9092)"
tcp 0 0 :::2181 :::* LISTEN 19787/java
tcp 0 0 :::9092 :::* LISTEN 28094/java
查看端口lsof -i:2181
netstat -tunlp|grep 2181 #查看zookeeper端口
说明:
Kafka的进程ID为28094,占用端口为9092
QuorumPeerMain为对应的zookeeper实例,进程ID为19787,在2181端口监听
[2019-04-12 01:25:26,175] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-04-12 01:25:26,176] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2019-04-12 01:25:26,176] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-04-12 01:25:26,200] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2019-04-12 01:25:26,211] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2019-04-12 01:25:26,214] INFO Kafka version: 2.2.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-04-12 01:25:26,214] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser)
[2019-04-12 01:25:26,215] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
配置环境变量/etc/profile
export KAFKA_HOME=/opt/applications/kafka
export PATH=$PATH:$KAFKA_HOME/bin
3、单机连通性测试
创建topic
方法一:
执行linux命令:
kafka 0.9版本及以前
bin/kafka-topics.sh --create --topic test --replication-factor 1 --partitions 1 --zookeeper localhost:2181
[wls81@master kafka]$ bin/kafka-topics.sh --create --topic test --replication-factor 1 --partitions 1 --zookeeper localhost:2181
Created topic test.
kafka 0.9版本以后
bin
/kafka-topics
.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic
test
–topic指定topic name
–partitions指定分区数,这个参数需要根据broker数和数据量决定,正常情况下,每个broker上两个partition最好;
–replication-factor指定partition的replicas数,建议设置为2;
方法二:
开启自动创建配置:auto.create.topics.enable=true
使用程序直接往kafka中相应的topic发送数据,如果topic不存在就会按默认配置进行创建。
查看Topic
kafka 0.9版本及以前
bin/kafka-topics.sh --list --zookeeper localhost:2181
[wls81@master kafka]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
kafka 0.9版本以后
bin
/kafka-topics
.sh --list --bootstrap-server localhost:9092
启动2个XSHELL客户端,一个用于生产者发送消息,一个用于消费者接受消息。
bin/kafka-console-producer.sh --broker-list 192.168.1.15:9092 --topic test
说明:早版本的Kafka,--broker-list 192.168.1.15:9092需改为--zookeeper 192.168.1.15:2181
before kafka 0.9 version
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
after kafka 0.9 version
bin
/kafka-console-consumer
.sh --bootstrap-server localhost:9092 --topic
test
--from-beginning
producer,指定的Socket(192.168.1.15+9092),说明生产者的消息要发往kafka,也即是broker
consumer, 指定的Socket(192.168.1.15+2181),说明消费者的消息来自zookeeper(协调转发)
总结:–from-beginning参数如果有表示从最开始消费数据,旧的和新的数据都会被消费,而没有该参数表示只会消费新产生的数据
上面的只是一个单个的broker,下面我们来实验一个多broker的集群。
二 搭建一个多个broker的伪集群
刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点也都是在本机上。
(1)为每一个broker提供配置文件
我们先看看config/server0.properties配置信息:
broker.id=0
listeners=PLAINTEXT://:9092
port=9092
host.name=192.168.1.15
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.1.15:2181
zookeeper.connection.timeout.ms=6000
queued.max.requests =500
log.cleanup.policy = delete
server1.properties:
server2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
port=9094
host.name=192.168.1.15
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs2
num.partitions=5
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.1.15:2181
zookeeper.connection.timeout.ms=6000
queued.max.requests =500
log.cleanup.policy = delete
(2)启动所有的broker
命令如下:
nohup ./bin/kafka-server-start.sh config/server0.properties > /tmp/kafka-logs/kafka0.out 2>&1 & #启动broker0
nohup ./bin/kafka-server-start.sh config/server1.properties > /tmp/kafka-logs/kafka1.out 2>&1 & #启动broker1
nohup ./bin/kafka-server-start.sh config/server2.properties > /tmp/kafka-logs/kafka2.out 2>&1 & #启动broker2
>/dev/null 2>&1
这样的语句
查看2181、9092、9093、9094端口
查看端口lsof -i:2181
netstat -tunlp|egrep "(2181|9092|9093|9094)"
tcp 0 0 :::9093 :::* LISTEN 29725/java
tcp 0 0 :::2181 :::* LISTEN 19787/java
tcp 0 0 :::9094 :::* LISTEN 29800/java
tcp 0 0 :::9092 :::* LISTEN 29572/java
一个zookeeper在2181端口上监听,3个kafka cluster(broker)分别在端口9092,9093,9094监听。
(3)创建topic
0.9之前的版本,创建topic
bin/kafka-topics.sh --create --topic topic_1 --partitions 1 --replication-factor 3 \--zookeeper localhost:2181
查看topic创建情况:
bin/kafka-topics.sh --list --zookeeper localhost:2181 test topic_1 topic_2 topic_3
删除topic:
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic_1
0.9之后版本,目前使用这个之后版本
创建Topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic topic_new
bin
/kafka-topics
.sh --describe --bootstrap-server localhost:9092 --topic topic_new
[wls81@master kafka]$ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic topic_new
Topic:topic_new PartitionCount:1 ReplicationFactor:3 Configs:cleanup.policy=delete,segment.bytes=1073741824
Topic: topic_new Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
下面是输出的解释。第一行给出了所有分区的摘要,每一行给出了关于一个分区的信息。因为这个主题只有一个分区,所以只有一行。
“leader”是负责给定分区的所有读写的节点。每个节点都将是分区中随机选择的部分的领导者。
“replicas”是复制此分区日志的节点列表,不管它们是领头节点还是当前活动的节点。
“ISR”是一组“同步”副本。这是副本列表的一个子集,当前处于活动状态并被领导者捕获。
注意,在我的示例中,节点2是主题唯一分区的前导。
(4)发送消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_new
> bin
/kafka-console-producer
.sh --broker-list localhost:9092 --topic topic_new
...
my
test
message 1
my
test
message 2
^C
接受消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic topic_new
> bin
/kafka-console-consumer
.sh --bootstrap-server localhost:9092 --from-beginning --topic topic_new
...
my
test
message 1
my
test
message 2
^C
需要注意,此时producer将topic发布到了3个broker中,现在就有点分布式的概念了。
(5)测试一下容错性
broker 1是领导者,所以让我们杀了它
[wls81@master logs]$ ps aux | grep server2.properties
wls81 6910 2.1 2.5 8201816 416932 pts/2 Sl 00:42 0:15 /
...
[wls81@master logs]$ kill -9 6910
Leadership已切换到其中一个追随者节点1,节点2不再位于同步副本集中:
[wls81@master kafka]$ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic topic_new
Topic:topic_new PartitionCount:1 ReplicationFactor:3 Configs:cleanup.policy=delete,segment.bytes=1073741824
Topic: topic_new Partition: 0 Leader: 1 Replicas: 2,1,0 Isr: 1,0
但是这些消息仍然可以被使用,即使最初记录这些内容的领导者已经下了
发送消息为
wls81@master kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_new
>dsaf
>test
>Kill leader2,and then operation
>
消费消息:中间部分为删除Leader2造成的
[wls81@master kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic topic_new
dsaf
test
[2019-04-13 00:55:11,758] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2019-04-13 00:55:11,921] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2019-04-13 00:55:12,126] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2019-04-13 00:55:12,632] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2019-04-13 00:55:13,537] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2019-04-13 00:55:14,446] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2019-04-13 00:55:15,325] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2019-04-13 00:55:16,334] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Kill leader2,and then operation
Kafka Connect是Kafka附带的一个工具,用于将数据导入和导出到Kafka。它是一个可扩展的工具,运行连接器,实现与外部系统交互的自定义逻辑。在这个快速启动中,我们将看到如何使用简单的连接器运行Kafka Connect,这些连接器将数据从文件导入到Kafka主题,并将数据从Kafka主题导出到文件
创建测试数据
[wls81@master kafka]$ echo -e "foo\nbar" > test.txt
[wls81@master kafka]$ pwd
/opt/applications/kafka
我们将启动两个以独立模式运行的连接器,这意味着它们在一个单独的、本地的、专用的进程中运行。
我们提供三个配置文件作为参数。第一个始终是Kafka连接进程的配置,包含公共配置,如要连接的Kafka代理和数据的序列化格式。其余的配置文件都指定要创建的连接器。
这些文件包括唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置。
bin
/connect-standalone
.sh config
/connect-standalone
.properties config
/connect-file-source
.properties config
/connect-file-sink
.properties
输出内容:
[2019-04-14 23:31:33,339] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser:110)
[2019-04-14 23:31:33,341] INFO Created connector local-file-sink (org.apache.kafka.connect.cli.ConnectStandalone:110)
[2019-04-14 23:31:33,342] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Subscribed to topic(s): connect-test (org.apache.kafka.clients.consumer.KafkaConsumer:936)
[2019-04-14 23:31:33,342] INFO WorkerSinkTask{id=local-file-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:301)
[2019-04-14 23:31:33,346] INFO Cluster ID: zHJkKjVvQd6TaV1K97QBYQ (org.apache.kafka.clients.Metadata:365)
[2019-04-14 23:31:33,347] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Discovered group coordinator 192.168.1.15:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
[2019-04-14 23:31:33,348] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459)
[2019-04-14 23:31:33,348] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-04-14 23:31:33,352] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-04-14 23:31:33,356] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Successfully joined group with generation 5 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:455)
[2019-04-14 23:31:33,357] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Setting newly assigned partitions: connect-test-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290)
[2019-04-14 23:31:33,364] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Resetting offset for partition connect-test-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-04-14 23:31:43,310] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
[2019-04-14 23:31:43,310] INFO WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)
[2019-04-14 23:31:43,317] INFO WorkerSourceTask{id=local-file-source-0} Finished commitOffsets successfully in 7 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:497)
[2019-04-14 23:31:43,341] INFO WorkerSinkTask{id=local-file-sink-0} Committing offsets asynchronously using sequence number 1: {connect-test-0=OffsetAndMetadata{offset=2, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:344)
这些示例配置文件(包含在Kafka中)使用之前启动的默认本地集群配置并创建两个连接器:第一个是源连接器,它从输入文件读取行,并将每个行生成到Kafka主题;第二个是接收连接器,它从Kafka主题读取消息,并将每个消息生成输出文件的行
在启动期间,您将看到许多日志消息,包括一些指示连接器正在被实例化的消息。一旦kafka connect进程启动,源连接器应开始读取test.txt中的行并将其生成到主题连接测试,而接收器连接器应开始读取主题连接测试中的消息并将其写入文件test.sink.txt。
我们可以通过检查输出文件的内容来验证数据已通过整个管道传递:
[wls81@master kafka]$ more test.sink.txt
foo
bar
[wls81@master kafka]$ pwd
/opt/applications/kafka
注意,数据存储在Kafka Topic Connect测试中,因此我们还可以运行控制台使用者来查看主题中的数据
[wls81@master kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
连接器继续处理数据,因此我们可以将数据添加到文件中,并看到它在管道中移动:
[wls81@master kafka]$ echo Another line2>> test.txt
[wls81@master kafka]$ echo Another line3>> test.txt
[wls81@master kafka]$
[wls81@master kafka]$ tail -f test.sink.txt
foo
bar
fdasd
dfa
test
taet
Another line
Another line2
Another line3