欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flume拦截器

程序员文章站 2022-06-13 20:56:13
...

摘要:

拦截器是简单的插件式组件,设置在source和channel之间。source接收到的时间,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。可以自定义拦截器。

flume内置了很多拦截器,并且会定期的添加一些拦截器,在这里列出一些flume内置的,经常使用的拦截器。

一、拦截器的种类介绍

1、Timestamp Interceptor(时间戳拦截器)

flume中一个最经常使用的拦截器 ,该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接受到的只有message。时间戳拦截器的配置。 参数 默认值 描述 type   类型名称timestamp,也可以使用类名的全路径 preserveExisting false 如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值

source连接到时间戳拦截器的配置:

a1.sources.r1.interceptors = timestamp 
a1.sources.r1.interceptors.timestamp.type=timestamp 
a1.sources.r1.interceptors.timestamp.preserveExisting=false

 2、Host Interceptor(主机拦截器)

主机拦截器插入服务器的ip地址或者主机名,agent将这些内容插入到事件的报头中。时间报头中的key使用hostHeader配置,默认是host。主机拦截器的配置参数 默认值 描述 type   类型名称host hostHeader host 事件投的key useIP true 如果设置为false,host键插入主机名 preserveExisting false 如果设置为true,若事件中报头已经存在,不会替换host报头的值

source连接到主机拦截器的配置:

a1.sources.r1.interceptors = host 
a1.sources.r1.interceptors.host.type=host 
a1.sources.r1.interceptors.host.useIP=false 
a1.sources.r1.interceptors.timestamp.preserveExisting=true

3、静态拦截器(Static Interceptor)

静态拦截器的作用是将k/v插入到事件的报头中。配置如下参数 默认值 描述 type   类型名称static key key 事件头的key value value key对应的value值 preserveExisting true 如果设置为true,若事件中报头已经存在该key,不会替换value的值source连接到静态拦截器的配置:

a1.sources.r1.interceptors = static 
a1.sources.r1.interceptors.static.type=static 
a1.sources.r1.interceptors.static.key=logs 
a1.sources.r1.interceptors.static.value=logFlume 
a1.sources.r1.interceptors.static.preserveExisting=false

4、正则过滤拦截器(Regex Filtering Interceptor)

在日志采集的时候,可能有一些数据是我们不需要的,这样添加过滤拦截器,可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。参数默认值描述 type 类型名称REGEX_FILTER regex .* 匹配除“\n”之外的任何个字符 excludeEvents false 默认收集匹配到的事件。如果为true,则会删除匹配到的event,收集未匹配到的。

source连接到正则过滤拦截器的配置:

a1.sources.r1.interceptors = regex 
a1.sources.r1.interceptors.regex.type=REGEX_FILTER 
a1.sources.r1.interceptors.regex.regex=(rm)|(kill) 
a1.sources.r1.interceptors.regex.excludeEvents=false

这样配置的拦截器就只会接收日志消息中带有rm 或者kill的日志。

5、Regex Extractor Interceptor

通过正则表达式来在header中添加指定的key,value则为正则匹配的部分

6、UUID Interceptor

用于在每个events header中生成一个UUID字符串,例如:b5755073-77a9-43c1-8fad-b7a586fc1b97。生成的UUID可以在sink中读取并使用。根据上面的source,拦截器的配置如下:

# source 拦截器
a1.sources.sources1.interceptors = i1
a1.sources.sources1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.sources1.interceptors.i1.headerName = uuid
a1.sources.sources1.interceptors.i1.preserveExisting = true
a1.sources.sources1.interceptors.i1.prefix = UUID-

7、Morphline Interceptor

Morphline拦截器,该拦截器使用Morphline对每个events数据做相应的转换。关于Morphline的使用,可参考

http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html
8、Search and Replace Interceptor

此拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。还可以进行回溯/群组捕捉。此拦截器使用与Java Matcher.replaceAll()方法中相同的规则

# 拦截器别名
a1.sources.avroSrc.interceptors = search-replace 
# 拦截器类型,必须是search_replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace

#删除事件正文中的前导字母数字字符,根据正则匹配event内容
a1.sources.avroSrc.interceptors.search-replace.searchPattern = 快速褐色([az] +)跳过懒惰([az] +)
# 替换匹配到的event内容
a1.sources.avroSrc.interceptors.search-replace.replaceString = 饥饿的$ 2吃了不小心$ 1
# 设置字符集,默认是utf8
a1.sources.avroSrc.interceptors.search-replace.charset = utf8

