安装Kafka与flume联合使用
安装Kafka与flume联合使用
Kafka的作用:消峰(消息队列,先进先出)
scala写的消息队列,常用于日志。消息队列简单理解就是生产者把实时性不强的数据丢入队列,消费者从队列中取出并处理。比如秒杀时可以把非常多的请求写入队列,再依次取出。很多程序都需要写日志,可以先写入kafka,再依次写入数据库,可以提高主程序性能
和解耦
Kafka是linkedin开源的MQ系统,主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,0.8开始支持复制,不支持事务,适合产生大量数据的互联网服务的数据收集业务
使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。
安装kafka步骤:
//启动步骤
[aaa@qq.com bin]# kafka-server-start.sh /opt/bigdata/kafka010/config/server.properties
官网下载:kafka — 消息中间界
版本型号根据自己的zookeeper版本酌定。
把下载的文件托到虚拟机中
//解压
[aaa@qq.com ~]# tar -zxvf kafka_2.11-0.10.0.1
//移动到新目录
[aaa@qq.com ~]# mv kafka_2.11-0.10.0.1 bigdata/kafka010
[aaa@qq.com ~]# cd /opt/bigdata/kafka010/config/
[aaa@qq.com config]# ls
server.properties
//配置config/server.properties服务器
[aaa@qq.com config]# vi server.properties
//在此下面配置当前虚拟机的名字bigdata
############################# Socket Server Settings #############################
advertised.listeners=PLAINTEXT://bigdata:9092
//在此下面更改logs路径为opt目录下面,统一方便管理,防止丢失。
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/opt/kafka-logs
//在此下面配置zookeeper的IP地址
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
//如果将来有多个,则在后面依次添加用逗号分割。
zookeeper.connect=192.168.56.101:2181
//配置kafka的环境变量
[aaa@qq.com config]# cd ..
[aaa@qq.com kafka010]# ls
bin config libs LICENSE logs NOTICE site-docs
//复制kafka路径
[aaa@qq.com kafka010]# pwd
/opt/bigdata/kafka010
[aaa@qq.com kafka010]# vi /etc/profile
//CTRL+G到最底端。
export KAFKA_HOME=/opt/bigdata/kafka010
export PATH=$PATH:$KAFKA_HOME/bin
//**一下,每一次修改环境变量的时候都需要**一下
[aaa@qq.com kafka010]# source /etc/profile
//启动zeekeeper
[aaa@qq.com kafka010]# zkServer.sh start
//启动 26761 QuorumPeerMain
[aaa@qq.com kafka010]# jps
31637 Jps
8966 SecondaryNameNode
8809 DataNode
26761 QuorumPeerMain
9210 NodeManager
8684 NameNode
9116 ResourceManager
27502 Kafka
//要学会写脚本自动启动和无密登录
//新建一个kafka-log文件夹
[aaa@qq.com bin]# mkdir -p /opt/kafka-log
//启动步骤
[aaa@qq.com bin]# kafka-server-start.sh /opt/bigdata/kafka010/config/server.properties
//双击一个新窗口 ls 查看kafka的命令
[aaa@qq.com ~]# cd /opt/bigdata/kafka010/bin/
[aaa@qq.com bin]# ls
//环境测试--创建消息队列
例如:1.
kafka-topics.sh --create\
--zookeeper 你的zookeeper的IP:2181
--replication-factor 副本数\
--partitions 分区数
--topic 消息队列名
检查队列是否创建成功
2.
kafka-topics.sh --zooper 你的zookeeper的IP:2181 --list
//先查看队列是否创建成功,现在没事数据,但是还可以查看随时监控情况
[aaa@qq.com bin]# kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
//创建消息队列依照上面的例子,
[aaa@qq.com bin]# kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic mydemo --partitions 1 --replication-factor 1
//在此查看,下面创建成功 mydemo
[aaa@qq.com bin]# kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
mydemo
//下面往消息队列中塞数据
例如:3.向你的消息队列中生产消息
kafka-console-producer.sh --队列名\
--broker-list 你的kafka队列的机器IP:9092\
//--broker-list 表示kafka有多少个节点,有多个时,就写上所有集群的所有IP地址,用逗号隔开。
[aaa@qq.com bin]# kafka-console-producer.sh --topic mydemo --broker-list 127.0.0.1:9092
//自己输入一些简单的字符
hello,world
cm,world
nihao,world
//双击一个新的窗口
4.消费者消息
kafka-console-consumer.sh
--bootstrsp-server 你的kafka的IP:9092\
--topic 队列名
[aaa@qq.com ~]# kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --zookeeper 127.0.0.1:2181 --topic mydemo --from-beginning
hello,world
cm,world
nihao,world
小结:
如果停止右面的窗口,再打开,发现左面的进程还会再走一遍。
因为每一次走,都相当于新组。(在不同的组里面可以多次执行使用,如果是同一个组组中则不会出现重复读取。)
问题:怎么样将flume与kafka连接呢?
//启动flume
[aaa@qq.com bin]# ./flume-ng agent -c conf -f /opt/bigdata/flume160/conf/file.conf -n cm -Dflume.root.logger=DEBUG,console
进入flume:
//进入flume
[aaa@qq.com flume160]# cd conf/
[aaa@qq.com conf]# ls
control.conf
flume-conf.properties.template
flume-env.ps1.template
flume-env.sh
flume-env.sh.template
log4j.properties
//复制一份新的文件传输全路径
[aaa@qq.com conf]# cp control.conf file.conf
//进行修改,需要根据官网的Source进行修改,以便读取更大的文件。
[aaa@qq.com conf]# vi file.conf
cm.sources=s1
cm.sinks=k1
cm.channels=c1
cm.sources.s1.type=spooldir
cm.sources.s1.spoolDir=/opt/data/users
cm.sources.s1.deserializer.maxLineLength=100000
cm.sources.s1.batchSize=500
cm.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
cm.sinks.k1.kafka.topic=user_friends
cm.sinks.k1.kafka.bootstrap.servers=127.0.0.1:9092
cm.sinks.k1.kafka.flumeBatchSize=500
cm.channels.c1.type=file
cm.channels.c1.checkpointDir=/opt/db/usck
cm.channels.c1.dataDir=/opt/db/usdp
cm.sources.s1.channels=c1
cm.sinks.k1.channel=c1
~
//新建上面已经设置好的文件夹
[aaa@qq.com ~]# cd /opt/
[aaa@qq.com opt]# mkdir -p /opt/data/users
[aaa@qq.com opt]# mkdir -p /opt/db/usck
[aaa@qq.com opt]# mkdir -p /opt/db/usdp
//给访问权限
[aaa@qq.com opt]# chmod 777 -R /opt/data/
[aaa@qq.com opt]# chmod 777 -R /opt/db
//新建kafka消息列(高版本自己可以创建)
[aaa@qq.com opt]# kafka-topics.sh --create --topic user_friends --zookeeper 127.0.0.1:2181 --partitions 1 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "user_friends".
//查看是否建立成功
[aaa@qq.com opt]# kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
mydemo
user_friends
//监控kafka
[aaa@qq.com opt]# kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --bootstrap-server 127.0.0.1:9092 --topic user_friends.csv
//启动flume
[aaa@qq.com bin]# ./flume-ng agent -c conf -f /opt/bigdata/flume160/conf/file.conf -n cm -Dflume.root.logger=DEBUG,console
[aaa@qq.com ~]# ls
a chenming.jar json.txt phone_data.txt
anaconda-ks.cfg chmod k run.sh
ashgk dept.txt mydemo.sql stu.jar
a.txt emp.txt mywc-1.0-SNAPSHOT.jar test
b jdak181 opt user_friends.csv
c jdk orders.csv zookeeper.out
[aaa@qq.com ~]# cd /opt
[aaa@qq.com opt]# ls *.csv
user_friends.csv
[aaa@qq.com opt]# install -m 777 user_friends.csv /opt/data/users
[aaa@qq.com opt]# ls /opt/data/users
user_friends.csv.COMPLETED
[aaa@qq.com opt]# ll
total 2001680
打开flume官网:
//提前查看表中数据
[aaa@qq.com ~]# cd /root
//查看最短的长度
[aaa@qq.com ~]# wc -l user_friends.csv
38203 user_friends.csv
//查看最长的长度
[aaa@qq.com ~]# wc -L user_friends.csv
53295 user_friends.csv
//查看具体数据
[aaa@qq.com ~]# head user_friends.csv
//查看首行数据
[aaa@qq.com ~]# head -n 1 user_friends.csv
首先,我想把数据文件导入HBase中(使用Java文件读取),但是我发现文件太大,宕机了。所以就要把使文件导入数据仓库里面(数据壶),(HBase可以支持大数据量的随机读写数据,但是导入的时候出现问题,Sqoop也只能导入离线型数据,无法导入实时数据),需要自身监控,则使用flume可以解决一部分,但是如果我要是flume集群的数据的话,也无法解决此问题,此时就需要kafka,因为kafka输出端可以限制流量的,这时HBase就有时间把数据读出来了,不会因为大数据量的吞吐量导致自动上锁,HBase第二个功能就是可以去重(行间一样的时候,就可以覆盖)
数据清洗:可以使用HBase,Hive,Spark等等
//千万不要写错,详情请查看官网具体详情。
cm.sources=s1
cm.sinks=k1
cm.channels=c1
cm.sources.s1.type=spooldir
cm.sources.s1.spoolDir=/opt/data/users
cm.sources.s1.deserializer.maxLineLength=100000
cm.sources.s1.batchSize=500
cm.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
cm.sinks.k1.kafka.topic=user_friends
cm.sinks.k1.kafka.bootstrap.servers=127.0.0.1:9092
cm.sinks.k1.kafka.flumeBatchSize=500
cm.channels.c1.type=file
cm.channels.c1.checkpointDir=/opt/db/usck
cm.channels.c1.dataDir=/opt/db/usdp
cm.sources.s1.channels=c1
cm.sinks.k1.channel=c1
~ :wq
多加练习,熟练掌握。
上一篇: ROS:kinect2的安装与使用
下一篇: Kinect 使用学习 (一)