flume学习--------自定义拦截器
flume内部执行过程
channel selector
1.Replicating Channel Selector (default)
Required properties are in bold.
Property Name Default Description
selector.type replicating The component type name, needs to be replicating
selector.optional – Set of channels to be marked as optional
Example for agent named a1 and it’s source called r1:
a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3
In the above configuration, c3 is an optional channel. Failure to
write to c3 is simply ignored. Since c1 and c2 are not marked
optional, failure to write to those channels will cause the
transaction to fail.
2.Multiplexing Channel Selector
Required properties are in bold.
Property Name Default Description
selector.type replicating The component type name, needs to be multiplexing
selector.header flume.selector.header
selector.default –
selector.mapping.* –
Example for agent named a1 and it’s source called r1:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
3.Custom Channel Selector
Sink Processor
1.Default Sink Processor
Default sink processor accepts only a single sink. User is not forced to create processor (sink group) for single sinks. Instead user can follow the source - channel - sink pattern that was explained above in this user guide.
默认接收器处理器只接受单个接收器。用户不必为单个接收器创建处理器(接收器组)。相反,用户可以遵循在本用户指南中解释的源-通道-接收器模式
2.Failover Sink Processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
3.Load balancing Sink Processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
自定义拦截器
实现功能:把字符串中含有hello的数据和不含hello的数据区分开;
导入flume-ng-core.jar
package com.xjq.Interceptor.flume_demo;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
public class TypeInterceptor implements Interceptor{
List<Event> listeEvents;
public void initialize() {
listeEvents = new ArrayList<Event>();
}
public Event intercept(Event event) {
Map<String, String> headers= event.getHeaders();
String body = new String(event.getBody());
if(body.contains("hello")) {
headers.put("type", "yes");
}else {
headers.put("type", "no");
}
return event;
}
public List<Event> intercept(List<Event> events) {
listeEvents.clear();
for (Event event : events) {
intercept(event);
listeEvents.add(event);
}
return listeEvents;
}
public void close() {
}
public static class Builder implements Interceptor.Builder{
public void configure(Context context) {
}
public Interceptor build() {
return new TypeInterceptor();
}
}
}
flume1.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node01
a1.sources.r1.port = 4444
#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.xjq.Interceptor.flume_demo.TypeInterceptor$Builder
#Selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.yes = c1
a1.sources.r1.selector.mapping.no = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node02
a1.sinks.k1.port = 5555
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node03
a1.sinks.k2.port = 5555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
flume2.conf
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = node02
a2.sources.r1.port = 5555
# Describe the sink
a2.sinks.k1.type = logger
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = node03
a3.sources.r1.port = 5555
# Describe the sink
a3.sinks.k1.type = logger
# Use a channel which buffers events in memory
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
上一篇: Fourinone-4.15.08(四不像)新版本发布
下一篇: Flume学习笔记
推荐阅读
-
Yii2的相关学习记录,自定义gii模板和引用vendor中的js、css(四) - 漫游云巅
-
React Native学习教程之自定义NavigationBar详解
-
React Native学习教程之Modal控件自定义弹出View详解
-
学习iOS自定义导航控制器UINavigationController
-
JAVA/JSP学习系列之七(Orion下自定义Tag)
-
React Native学习教程之自定义NavigationBar详解
-
React Native学习教程之Modal控件自定义弹出View详解
-
学习iOS自定义导航控制器UINavigationController
-
Java异常学习之自定义异常详解
-
Struts2学习教程之自定义类型转换器的方法