欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

(一)flume的介绍和简单案例

程序员文章站 2022-06-15 14:10:07
...

一、flume 介绍

1、定义

flume 是 cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。flume 基于流式框架,灵活简单。如:

2、架构组成

(一)flume的介绍和简单案例

(1)agent

agent 是 一个 JVM 进程,它以事件的形式将数据从源头送到目的地。

agent有三个组成部分:source、channel、sink。

(2)source

source是负责接收数据到 flume agent 的组件。source可以处理各种类型、各种格式的日志数据,包括avto、thift、exec、jms、sysloh、http等。

(3)sink

sink 不断地轮询channel 中的事件,且批量的移除它们,并将这些事件批量写入到存储系统或索引系统或者发送到其他的 flume agent。

sink组件的目的地包括:hdfs、logger、kafka、hbase、solr等。

(4)channel

channel 是位于 source 和 sink 之间的缓冲区。因此,channel 允许 source 和 sink 运作在不同的 速率上,可以避免积压。channel 是线程安全的,可以同时处理几个 source 的写入操作和 几个sink 的读取操作。

flume自带2 种channel:memory channel 和 file channel 。

memory channel是 内存中的队列。优点是速度快。缺点是不太安全。因为随着机器的宕机或重启,数据会全部丢失,且无法恢复。

file channel 是将所有事件写入到磁盘。与memory channel对比,缺点是速度相对慢,优点是更加安全,数据不会丢失。此外file channel可以存储的数据更多(磁盘容量大于内存容量)。

(5)event

传输单元,flume数据传输的基本单元,以 event 的形式将数据从源头送至目的地。

event 由 headerbody 两部分组成。header 用来存放该event 的一些属性,类似于元数据,为K-V 结构。body 用来存放该条数据,形式是字节数组。如:
(一)flume的介绍和简单案例

二、常见案例

1、监控端口数据官方案例

(1)案例需求

使用 flume 监听一个端口,收集该端口数据,并打印到控制台。

(2)配置文件(netcat-flume-conf.conf)

# example.conf: A single-node Flume configuration

# Name the components on this agent 第一步,给组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 第二步,配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink	第三步,配置 sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory 第四步,配置channel
a1.channels.c1.type = memory
# 队列的容量
a1.channels.c1.capacity = 1000
# 转换的数目,需要小于容量,不然处理不了
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel 第五步,将source 和 sink绑定相应的 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)启动

bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

2、实时监控单个追加文件

(1)案例需求

实时监控日志文件,并上传到 hdfs。

(2)配置文件(netcat-flume-conf.conf)

# example.conf: A single-node Flume configuration

# Name the components on this agent 第一步,给组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 第二步,配置 source
a1.sources.r1.type = exec
# 查看日志的命令
a1.sources.r1.command = tail -F xxxx.log

# Describe the sink	第三步,配置 sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = xxxx
# 上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = log-
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个 event 才 flush 到 hdfs 一次
a1.sinks.k1.hdfs.barchSize = 1000
# 设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件,60s,单位秒
a1.sinks.k1.hdfs.rollInterval = 60
# 设置每个文件的滚动大小,记住小于 128m(hdfs的块大小)
a1.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与 event 数量无关
a1.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory 第四步,配置channel
a1.channels.c1.type = memory
# 队列的容量
a1.channels.c1.capacity = 1000
# 转换的数目,需要小于容量,不然处理不了
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel 第五步,将source 和 sink绑定相应的 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(三)启动