欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume架构与应用

程序员文章站 2022-06-14 11:17:37
...

 

- Flume定义

  1. Flume是分布式,高可用,基于流式计算的,用于收集、聚合、移动大量日志数据的框架。

- Flume模型 

Flume架构与应用

Source用于采集数据源的数据,然后封装成Event传输给Channel管道,期间也可以设置过滤器
Chanel接受来自Source传输过来的Event数据
Sink在Channel中拉取Event数据并将数输出,将数据写入存储设备上。
Event:有可以选的Header和载有数据的ByteAarry组成,Header是容纳了Key-value字符串对的无序集合,key在集合内是唯一的。Header可以在上线文路由中扩展。

- Flume的优点

  1. 可以和任何*数据库集中式集成
  2. 起到缓冲的作用,减轻存储设备的压力
  3. 提供了数据流的路线
  4. 事物基于channel,保证了数据的可靠性
  5. 高效收集日志
  6. 支持水平拓展支持多级跳跃。

- Flume架构介绍

Flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent,agent本身是一个java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。 
agent里面包含3个核心的组件:source—->channel—–>sink,类似生产者、仓库、消费者的架构。 
source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。
channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。
4、flume的运行机制 
flume的核心就是一个agent,这个agent对外有两个进行交互的地方,一个是接受数据的输入——source,一个是数据的输出sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方—-例如HDFS等,注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。
5、flume的广义用法 
flume之所以这么神奇—-其原因也在于flume可以支持多级flume的agent,即flume可以前后相继,例如sink可以将数据写到下一个agent的source中,这样的话就可以连成串了,可以整体处理了。flume还支持扇入(fan-in)、扇出(fan-out)。所谓扇入就是source可以接受多个输入,所谓扇出就是sink可以将数据输出多个目的地destination中。

Flume架构与应用

接下来对Flume进行应用与测试

- Flume应用——日志采集

        1.案例一:Spooling Directory Source:监听一个知道你目录,即只要应用程序向这个指定的目录中添加新的文件,sourre组件就可以获取到该信息,并解析该文件的内容,然后写入到channel。写入完成后,标记该文文件写入完成后,标记该文件已完成或者删除该文件。其中channel为file类型,sink为hdfs存储类型。

①、Flume官网中Spooling Directory Source描述

 Property Name       Default      Description
 channels              –  
 type                  –          The component type name, needs to be spooldir.
 spoolDir              –          Spooling Directory Source监听的目录
 fileSuffix         .COMPLETED    文件内容写入到channel之后,标记该文件
 deletePolicy       never         文件内容写入到channel之后的删除策略: never or immediate
 fileHeader         false         Whether to add a header storing the absolute path filename.
 ignorePattern      ^$           Regular expression specifying which files to ignore (skip)
 interceptors          –          指定传输中event的head(头信息),常用timestamp

②、Spooling Directory Source的两个注意事项:

①If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing. 
 即:拷贝到spool目录下的文件不可以再打开编辑
 ②If a file name is reused at a later time, Flume will print an error to its log file and stop processing.
 即:不能将具有相同文件名字的文件拷贝到这个目录下

③、编写配置文件

a1.sources=r1
a1.channels=c1
a1.sinks=k1

a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/LogForFlumeCollect
a1.sources.r1.fileHeader=true
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp

a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/LogForFlume/checkPoints
a1.channels.c1.dataDirs=/LogForFlume/datas

a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/FlumeForSpoolDir
a1.sinks.k1.hdfs.writeFormat=text
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.rollSize=10485760
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.filePrefix=/spoolDir-%Y%m%d
a1.sinks.k1.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

④、启动Flume agent a1服务端

flume-ng agent -c . -f spoolDirForFileToHdfs.conf -n a1 -Dflume.root.logger=INFO,console

⑤、使用cp命令向Spooling Directory 中发送数据

cp student.txt /LogForFlumeCollect/

⑥在控制台将收到到日志信息,并且在hdfs上会生成按配置文件规定的日志文件

