自定义flume 拦截器(interceptor)
程序员文章站
2022-05-23 11:07:45
...
自定义Interceptor
自定义过滤器类:
新建Maven项目,新建类,实现Interceptor接口,重写intercept方法,如:将header添加到body中,
- 重写父类方法:close(),intiialize()
代码如下
@Override
public void close() {
}
@Override
public void initialize() {
}
2.重写父类方法:intercept()
自定义拦截器,这里是将header中的数据加入到body中,中间用Tab键分隔。
@Override
public Event intercept(Event ev) {
Map<String, String> event = ev.getHeaders();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
if (event.containsKey("timestamp")) {
timestampHeaderName = "timestamp=" + event.get("timestamp");
}
if (event.containsKey("host")) {
hostHeaderName = "host=" + event.get("host");
}
baos.write(ev.getBody());
baos.write("\t".getBytes(charsetName));
baos.write(timestampHeaderName.getBytes(charsetName));
baos.write("\t".getBytes(charsetName));
baos.write(hostHeaderName.getBytes(charsetName));
ev.setBody(baos.toByteArray());
} catch (IOException e) {
e.printStackTrace();
}
return ev;
}
3.重写方法intercept()
这里写的代码几乎是固定了的,主要实现拦截的方法在上边的方法中已经定义好了,这里只是实现了一下,多条数据的时候的实现
代码如下:
@Override
public List<Event> intercept(List<Event> event) {
List<Event> result = new ArrayList<>();
for (Event ev : event) {
result.add(ev);
}
return result;
}
4.自定义类种类:Builder
在builder的最后调用了主方法,但是在集群中调用的是这个方法。
public static class Builder implements Interceptor.Builder {
@Override
public void configure(Context arg0) {
}
@Override
public Interceptor build() {
return new MyApp();
}
}
5.导出jar包到集群中
在flume的安装目录下依次创建如下文件夹:plugins.d/自定义名称/lib/这里放jar包
总的路径如:
/usr/share/flume/plugins.d/intercept/lib/intercept.jar
6启动自定义properties文件,
启动命令:flume-ng agent -f properties配置文件名称 -n properties文件中的名称 -Dflume.root.logger=Info,console
如果不需要打印日志,-Dflume.root.logger=Info,console不需要添加。
properties配置文件见上篇博客:blog.csdn.net/James_JR10/article/details/78255910
推荐阅读