flume拦截器
摘要:
拦截器是简单的插件式组件,设置在source和channel之间。source接收到的时间,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。可以自定义拦截器。
flume内置了很多拦截器,并且会定期的添加一些拦截器,在这里列出一些flume内置的,经常使用的拦截器。
一、拦截器的种类介绍
1、Timestamp Interceptor(时间戳拦截器)
flume中一个最经常使用的拦截器 ,该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接受到的只有message。时间戳拦截器的配置。 参数 默认值 描述 type 类型名称timestamp,也可以使用类名的全路径 preserveExisting false 如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值
source连接到时间戳拦截器的配置:
a1.sources.r1.interceptors = timestamp
a1.sources.r1.interceptors.timestamp.type=timestamp
a1.sources.r1.interceptors.timestamp.preserveExisting=false
2、Host Interceptor(主机拦截器)
主机拦截器插入服务器的ip地址或者主机名,agent将这些内容插入到事件的报头中。时间报头中的key使用hostHeader配置,默认是host。主机拦截器的配置参数 默认值 描述 type 类型名称host hostHeader host 事件投的key useIP true 如果设置为false,host键插入主机名 preserveExisting false 如果设置为true,若事件中报头已经存在,不会替换host报头的值
source连接到主机拦截器的配置:
a1.sources.r1.interceptors = host
a1.sources.r1.interceptors.host.type=host
a1.sources.r1.interceptors.host.useIP=false
a1.sources.r1.interceptors.timestamp.preserveExisting=true
3、静态拦截器(Static Interceptor)
静态拦截器的作用是将k/v插入到事件的报头中。配置如下参数 默认值 描述 type 类型名称static key key 事件头的key value value key对应的value值 preserveExisting true 如果设置为true,若事件中报头已经存在该key,不会替换value的值source连接到静态拦截器的配置:
a1.sources.r1.interceptors = static
a1.sources.r1.interceptors.static.type=static
a1.sources.r1.interceptors.static.key=logs
a1.sources.r1.interceptors.static.value=logFlume
a1.sources.r1.interceptors.static.preserveExisting=false
4、正则过滤拦截器(Regex Filtering Interceptor)
在日志采集的时候,可能有一些数据是我们不需要的,这样添加过滤拦截器,可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。参数默认值描述 type 类型名称REGEX_FILTER regex .* 匹配除“\n”之外的任何个字符 excludeEvents false 默认收集匹配到的事件。如果为true,则会删除匹配到的event,收集未匹配到的。
source连接到正则过滤拦截器的配置:
a1.sources.r1.interceptors = regex
a1.sources.r1.interceptors.regex.type=REGEX_FILTER
a1.sources.r1.interceptors.regex.regex=(rm)|(kill)
a1.sources.r1.interceptors.regex.excludeEvents=false
这样配置的拦截器就只会接收日志消息中带有rm 或者kill的日志。
5、Regex Extractor Interceptor
通过正则表达式来在header中添加指定的key,value则为正则匹配的部分
6、UUID Interceptor
用于在每个events header中生成一个UUID字符串,例如:b5755073-77a9-43c1-8fad-b7a586fc1b97。生成的UUID可以在sink中读取并使用。根据上面的source,拦截器的配置如下:
# source 拦截器
a1.sources.sources1.interceptors = i1
a1.sources.sources1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.sources1.interceptors.i1.headerName = uuid
a1.sources.sources1.interceptors.i1.preserveExisting = true
a1.sources.sources1.interceptors.i1.prefix = UUID-
7、Morphline Interceptor
Morphline拦截器,该拦截器使用Morphline对每个events数据做相应的转换。关于Morphline的使用,可参考
http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html
8、Search and Replace Interceptor
此拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。还可以进行回溯/群组捕捉。此拦截器使用与Java Matcher.replaceAll()方法中相同的规则
# 拦截器别名
a1.sources.avroSrc.interceptors = search-replace
# 拦截器类型,必须是search_replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace
#删除事件正文中的前导字母数字字符,根据正则匹配event内容
a1.sources.avroSrc.interceptors.search-replace.searchPattern = 快速褐色([az] +)跳过懒惰([az] +)
# 替换匹配到的event内容
a1.sources.avroSrc.interceptors.search-replace.replaceString = 饥饿的$ 2吃了不小心$ 1
# 设置字符集,默认是utf8
a1.sources.avroSrc.interceptors.search-replace.charset = utf8
二、静态拦截器的使用(静态拦截器)
假设有两台服务器(A、B )收集实时生产日志,日志类型分别是:access.log、nginx.log、web.log
将两台服务器的日志汇总到一台服务器上(C),然后统计存储到HDFS文件系统中,且存储目录结构为
/source/logs/access/20180911/events.x
/source/logs/nginx/20180911/ events.x
/source/logs/web/20180911/ events.x
1、在服务器 node01、node02 上创建配置文件 exec_source.properties
# Name the components on this agent
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 元数据类型
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/data/logs/access.log
# 拦截器别名
a1.sources.r1.interceptors = i1
# 拦截器类型
a1.sources.r1.interceptors.i1.type = static
# 拦截的键值对
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /export/data/logs/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /export/data/logs/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web
# 下沉的目的地
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node03
a1.sinks.k1.port = 41414
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 2000000
a1.channels.c1.transactionCapacity = 100000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1
2、在服务器 node03 上创建配置文件avro_source_hdfs.properties
# 定义agent名, source、channel、sink的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 定义source
a1.sources.r1.type = avro
a1.sources.r1.bind = node03
a1.sources.r1.port =41414
# 添加时间拦截器,将没有时间戳的event添加上时间戳
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
# 定义channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
# 定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://node01:9000/source/logs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
# 时间类型
#a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 生成的文件不按条数生成
a1.sinks.k1.hdfs.rollCount = 0
# 生成的文件不按时间生成
a1.sinks.k1.hdfs.rollInterval = 30
# 生成的文件按大小生成
a1.sinks.k1.hdfs.rollSize = 10485760
#a1.sinks.k1.hdfs.rollSize =0
# 批量写入hdfs的个数
a1.sinks.k1.hdfs.batchSize = 20
# flume操作hdfs的线程数(包括新建,写入等)
a1.sinks.k1.hdfs.threadsPoolSize=10
# 操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000
# 组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、测试
先启动node03上的flume:
bin/flume-ng agent -c conf/ -f conf/avro_source_hdfs.properties -n a1 -Dflume.root.logger=INFO,console
再分别启动node01、node02上的flume:
bin/flume-ng agent -c conf/ -f conf/exec_source.properties -n a1 -Dflume.root.logger=INFO,console
接着在node01、node02产生模拟数据:
while true;do echo access...running >> /export/data/logs/access.log;echo nginx...running >> /export/data/logs/nginx.log;echo web...running >> /export/data/logs/web.log;sleep 0.5;done
4、运行结果
node03的控制台输出
使用HDFS的web-ui查看生成的文件:http://node01:50070/explorer.html#/source/logs
三、自定义拦截器
1、添加依赖
<dependencies>
<!-- flume核心依赖 -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>utf-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
2、定义实现类MyInterceptor ,只需要实现Interceptor接口,定义内部类Builder实现Interceptor.Builder即可
package com.theone.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
/**
* @ClassName MyInterceptor
* @Description TODO
* @Author Pureman
* @Date 2018/7/30 14:53
**/
public class MyInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public void close() {
}
/**
* 拦截source发送到通道channel中的消息
*
* @param event 接收过滤的event
* @return event 根据业务处理后的event
*/
@Override
public Event intercept(Event event) {
// 获取事件对象中的字节数据
byte[] arr = event.getBody();
// 将获取的数据转换成大写
event.setBody(new String(arr).toUpperCase().getBytes());
// 返回到消息中
return event;
}
// 接收被过滤事件集合
@Override
public List<Event> intercept(List<Event> events) {
List<Event> list = new ArrayList<>();
for (Event event : events) {
list.add(intercept(event));
}
return list;
}
public static class Builder implements Interceptor.Builder {
// 获取配置文件的属性
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {
}
}
3、并打包(不指定主类),将jar包上传至node01
4、在node01上创建配置文件 myInterceptor-hdfs.properties
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/data/logs/text.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.theone.flume.MyInterceptor$Builder
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /theone/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5、启动节点node01上的 flume
flume-ng agent -c conf/ -f conf/myInterceptor-hdfs.properties -n a1 Dflume.root.logger=DEBUG,console
6、在复制的node01中循环追加内容
while true;do echo one world one dream >>/export/data/logs/text.log;sleep 0.5;done