2019-02-20 22:00:44,379 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:393)] Closing /FlumeForSpoolDir//spoolDir-20190220.1550728784201.tmp
2019-02-20 22:00:44,416 (hdfs-k1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:655)] Renaming /FlumeForSpoolDir/spoolDir-20190220.1550728784201.tmp to /FlumeForSpoolDir/spoolDir-20190220.1550728784201
2019-02-20 22:00:44,461 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.
aaa@qq.com:/opt$ hdfs dfs -ls /FlumeForSpoolDir
Found 2 items
-rw-r--r--   3 bailing supergroup          1 2019-02-20 21:55 /FlumeForSpoolDir/spoolDir-20190220.1550728464520
-rw-r--r--   3 bailing supergroup        204 2019-02-20 22:00 /FlumeForSpoolDir/spoolDir-20190220.1550728784201

aaa@qq.com:/opt$ hdfs dfs -cat /FlumeForSpoolDir/spoolDir-20190220.1550728784201
54993,2016-11-2,2018-11-14,man,30
64993,2006-01-2,2008-10-24,man,31
24993,2006-12-2,2028-09-04,man,32
44993,2008-01-2,2038-08-21,man,33
14993,2010-11-2,2001-07-25,man,34
74993,2009-11-5,2004-06-26,man,35

        2.案例二:Spooling Directory Source:监听一个知道你目录,即只要应用程序向这个指定的目录中添加新的文件,sourre组件就可以获取到该信息,并解析该文件的内容,然后写入到channel。写入完成后,标记该文文件写入完成后,标记该文件已完成或者删除该文件。其中channel为memory类型,sink为local本地存储类型。(相对于案例一两个变化,channel类型,sink落地文件目录)

①、编写配置文件

a1.sources=r1
a1.channels=c1
a1.sinks=k2

a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/LogForFlumeCollectLocal
a1.sources.r1.fileHeader=true
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=1000


a1.sinks.k2.type=file_roll
a1.sinks.k2.sink.directory=/FlumeLogsToLocal

a1.sources.r1.channels=c1
a1.sinks.k2.channel=c1

②、启动Flume agent a1服务端

flume-ng agent -c . -f spoolDirForFileToHdfs.conf -n a1 -Dflume.root.logger=INFO,console

③、使用cp命令向Spooling Directory 中发送数据

cp student.txt /LogForFlumeCollect/

④、在控制台将收到到日志信息,并且在hdfs上会生成按配置文件规定的日志文件

2019-02-20 22:59:03,517 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:324)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2019-02-20 22:59:03,523 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:433)] Preparing to move file /LogForFlumeCollectLocal/student.txt to /LogForFlumeCollectLocal/student.txt.COMPLETED

        3.案例三:NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。 其中 Sink为hdfs,Channel为memory。

①Flume官网中NetCat Source描述:

 Property Name Default Description 
 channels – 
 type – The component type name, needs to be netcat 
 bind – 日志需要发送到的主机名或者Ip地址,该主机运行着netcat类型的source在监听 
 port – 日志需要发送到的端口号,该端口号要有netcat类型的source在监听

②配置文件

a1.sources=r1
a1.channels=c1
a1.sinks=k1

a1.sources.r1.type=netcat
a1.sources.r1.bind=192.168.2.159
a1.sources.r1.port=55555
a1.sources.r1.max-line-lenght=100000

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=1000
a1.channels.c1.keep-alive=30

a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/netcatFlume
a1.sinks.k1.hdfs.filePrefix=%Y%m%d-
a1.sinks.k1.hdfs.writeFormat=text
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.rollSize=10485760
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

③、启动Flume agent a1服务端

aaa@qq.com:/opt/flume/conf$ flume-ng agent -c . -f FlumeNetcatToHdfs.conf -n a1 -Dflume.root.logger=INFO,console

④、向指定端口发送数据

aaa@qq.com:/LogForFlumeCollectLocal$ telnet 192.168.2.159 55555 
Trying 192.168.2.159...
Connected to 192.168.2.159.
Escape character is '^]'.
big data
OK

