欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flume学习--------自定义拦截器

程序员文章站 2022-06-14 13:06:58
...

flume内部执行过程

flume学习--------自定义拦截器

channel selector

1.Replicating Channel Selector (default)

Required properties are in bold.
Property Name Default Description
selector.type replicating The component type name, needs to be replicating
selector.optional – Set of channels to be marked as optional

Example for agent named a1 and it’s source called r1:

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

In the above configuration, c3 is an optional channel. Failure to
write to c3 is simply ignored. Since c1 and c2 are not marked
optional, failure to write to those channels will cause the
transaction to fail.

2.Multiplexing Channel Selector

Required properties are in bold.
Property Name Default Description
selector.type replicating The component type name, needs to be multiplexing
selector.header flume.selector.header
selector.default –
selector.mapping.* –

Example for agent named a1 and it’s source called r1:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

3.Custom Channel Selector

Sink Processor

1.Default Sink Processor

Default sink processor accepts only a single sink. User is not forced to create processor (sink group) for single sinks. Instead user can follow the source - channel - sink pattern that was explained above in this user guide.
默认接收器处理器只接受单个接收器。用户不必为单个接收器创建处理器(接收器组)。相反,用户可以遵循在本用户指南中解释的源-通道-接收器模式

2.Failover Sink Processor

flume学习--------自定义拦截器

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

3.Load balancing Sink Processor

flume学习--------自定义拦截器

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

自定义拦截器

实现功能:把字符串中含有hello的数据和不含hello的数据区分开;
flume学习--------自定义拦截器
导入flume-ng-core.jar

package com.xjq.Interceptor.flume_demo;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

public class TypeInterceptor implements Interceptor{
	
	List<Event> listeEvents;
	public void initialize() {
		listeEvents = new ArrayList<Event>();
	}

	public Event intercept(Event event) {
		Map<String, String> headers= event.getHeaders();
		String body = new String(event.getBody());
		if(body.contains("hello")) {
			headers.put("type", "yes");
		}else {
			headers.put("type", "no");
		}
		return event;
	}

	public List<Event> intercept(List<Event> events) {
		listeEvents.clear();
		for (Event event : events) {
			intercept(event);
			listeEvents.add(event);
		}
		return listeEvents;
	}

	public void close() {
		
	}
	
	public static class Builder implements Interceptor.Builder{

		public void configure(Context context) {
			
		}

		public Interceptor build() {
			return new TypeInterceptor();
		}
		
	}

}

flume1.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node01
a1.sources.r1.port = 4444

#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.xjq.Interceptor.flume_demo.TypeInterceptor$Builder

#Selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.yes = c1
a1.sources.r1.selector.mapping.no = c2

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node02
a1.sinks.k1.port = 5555

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node03
a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

flume2.conf

a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = node02
a2.sources.r1.port = 5555

# Describe the sink
a2.sinks.k1.type = logger

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = node03
a3.sources.r1.port = 5555

# Describe the sink
a3.sinks.k1.type = logger

# Use a channel which buffers events in memory
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
相关标签: flume