Flume入门案例(三个简单的小案例)
程序员文章站
2022-05-19 13:18:54
...
案例一:使用Flume监听一个端口,收集该端口数据,并打印到控制台。
(1)安装netcat工具
[@hadoop102 software]$ sudo yum install -y nc
(2)判断44444端口是否被占用
[@hadoop102 flume-telnet]$ sudo netstat -tunlp | grep 44444
(3)创建Flume Agent配置文件netcat-flume-logger.conf
在flume目录下创建job文件夹并进入job文件夹。
[@hadoop102 flume]$ mkdir job
[@hadoop102 flume]$ cd job/
在job文件夹下创建Flume Agent配置文件netcat-flume-logger.conf。
[@hadoop102 job]$ vim netcat-flume-logger.conf
在netcat-flume-logger.conf文件中添加如下内容。
# Name the components on this agent
a1.sources = r1 a1就是agent的名字
a1.sinks = k1
a1.channels = c1
# Describe/configure the source 这是r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink 这是sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory 这是channels
a1.channels.c1.type = memory 内存channels
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(4)先开启flume监听端口
[@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
(5)使用netcat工具向本机的44444端口发送内容
[@hadoop102 ~]$ nc localhost 44444
(6)在Flume监听页面观察接收数据情况
案例二:实时监控单个文件的追加
*Source 用到的是exec,channel是memory,sink是hdfs。
前期准备:因为/opt/module/hive/logs/hive.log本身日志太长,选择自己创建在/opt/module/datas/hive.log 自己通过echo 命令追加到至文件中。
在job中创建 vim file-flume-hdfs2.conf
配置如下的配置文件:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/datas/hive.log
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否对时间戳取整
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 10
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
运行Flume
[@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-file-hdfs.conf
echo “bigdataJ”>> /opt/module/datas/hive.log
在hdfs上查看文件
案例三:实时监控多目录下的多个追加文件
Taildir source ,memory channel,hdfs sink。
Exec source适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。
(1)在job中创建配置文件taildir-flume-hdfs.conf
添加如下内容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/file1/file1.txt
a1.sources.r1.filegroups.f2 = /opt/module/file2/file.*.txt
a1.sources.r1.positionFile = /opt/module/taildir/tail_dir.json
# /opt/module/taildir/tail_dir.json这是记录文件回滚的位置信息
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(2)启动监控文件夹命令
[@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/files-flume-hdfs.conf
(3)向files文件夹中追加内容
在/opt/module/flume目录下创建files文件夹
mkdir file1
mkdir file2
追加文件
echo hello >> file1/file1.txt
echo world >> file2/fileqwer.txt
(4)数据
(5)可以在/opt/module/taildir/tail_dir.json观察信息。
上一篇: Python2至Python3的bin文件操作变化
下一篇: java:数据交换