flume安装并整合kafka
官方文档:http://flume.apache.org/FlumeUserGuide.html
参考图书:Flume 构建高可用、可拓展的海量日志采集系统
参考文档:http://www.aboutyun.com/forum.php?mod=viewthread&tid=20699
kafka集群部署:https://blog.51cto.com/13323775/2063420
flume
Flume agent之间的通信(参考图书)
flume内置了专门的RPC sink-source对来处理agent之间的数据传输。
source是负责接收数据到Flume Agent的组件。包括Avro Source、Thrift source 、HTTP Source、Spooling Directory Source、Syslog Source、Exec Source、JMS Source等。
channel是位于source和sink之间的缓冲区,是保证数据不丢失的关键。
sink从Channel中读取事件,每一个sink只能从一个Channel钟读取事件,必须给每一个sink配置Channel,否则会从agent中移除。
安装flume
下载安装
cd /data/
wget http://mirrors.hust.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
tar axf apache-flume-1.8.0-bin.tar.gz
cd apache-flume-1.8.0-bin
修改环境变量
vim /etc/profile
#FLUSM
export FLUME_HOME=/data/apache-flume-1.8.0-bin
export PATH=$PATH:${FLUME_HOME}/bin
export HADOOP_HOME=/data/hadoop
source /etc/profile
修改配置文件
cd ${FLUME_HOME}/conf/
cp flume-env.sh.template flume-env.sh
修改 flume-env.sh
export JAVA_HOME=/usr/local/jdk
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
export HADOOP_HOME=/data/hadoop
验证安装
flume-ng version
使用flume
单节点agent传输信息
cd ${FLUME_HOME}/conf/
添加配置文件
vim avro.conf
#Name the components on this agent
agent.sources = avroSrc
agent.channels = avroChannel
#Describe/configure the source
agent.sources.avroSrc.type = netcat
agent.sources.avroSrc.bind = localhost
agent.sources.avroSrc.port = 62000
#Describe the sink
agent.sinks.avroSink.type = logger
#Use a channel that buffers events in memory
agent.channels.avroChannel.type = memory
agent.channels.avroChannel.capacity = 1000
agent.channels.avroChannel.transactionCapacity = 100
#Bind the source and sink to the channel
agent.sinks = avroSink
agent.sources.avroSrc.channels = avroChannel
agent.sinks.avroSink.channel = avroChannel
“#测试agent.sources.avroSrc.type用avro,然后报错
#org.apache.avro.AvroRuntimeException: Excessively large list #allocation request detected: 1863125517 items! Connection #closed”
运行flume agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/avro.conf -n agent -Dflume.root.logger=INFO,console
使用Telnet连接测试
telnet localhost 6200
查看
exec监控本地文件
cd ${FLUME_HOME}/conf/
添加配置文件
vim exec.conf
#example.conf: A single-node Flume configuration
#Name the components on this agent
agentexec.sources = avroexec
agentexec.sinks = sinkexec
agentexec.channels = channelexec
#Describe/configure the sources
#Describe/configure the source
agentexec.sources.avroexec.bind = localhost
agentexec.sources.avroexec.port = 630000
agentexec.sources.avroexec.type = exec
agentexec.sources.avroexec.command = tail -F /tmp/testexec.log
#Describe the sink
agentexec.sinks.sinkexec.type = logger
#Use a channel which buffers events in memory
agentexec.channels.channelexec.type = memory
agentexec.channels.channelexec.capacity = 100000
agentexec.channels.channelexec.transactionCapacity = 10000
#Bind the source and sink to the channel
agentexec.sources.avroexec.channels = channelexec
agentexec.sinks.sinkexec.channel = channelexec
运行flume agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/exec.conf --name agentexec -Dflume.root.logger=INFO,console
测试
尴尬,只获取到了一部分(暂时没有占到解决方法)
spooldir整合kafka监控日志
前提:安装kafka集群
cd ${FLUME_HOME}/conf/
添加配置文件
vim single_agent.conf
#agent name a1
a1.sources = source1
a1.channels = channel1
a1.sinks = sink1
#set source
#“测试使用将数据放在了/tmp目录***意设置”
a1.sources.source1.type = spooldir
a1.sources.source1.spoolDir=/tmp/spooldir
a11.sources.source1.fileHeader = false
#set sink
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.topic= spooldir
#set channel
#“测试使用将数据放在了/tmp目录***意设置”
a1.channels.channel1.type = file
a1.channels.channel1.checkpointDir = /tmp/flume_data/checkpoint
a1.channels.channel1.dataDirs= /tmp/flume_data/data
#bind
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1
创建文件存放目录
mkdir -pv /tmp/spooldir
mkdir -pv /tmp/flume_data/checkpoint
mkdir -pv /tmp/flume_data/data
(所有节点)启动kafka集群
kafka-server-start.sh /data/kafka_2.11-1.0.0/config/server.properties
创建kafka的topic
kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic spooldir --replication-factor 1 --partitions 3
查看topic
kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
创建kafka的consumer
kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic spooldir --from-beginning
(新窗口)启动flume的agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console
写入测试
[aaa@qq.com conf]# echo "hello ,test flume spooldir source" >> /tmp/spooldir/spool.txt
flume-ng信息
kafka信息
将日志信息写入hbase
前提:安装hbase集群
cd ${FLUME_HOME}/conf/
mkdir hbase && cd hbase
添加配置文件,这里需要两个agent端
hbase-back.conf用于收集本地数据,hbase-front.conf用于将数据写入hbase
vim hbase-back.conf
agent.sources =backsrc
agent.channels=memoryChannel
agent.sinks =remotesink
#Describe the sources
agent.sources.backsrc.type = exec
agent.sources.backsrc.command = tail -F /tmp/test/data/data.txt
agent.sources.backsrc.checkperiodic = 1000
agent.sources.backsrc.channels=memoryChannel
#Describe the channels
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.keep-alive = 30
agent.channels.memoryChannel.capacity = 1000
agent.channels.memoryChannel.transactionCapacity = 1000
#Describe the sinks
agent.sinks.remotesink.type = avro
agent.sinks.remotesink.hostname = master
agent.sinks.remotesink.port = 9999
agent.sinks.remotesink.channel= memoryChannel
vim hbase-front.conf
agent.sources = frontsrc
agent.channels = memoryChannel
agent.sinks = fileSink
#Describe the sources
agent.sources.frontsrc.type = avro
agent.sources.frontsrc.bind = master
agent.sources.frontsrc.port = 9999
agent.sources.frontsrc.channels = memoryChannel
#Describe the channels
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.keep-alive = 30
agent.channels.memoryChannel.capacity = 1000
agent.channels.memoryChannel.transactionCapacity =1000
#Describe the sinks
agent.sinks.fileSink.type = hbase
agent.sinks.fileSink.channel=memoryChannel
agent.sinks.fileSink.table = access_log
agent.sinks.fileSink.columnFamily = t
agent.sinks.fileSink.batchSize= 50
agent.sinks.fileSink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent.sinks.fileSink.zookeeperQuorum = master:2181,slave1:2181,slave2:2181
agent.sinks.fileSink.znodeParent = /hbase
agent.sinks.fileSink.timeout = 90000
创建本地文件和目录
mkdir -pv /tmp/test/data && touch /tmp/test/data/data.txt
创建hbase中的表
hbase shell
创建表
create 'access_log','t'
查看
list
启动back agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/hbase/hbase-back.conf --name agent -Dflume.root.logger=INFO,console
启动后会报错
18/01/22 22:29:28 WARN sink.AbstractRpcSink: Unable to create Rpc client using hostname: 192.168.3.58, port: 9999
org.apache.flume.FlumeException: NettyAvroRpcClient { host: master, port: 9999 }: RPC connection error
这是因为avro连接没有完成,现在只启动了sink端,没有source端,等启动了front后就会显示连接上了
启动front agent
flume-ng agent -f /data/apache-flume-1.8.0-bin/conf/hbase/hbase-front.conf --name agent -Dflume.root.logger=INFO,console
向本地文件中追加内容,然后在hbase中查看
echo "hello ,test flush to hbase">>/tmp/test/data/data.txt
写入的过程中两个agent不会打印日志
查看hbase中的数据
hbase shell
scan "access_log"
flume向hbase中写入日志会有一定时间的延迟
将日志写入hadoop
原理和写入hbase一样,理解了hbase写入流程就很好理解写入其它服务了,详细配置参考官方文档。
前提:安装hadoop集群
cd ${FLUME_HOME}/conf/
mkdir hdfs && cd hdfs
添加配置文件,这里需要两个agent端
hadoop-back.conf用于收集本地数据,hadoop-front.conf用于将数据写入hadoop
vim hadoop-back.conf
#Namethe components
hadoop.sources= backsrc
hadoop.sinks= fileSink
hadoop.channels= memoryChannel
#Source
hadoop.sources.backsrc.type= spooldir
hadoop.sources.backsrc.spoolDir= /tmp/data/hadoop
hadoop.sources.backsrc.channels= memoryChannel
hadoop.sources.backsrc.fileHeader = true
#Channel
hadoop.channels.memoryChannel.type= memory
hadoop.channels.memoryChannel.keep-alive = 30
hadoop.channels.memoryChannel.capacity = 1000
hadoop.channels.memoryChannel.transactionCapacity = 1000
#Sink
hadoop.sinks.fileSink.type= avro
hadoop.sinks.fileSink.hostname= master
hadoop.sinks.fileSink.port= 10000
hadoop.sinks.fileSink.channel= memoryChannel
vim hadoop-front.conf
#Namethe components
hadoop.sources= frontsrc
hadoop.channels= memoryChannel
hadoop.sinks= remotesink
#Source
hadoop.sources.frontsrc.type= avro
hadoop.sources.frontsrc.bind= master
hadoop.sources.frontsrc.port= 10000
hadoop.sources.frontsrc.channels= memoryChannel
#Channel
hadoop.channels.memoryChannel.type= memory
hadoop.channels.memoryChannel.keep-alive = 30
hadoop.channels.memoryChannel.capacity = 1000
hadoop.channels.memoryChannel.transactionCapacity =1000
#Sink
hadoop.sinks.remotesink.type= hdfs
hadoop.sinks.remotesink.hdfs.path=hdfs://master/flume
hadoop.sinks.remotesink.hdfs.rollInterval = 0
hadoop.sinks.remotesink.hdfs.idleTimeout = 10000
hadoop.sinks.remotesink.hdfs.fileType= DataStream
hadoop.sinks.remotesink.hdfs.writeFormat= Text
hadoop.sinks.remotesink.hdfs.threadsPoolSize = 20
hadoop.sinks.remotesink.channel= memoryChannel
创建本地目录并修改权限
mkdir -pv /tmp/data/hadoop && chmod -R 777 /tmp/data/
创建hdfs中的目录并修改权限
hadoop fs -mkdir /flume
hadoop fs -chmod 777 /flume
hadoop fs -ls /
向本地目录中写入文件
echo "hello, test hadoop" >> /tmp/data/hadoop/hadoop.log
echo "hello, test flume" >> /tmp/data/hadoop/flume.log
echo "hello, test helloworld" >> /tmp/data/hadoop/helloworld.log
查看hdfs中的文件和文件信息
hadoop fs -ls /flume
hadoop fs -cat /flume/FlumeData.1516634328510.tmp
参考文档:
官方文档:http://flume.apache.org/FlumeUserGuide.html
图书:Flume 构建高可用、可拓展的海量日志采集系统
flume常见配置:http://blog.csdn.net/sang1203/article/details/51474628
flume安装与使用:http://www.aboutyun.com/forum.php?mod=viewthread&tid=20699
转载于:https://blog.51cto.com/13323775/2063751
推荐阅读
-
flume安装并整合kafka
-
SUSE12Sp3-使用Docker导入镜像并安装redis,zookeeper,kafka
-
整合Flume和Kafka完成实时数据采集
-
flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)
-
hadoop-ha+zookeeper+hbase+hive+sqoop+flume+kafka+spark集群安装
-
flume整合kafka
-
Spark Streaming实时流处理项目实战笔记——Kafka实战之整合Flume和Kafka完成实时数据采集
-
java,mysql,hadoop,cdh,hive,spark,ntp,zookeeper,kafka,storm,redis,flume,git 安装详解
-
java,mysql,hadoop,cdh,hive,spark,ntp,zookeeper,kafka,storm,redis,flume,git 安装详解
-
SUSE12Sp3-使用Docker导入镜像并安装redis,zookeeper,kafka