【Flume】文件收集框架Flume
1、Flume架构
Flume是Cloudera 开发的框架,用于从文件中实时收集数据。
角色 | 描述 | 常用形式 |
---|---|---|
Source | Source用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel | Avro Source、Exec Source、Spooling Directory Source、Kafka Source、Netcat Source、Syslog Sources、HTTP Source |
Channel | 连接 sources 和 sinks ,这个有点像一个队列 | Memory Channel、JDBC Channel、Kafka Channel、File Channel |
Sink | 从Channel收集数据,将数据写到目标源,可以是下一个Source也可以是HDFS或者HBase | HDFS Sink、Hive Sink、HBase Sinks、MorphlineSolrSink、ElasticSearchSink、Kafka Sink |
Events:Event是Flume数据传输的基本单元,Flume以事件的形式将数据从源头传送到最终的目的。Event由可选的header和载有数据的一个byte array构成,载有的数据对flume是不透明的,Header是容纳了key-value字符串对的无序集合,key在集合内是唯一的,Header可以在上下文中使用扩展。
Source、Channel、Sink、Event的执行流程如下:
2、Flume的安装
(1)Flume的运行环境:
1)运行在logs收集的地方,将生成的logs在Source中收集起来;
2)系统:只支持Linux;
3)需要JVM/JDK环境;
4)是一种轻量级服务,其他的轻量级服务还有:zookeeper,journalnode,zkfc,sqoop等。
(2)Flume的安装:
1)上传下载好的flume压缩包到Linux系统中,并为压缩包赋予执行权限:$ chmod u+x flume-ng-1.5.0-cdh5.3.6.tar.gz
2)解压flume安装包:$ tar -zxf flume-ng-1.5.0-cdh5.3.6.tar.gz -C /opt/cdh-5.3.6/
3)将flume文件夹改名:cdh-5.3.6]$ mv apache-flume-1.5.0-cdh5.3.6-bin/ flume-1.5.0-cdh5.3.6
4)修改配置文件/opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/conf/flume-env.sh,指定JAVA_HOME。flume-1.5.0-cdh5.3.6]$ cd conf conf]$ cp flume-env.sh.template flume-env.sh
export JAVA_HOME=/opt/modules/jdk1.7.0_67
5)验证安装是否成功:flume-1.5.0-cdh5.3.6]$ bin/flume-ng version
出现如下结果说明安装成功:
Flume 1.5.0-cdh5.3.6
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: e97c9912e8b940cf493c2392d2b113b97194cffb
Compiled by jenkins on Tue Jul 28 15:21:40 PDT 2015
From source with checksum e4d03999f62abb1c9e9f34054fe59f06
3、Flume命令
[aaa@qq.com flume-1.5.0-cdh5.3.6]$ bin/flume-ng
Usage: bin/flume-ng <command> [options]...
commands:
agent run a Flume agent
global options:
--conf,-c <conf> use configs in <conf> directory
-Dproperty=value sets a Java system property value
agent options:
--name,-n <name> the name of this agent (required)
--conf-file,-f <file> specify a config file (required if -z missing)
4、Flume使用示例一
Source为Telnet输入,Channel为内存,Sink存储为日志,debug信息输出到屏幕上。
1)配置文件a1.conf:
# define agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# define sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-senior.ibeifeng.com
a1.sources.r1.port = 44444
# define sinks
a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 1024
# define channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# define bindings
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2)运行命令:
bin/flume-ng agent \
-c conf \
-n a1 \
-f conf/a1.conf \
-Dflume.root.logger=DEBUG,console
5、Flume使用示例二
Source为执行命令,Channel为内存,Sink存储在hdfs上,debug信息输出到屏幕上。
1)配置文件flume-tail.conf:
# define agent
a2.sources = r2
a2.channels = c2
a2.sinks = k2
# define sources
a2.sources.r2.type = exec
a2.sources.r2.command = tail -f /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# define sinks
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/flume/hive-logs/
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.writeFormat = Text
a2.sinks.k2.hdfs.batchSize = 10
# define channels
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# define bindings
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
2)运行命令:
bin/flume-ng agent \
-c conf \
-n a2 \
-f conf/flume-tail.conf \
-Dflume.root.logger=DEBUG,console
6、Flume使用示例三
Source为Spooling,Channel为file channel,Sink存储到hdfs上,debug信息输出到屏幕上。选择非.log结尾的文件传输,传输后原文件加上后缀.delete。Sink在hdfs上根据当前日期创建文件夹,输出文件结果。
1)配置文件flume-app.conf:
# define agent
a3.sources = r3
a3.channels = c3
a3.sinks = k3
# define sources
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/spoollogs
a3.sources.r3.ignorePattern = ^(.)*\\.log$
a3.sources.r3.fileSuffix = .delete
# define sinks
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/flume/splogs/%Y%m%d
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.writeFormat = Text
a3.sinks.k3.hdfs.batchSize = 10
# define channels
a3.channels.c3.type = file
a3.channels.c3.checkpointDir = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/filechannel/checkpoint
a3.channels.c3.dataDirs = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/filechannel/data
# define bindings
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
其中,spoolDir为监控的日志目录,抽取完整的日志文件,写的日志文件不抽取; 使用FileChannel,本地文件系统缓冲,比内存安全性更高;数据存储在HDFS上,存储对应hive表的目录或者hdfs目录。
2)运行命令:
bin/flume-ng agent \
-c conf \
-n a3 \
-f conf/flume-app.conf \
-Dflume.root.logger=DEBUG,console
7、Flume架构的使用场景
上一篇: 三国美男周瑜的四种死法!哪种最靠谱?
下一篇: API网关在微服务中的应用