Kafka安装及测试
系统环境
Linux Ubuntu 16.04
jdk-7u75-linux-x64
相关知识
Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它因可以水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。
Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:
(1)以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能
(2)高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输
(3)支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输
(4)同时支持离线数据处理和实时数据处理
(5)Scale out:支持在线水平扩展
Kafka中各个组件的功能:
(1)Broker: Kafka集群包含一个或多个服务器,这种服务器被称为broker
(2)Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的Topic即可生产或消费数据,不必关心数据存于何处)
(3)Partition:Parition是物理上的概念,每个Topic包含一个或多个Partition
(4)Producer:负责发布消息到Kafka broker
(5)Consumer:消息消费者,向Kafka broker读取消息的客户端
(6)Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干Broker(Kafka支持水平扩展,一般Broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个ZooKeeper集群。Kafka通过ZooKeeper管理集群配置,选举Leader,以及在Consumer Group发生变化时进行rebalance。
Producer使用Push模式将消息发布到Broker,Consumer使用Pull模式从Broker订阅并消费消息。
任务内容
Kafka安装依赖Scala、ZooKeeper,所以需要先安装Scala与ZooKeeper。然后在已安装好Scala和ZooKeeper的环境基础上,安装部署Kafka。
任务步骤
1.首先在Linux本地,新建/data/kafka1目录,用于存放实验所需文件。
mkdir -p /data/kafka1
切换目录到/data/kafka1下,使用wget命令,下载所需安装包scala-2.10.4.tgz,kafka_2.10-0.8.2.2.tgz以及zookeeper-3.4.5-cdh5.4.5.tar.gz。
cd /data/kafka1
wget http://192.168.1.100:60000/allfiles/kafka1/scala-2.10.4.tgz
wget http://192.168.1.100:60000/allfiles/kafka1/kafka_2.10-0.8.2.2.tgz
wget http://192.168.1.100:60000/allfiles/kafka1/zookeeper-3.4.5-cdh5.4.5.tar.gz
2.安装Scala。
切换到/data/kafka1目录下,将Scala安装包scala-2.10.4.tgz解压到/apps目录下,并将解压后的目录,重命名为scala。
cd /data/kafka1
tar -xzvf /data/kafka1/scala-2.10.4.tgz -C /apps/
cd /apps
mv /apps/scala-2.10.4/ /apps/scala
使用vim打开用户环境变量。
sudo vim ~/.bashrc
将以下Scala的路径信息,追加到用户环境变量中。
#scala
export SCALA_HOME=/apps/scala
export PATH=$SCALA_HOME/bin:$PATH
执行source命令,使环境变量生效。
source ~/.bashrc
3.切换到/data/kafka1目录下,将kafka的压缩包kafka_2.10-0.8.2.2.tgz解压到/apps目录下,并将解压缩后的目录,重命名为kafka。
cd /data/kafka1
tar -xzvf /data/kafka1/kafka_2.10-0.8.2.2.tgz -C /apps/
cd /apps
mv /apps/kafka_2.10-0.8.2.2/ /apps/kafka
使用vim打开用户环境变量。
sudo vim ~/.bashrc
将以下Kafka的路径信息,追加到用户环境变量中。
#kafka
export KAFKA_HOME=/apps/kafka
export PATH=$KAFKA_HOME/bin:$PATH
执行source命令,使环境变量生效。
source ~/.bashrc
4.由于Kafka的部分数据需要存储到ZooKeeper中,所以必须额外安装ZooKeeper,或使用Kafka安装包自带的ZooKeeper程序。
首先来演示使用外置的ZooKeeper程序。
将/data/kafka1目录下zookeeper-3.4.5-cdh5.4.5.tar.gz,解压缩到/apps目录下,并将解压缩的目录,重命名为zookeeper。
cd /data/kafka1
tar -xzvf /data/kafka1/zookeeper-3.4.5-cdh5.4.5.tar.gz -C /apps/
cd /apps
mv /apps/zookeeper-3.4.5-cdh5.4.5/ /apps/zookeeper
使用vim打开用户环境变量。
sudo vim ~/.bashrc
将以下Zookeeper的路径信息,追加到用户环境变量中。
#zookeeper
export ZOOKEEPER_HOME=/apps/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH
执行source命令,使环境变量生效。
source ~/.bashrc
修改ZooKeeper的配置文件,将ZooKeeper配置为单机模式。
切换到ZooKeeper的配置文件所在目录/apps/zookeeper/conf下,将zoo_sample.cfg重命名为zoo.cfg
cd /apps/zookeeper/conf/
mv /apps/zookeeper/conf/zoo_sample.cfg /apps/zookeeper/conf/zoo.cfg
使用vim打开zoo.cfg文件,并修改dataDir项内容
vim zoo.cfg
由:
dataDir=/tmp/zookeeper
改为:
dataDir=/data/tmp/zookeeper-outkafka/data
这里的/data/tmp/zookeeper-outkafka/data目录需要提前创建。
mkdir -p /data/tmp/zookeeper-outkafka/data
启动ZooKeeper,并查看ZooKeeper的运行状态。
cd /apps/zookeeper/bin
./zkServer.sh start
./zkServer.sh status
关闭ZooKeeper。
cd /apps/zookeeper/bin
./zkServer.sh stop
5.使用Kafka内置的ZooKeeper,切换目录到/apps/kafka/config目录下。
cd /apps/kafka/config
这里放置着与ZooKeeper的配置文件zoo.cfg功能相似的配置文件zookeeper.properties,使用vim打开zookeeper.properties配置文件。
vim zookeeper.properties
将dataDir目录修改为/data/tmp/zookeeper-inkafka/data目录。
dataDir=/data/tmp/zookeeper-inkafka/data
这里的/data/tmp/zookeeper-inkafka/data目录,须提前创建。
mkdir -p /data/tmp/zookeeper-inkafka/data
下面启动ZooKeeper服务,切换目录到/apps/kafka目录下,在kafka的bin目录下放有ZooKeeper的启动脚本,按Ctrl+c退出。
cd /apps/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties &
末尾的&符号,会将zookeeper-server-start.sh放到后台执行。输入jps
jps
查看ZooKeeper的进程QuorumPeerMain
aaa@qq.com:/apps/kafka$ jps
375 Jps
293 QuorumPeerMain
aaa@qq.com:/apps/kafka$
下面关闭ZooKeeper进程
cd /apps/kafka
bin/zookeeper-server-stop.sh stop
6.以上两种ZooKeeper的使用方式,可以根据自己需要进行选择。后续课程,我们会默认使用外置的ZooKeeper,对Kafka数据进行管理。
至此Kafka已安装完毕。
接下来对Kafka进行测试,检测是否可以正常运行。
7.切换到/apps/zookeeper目录下,启动ZooKeeper服务。
cd /apps/zookeeper
bin/zkServer.sh start
8.切换到/apps/kafka/config目录下,这里放置了Kafka的相关的配置文件。使用vim打开Kafka服务的配置文件server.properties。
cd /apps/kafka/config
vim server.properties
server.properties文件中的配置项包括:服务器基本配置,socket服务设置,log日志的配置,log刷新策略,log保留策略,ZooKeeper配置。
服务器基本配置,主要包括当前节点的编号。
ZooKeeper配置中,包括ZooKeeper服务的IP和端口号等。
我们修改zookeeper.connect项的值为:
zookeeper.connect=localhost:2181
这里的IP和端口,是ZooKeeper发送接收消息使用的端口。IP必须为ZooKeeper服务的IP,我们设置为localhost,端口必须和/apps/zookeeper/conf下zoo.cfg中的clientPort端口一致。
9.切换目录到/apps/kafka目录下,启动Kafka服务。启动Kafka服务时,会读取Kafka配置文件目录下的server.properties文件。
cd /apps/kafka
bin/kafka-server-start.sh config/server.properties &
这样启动了Kafka的server,并在后端运行。
10.另外开启一个窗口,调用/apps/kafka/bin目录下kafka-topic.sh脚本创建一个topic。
cd /apps/kafka
bin/kafka-topics.sh \
--create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--topic sayaword \
--partitions 1
kafka-topic.sh命令后,需要添加一些参数,比如ZooKeeper的配置,主题名称等。
下面查看Kafka中,都有哪些topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
11.调用/apps/kafka/bin目录下kafka-console-producer.sh,来生产一些消息,producer也就是生产者。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sayaword
这里的localhost为Kafka的IP,9092为broker节点的端口。用户可以在console界面上,输入信息,交给producer进行处理,并发给consumer。
12.再令开启一个窗口,调用bin目录下kafka-console-consumer.sh,启动consumer,consumer作为消费者,用来消费数据。
cd /apps/kafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sayaword --from-beginning
kafka-console-consumer.sh依然需要加一些参数,比如ZooKeeper的IP及端口、主题名称、读取数据位置等。
13.在执行kafka-console-producer.sh命令的界面中,随便输入几行文字,按回车。可以看到在consumer端,会将同样的内容,输出出来。
producer端:
consumer端:
14.退出测试。
在kafka-console-consumer.sh、kafka-console-producer.sh及kafka-server-start.sh在命令行界面,执行Ctrl + c,分别退出consumer,producer及server。
切换目录到/apps/zookeeper/bin目录下,停止ZooKeeper。
cd /apps/zookeeper/bin
./zkServer.sh stop
上一篇: maven项目打war包
下一篇: 你真的懂函数吗?
推荐阅读
-
CuteFTP怎么用 CuteFTP安装教程及使用指南详细介绍
-
超级兔子安装失败怎么办?超级兔子常见问题及解决办法介绍
-
ELK6.x_Kafka 安装配置文档
-
componentone 2017怎么破解?componentone Studio 2017安装及破解激活图文详细教程
-
MSC Apex Grizzly 2017中文安装及破解详细教程(附破解文件下载)
-
MATLAB R2013b怎么激活?matlab r2013b破解安装及激活详细图文教程
-
LoadRunner 12怎么安装?LoadRunner12.02安装及汉化教程图解
-
flutter的环境安装配置问题及解决方法
-
Kangle(Web服务器)如何安装及防盗链功能设置
-
MySql安装及登录详解