欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【Flume】文件收集框架Flume

程序员文章站 2022-06-14 14:39:38
...

1、Flume架构

Flume是Cloudera 开发的框架,用于从文件中实时收集数据。
【Flume】文件收集框架Flume

角色 描述 常用形式
Source Source用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel Avro Source、Exec Source、Spooling Directory Source、Kafka Source、Netcat Source、Syslog Sources、HTTP Source
Channel 连接 sources 和 sinks ,这个有点像一个队列 Memory Channel、JDBC Channel、Kafka Channel、File Channel
Sink 从Channel收集数据,将数据写到目标源,可以是下一个Source也可以是HDFS或者HBase HDFS Sink、Hive Sink、HBase Sinks、MorphlineSolrSink、ElasticSearchSink、Kafka Sink

Events:Event是Flume数据传输的基本单元,Flume以事件的形式将数据从源头传送到最终的目的。Event由可选的header和载有数据的一个byte array构成,载有的数据对flume是不透明的,Header是容纳了key-value字符串对的无序集合,key在集合内是唯一的,Header可以在上下文中使用扩展。

Source、Channel、Sink、Event的执行流程如下:
【Flume】文件收集框架Flume

2、Flume的安装

(1)Flume的运行环境:
1)运行在logs收集的地方,将生成的logs在Source中收集起来;
2)系统:只支持Linux;
3)需要JVM/JDK环境;
4)是一种轻量级服务,其他的轻量级服务还有:zookeeper,journalnode,zkfc,sqoop等。
(2)Flume的安装:
1)上传下载好的flume压缩包到Linux系统中,并为压缩包赋予执行权限:
$ chmod u+x flume-ng-1.5.0-cdh5.3.6.tar.gz
2)解压flume安装包:
$ tar -zxf flume-ng-1.5.0-cdh5.3.6.tar.gz -C /opt/cdh-5.3.6/
3)将flume文件夹改名:
cdh-5.3.6]$ mv apache-flume-1.5.0-cdh5.3.6-bin/ flume-1.5.0-cdh5.3.6
4)修改配置文件/opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/conf/flume-env.sh,指定JAVA_HOME。
flume-1.5.0-cdh5.3.6]$ cd conf conf]$ cp flume-env.sh.template flume-env.sh

export JAVA_HOME=/opt/modules/jdk1.7.0_67

5)验证安装是否成功:
flume-1.5.0-cdh5.3.6]$ bin/flume-ng version
出现如下结果说明安装成功:

Flume 1.5.0-cdh5.3.6
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: e97c9912e8b940cf493c2392d2b113b97194cffb
Compiled by jenkins on Tue Jul 28 15:21:40 PDT 2015
From source with checksum e4d03999f62abb1c9e9f34054fe59f06

3、Flume命令

[aaa@qq.com flume-1.5.0-cdh5.3.6]$ bin/flume-ng 
Usage: bin/flume-ng <command> [options]...

commands:
  agent                     run a Flume agent

global options:
  --conf,-c <conf>          use configs in <conf> directory
  -Dproperty=value          sets a Java system property value

agent options:
  --name,-n <name>          the name of this agent (required)
  --conf-file,-f <file>     specify a config file (required if -z missing)

4、Flume使用示例一

Source为Telnet输入,Channel为内存,Sink存储为日志,debug信息输出到屏幕上。
1)配置文件a1.conf:

# define agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# define sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-senior.ibeifeng.com
a1.sources.r1.port = 44444

# define sinks
a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 1024

# define channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# define bindings
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2)运行命令:

bin/flume-ng agent \
-c conf \
-n a1 \
-f conf/a1.conf \
-Dflume.root.logger=DEBUG,console

5、Flume使用示例二

Source为执行命令,Channel为内存,Sink存储在hdfs上,debug信息输出到屏幕上。
1)配置文件flume-tail.conf:

# define agent
a2.sources = r2
a2.channels = c2
a2.sinks = k2

# define sources
a2.sources.r2.type = exec
a2.sources.r2.command = tail -f /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
a2.sources.r2.shell = /bin/bash -c

# define sinks
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/flume/hive-logs/
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.writeFormat = Text
a2.sinks.k2.hdfs.batchSize = 10

# define channels
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# define bindings
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

2)运行命令:

bin/flume-ng agent \
-c conf \
-n a2 \
-f conf/flume-tail.conf \
-Dflume.root.logger=DEBUG,console

6、Flume使用示例三

Source为Spooling,Channel为file channel,Sink存储到hdfs上,debug信息输出到屏幕上。选择非.log结尾的文件传输,传输后原文件加上后缀.delete。Sink在hdfs上根据当前日期创建文件夹,输出文件结果。
1)配置文件flume-app.conf:

# define agent
a3.sources = r3
a3.channels = c3
a3.sinks = k3

# define sources
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/spoollogs
a3.sources.r3.ignorePattern = ^(.)*\\.log$
a3.sources.r3.fileSuffix = .delete

# define sinks
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/flume/splogs/%Y%m%d
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.writeFormat = Text
a3.sinks.k3.hdfs.batchSize = 10

# define channels
a3.channels.c3.type = file
a3.channels.c3.checkpointDir = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/filechannel/checkpoint
a3.channels.c3.dataDirs = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/filechannel/data

# define bindings
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

其中,spoolDir为监控的日志目录,抽取完整的日志文件,写的日志文件不抽取; 使用FileChannel,本地文件系统缓冲,比内存安全性更高;数据存储在HDFS上,存储对应hive表的目录或者hdfs目录。
2)运行命令:

bin/flume-ng agent \
-c conf \
-n a3 \
-f conf/flume-app.conf \
-Dflume.root.logger=DEBUG,console

7、Flume架构的使用场景

【Flume】文件收集框架Flume
【Flume】文件收集框架Flume
【Flume】文件收集框架Flume
【Flume】文件收集框架Flume