(一)flume的介绍和简单案例
一、flume 介绍
1、定义
flume 是 cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。flume 基于流式框架,灵活简单。如:
2、架构组成
(1)agent
agent 是 一个 JVM 进程,它以事件的形式将数据从源头送到目的地。
agent有三个组成部分:source、channel、sink。
(2)source
source是负责接收数据到 flume agent 的组件。source可以处理各种类型、各种格式的日志数据,包括avto、thift、exec、jms、sysloh、http等。
(3)sink
sink 不断地轮询channel 中的事件,且批量的移除它们,并将这些事件批量写入到存储系统或索引系统或者发送到其他的 flume agent。
sink组件的目的地包括:hdfs、logger、kafka、hbase、solr等。
(4)channel
channel 是位于 source 和 sink 之间的缓冲区。因此,channel 允许 source 和 sink 运作在不同的 速率上,可以避免积压。channel 是线程安全的,可以同时处理几个 source 的写入操作和 几个sink 的读取操作。
flume自带2 种channel:memory channel 和 file channel 。
memory channel是 内存中的队列。优点是速度快。缺点是不太安全。因为随着机器的宕机或重启,数据会全部丢失,且无法恢复。
file channel 是将所有事件写入到磁盘。与memory channel对比,缺点是速度相对慢,优点是更加安全,数据不会丢失。此外file channel可以存储的数据更多(磁盘容量大于内存容量)。
(5)event
传输单元,flume数据传输的基本单元,以 event 的形式将数据从源头送至目的地。
event 由 header 和 body 两部分组成。header 用来存放该event 的一些属性,类似于元数据,为K-V 结构。body 用来存放该条数据,形式是字节数组。如:
二、常见案例
1、监控端口数据官方案例
(1)案例需求
使用 flume 监听一个端口,收集该端口数据,并打印到控制台。
(2)配置文件(netcat-flume-conf.conf)
# example.conf: A single-node Flume configuration
# Name the components on this agent 第一步,给组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source 第二步,配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink 第三步,配置 sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory 第四步,配置channel
a1.channels.c1.type = memory
# 队列的容量
a1.channels.c1.capacity = 1000
# 转换的数目,需要小于容量,不然处理不了
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel 第五步,将source 和 sink绑定相应的 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(3)启动
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
2、实时监控单个追加文件
(1)案例需求
实时监控日志文件,并上传到 hdfs。
(2)配置文件(netcat-flume-conf.conf)
# example.conf: A single-node Flume configuration
# Name the components on this agent 第一步,给组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source 第二步,配置 source
a1.sources.r1.type = exec
# 查看日志的命令
a1.sources.r1.command = tail -F xxxx.log
# Describe the sink 第三步,配置 sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = xxxx
# 上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = log-
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个 event 才 flush 到 hdfs 一次
a1.sinks.k1.hdfs.barchSize = 1000
# 设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件,60s,单位秒
a1.sinks.k1.hdfs.rollInterval = 60
# 设置每个文件的滚动大小,记住小于 128m(hdfs的块大小)
a1.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与 event 数量无关
a1.sinks.k1.hdfs.rollCount = 0
# Use a channel which buffers events in memory 第四步,配置channel
a1.channels.c1.type = memory
# 队列的容量
a1.channels.c1.capacity = 1000
# 转换的数目,需要小于容量,不然处理不了
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel 第五步,将source 和 sink绑定相应的 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1