⑤、在控制台将收到到日志信息,并且在hdfs上会生成按配置文件规定的日志文件

2019-02-20 23:46:53,579 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
2019-02-20 23:46:53,582 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.2.159:55555]
2019-02-20 23:46:59,761 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2019-02-20 23:47:00,302 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:251)] Creating /netcatFlume/20190220-.1550735219762.tmp

     4.案例四:NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。 其中 Sink为file_roll,Channel为file。(于案例三发生两处变化,sink落地文件在local,channel类型为file)

①、配置文件

a1.sources=r1
a1.channels=c1
a1.sinks=k1

a1.sources.r1.type=netcat
a1.sources.r1.bind=192.168.2.159
a1.sources.r1.port=55555
a1.sources.r1.max-line-lenght=100000

a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/flume/checkpointDir
a1.channels.c1.dataDirs=/flume/dataDir

a1.sinks.k1.type=file_roll
a1.sinks.k1.sink.directory=/location

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

②、启动Flume agent a1服务端

aaa@qq.com:/opt/flume/conf$ flume-ng agent -c . -f FlumeNetcatToHdfs.conf -n a1 -Dflume.root.logger=INFO,console

③、向指定端口发送数据

aaa@qq.com:/$ telnet 192.168.2.159 55555
Trying 192.168.2.159...
Connected to 192.168.2.159.
Escape character is '^]'.
flume test
OK

④、在控制台将收到到日志信息,并且在hdfs上会生成按配置文件规定的日志文件

2019-02-21 02:02:13,779 (Log-BackgroundWorker-c1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:230)] Start checkpoint for /home/bailing/.flume/file-channel/checkpoint/checkpoint, elements to sync = 1
2019-02-21 02:02:13,795 (Log-BackgroundWorker-c1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:255)] Updating checkpoint metadata: logWriteOrderID: 1550743304143, queueSize: 1, queueHead: 999999
2019-02-21 02:02:13,811 (Log-BackgroundWorker-c1) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:1065)] Updated checkpoint for file: /home/bailing/.flume/file-channel/data/log-1 position: 91 logWriteOrderID: 1550743304143
2019-02-21 02:02:43,818 (Log-BackgroundWorker-c1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:230)] Start checkpoint for /home/bailing/.flume/file-channel/checkpoint/checkpoint, elements to sync = 1
2019-02-21 02:02:43,819 (Log-BackgroundWorker-c1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:255)] Updating checkpoint metadata: logWriteOrderID: 1550743304146, queueSize: 0, queueHead: 0
2019-02-21 02:02:43,821 (Log-BackgroundWorker-c1) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:1065)] Updated checkpoint for file: /home/bailing/.flume/file-channel/data/log-1 position: 168 logWriteOrderID: 1550743304146

⑤、查看本地sink落地文件

aaa@qq.com:/location$ ls
1550744413761-1  1550744413761-2  1550744413761-3
aaa@qq.com:/location$ cat *
flume test
aaa@qq.com:/location$ 

        5.案例五:Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 。 其中 Sink:hdfs Channel:channel .这个案列为了方便显示Exec Source的运行效果,结合Hive中的external table进行来说明。

①、配置文件

a1.sources=r1
a1.channels=c1
a1.sinks=k1

a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /flume/listen.txt

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=1000
a1.channels.c1.keep-alive=30

a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/NetcatFlumeHive
a1.sinks.k1.hdfs.rollSize=10485760
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.writeFormat=text
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.filePrefix=%Y%m%d-
a1.sinks.k1.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

②、在hive中创建外部表并且指定location

hive (default)> create table test(
              > text string)
              > row format delimited
              > location "/NetcatFlumeHive";
OK
Time taken: 1.459 seconds

③、启动flume agent a1 服务端:

aaa@qq.com:/opt/flume/conf$ flume-ng agent -c . -f exec.conf -n a1 -Dflume.root.logger=INFO,console

④、向/flume/listen.txt中发送数据

echo "flume exec" >> listen.txt

⑤、在HDFS和Hive分别中查看flume收集到的日志数据:

