征服flume之三——使用log4j输出日志到flume 博客分类: 企业架构 flumelog4javrosink
程序员文章站
2024-03-06 18:44:38
...
接下来的几篇文章,我们将逐步学习使用各种方式对日志进行采集。
本文讲述的是如何使用log4j直接输出日志到flume。
先上干货,再讲理论!
1、flume配置文件
2、测试代码
3、log4j.xml
4、pom.xml
这里要说明的几点:
1、flume的Log4j Appender必须使用Log4j的异步加载器,否则一旦日志服务器挂掉,将会导致应用服务器宕机;需要将异步加载器中的消息队列设置为非阻塞模式Blocking=false,并设置相应的buffersize,否则flume服务器宕机时,会导致应用服务器宕机。
2、当flume服务器宕机时,Log4jAppender类的append方法会抛出FlumeException,需要对该异常进行捕获,否则同样会引起应用服务器的宕机。这部分我是通过继承Log4jAppender并重写append接口实现的,参见下述代码。
3、正常状况下Log4jAppender本身有自动重连机制(已测试)
本文讲述的是如何使用log4j直接输出日志到flume。
先上干货,再讲理论!
1、flume配置文件
agent.sources = so1 agent.channels = c1 agent.sinks = s1 # For each one of the sources, the type is defined agent.sources.so1.type = avro agent.sources.so1.bind = 0.0.0.0 agent.sources.so1.port = 44444 tier1.channels.channel1.keep-alive=30 # The channel can be defined as follows. # agent.sources.seqGenSrc.channels = memoryChannel # Each sink's type must be defined agent.sinks.s1.type = logger #Specify the channel the sink should use # agent.sinks.loggerSink.channel = memoryChannel # Each channel's type is defined. agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 agent.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel agent.sources.so1.channels = c1 agent.sinks.s1.channel = c1
2、测试代码
public class FlumeLogTest { private Logger logger = LoggerFactory.getLogger(getClass()); public static void main(String[] args) throws Exception { DOMConfigurator.configureAndWatch("config/log4j.xml"); new FlumeLogTest().start(); } public void start() { while(true){ logger.debug("flume log test:{}",System.currentTimeMillis()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
3、log4j.xml
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" > <log4j:configuration> <appender name="flume" class="org.apache.flume.clients.log4jappender.Log4jAppender"> <param name="hostname" value="192.168.113.181" /> <param name="port" value="44444" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="[%p] %d{dd MMM hh:mm:ss aa} %t [%l] %m%n" /> </layout> </appender> <appender name="async" class="org.apache.log4j.AsyncAppender"> <param name="Blocking" value="false" /> <param name="BufferSize" value="500" /> <appender-ref ref="flume" /> </appender> <appender name="CONSOLE.OUT" class="org.apache.log4j.ConsoleAppender"> <param name="target" value="System.out" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="[%d][%p, (%F:%L).%M] %m%n" /> </layout> <filter class="org.apache.log4j.varia.LevelRangeFilter"> <param name="LevelMin" value="debug" /> <param name="LevelMax" value="info" /> <param name="AcceptOnMatch" value="false" /> </filter> </appender> <logger name="org.springframework"> <level value="ERROR" /> </logger> <logger name="com.cp.flume"> <level value="debug" /> </logger> <root> <priority value="info"></priority> <appender-ref ref="async" /> <appender-ref ref="CONSOLE.OUT" /> </root> </log4j:configuration>
4、pom.xml
<dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>1.7.0-SNAPSHOT</version> </dependency>
这里要说明的几点:
1、flume的Log4j Appender必须使用Log4j的异步加载器,否则一旦日志服务器挂掉,将会导致应用服务器宕机;需要将异步加载器中的消息队列设置为非阻塞模式Blocking=false,并设置相应的buffersize,否则flume服务器宕机时,会导致应用服务器宕机。
2、当flume服务器宕机时,Log4jAppender类的append方法会抛出FlumeException,需要对该异常进行捕获,否则同样会引起应用服务器的宕机。这部分我是通过继承Log4jAppender并重写append接口实现的,参见下述代码。
3、正常状况下Log4jAppender本身有自动重连机制(已测试)
package org.apache; import java.util.Properties; import org.apache.commons.lang.StringUtils; import org.apache.flume.FlumeException; import org.apache.flume.api.RpcClientConfigurationConstants; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.api.RpcClientFactory.ClientType; import org.apache.flume.clients.log4jappender.Log4jAppender; import org.apache.log4j.helpers.LogLog; import org.apache.log4j.spi.LoggingEvent; /** * @project: flume-log4j-test * @Title: FailoverLog4jAppender.java * @Package: org.apache @author: chenpeng * @email: 46731706@qq.com * @date: 2016年2月24日下午2:12:16 * @description: * @version: */ public class FailoverLog4jAppender extends Log4jAppender { private String hosts; private String maxAttempts; private boolean configured = false; public void setHosts(String hostNames) { this.hosts = hostNames; } public void setMaxAttempts(String maxAttempts) { this.maxAttempts = maxAttempts; } @Override public synchronized void append(LoggingEvent event) { if (!configured) { String errorMsg = "Flume Log4jAppender not configured correctly! Cannot" + " send events to Flume."; LogLog.error(errorMsg); if (getUnsafeMode()) { return; } throw new FlumeException(errorMsg); } try { super.append(event); } catch (FlumeException e) { e.printStackTrace(); } } /** * * @throws FlumeException * if the FailoverRpcClient cannot be instantiated. */ @Override public void activateOptions() throws FlumeException { try { final Properties properties = getProperties(hosts, maxAttempts, getTimeout()); rpcClient = RpcClientFactory.getInstance(properties); if (layout != null) { layout.activateOptions(); } configured = true; } catch (Exception e) { String errormsg = "RPC client creation failed! " + e.getMessage(); LogLog.error(errormsg); if (getUnsafeMode()) { return; } throw new FlumeException(e); } } /** */ private Properties getProperties(String hosts, String maxAttempts, long timeout) throws FlumeException { if (StringUtils.isEmpty(hosts)) { throw new FlumeException("hosts must not be null"); } Properties props = new Properties(); String[] hostsAndPorts = hosts.split("\\s+"); StringBuilder names = new StringBuilder(); for (int i = 0; i < hostsAndPorts.length; i++) { String hostAndPort = hostsAndPorts[i]; String name = "h" + i; props.setProperty( RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + name, hostAndPort); names.append(name).append(" "); } props.put(RpcClientConfigurationConstants.CONFIG_HOSTS, names.toString()); props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, ClientType.DEFAULT_FAILOVER.toString()); if (StringUtils.isEmpty(maxAttempts)) { throw new FlumeException("hosts must not be null"); } props.put(RpcClientConfigurationConstants.CONFIG_MAX_ATTEMPTS, maxAttempts); props.setProperty( RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT, String.valueOf(timeout)); props.setProperty( RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, String.valueOf(timeout)); return props; } }