欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume学习笔记:Flume自定义Agent

程序员文章站 2022-03-14 12:41:50
...

目录

Flume自定义Agent

pom文件

自定义Source

测试

自定义Sink

测试

自定义Interceptor 

测试

自定义Channel(DualChannel实现)

参考

Flume自定义Agent

pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>FlumeAgent</groupId>
  <artifactId>FlumeAgent</artifactId>
  <version>1.0</version>
  
  <build>
    <pluginManagement>
      <plugins>
        <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
        <plugin>
          <groupId>org.eclipse.m2e</groupId>
          <artifactId>lifecycle-mapping</artifactId>
          <version>1.0.0</version>
          <configuration>
            <lifecycleMappingMetadata>
              <pluginExecutions>
                <pluginExecution>
                  <pluginExecutionFilter>
                    <groupId>
                      org.apache.maven.plugins
                    </groupId>
                    <artifactId>
                      maven-compiler-plugin
                    </artifactId>
                    <versionRange>[3.1,)</versionRange>
                    <goals>
                      <goal>compile</goal>
                    </goals>
                  </pluginExecutionFilter>
                  <action>
                    <ignore></ignore>
                  </action>
                </pluginExecution>
              </pluginExecutions>
            </lifecycleMappingMetadata>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
  
  <repositories>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
      <id>maven</id>
      <url>http://central.maven.org/maven2/</url>
    </repository>
    <repository>
    <id>alimaven</id>  
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>  
     </repositories>

<dependencies>
    <dependency>  
        <groupId>org.apache.maven.plugins</groupId>  
        <artifactId>maven-resources-plugin</artifactId>  
        <version>2.5</version>  
    </dependency>  
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.6.0-cdh5.14.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.6.0-cdh5.14.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-maven-plugins</artifactId>
    <version>2.6.0-cdh5.14.0</version>
    </dependency>
     <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-cdh5.14.0</version>
        </dependency>
     <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
        <version>2.6.0-cdh5.14.0</version>
     </dependency>
    
     <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-yarn-server-nodemanager</artifactId>
        <version>2.6.0-cdh5.14.0</version>
     </dependency>
            
     <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-yarn-common</artifactId>
          <version>2.6.0-cdh5.14.0</version>
     </dependency>
     <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-app</artifactId>
    <version>2.6.0-cdh5.14.0</version>
    </dependency>
     <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-common</artifactId>
        <version>2.6.0-cdh5.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>2.6.0-cdh5.14.0</version>
    </dependency>
     <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
        <version>2.6.0-cdh5.14.0</version>
    </dependency>
    <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-yarn-api</artifactId>
        <version>2.6.0-cdh5.14.0</version>
     </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-yarn-client</artifactId>
        <version>2.6.0-cdh5.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>1.1.0-cdh5.14.0</version>
     </dependency>
      <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-common</artifactId>
    <version>1.1.0-cdh5.14.0</version>
</dependency>
        <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>1.1.0-cdh5.14.0</version>
</dependency>
 <dependency>  
      <groupId>org.apache.flume</groupId>  
      <artifactId>flume-ng-core</artifactId>  
      <version>1.6.0-cdh5.14.0</version>  
    </dependency>  
  
    <dependency>  
      <groupId>org.apache.flume.flume-ng-sources</groupId>  
      <artifactId>flume-kafka-source</artifactId>  
      <version>1.6.0-cdh5.14.0</version>  
    </dependency> 

<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-metastore -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-metastore</artifactId>
    <version>1.1.0-cdh5.14.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>1.1.0-cdh5.14.0</version>
</dependency>
    <dependency>
      <groupId>org.apache.mrunit</groupId>
      <artifactId>mrunit</artifactId>
      <version>1.1.0</version>
      <classifier>hadoop2</classifier>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-all</artifactId>
      <version>1.9.5</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.10</version>
      <scope>test</scope>
    </dependency>
    <dependency>  
            <groupId>jdk.tools</groupId>  
            <artifactId>jdk.tools</artifactId>  
            <version>1.8</version>  
            <scope>system</scope>  
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>  
        </dependency>  
 </dependencies>
  
</project>

自定义Source

package com.flume.source;

import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Random;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;

public class MySource extends AbstractSource implements Configurable,PollableSource{

	public long getBackOffSleepIncrement() {
		return 0;
	}

	public long getMaxBackOffSleepInterval() {
		return 0;
	}

	public Status process() throws EventDeliveryException {
		
		//生成0-10的随机数,组合成一个text
		Random random = new Random();
		int randomNum = random.nextInt(10);
		String text = "myfirstSource" + randomNum;
		
		//生成Header
		HashMap<String,String> header = new HashMap<String,String>();
		header.put("id",Integer.toString(randomNum));
		
		//EventBuilder.withBody:将给定的header和body组合成Event,并制定字符集
		//getChannelProcessor.processEvent():将给定的Event put到每个配置的Channel
		this.getChannelProcessor().processEvent(EventBuilder.withBody(text,Charset.forName("UTF-8"),header));
		
		//Ready状态表示Event可以被取走,还有一个状态是Backoff,表示让Flume睡眠一段时间
		return Status.READY;
	}

