Flume学习笔记:Flume自定义Agent
目录
Flume自定义Agent
pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>FlumeAgent</groupId>
<artifactId>FlumeAgent</artifactId>
<version>1.0</version>
<build>
<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>maven</id>
<url>http://central.maven.org/maven2/</url>
</repository>
<repository>
<id>alimaven</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-maven-plugins</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.1.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
<version>1.1.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.1.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-kafka-source</artifactId>
<version>1.6.0-cdh5.14.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-metastore -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>1.1.0-cdh5.14.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.1.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.1.0</version>
<classifier>hadoop2</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
</dependencies>
</project>
自定义Source
package com.flume.source;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Random;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
public class MySource extends AbstractSource implements Configurable,PollableSource{
public long getBackOffSleepIncrement() {
return 0;
}
public long getMaxBackOffSleepInterval() {
return 0;
}
public Status process() throws EventDeliveryException {
//生成0-10的随机数,组合成一个text
Random random = new Random();
int randomNum = random.nextInt(10);
String text = "myfirstSource" + randomNum;
//生成Header
HashMap<String,String> header = new HashMap<String,String>();
header.put("id",Integer.toString(randomNum));
//EventBuilder.withBody:将给定的header和body组合成Event,并制定字符集
//getChannelProcessor.processEvent():将给定的Event put到每个配置的Channel
this.getChannelProcessor().processEvent(EventBuilder.withBody(text,Charset.forName("UTF-8"),header));
//Ready状态表示Event可以被取走,还有一个状态是Backoff,表示让Flume睡眠一段时间
return Status.READY;
}
public void configure(Context arg0) {
}
}
测试
生成jar包,拷到$FLUME_HOME/lib下
然后执行命令
flume-ng agent -n a1 -c / -f /udagent.conf -Dflume.root.logger=INFO,console
运行效果
自定义Sink
package com.flume.sink;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
public class MySink extends AbstractSink implements Configurable {
public Status process() throws EventDeliveryException {
//初始化Status
Status status = Status.READY;
Transaction trans = null;
try {
//开始事务
Channel channel = getChannel();
trans = channel.getTransaction();
trans.begin();
//获取Event
Event event = channel.take();
while(event!=null){
//获取body
String body = new String(event.getBody());
System.out.println(body);
}
if (event == null) {
status = Status.BACKOFF;
}
trans.commit();
} catch (Exception e) {
//有异常的时候还是要提交
if (trans != null) {
trans.commit();
}
e.printStackTrace();
} finally {
if (trans != null) {
trans.close();
}
}
return status;
}
public void configure(Context arg0) {
}
}
测试
配置文件如下
运行Flume
flume-ng agent -n a1 -c / -f udsinkagent.conf -Dflume.root.logger=INFO,console
将测试文件拷贝进文件夹中,相应的jar包也拷到Flume的lib下
Flume的Console马上会打印出文件的内容
自定义Interceptor
package com.flume.interceptor;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import com.google.common.base.Charsets;
public class MyInterceptor implements Interceptor{
public void close() {
}
public void initialize() {
}
//对单个Event内部的处理
public Event intercept(Event event) {
String body = new String(event.getBody(),Charsets.UTF_8);
System.out.println(body);
StringBuffer bodyString = new StringBuffer();
//当发现body以test开头时,将body替换成123456
if(body.startsWith("test")){
bodyString = bodyString.append("123456");
event.setBody(bodyString.toString().getBytes());
}
return event;
}
//对多个Event的处理
public List<Event> intercept(List<Event> events) {
//reset Event
for(Event event :events ){
if (event!=null) intercept(event);
}
return events;
}
public static class Builder implements Interceptor.Builder{
public void configure(Context arg0) {
}
public Interceptor build() {
return new MyInterceptor();
}
}
}
测试
测试文件
运行Flume
flume-ng agent -n a2 -c / -f /myinterceptor.conf -Dflume.root.logger= INFO,console
测试结果,可以发现所有以test开头的字符串都被替换成了123456
自定义Channel(DualChannel实现)
这里参考了一下美团技术团队的文章
https://tech.meituan.com/mt_log_system_optimization.html
Flume本身提供了MemoryChannel和FileChannel。MemoryChannel处理速度快,但缓存大小有限,且没有持久化;FileChannel则刚好相反。我们希望利用两者的优势,在Sink处理速度够快,Channel没有缓存过多日志的时候,就使用MemoryChannel,当Sink处理速度跟不上,又需要Channel能够缓存下应用端发送过来的日志时,就使用FileChannel,由此我们开发了DualChannel,能够智能的在两个Channel之间切换。
/***
* putToMemChannel indicate put event to memChannel or fileChannel
* takeFromMemChannel indicate take event from memChannel or fileChannel
* */
private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);
void doPut(Event event) {
if (switchon && putToMemChannel.get()) {
//往memChannel中写数据
memTransaction.put(event);
if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
putToMemChannel.set(false);
}
} else {
//往fileChannel中写数据
fileTransaction.put(event);
}
}
Event doTake() {
Event event = null;
if ( takeFromMemChannel.get() ) {
//从memChannel中取数据
event = memTransaction.take();
if (event == null) {
takeFromMemChannel.set(false);
}
} else {
//从fileChannel中取数据
event = fileTransaction.take();
if (event == null) {
takeFromMemChannel.set(true);
putToMemChannel.set(true);
}
}
return event;
}
Flume提供了NullSink,可以把不需要的日志通过NullSink直接丢弃,不进行存储。然而,Source需要先将events存放到Channel中,NullSink再将events取出扔掉。为了提升性能,我们把这一步移到了Channel里面做,所以开发了NullChannel。
参考
http://flume.apache.org/releases/content/1.6.0/apidocs/overview-summary.html
上一篇: TX2挂载固态硬盘SSD到/home目录下(亲测有效)
下一篇: 小程序视频控件的使用