hive (default)> select * from test;
OK
test.text
flume exec
Time taken: 1.03 seconds, Fetched: 1 row(s)
aaa@qq.com:/flume$ hdfs dfs -cat /NetcatFlumeHive/20190221-.1550802462481
flume exec

6、案例六:Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 。 其中 Sink:file_roll Channel:file .这个案列为了方便显示Exec Source的运行效果,结合Hive中的external table进行来说明。(与案例的发生两个不同,发送数据和查看结果不变)

配置文件:

a1.sources=r1
a1.channels=c1
a1.sinks=k1

a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /flume/listen.txt

a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/flume/checkpointDir
a1.channels.c1.dataDir=/flume/dataDir

a1.sinks.k1.type=file_roll
a1.sinks.k1.sink.directory=/location

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

7.案例七:Avro Source:监听一个指定的Avro 端口,通过Avro 端口可以获取到Avro client发送过来的文件 。即只要应用程序通过Avro 端口发送文件,source组件就可以获取到该文件中的内容。 其中 Sink:hdfs Channel:file (注:Avro和Thrift都是一些序列化的网络端口–通过这些网络端口可以接受或者发送信息,Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制)。

Flume架构与应用

①、Flume官网中Avro Source的描述:

Property     Name   Default Description
 channels      –  
 type          –     The component type name, needs to be avro
 bind          –     日志需要发送到的主机名或者ip,该主机运行着ARVO类型的source
 port          –     日志需要发送到的端口号,该端口要有ARVO类型的source在监听

②配置文件

a1.sources = r1
 a1.sinks = k1
 a1.channels = c1
 
 # Describe/configure the source
 a1.sources.r1.type = avro
 a1.sources.r1.bind = 192.168.2.159
 a1.sources.r1.port = 55555
 
 # Describe the sink
 a1.sinks.k1.type = hdfs
 a1.sinks.k1.hdfs.path =/dataoutput
 a1.sinks.k1.hdfs.writeFormat = Text
 a1.sinks.k1.hdfs.fileType = DataStream
 a1.sinks.k1.hdfs.rollInterval = 10
 a1.sinks.k1.hdfs.rollSize = 0
 a1.sinks.k1.hdfs.rollCount = 0
 a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
 a1.sinks.k1.hdfs.useLocalTimeStamp = true
 
 # Use a channel which buffers events in file
 a1.channels.c1.type = file
 a1.channels.c1.checkpointDir = /usr/flume/checkpoint
 a1.channels.c1.dataDirs = /usr/flume/data
 
 # Bind the source and sink to the channel
 a1.sources.r1.channels = c1
 a1.sinks.k1.channel = c1

③、启动flume agent a1 服务端

flume-ng agent -c . -f avroFlume.conf -n a1 -Dflume.root.logger=INFO,console

④、使用avro-client发送文件

flume-ng avro-client -c . -H 192.168.2.159 -p55555 -F /location/test.txt 

⑤、在控制台将收到到日志信息,并且在hdfs上会生成按配置文件规定的日

aaa@qq.com:~$ hdfs dfs -ls /dataoutput
Found 7 items
-rw-r--r--   3 bailing supergroup          9 2019-02-19 18:17 /dataoutput/2019-02-19-18-17-11.1550629031059
-rw-r--r--   3 bailing supergroup         18 2019-02-19 18:17 /dataoutput/2019-02-19-18-17-13.1550629033682
-rw-r--r--   3 bailing supergroup          9 2019-02-19 18:17 /dataoutput/2019-02-19-18-17-15.1550629035102
-rw-r--r--   3 bailing supergroup         11 2019-02-21 19:28 /dataoutput/2019-02-21-19-28-12.1550806092252
-rw-r--r--   3 bailing supergroup          9 2019-02-21 19:48 /dataoutput/2019-02-21-19-48-31.1550807311274
-rw-r--r--   3 bailing supergroup          9 2019-02-20 01:23 /dataoutput/20190220-.1550654583919
-rw-r--r--   3 bailing supergroup          9 2019-02-20 01:23 /dataoutput/20190220-.1550654583920

志文件