log4j2异步日志解读(一)AsyncAppender
log4j、logback、log4j2 历史和关系,我们就在这里不展开讲了。直接上干货,log4j2突出于其他日志的优势,异步日志实现。
看一个东西,首先看官网文档, ,因为前面文章已经讲解了disruptor源码【】,本文主要展开说说异步日志asyncappender和asynclogger(基于disruptor实现)。
asynclogger笔者下文展开讲。
一、asyncappender
我们先来看看asyncapperder核心,就是logger将数据通过append方法放入到阻塞队列中,随后后台线程从队列中取出数据然后进行后续的操作。
那这样看来,就很简单了,一个append()方法,一个后台线程执行就是我们要看的核心代码了。围绕我们要看的类asyncappender,来看看类关系图。
一、放入队列
主要实现就是logger将数据通过append方法放入到阻塞队列中。
//asyncappender.java /** * actual writing occurs here. * * @param logevent the logevent. */ @override public void append(final logevent logevent) { if (!isstarted()) { throw new illegalstateexception("asyncappender " + getname() + " is not active"); } //创建log4jlogevent的对象memento final log4jlogevent memento = log4jlogevent.creatememento(logevent, includelocation); internalasyncutil.makemessageimmutable(logevent.getmessage()); //transfer(memento)将event放入队列 //默认arrayblockingqueuefactory 大小1024 if (!transfer(memento)) { if (blocking) { if (abstractlogger.getrecursiondepth() > 1) { // log4j2-1518, log4j2-2031 // if queue is full and we are in a recursive call, call appender directly to prevent deadlock asyncqueuefullmessageutil.logwarningtostatuslogger(); logmessageincurrentthread(logevent); } else { // delegate to the event router (which may discard, enqueue and block, or log in current thread) final eventroute route = asyncqueuefullpolicy.getroute(thread.getid(), memento.getlevel()); route.logmessage(this, memento); } } else { error("appender " + getname() + " is unable to write primary appenders. queue is full"); logtoerrorappenderifnecessary(false, memento); } } } private boolean transfer(final logevent memento) { return queue instanceof transferqueue ? ((transferqueue<logevent>) queue).trytransfer(memento) : queue.offer(memento); }
如流程图所示,首先会判断用户是否设置了blocking选项,默认是true,如果设置为false,则appender直接会toerrorappender,如果用户没有配置或者配置为true,则会按照一定的策略来处理这些消息。策略可以分为2种,他们分别为:
1、defaultasyncqueuefullpolicy---等待队列,转为同步操作策略
public class defaultasyncqueuefullpolicy implements asyncqueuefullpolicy { @override public eventroute getroute(final long backgroundthreadid, final level level) { // log4j2-471: prevent deadlock when ringbuffer is full and object // being logged calls logger.log() from its tostring() method if (thread.currentthread().getid() == backgroundthreadid) { return eventroute.synchronous; } return eventroute.enqueue; }
2、discardingasyncqueuefullpolicy---按照日志等级抛弃日志策略
//discardingasyncqueuefullpolicy.java @override public eventroute getroute(final long backgroundthreadid, final level level) { if (level.islessspecificthan(thresholdlevel)) { if (discardcount.getandincrement() == 0) { logger.warn("async queue is full, discarding event with level {}. " + "this message will only appear once; future events from {} " + "are silently discarded until queue capacity becomes available.", level, thresholdlevel); } return eventroute.discard; } return super.getroute(backgroundthreadid, level); }
二、后台线程执行后续操作。
主要就是后台线程从队列中取出数据然后进行后续的操作。
//asyncappender.java private class asyncthread extends log4jthread { private volatile boolean shutdown = false; private final list<appendercontrol> appenders; private final blockingqueue<logevent> queue; public asyncthread(final list<appendercontrol> appenders, final blockingqueue<logevent> queue) { super("asyncappender-" + thread_sequence.getandincrement()); this.appenders = appenders; this.queue = queue; setdaemon(true); } @override public void run() { while (!shutdown) { logevent event; try { event = queue.take(); if (event == shutdown_log_event) { shutdown = true; continue; } } catch (final interruptedexception ex) { break; // log4j2-830 } event.setendofbatch(queue.isempty()); final boolean success = callappenders(event); if (!success && errorappender != null) { try { errorappender.callappender(event); } catch (final exception ex) { // silently accept the error. } } } // process any remaining items in the queue. logger.trace("asyncappender.asyncthread shutting down. processing remaining {} queue events.", queue.size()); int count = 0; int ignored = 0; while (!queue.isempty()) { try { final logevent event = queue.take(); if (event instanceof log4jlogevent) { final log4jlogevent logevent = (log4jlogevent) event; logevent.setendofbatch(queue.isempty()); callappenders(logevent); count++; } else { ignored++; logger.trace("ignoring event of class {}", event.getclass().getname()); } } catch (final interruptedexception ex) { // may have been interrupted to shut down. // here we ignore interrupts and try to process all remaining events. } } logger.trace("asyncappender.asyncthread stopped. queue has {} events remaining. " + "processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored); } ... }
该线程会一直尝试从阻塞队列中获取logevent,如果获取成功,调用appenderref所引用appender的append方法。我们也可以看到,asyncappender实际上主要是类似于中转,日志异步化,当消息放入阻塞队列,返回成功,这样能够大幅提高日志记录的吞吐。用户可以在权衡性能与日志收集质量上进行权衡配置策略(设置blocking选项),当然也可以设置不同类型的阻塞队列已到达更好的日志记录吞吐。
asyncappender配置参数
https://logging.apache.org/log4j/2.x/manual/appenders.html#asyncappender
上一篇: 心情有点沉重
下一篇: Java 边学边做(一)过一下基础