大数据实训笔记Day04
Flume
Source
AVRO Source
-
AVRO是Apache提供的一套序列化反序列化机制,AVRO的序列化机制能够跨平台跨语言的
-
AVRO Source实际上是用于接收被AVRO序列化之后的数据,结合AVRO Sink可以实现多级、扇入以及扇出流动
-
配置案例
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # 配置AVRO Source a1.sources.s1.type = avro # 要监听的主机 a1.sources.s1.bind = 0.0.0.0 # 要监听的端口 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/conf -f avrosource.conf -Dflume.root.logger=INFO,console
-
利用AVRO客户端将文件序列化之后发送给Flume
flume-ng avro-client -H hadoop -p 8090 -F a.txt
-
Exec Source
-
Exec Source会监听指定的命令,会将这个命令的执行结果作为日志来进行收集
-
案例:监听文件,如果文件中新添了数据,自动收集文件中新添的数据
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # 配置Exec Source a1.sources.s1.type = exec # 指定监听的命令 a1.sources.s1.command = tail -F /opt/a.txt # 指定命令类型 a1.sources.s1.shell = /bin/bash -c a1.channels.c1.type = memory a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动Flume
flume-ng agent -n a1 -c $FLUME_HOME/conf -f execsource.conf -Dflume.root.logger=INFO,console
-
Sequence Generator Source
-
Sequence Generator Source是一个序列产生器,会从0开始递增,递增到totalEvents
-
totalEvents如果不指定,则默认是Long.MAX_VALUE,即263-1
-
案例
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # 配置Sequence Generator Source a1.sources.s1.type = seq # 指定递增的最大值 a1.sources.s1.totalEvents = 1000 a1.channels.c1.type = memory a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/conf -f seqsource.conf -Dflume.root.logger=INFO,console
-
Spooling Directory Source
-
Spooling Directory Source会监听指定的目录,如果指定的目录下产生了新的文件,那么会将这个新文件中的内容自动的进行按行收集
-
被收集完的文件会自动添加一个后缀.COMPLETED
-
案例
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # 配置Spooling Directory Source a1.sources.s1.type = spooldir # 要监听的目录 a1.sources.s1.spoolDir = /opt/flumedata a1.channels.c1.type = memory a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/conf -f spooldirsource.conf -Dflume.root.logger=INFO,console
-
HTTP Source
-
HTTP Source监听HTTP请求,只能用于监听GET和POST请求。其中对于GET请求的监听只处于实验阶段,所以实际过程中只用这个Source来监听POST请求
-
案例
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # 配置HTTP Source a1.sources.s1.type = http # 要监听的端口 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/conf -f httpsource.conf -Dflume.root.logger=INFO,console
-
发送POST请求
curl -X POST -d '[{"headers":{"date":"2021-07-14"},"body":"hello class"}]' http://hadoop:8090
-
Custom Source
-
在Flume中,如果Flume原生提供的Source不能适用指定场景,那么此时就可以考虑自定义Source
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # 配置自定义Source - 指定全路径名 a1.sources.s1.type = cn.tedu.flume.source.AuthSource # 指定步长 a1.sources.s1.step = 5 # 指定终止范围 a1.sources.s1.end = 100 a1.channels.c1.type = memory a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
Channel
Memory Channel
-
Memory Channel将Source收集来的数据临时放到内存的队列中,因此这个Channel的读写速度相对较快但是不可靠
-
通过capacity属性来定义队列的容量。如果队列被放慢,那么后来的数据就会被阻塞。capactiy如果不指定则默认值为为100,实际开发中,一般会将这个值调节为300000~500000
-
属性transactionCapacity表示事务容量,实际上表示了每次发送或者接收的数据量。transactionCapacity默认也是100,实际开发过程中,会考虑将这个值调节为3000~5000
-
案例
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = netcat a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 8090 # 配置Memory Channel a1.channels.c1.type = memory # 指定容量 a1.channels.c1.capacity = 100000 # 指定数据的批量 a1.channels.c1.transactionCapacity = 1000 a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/bin -f memory.conf -Dflume.root.logger=INFO,console
-
发送数据
nc hadoop 8090
-
File Channel
-
File Channel将Source收集来的数据以文件形式存储到本地磁盘上,所以这个Channel的读写速度慢但是可靠
-
在存储的时候,如果不指定,那么会放在
~/.flume/file-channel/data
路径下 -
案例
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = netcat a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 8090 # 配置File Channel a1.channels.c1.type = file # 指定在磁盘上的临时存储路径 a1.channels.c1.dataDirs = /opt/flumedata a1.sinks.k1.type = logger a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/bin -f filechannel.conf -Dflume.root.logger=INFO,console
-
发送数据
nc hadoop 8090
-
JDBC Channel
- JDBC Channel是将Source收集的数据临时存储到数据库中,因为数据库存在索引的问题,所以理论上这个JDBC Channel的效率要高于File Channel但是低于Memory Channel
- JDBC Channel到目前为止仅仅支持Derby(微型数据库、单连接)。正因为采用的是Derby库,所以实际开发中不用这个JDBC Channel
Sink
HDFS Sink
-
HDFS Sink将数据写到HDFS上。在写数据的时候,默认是每隔30s在HDFS上生成一个新的文件,那么这会导致HDFS上生成大量的小文件,所以实际过程中需要改变这个值
-
HDFS Sink在讲数据写到HDFS上的时候,还需要考虑文件类型。Flume支持三种文件类型:SequenceFil(序列文件)、DataStream(文本文件)、CompressedStream(压缩文件)
-
案例
-
启动HDFS
start-dfs.sh
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = netcat a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # 配置HDFS Sink a1.sinks.k1.type = hdfs # 指定在HDFS上的存储路径 a1.sinks.k1.hdfs.path = hdfs://hadoop:9000/flumedata/test # 指定文件的滚动间隔时间 a1.sinks.k1.hdfs.rollInterval = 3600 # 指定文件的存储类型 a1.sinks.k1.hdfs.fileType = DataStream a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/bin -f hdfssink.conf -Dflume.root.logger=INFO,console
-
发送数据
nc hadoop 8090
-
Logger Sink
-
Logger Sink表示将数据打印到指定位置上,一般是控制台上
-
在打印的时候,为了防止过多的数据将控制台占满,所以默认打印body部分的数据不超过16个字节,可以通过maxBytesToLog来配置
-
Logger Sink在打印数据的时候,对中文支持不好
-
案例
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = netcat a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000 # 配置Logger Sink a1.sinks.k1.type = logger # 指定body部分打印的字节个数 a1.sinks.k1.maxBytesToLog = 20 a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/bin -f loggersink.conf -Dflume.root.logger=INFO,console
-
发送数据
nc hadoop 8090
-
File Roll Sink
-
File Roll Sink将数据最终写到执行的目录下。在写的时候同样是每隔30s会生成一个小文件,所以实际过程中需要调节大小
-
案例
-
格式文件
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = netcat a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 8090 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # 配置File Roll Sink a1.sinks.k1.type = file_roll # 指定数据的存储目录 a1.sinks.k1.sink.directory = /opt/flumedata # 指定文件滚动的间隔时间 a1.sinks.k1.sink.rollInterval = 3600 a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
启动命令
flume-ng agent -n a1 -c $FLUME_HOME/bin -f filerollsink.conf -Dflume.root.logger=INFO,console
-
发送数据
nc hadoop 8090
-
上一篇: Python学习——matplotlib(介绍及安装)
下一篇: CSS,HTML列表属性
推荐阅读