二、静态拦截器的使用(静态拦截器)

假设有两台服务器(AB )收集实时生产日志,日志类型分别是:access.lognginx.logweb.log

将两台服务器的日志汇总到一台服务器上(C),然后统计存储到HDFS文件系统中,且存储目录结构为

/source/logs/access/20180911/events.x

/source/logs/nginx/20180911/ events.x

/source/logs/web/20180911/ events.x

flume拦截器

1、在服务器 node01node02 创建配置文件 exec_source.properties

# Name the components on this agent
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 元数据类型
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/data/logs/access.log
# 拦截器别名
a1.sources.r1.interceptors = i1
# 拦截器类型
a1.sources.r1.interceptors.i1.type = static
# 拦截的键值对
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access

a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /export/data/logs/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx

a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /export/data/logs/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web

# 下沉的目的地
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node03
a1.sinks.k1.port = 41414

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 2000000
a1.channels.c1.transactionCapacity = 100000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1

2、在服务器 node03 上创建配置文件avro_source_hdfs.properties

# 定义agent名, source、channel、sink的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1


# 定义source
a1.sources.r1.type = avro
a1.sources.r1.bind = node03
a1.sources.r1.port =41414

# 添加时间拦截器,将没有时间戳的event添加上时间戳
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder


# 定义channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000

# 定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://node01:9000/source/logs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
# 时间类型
#a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 生成的文件不按条数生成
a1.sinks.k1.hdfs.rollCount = 0
# 生成的文件不按时间生成
a1.sinks.k1.hdfs.rollInterval = 30
# 生成的文件按大小生成
a1.sinks.k1.hdfs.rollSize  = 10485760
#a1.sinks.k1.hdfs.rollSize  =0
# 批量写入hdfs的个数
a1.sinks.k1.hdfs.batchSize = 20
# flume操作hdfs的线程数(包括新建,写入等)
a1.sinks.k1.hdfs.threadsPoolSize=10
# 操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000

# 组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、测试

先启动node03上的flume:

bin/flume-ng agent -c conf/ -f conf/avro_source_hdfs.properties -n a1 -Dflume.root.logger=INFO,console

再分别启动node01、node02上的flume:

bin/flume-ng agent -c conf/ -f conf/exec_source.properties -n a1 -Dflume.root.logger=INFO,console

接着在node01、node02产生模拟数据:

while true;do echo access...running >> /export/data/logs/access.log;echo nginx...running >> /export/data/logs/nginx.log;echo web...running >> /export/data/logs/web.log;sleep 0.5;done

4、运行结果

node03的控制台输出

flume拦截器

使用HDFS的web-ui查看生成的文件:http://node01:50070/explorer.html#/source/logs

flume拦截器

三、自定义拦截器

1、添加依赖

    <dependencies>
        <!-- flume核心依赖 -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.6.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 打包插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

2、定义实现类MyInterceptor ,只需要实现Interceptor接口,定义内部类Builder实现Interceptor.Builder即可

package com.theone.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;

/**
 * @ClassName MyInterceptor
 * @Description TODO
 * @Author Pureman
 * @Date 2018/7/30 14:53
 **/
public class MyInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    public void close() {
    }

    /**
     * 拦截source发送到通道channel中的消息
     *
     * @param event 接收过滤的event
     * @return event    根据业务处理后的event
     */
    @Override
    public Event intercept(Event event) {
        // 获取事件对象中的字节数据
        byte[] arr = event.getBody();
        // 将获取的数据转换成大写
        event.setBody(new String(arr).toUpperCase().getBytes());
        // 返回到消息中
        return event;
    }
    // 接收被过滤事件集合
    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> list = new ArrayList<>();
        for (Event event : events) {
            list.add(intercept(event));
        }
        return list;
    }

    public static class Builder implements Interceptor.Builder {
        // 获取配置文件的属性
        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }

3、并打包(不指定主类),将jar包上传至node01

4、node01上创建配置文件 myInterceptor-hdfs.properties

# Name the components on this agent 
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/data/logs/text.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.theone.flume.MyInterceptor$Builder

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /theone/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5、启动节点node01上的 flume

flume-ng agent -c conf/ -f conf/myInterceptor-hdfs.properties -n a1 Dflume.root.logger=DEBUG,console

6、在复制的node01中循环追加内容

while true;do echo one world one dream >>/export/data/logs/text.log;sleep 0.5;done