欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume自定义Source

程序员文章站 2022-06-14 13:04:26
...

简介

Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些source。

官方也提供了自定义source的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#source根据官方说明自定义MySource需要继承AbstractSource类并实现Configurable和PollableSource接口。

需求

使用flume接收数据,并给每条数据添加前缀,输出到控制台。前缀可从flume配置文件中配置。
Flume自定义Source

编写代码

创建个Maven工程

依赖

 <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.7.0</version>
</dependency>

具体代码


import java.util.ArrayList;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.PollableSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

/*
 * 使用flume接收数据,并给每条数据添加前缀,输出到控制台。前缀可从flume配置文件中配置
 */
public class MySource extends AbstractSource implements Configurable, PollableSource {

	private String prefix;

	// 最核心方法,在process()中,创建Event,将event放入channel
	// Status{ READY, BACKOFF}
	// READY: source成功第封装了event,存入到channel,返回READY
	// BACKOFF: source无法封装了event,无法存入到channel,返回BACKOFF
	// process()方法会被Source所在的线程循环调用!
	@Override
	public Status process() {

		Status status = Status.READY;

		//封装event
		List<Event> datas = new ArrayList<>();

		for (int i = 0; i < 10; i++) {

			SimpleEvent e = new SimpleEvent();

			//向body中封装数据
			e.setBody((prefix + "hello" + i).getBytes());

			datas.add(e);

		}

		//将数据放入channel
		// 获取当前source对象对应的channelprocessor
		try {

			Thread.sleep(5000);

			ChannelProcessor cp = getChannelProcessor();

			cp.processEventBatch(datas);

		} catch (Exception e) {

			status = Status.BACKOFF;

			e.printStackTrace();
		}

		return status;
	}

	// 当source没有数据可封装时,会让source所在的线程先休息一会,休息的时间,由以下值*计数器系数
	@Override
	public long getBackOffSleepIncrement() {
		return 2000;
	}

	@Override
	public long getMaxBackOffSleepInterval() {
		return 5000;
	}

	// 从配置中来读取信息
	@Override
	public void configure(Context context) {

		//从配置文件中读取key为prefix的属性值,如果没有配置,提供默认值default:
		prefix = context.getString("prefix", "default:");

	}

}

Maven打包

Maven 先clean 再package

打成 Flume-0.0.1-SNAPSHOT.jar 之后放到apache-flume-1.7.0/lib目录下面

编写配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.zjj.flume.custom.MySource
a1.sources.r1.delay = 1000

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent


[aaa@qq.com job]# flume-ng agent -n a1  -c conf/  -f  "/root/soft/apache-flume-1.7.0/conf/job/demo1.conf"  -Dflume.root.logger=DEBUG,console

等待几秒 控制台打印内容

20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 30       default:hello0 }
20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 31       default:hello1 }
20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 32       default:hello2 }
20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 33       default:hello3 }
20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 34       default:hello4 }
20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 35       default:hello5 }
20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 36       default:hello6 }
20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 37       default:hello7 }
20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 38       default:hello8 }
20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 39       default:hello9 }
20/10/26 15:24:24 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 30       default:hello0 }

说明配置生效了…

代码

https://gitee.com/crow1/flume-my-source

相关标签: # Flume flume