	public void configure(Context arg0) {
	}
	
}

测试

生成jar包,拷到$FLUME_HOME/lib下
然后执行命令

flume-ng agent -n a1 -c / -f /udagent.conf -Dflume.root.logger=INFO,console

运行效果

Flume学习笔记:Flume自定义Agent

自定义Sink

package com.flume.sink;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

public class MySink extends AbstractSink implements Configurable {

	public Status process() throws EventDeliveryException {

		//初始化Status
		Status status = Status.READY;
		Transaction trans = null;
		try {
			//开始事务
			Channel channel = getChannel();
			trans = channel.getTransaction();
			
			trans.begin();
			
			//获取Event
			Event event = channel.take();
			while(event!=null){
				//获取body
				String body = new String(event.getBody());
				System.out.println(body);
			}
			if (event == null) {
				status = Status.BACKOFF;
			}

			trans.commit();
		} catch (Exception e) {
			//有异常的时候还是要提交
			if (trans != null) {
				trans.commit();
			}
			e.printStackTrace();
		} finally {
			if (trans != null) {
				trans.close();
			}
		}

		return status;
	}

	public void configure(Context arg0) {
	}

}

测试

配置文件如下

Flume学习笔记:Flume自定义Agent

运行Flume

flume-ng agent -n a1 -c / -f udsinkagent.conf -Dflume.root.logger=INFO,console

将测试文件拷贝进文件夹中,相应的jar包也拷到Flume的lib下
Flume学习笔记:Flume自定义Agent

Flume的Console马上会打印出文件的内容

Flume学习笔记:Flume自定义Agent

自定义Interceptor 

package com.flume.interceptor;

import java.util.List;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import com.google.common.base.Charsets;

public class MyInterceptor implements Interceptor{

	public void close() {
	}

	public void initialize() {		
	}

	//对单个Event内部的处理
	public Event intercept(Event event) {
		
		String body = new String(event.getBody(),Charsets.UTF_8);
		System.out.println(body);
		StringBuffer bodyString = new StringBuffer();
		
		//当发现body以test开头时,将body替换成123456
		if(body.startsWith("test")){
			bodyString = bodyString.append("123456");
			event.setBody(bodyString.toString().getBytes());
		}
		
		return event;
	}

	//对多个Event的处理
	public List<Event> intercept(List<Event> events) {

		//reset Event
		for(Event event :events ){
			if (event!=null) intercept(event);
		}
		
		return events;
	}
	
	public static class Builder implements Interceptor.Builder{

		public void configure(Context arg0) {
			
		}

		public Interceptor build() {
			return new MyInterceptor();
		}		
	}
}

测试

测试文件

Flume学习笔记:Flume自定义Agent

运行Flume

flume-ng agent -n a2 -c / -f /myinterceptor.conf -Dflume.root.logger= INFO,console

测试结果,可以发现所有以test开头的字符串都被替换成了123456 
Flume学习笔记:Flume自定义Agent

自定义Channel(DualChannel实现)

这里参考了一下美团技术团队的文章
https://tech.meituan.com/mt_log_system_optimization.html

Flume本身提供了MemoryChannel和FileChannel。MemoryChannel处理速度快,但缓存大小有限,且没有持久化;FileChannel则刚好相反。我们希望利用两者的优势,在Sink处理速度够快,Channel没有缓存过多日志的时候,就使用MemoryChannel,当Sink处理速度跟不上,又需要Channel能够缓存下应用端发送过来的日志时,就使用FileChannel,由此我们开发了DualChannel,能够智能的在两个Channel之间切换。

/***
 * putToMemChannel indicate put event to memChannel or fileChannel
 * takeFromMemChannel indicate take event from memChannel or fileChannel
 * */
private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);

void doPut(Event event) {
        if (switchon && putToMemChannel.get()) {
              //往memChannel中写数据
              memTransaction.put(event);

              if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
                putToMemChannel.set(false);
              }
        } else {
              //往fileChannel中写数据
              fileTransaction.put(event);
        }
  }

Event doTake() {
    Event event = null;
    if ( takeFromMemChannel.get() ) {
        //从memChannel中取数据
        event = memTransaction.take();
        if (event == null) {
            takeFromMemChannel.set(false);
        } 
    } else {
        //从fileChannel中取数据
        event = fileTransaction.take();
        if (event == null) {
            takeFromMemChannel.set(true);

            putToMemChannel.set(true);
        } 
    }
    return event;
}

Flume提供了NullSink,可以把不需要的日志通过NullSink直接丢弃,不进行存储。然而,Source需要先将events存放到Channel中,NullSink再将events取出扔掉。为了提升性能,我们把这一步移到了Channel里面做,所以开发了NullChannel。 

参考

http://flume.apache.org/releases/content/1.6.0/apidocs/overview-summary.html

 

相关标签: Flume