欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume入门案例(三个简单的小案例)

程序员文章站 2022-05-19 13:18:54
...

案例一:使用Flume监听一个端口,收集该端口数据,并打印到控制台。

1)安装netcat工具
[@hadoop102 software]$ sudo yum install -y nc

(2)判断44444端口是否被占用
[@hadoop102 flume-telnet]$ sudo netstat -tunlp | grep 444443)创建Flume Agent配置文件netcat-flume-logger.conf
在flume目录下创建job文件夹并进入job文件夹。
[@hadoop102 flume]$ mkdir job
[@hadoop102 flume]$ cd job/
在job文件夹下创建Flume Agent配置文件netcat-flume-logger.conf。
[@hadoop102 job]$ vim netcat-flume-logger.conf
在netcat-flume-logger.conf文件中添加如下内容。

# Name the components on this agent
a1.sources = r1    a1就是agent的名字
a1.sinks = k1
a1.channels = c1

# Describe/configure the source  这是r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink   这是sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory  这是channels
a1.channels.c1.type = memory  内存channels
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(4)先开启flume监听端口
[@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console

(5)使用netcat工具向本机的44444端口发送内容
[@hadoop102 ~]$ nc localhost 444446)在Flume监听页面观察接收数据情况

案例二:实时监控单个文件的追加

*Source 用到的是exec,channel是memory,sink是hdfs。
前期准备:因为/opt/module/hive/logs/hive.log本身日志太长,选择自己创建在/opt/module/datas/hive.log 自己通过echo 命令追加到至文件中。

在job中创建 vim file-flume-hdfs2.conf
配置如下的配置文件:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/datas/hive.log

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否对时间戳取整
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 10
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700

#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

运行Flume
[@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-file-hdfs.conf

echo “bigdataJ”>> /opt/module/datas/hive.log
在hdfs上查看文件

案例三:实时监控多目录下的多个追加文件

Taildir source ,memory channel,hdfs sink。
Exec source适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。

1)在job中创建配置文件taildir-flume-hdfs.conf
添加如下内容:

# Name the components on this agent
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/file1/file1.txt
a1.sources.r1.filegroups.f2 = /opt/module/file2/file.*.txt
a1.sources.r1.positionFile = /opt/module/taildir/tail_dir.json
# /opt/module/taildir/tail_dir.json这是记录文件回滚的位置信息

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1



(2)启动监控文件夹命令
[@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/files-flume-hdfs.conf3)向files文件夹中追加内容

在/opt/module/flume目录下创建files文件夹
mkdir file1
mkdir file2
追加文件
echo hello >> file1/file1.txt
echo world >> file2/fileqwer.txt

(4)数据
(5)可以在/opt/module/taildir/tail_dir.json观察信息。