Kafka入门宝典(详细截图版)
1、了解 apache kafka
1.1、简介
官网:http://kafka.apache.org/
- apache kafka 是一个开源消息系统,由scala 写成。是由apache 软件基金会开发的一个开源消息系统项目。
- kafka 最初是由linkedin 开发,并于2011 年初开源。2012 年10 月从apache incubator 毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待(低延时)的平台。
- kafka 是一个分布式消息系统:具有生产者、消费者的功能。它提供了类似于jms 的特性,但是在设计实现上完全不同,此外它并不是jms 规范的实现。【重点】
1.2、kafka的基本结构
producer:消息的发送者
consumer:消息的接收者
kafka cluster:kafka的集群。
topic:就是消息类别名,一个topic中通常放置一类消息。每个topic都有一个或者多个订阅者(消费者)。
消息的生产者将消息推送到kafka集群,消息的消费者从kafka集群中拉取消息。
1.3、kafka的完整架构
说明:
- broker:集群中的每一个kafka实例,称之为broker;
- zookeeper:kafka 利用zookeeper 保存相应元数据信息, kafka 元数据信息包括如代理节点信息、kafka集群信息、旧版消费者信息及其消费偏移量信息、主题信息、分区状态信息、分区副本分配方案信息、动态配置信息等。
- consumergroup:在kafka 中每一个消费者都属于一个特定消费组( consumergroup ),我们可以为每个消费者指定一个消费组,以groupld 代表消费组名称,通过group.id 配置设置。如果不指定消费组,则该消费者属于默认消费组test-consumer-group 。
1.4、kafka的特性
- 消息持久化
- kafka 基于文件系统来存储和缓存消息。
- 高吞吐量
- kafka 将数据写到磁盘,充分利用磁盘的顺序读写。同时, kafka 在数据写入及数据同步采用了零拷贝( zero-copy )技术,采用sendfile()函数调用,sendfile()函数是在两个文件描述符之间直接传递数据,完全在内核中操作,从而避免了内核缓冲区与用户缓冲区之间数据的拷贝,操作效率极高。
- kafka 还支持数据压缩及批量发送,同时kafka 将每个主题划分为多个分区,这一系列的优化及实现方法使得kafka 具有很高的吞吐量。经大多数公司对kafka 应用的验证, kafka 支持每秒数百万级别的消息。
- 高扩展性
- kafka 依赖zookeeper来对集群进行协调管理,这样使得kafka 更加容易进行水平扩展,生产者、消费者和代理都为分布式,可配置多个。
- 同时在机器扩展时无需将整个集群停机,集群能够自动感知,重新进行负责均衡及数据复制。
- 多客户端支持
- kafka 核心模块用scala 语言开发,kafka 提供了多种开发语言的接入,如java 、scala、c 、c++、python 、go 、erlang 、ruby 、node. 等。
- 安全机制
- kafka 支持以下几种安全措施:
- 通过ssl 和sasl(kerberos), sasl/pla时验证机制支持生产者、消费者与broker连接时的身份认证;
- 支持代理与zookeeper 连接身份验证;
- 通信时数据加密;
- 客户端读、写权限认证;
- kafka 支持与外部其他认证授权服务的集成;
- kafka 支持以下几种安全措施:
- 数据备份
- kafka 可以为每个topic指定副本数,对数据进行持久化备份,这可以一定程度上防止数据丢失,提高可用性。
- 轻量级
- kafka 的实例是无状态的,即broker不记录消息是否被消费,消费偏移量的管理交由消费者自己或组协调器来维护。
- 同时集群本身几乎不需要生产者和消费者的状态信息,这就使得kafka非常轻量级,同时生产者和消费者客户端实现也非常轻量级。
- 消息压缩
- kafka 支持gzip, snappy 、lz4 这3 种压缩方式,通常把多条消息放在一起组成messageset,然后再把message set 放到一条消息里面去,从而提高压缩比率进而提高吞吐量。
1.5、kafka的应用场景
- 消息系统。
- kafka 作为一款优秀的消息系统,具有高吞吐量、内置的分区、备份冗余分布式等特点,为大规模消息处理提供了一种很好的解决方案。
- 应用监控。
- 利用kafka 采集应用程序和服务器健康相关的指标,如cpu 占用率、io 、内存、连接数、tps 、qps 等,然后将指标信息进行处理,从而构建一个具有监控仪表盘、曲线图等可视化监控系统。例如,很多公司采用kafka 与elk (elastic search 、logstash 和kibana)整合构建应用服务监控系统。
- 网站用户行为追踪。
- 为了更好地了解用户行为、操作习惯,改善用户体验,进而对产品升级改进,将用户操作轨迹、内容等信息发送到kafka 集群上,通过hadoop 、spark 或strom等进行数据分析处理,生成相应的统计报告,为推荐系统推荐对象建模提供数据源,进而为每个用户进行个性化推荐。
- 流处理。
- 需要将己收集的流数据提供给其他流式计算框架进行处理,用kafka 收集流数据是一个不错的选择。
- 持久性日志。
- kafka 可以为外部系统提供一种持久性日志的分布式系统。日志可以在多个节点间进行备份, kafka 为故障节点数据恢复提供了一种重新同步的机制。同时, kafka很方便与hdfs 和flume 进行整合,这样就方便将kafka 采集的数据持久化到其他外部系统。
2、kafka的安装与配置
准备三台虚拟机,分别是node01,node02,node03,并且修改hosts文件如下:
vim /etc/hosts #注意: 前面的ip地址改成自己的ip地址 192.168.40.133 node01 192.168.40.134 node02 192.168.40.135 node03 #3台服务器的时间要一致 #时间更新: yum install -y rdate rdate -s time-b.nist.gov
2.1、基础环境配置
2.1.1、jdk环境
由于kafka 是用scala 语言开发的,运行在jvm上,因此在安装kafka 之前需要先安装jdk 。
安装过程略过,我这里使用的是jdk1.8。
2.1.2、zookeeper环境
2.1.2.1、安装zookeeper
kafka 依赖zookeeper ,通过zookeeper 来对服务节点、消费者上下线管理、集群、分区元数据管理等,因此zookeeper 也是kafka 得以运行的基础环境之一。
#上传zookeeper-3.4.9.tar.gz到/export/software cd /export/software mkdir -p /export/servers/ tar -xvf zookeeper-3.4.9.tar.gz -c /export/servers/ #创建zookeeper的data目录 mkdir /export/data/zookeeper -p cd /export/servers/zookeeper-3.4.9/conf/ #修改配置文件 mv zoo_sample.cfg zoo.cfg vim zoo.cfg #设置data目录 datadir=/export/data/zookeeper #启动zookeeper ./zkserver.sh start #检查是否启动成功 jps
2.1.2.3、搭建zookeeper集群
#在/export/data/zookeeper目录中创建myid文件 vim /export/data/zookeeper/myid #写入对应的节点的id,如:1,2等,保存退出 #在conf下,修改zoo.cfg文件 vim zoo.cfg #添加如下内容 server.1=node01:2888:3888 server.2=node02:2888:3888 server.3=node03:2888:3888
2.1.2.3、配置环境变量
vim /etc/profile export zk_home=/export/servers/zookeeper-3.4.9 export path=${zk_home}/bin:$path #立即生效 source /etc/profile
2.1.2.4、分发到其它机器
scp /etc/profile node02:/etc/ scp /etc/profile node03:/etc/ cd /export/servers scp -r zookeeper-3.4.9 node02:/export/servers/ scp -r zookeeper-3.4.9 node03:/export/servers/
2.1.2.5、一键启动、停止脚本
mkdir -p /export/servers/onekey/zk vim slave #输入如下内容 node01 node02 node03 #保存退出 vim startzk.sh #输入如下内容 cat /export/servers/onekey/zk/slave | while read line do { echo "开始启动 --> "$line ssh $line "source /etc/profile;nohup sh ${zk_home}/bin/zkserver.sh start >/dev/null 2>&1 &" }& wait done echo "★★★启动完成★★★" #保存退出 vim stopzk.sh #输入如下内容 cat /export/servers/onekey/zk/slave | while read line do { echo "开始停止 --> "$line ssh $line "source /etc/profile;nohup sh ${zk_home}/bin/zkserver.sh stop >/dev/null 2>&1 &" }& wait done echo "★★★停止完成★★★" #保存退出 #设置可执行权限 chmod +x startzk.sh stopzk.sh #添加到环境变量中 export zk_onekey=/export/servers/onekey export path=${zk_onekey}/zk:$path
2.1.2.6、检查启动是否成功
发现三台机器都有“quorumpeermain”进程,说明机器已经启动成功了。
检查集群是否正常:
zkserver.sh status
发现,集群运行一切正常。
2.2、安装kafka
2.2.1、单机版kafka安装
第一步:上传kafka安装包并且解压
rz 上传kafka_2.11-1.1.0.tgz到 /export/software/ cd /export/software/ tar -xvf kafka_2.11-1.1.0.tgz -c /export/servers/ cd /export/servers mv kafka_2.11-1.1.0/ kafka
第二步:配置环境变量
vim /etc/profile #输入如下内容 export kafka_home=/export/servers/kafka export path=${kafka_home}/bin:$path #保存退出 source /etc/profile
第三步:修改配置文件
cd /export/servers/kafka cd config vim server.properties # the id of the broker. this must be set to a unique integer for each broker. # 必须要只要一个brokerid,并且它必须是唯一的。 broker.id=0 # a comma separated list of directories under which to store log files # 日志数据文件存储的路径 (如不存在,需要手动创建该目录, mkdir -p /export/data/kafka/) log.dirs=/export/data/kafka # zookeeper的配置,本地模式下指向到本地的zookeeper服务即可 zookeeper.connect=node01:2181 # 保存退出
第四步:启动kafka服务
# 以守护进程的方式启动kafka kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties
第五步:检测kafka是否启动
如果进程中有名为kafka的进程,就说明kafka已经启动了。
2.2.2、验证kafka是否安装成功
由于kafka是将元数据保存在zookeeper中的,所以,可以通过查看zookeeper中的信息进行验证kafka是否安装成功。
2.2.3、部署kafka-manager
kafka manager 由 yahoo 公司开发,该工具可以方便查看集群 主题分布情况,同时支持对 多个集群的管理、分区平衡以及创建主题等操作。
源码托管于github:https://github.com/yahoo/kafka-manager
第一步:上传kafka-manager安装包并且解压
rz上传kafka-manager-1.3.3.17.tar.gz到 /export/software/ cd /export/software tar -xvf kafka-manager-1.3.3.17.tar.gz -c /export/servers/ cd /export/servers/kafka-manager-1.3.3.17/conf
第二步:修改配置文件
#修改配置文件 vim application.conf #新增项,http访问服务的端口 http.port=19000 #修改成自己的zk机器地址和端口 kafka-manager.zkhosts="node01:2181" #保存退出
第三步:启动服务
cd /export/servers/kafka-manager-1.3.3.17/bin #启动服务 ./kafka-manager -dconfig.file=../conf/application.conf #制作启动脚本 vim /etc/profile export kafka_manage_home=/export/servers/kafka-manager-1.3.3.17 export path=${kafka_manage_home}/bin:$path source /etc/profile cd /export/servers/onekey/ mkdir kafka-manager cd kafka-manager vim start-kafka-manager.sh nohup kafka-manager -dconfig.file=${kafka_manage_home}/conf/application.conf >/dev/null 2>&1 & chmod +x start-kafka-manager.sh vim /etc/profile export path=${zk_onekey}/kafka-manager:$path source /etc/profile
第四步:检查是否启动成功
打开浏览器,输入地址:http://node01:19000/,即可看到kafka-manage管理界面。
2.2.4、kafka-manager的使用
进入管理界面,是没有显示cluster信息的,需要添加后才能操作。
- 添加 cluster:
输入cluster name、zookeeper信息、以及kafka的版本信息(这里最高只能选择1.0.0)。
点击save按钮保存。
添加成功。
- 查看kafka的信息
- 查看broker信息
- 查看topic列表
- 查看单个topic信息以及操作
- 优化副本选举
- 查看消费者信息
2.2.5、搭建kafka集群
kafka集群的搭建是非常简单的,只需要将上面的单机版的kafka分发的其他机器,并且将zookeeper信息修改成集群的配置以及设置不同的broker值即可。
第一步:将kafka分发到node02、node03
cd /export/servers/ scp -r kafka node02:/export/servers/ scp -r kafka node03:/export/servers/ scp /etc/profile node02:/etc/ scp /etc/profile node03:/etc/ # 分别到node02、node03机器上执行 source /etc/profile
第二步:修改node01、node02、node03上的kafka配置文件
-
node01:
cd /export/servers/kafka/config vim server.properties zookeeper.connect=node01:2181,node02:2181,node03:2181
-
node02:
cd /export/servers/kafka/config vim server.properties broker.id=1 zookeeper.connect=node01:2181,node02:2181,node03:2181
-
node03:
cd /export/servers/kafka/config vim server.properties broker.id=2 zookeeper.connect=node01:2181,node02:2181,node03:2181
第三步:编写一键启动、停止脚本。注意:该脚本依赖于环境变量中的kafka_home。
mkdir -p /export/servers/onekey/kafka vim slave #输入如下内容 node01 node02 node03 #保存退出 vim start-kafka.sh #输入如下内容 cat /export/servers/onekey/kafka/slave | while read line do { echo "开始启动 --> "$line ssh $line "source /etc/profile;nohup sh ${kafka_home}/bin/kafka-server-start.sh -daemon ${kafka_home}/config/server.properties >/dev/null 2>&1 &" }& wait done echo "★★★启动完成★★★" #保存退出 chmod +x start-kafka.sh vim stop-kafka.sh #输入如下内容 cat /export/servers/onekey/kafka/slave | while read line do { echo "开始停止 --> "$line ssh $line "source /etc/profile;nohup sh ${kafka_home}/bin/kafka-server-stop.sh >/dev/null 2>&1 &" }& wait done echo "★★★停止完成★★★" #保存退出 chmod +x stop-kafka.sh #加入到环境变量中 export path=${zk_onekey}/kafka:$path source /etc/profile
第四步:通过kafka-manager管理工具查看集群信息。
由此可见,kafka集群已经启动完成。
3、kafka快速入门
对kafka的操作有2种方式,一种是通过命令行方式,一种是通过api方式。
3.1、通过命令行kafka
kafka在bin目录下提供了shell脚本文件,可以对kafka进行操作,分别是:
通过命令行的方式,我们将体验下kafka,以便我们对kafka有进一步的认知。
3.1.1、topic的操作
3.1.1.1、创建topic
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic #执行结果: created topic "my-kafka-topic".
参数说明:
- zookeeper:参数是必传参数,用于配置 kafka 集群与 zookeeper 连接地址。至少写一个。
- partitions:参数用于设置主题分区数,该配置为必传参数。
- replication-factor:参数用来设置主题副本数 ,该配置也是必传参数。
- topic:指定topic的名称。
3.1.1.2、查看topic列表
kafka-topics.sh --list --zookeeper node01:2181 __consumer_offsets my-kafka-topic
可以查看列表。
如果需要查看topic的详细信息,需要使用describe命令。
kafka-topics.sh --describe --zookeeper node01:2181 --topic test-topic #若不指定topic,则查看所有topic的信息 kafka-topics.sh --describe --zookeeper node01:2181
3.1.1.3、删除topic
通过kafka-topics.sh执行删除动作,需要在server.properties文件中配置 delete.topic.enable=true,该配置默认为 false。
否则执行该脚本并未真正删除主题 ,将该topic标记为删除状态 。
kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic # 执行如下 [root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic topic my-kafka-topic is marked for deletion. note: this will have no impact if delete.topic.enable is not set to true. # 如果将delete.topic.enable=true [root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic2 topic my-kafka-topic2 is marked for deletion. note: this will have no impact if delete.topic.enable is not set to true. # 说明:虽然设置后,删除时依然提示没有设置为true,实际上已经删除了。
3.1.2、生产者的操作
kafka-console-producer.sh --broker-list node01:9092 --topic my-kafka-topic
可以看到,已经向topic发送了消息。
3.1.3、消费者的操作
kafka-console-consumer.sh --bootstrap-server node01:9092 --topic my-kafka-topic # 通过以上命令,可以看到消费者可以接收生产者发送的消息 # 如果需要从头开始接收数据,需要添加--from-beginning参数 kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic my-kafka-topic
3.2、通过java api操作kafka
除了通过命令行的方式操作kafka外,还可以通过java api的方式操作,这种方式将更加的常用。
3.2.1、创建工程
导入依赖:
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactid>itcast-bigdata</artifactid> <groupid>cn.itcast.bigdata</groupid> <version>1.0.0-snapshot</version> </parent> <modelversion>4.0.0</modelversion> <artifactid>itcast-bigdata-kafka</artifactid> <dependencies> <dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka_2.11</artifactid> <version>1.1.0</version> </dependency> <dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka-clients</artifactid> <version>1.1.0</version> </dependency> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>4.12</version> </dependency> </dependencies> <build> <plugins> <!-- java编译插件 --> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-compiler-plugin</artifactid> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> </configuration> </plugin> </plugins> </build> </project>
3.2.2、topic的操作
由于主题的元数据信息是注册在 zookeeper 相 应节点之中,所以对主题的操作实质是对 zookeeper 中记录主题元数据信息相关路径的操作。 kafka将对 zookeeper 的相关操作封装成一 个 zkutils 类 , 井封装了一个adrninutils 类调用 zkclient 类的相关方法以实现对 kafka 元数据 的操作,包括对主题、代理、消费者等相关元数据的操作。对主题操作的相关 api调用较简单, 相应操作都是通过调用 adminutils类的相应方法来完成的。
package cn.itcast.kafka; import kafka.admin.adminutils; import kafka.utils.zkutils; import org.apache.kafka.common.security.jaasutils; import org.junit.test; import java.util.properties; public class testkafkatopic { @test public void testcreatetopic() { zkutils zkutils = null; try { //参数:zookeeper的地址,session超时时间,连接超时时间,是否启用zookeeper安全机制 zkutils = zkutils.apply("node01:2181", 30000, 3000, jaasutils.iszksecurityenabled()); string topicname = "my-kafka-topic-test1"; if (!adminutils.topicexists(zkutils, topicname)) { //参数:zkutils,topic名称,partition数量,副本数量,参数,机架感知模式 adminutils.createtopic(zkutils, topicname, 1, 1, new properties(), adminutils.createtopic$default$6()); system.out.println(topicname + " 创建成功!"); } else { system.out.println(topicname + " 已存在!"); } } finally { if (null != zkutils) { zkutils.close(); } } } }
测试结果:
3.2.2.1、删除topic
@test public void testdeletetopic() { zkutils zkutils = null; try { //参数:zookeeper的地址,session超时时间,连接超时时间,是否启用zookeeper安全机制 zkutils = zkutils.apply("node01:2181", 30000, 3000, jaasutils.iszksecurityenabled()); string topicname = "my-kafka-topic-test1"; if (adminutils.topicexists(zkutils, topicname)) { //参数:zkutils,topic名称 adminutils.deletetopic(zkutils, topicname); system.out.println(topicname + " 删除成功!"); } else { system.out.println(topicname + " 不已存在!"); } } finally { if (null != zkutils) { zkutils.close(); } } }
测试结果:
3.2.3、生产者的操作
package cn.itcast.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.stringserializer; import org.junit.test; import java.util.properties; public class testproducer { @test public void testproducer() throws interruptedexception { properties config = new properties(); // 设置kafka服务列表,多个用逗号分隔 config.setproperty(producerconfig.bootstrap_servers_config, "node01:9092,node02:9092"); // 设置序列化消息 key 的类 config.setproperty(producerconfig.key_serializer_class_config, stringserializer.class.getname()); // 设置序列化消息 value 的类 config.setproperty(producerconfig.value_serializer_class_config, stringserializer.class.getname()); // 初始化 kafkaproducer<string, string> kafkaproducer = new kafkaproducer<string, string>(config); for (int i = 0; i < 100 ; i++) { producerrecord record = new producerrecord("my-kafka-topic","data-" + i); // 发送消息 kafkaproducer.send(record); system.out.println("发送消息 --> " + i); thread.sleep(100); } kafkaproducer.close(); } }
3.2.4、消费者的操作
package cn.itcast.kafka; import org.apache.kafka.clients.consumer.consumerconfig; import org.apache.kafka.clients.consumer.consumerrecord; import org.apache.kafka.clients.consumer.consumerrecords; import org.apache.kafka.clients.consumer.kafkaconsumer; import org.apache.kafka.clients.producer.producerconfig; import org.apache.kafka.common.serialization.stringdeserializer; import org.apache.kafka.common.serialization.stringserializer; import org.junit.test; import javax.sound.midi.soundbank; import java.util.arrays; import java.util.properties; public class testconsumer { @test public void testconsumer() { properties config = new properties(); // 设置kafka服务列表,多个用逗号分隔 config.setproperty(consumerconfig.bootstrap_servers_config, "node01:9092,node02:9092"); // 设置消费者分组id config.setproperty(consumerconfig.group_id_config, "my-group"); // 设置序反列化消息 key 的类 config.setproperty(consumerconfig.key_deserializer_class_config, stringdeserializer.class.getname()); // 设置序反列化消息 value 的类 config.setproperty(consumerconfig.value_deserializer_class_config, stringdeserializer.class.getname()); kafkaconsumer<string, string> kafkaconsumer = new kafkaconsumer<string, string>(config); // 订阅topic kafkaconsumer.subscribe(arrays.aslist("my-kafka-topic")); while (true) { // 使用死循环不断的拉取数据 consumerrecords<string, string> records = kafkaconsumer.poll(1000); for (consumerrecord<string, string> record : records) { string value = record.value(); long offset = record.offset(); system.out.println("value = " + value + ", offset = " + offset); } } } }
什么是kafka?
kafka监控工具汇总
kafka快速入门
kafka核心之consumer
kafka核心之producer
替代flume——kafka connect简介
最简单流处理引擎——kafka streams简介
更多实时计算,flink,kafka等相关技术博文,欢迎关注实时流式计算