欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

征服flume之三——使用log4j输出日志到flume 博客分类: 企业架构 flumelog4javrosink 

程序员文章站 2024-03-06 18:44:38
...
接下来的几篇文章,我们将逐步学习使用各种方式对日志进行采集。

本文讲述的是如何使用log4j直接输出日志到flume。
先上干货,再讲理论!

1、flume配置文件
agent.sources = so1
agent.channels = c1
agent.sinks = s1

# For each one of the sources, the type is defined
agent.sources.so1.type = avro
agent.sources.so1.bind = 0.0.0.0
agent.sources.so1.port = 44444
tier1.channels.channel1.keep-alive=30

# The channel can be defined as follows.
# agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined

agent.sinks.s1.type = logger

#Specify the channel the sink should use
# agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
agent.sources.so1.channels = c1
agent.sinks.s1.channel = c1


2、测试代码
public class FlumeLogTest {
	private Logger logger = LoggerFactory.getLogger(getClass());
	public static void main(String[] args) throws Exception {
		DOMConfigurator.configureAndWatch("config/log4j.xml");
		new FlumeLogTest().start();
	}

	public void start() {
		while(true){
			logger.debug("flume log test:{}",System.currentTimeMillis());
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
	}
}


3、log4j.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" >
<log4j:configuration>
	<appender name="flume"
		class="org.apache.flume.clients.log4jappender.Log4jAppender">
		<param name="hostname" value="192.168.113.181" />
		<param name="port" value="44444" />
		<layout class="org.apache.log4j.PatternLayout">
			<param name="ConversionPattern" value="[%p] %d{dd MMM hh:mm:ss aa} %t [%l] %m%n" />
		</layout>
	</appender>
	<appender name="async" class="org.apache.log4j.AsyncAppender">
		<param name="Blocking" value="false" />
		<param name="BufferSize" value="500" />
		<appender-ref ref="flume" />
	</appender>

	<appender name="CONSOLE.OUT" class="org.apache.log4j.ConsoleAppender">
		<param name="target" value="System.out" />
		<layout class="org.apache.log4j.PatternLayout">
			<param name="ConversionPattern" value="[%d][%p, (%F:%L).%M] %m%n" />
		</layout>
		<filter class="org.apache.log4j.varia.LevelRangeFilter">
			<param name="LevelMin" value="debug" />
			<param name="LevelMax" value="info" />
			<param name="AcceptOnMatch" value="false" />
		</filter>
	</appender>

	<logger name="org.springframework">
		<level value="ERROR" />
	</logger>
	<logger name="com.cp.flume">
		<level value="debug" />
	</logger>
	<root>
		<priority value="info"></priority>
		<appender-ref ref="async" />
		<appender-ref ref="CONSOLE.OUT" />
	</root>
</log4j:configuration>

4、pom.xml
		<dependency>
			<groupId>org.apache.flume.flume-ng-clients</groupId>
			<artifactId>flume-ng-log4jappender</artifactId>
			<version>1.7.0-SNAPSHOT</version>
		</dependency>


这里要说明的几点:
1、flume的Log4j Appender必须使用Log4j的异步加载器,否则一旦日志服务器挂掉,将会导致应用服务器宕机;需要将异步加载器中的消息队列设置为非阻塞模式Blocking=false,并设置相应的buffersize,否则flume服务器宕机时,会导致应用服务器宕机。
2、当flume服务器宕机时,Log4jAppender类的append方法会抛出FlumeException,需要对该异常进行捕获,否则同样会引起应用服务器的宕机。这部分我是通过继承Log4jAppender并重写append接口实现的,参见下述代码。
3、正常状况下Log4jAppender本身有自动重连机制(已测试)

package org.apache;


import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.api.RpcClientFactory.ClientType;
import org.apache.flume.clients.log4jappender.Log4jAppender;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;

/** 
* @project:		flume-log4j-test
* @Title:		FailoverLog4jAppender.java
* @Package:		org.apache
  @author: 		chenpeng
* @email: 		46731706@qq.com
* @date:		2016年2月24日下午2:12:16
* @description:
* @version:
*/ 
public class FailoverLog4jAppender extends Log4jAppender {

	private String hosts;
	private String maxAttempts;
	private boolean configured = false;

	public void setHosts(String hostNames) {
		this.hosts = hostNames;
	}

	public void setMaxAttempts(String maxAttempts) {
		this.maxAttempts = maxAttempts;
	}

	@Override
	public synchronized void append(LoggingEvent event) {
		if (!configured) {
			String errorMsg = "Flume Log4jAppender not configured correctly! Cannot"
					+ " send events to Flume.";
			LogLog.error(errorMsg);
			if (getUnsafeMode()) {
				return;
			}
			throw new FlumeException(errorMsg);
		}
		try {
			super.append(event);
		} catch (FlumeException e) {
			e.printStackTrace();
		}
	}

	/**
	 *
	 * @throws FlumeException
	 *             if the FailoverRpcClient cannot be instantiated.
	 */
	@Override
	public void activateOptions() throws FlumeException {
		try {
			final Properties properties = getProperties(hosts, maxAttempts,
					getTimeout());
			rpcClient = RpcClientFactory.getInstance(properties);
			if (layout != null) {
				layout.activateOptions();
			}
			configured = true;
		} catch (Exception e) {
			String errormsg = "RPC client creation failed! " + e.getMessage();
			LogLog.error(errormsg);
			if (getUnsafeMode()) {
				return;
			}
			throw new FlumeException(e);
		}

	}

	/**
	*/
	private Properties getProperties(String hosts, String maxAttempts,
			long timeout) throws FlumeException {

		if (StringUtils.isEmpty(hosts)) {
			throw new FlumeException("hosts must not be null");
		}

		Properties props = new Properties();
		String[] hostsAndPorts = hosts.split("\\s+");
		StringBuilder names = new StringBuilder();
		for (int i = 0; i < hostsAndPorts.length; i++) {
			String hostAndPort = hostsAndPorts[i];
			String name = "h" + i;
			props.setProperty(
					RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + name,
					hostAndPort);
			names.append(name).append(" ");
		}
		props.put(RpcClientConfigurationConstants.CONFIG_HOSTS,
				names.toString());
		props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
				ClientType.DEFAULT_FAILOVER.toString());

		if (StringUtils.isEmpty(maxAttempts)) {
			throw new FlumeException("hosts must not be null");
		}

		props.put(RpcClientConfigurationConstants.CONFIG_MAX_ATTEMPTS,
				maxAttempts);

		props.setProperty(
				RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
				String.valueOf(timeout));
		props.setProperty(
				RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
				String.valueOf(timeout));
		return props;
	}

}