欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

log4j2异步日志解读(一)AsyncAppender

程序员文章站 2023-03-27 23:37:43
log4j、logback、log4j2 历史和关系,我们就在这里不展开讲了。直接上干货,log4j2突出于其他日志的优势,异步日志实现。 看一个东西,首先看官网文档,https://logging.apache.org/log4j/2.x/ ,因为前面文章已经讲解了disruptor源码【http ......

log4j2异步日志解读(一)AsyncAppender

 

log4j、logback、log4j2 历史和关系,我们就在这里不展开讲了。直接上干货,log4j2突出于其他日志的优势,异步日志实现。

看一个东西,首先看官网文档, ,因为前面文章已经讲解了disruptor源码【】,本文主要展开说说异步日志asyncappender和asynclogger(基于disruptor实现)。

asynclogger笔者下文展开讲。

 

一、asyncappender

log4j2异步日志解读(一)AsyncAppender

 

我们先来看看asyncapperder核心,就是logger将数据通过append方法放入到阻塞队列中,随后后台线程从队列中取出数据然后进行后续的操作。

那这样看来,就很简单了,一个append()方法,一个后台线程执行就是我们要看的核心代码了。围绕我们要看的类asyncappender,来看看类关系图。

log4j2异步日志解读(一)AsyncAppender

一、放入队列

主要实现就是logger将数据通过append方法放入到阻塞队列中。

//asyncappender.java
    /**
     * actual writing occurs here.
     *
     * @param logevent the logevent.
     */
    @override
    public void append(final logevent logevent) {
        if (!isstarted()) {
            throw new illegalstateexception("asyncappender " + getname() + " is not active");
        }
        //创建log4jlogevent的对象memento
        final log4jlogevent memento = log4jlogevent.creatememento(logevent, includelocation);
        internalasyncutil.makemessageimmutable(logevent.getmessage());
        //transfer(memento)将event放入队列
        //默认arrayblockingqueuefactory 大小1024
        if (!transfer(memento)) {
            if (blocking) {
                if (abstractlogger.getrecursiondepth() > 1) { // log4j2-1518, log4j2-2031
                    // if queue is full and we are in a recursive call, call appender directly to prevent deadlock
                    asyncqueuefullmessageutil.logwarningtostatuslogger();
                    logmessageincurrentthread(logevent);
                } else {
                    // delegate to the event router (which may discard, enqueue and block, or log in current thread)
                 
                    final eventroute route = asyncqueuefullpolicy.getroute(thread.getid(), memento.getlevel());
                    route.logmessage(this, memento);
                }
            } else {
                error("appender " + getname() + " is unable to write primary appenders. queue is full");
                logtoerrorappenderifnecessary(false, memento);
            }
        }
    }

    private boolean transfer(final logevent memento) {
        return queue instanceof transferqueue
            ? ((transferqueue<logevent>) queue).trytransfer(memento)
            : queue.offer(memento);
    }

log4j2异步日志解读(一)AsyncAppender

如流程图所示,首先会判断用户是否设置了blocking选项,默认是true,如果设置为false,则appender直接会toerrorappender,如果用户没有配置或者配置为true,则会按照一定的策略来处理这些消息。策略可以分为2种,他们分别为:

1、defaultasyncqueuefullpolicy---等待队列,转为同步操作策略

public class defaultasyncqueuefullpolicy implements asyncqueuefullpolicy {
    @override
    public eventroute getroute(final long backgroundthreadid, final level level) {

        // log4j2-471: prevent deadlock when ringbuffer is full and object
        // being logged calls logger.log() from its tostring() method
        if (thread.currentthread().getid() == backgroundthreadid) {
            return eventroute.synchronous;
        }
        return eventroute.enqueue;
    }


2、discardingasyncqueuefullpolicy---按照日志等级抛弃日志策略

//discardingasyncqueuefullpolicy.java
    @override
    public eventroute getroute(final long backgroundthreadid, final level level) {
        if (level.islessspecificthan(thresholdlevel)) {
            if (discardcount.getandincrement() == 0) {
                logger.warn("async queue is full, discarding event with level {}. " +
                        "this message will only appear once; future events from {} " +
                        "are silently discarded until queue capacity becomes available.",
                        level, thresholdlevel);
            }
            return eventroute.discard;
        }
        return super.getroute(backgroundthreadid, level);
    }

二、后台线程执行后续操作。

主要就是后台线程从队列中取出数据然后进行后续的操作。

//asyncappender.java
private class asyncthread extends log4jthread {

        private volatile boolean shutdown = false;
        private final list<appendercontrol> appenders;
        private final blockingqueue<logevent> queue;

        public asyncthread(final list<appendercontrol> appenders, final blockingqueue<logevent> queue) {
            super("asyncappender-" + thread_sequence.getandincrement());
            this.appenders = appenders;
            this.queue = queue;
            setdaemon(true);
        }

        @override
        public void run() {
            while (!shutdown) {
                logevent event;
                try {
                    event = queue.take();
                    if (event == shutdown_log_event) {
                        shutdown = true;
                        continue;
                    }
                } catch (final interruptedexception ex) {
                    break; // log4j2-830
                }
                event.setendofbatch(queue.isempty());
                final boolean success = callappenders(event);
                if (!success && errorappender != null) {
                    try {
                        errorappender.callappender(event);
                    } catch (final exception ex) {
                        // silently accept the error.
                    }
                }
            }
            // process any remaining items in the queue.
            logger.trace("asyncappender.asyncthread shutting down. processing remaining {} queue events.",
                queue.size());
            int count = 0;
            int ignored = 0;
            while (!queue.isempty()) {
                try {
                    final logevent event = queue.take();
                    if (event instanceof log4jlogevent) {
                        final log4jlogevent logevent = (log4jlogevent) event;
                        logevent.setendofbatch(queue.isempty());
                        callappenders(logevent);
                        count++;
                    } else {
                        ignored++;
                        logger.trace("ignoring event of class {}", event.getclass().getname());
                    }
                } catch (final interruptedexception ex) {
                    // may have been interrupted to shut down.
                    // here we ignore interrupts and try to process all remaining events.
                }
            }
            logger.trace("asyncappender.asyncthread stopped. queue has {} events remaining. "
                + "processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
        }

    ...
}

该线程会一直尝试从阻塞队列中获取logevent,如果获取成功,调用appenderref所引用appender的append方法。我们也可以看到,asyncappender实际上主要是类似于中转,日志异步化,当消息放入阻塞队列,返回成功,这样能够大幅提高日志记录的吞吐。用户可以在权衡性能与日志收集质量上进行权衡配置策略(设置blocking选项),当然也可以设置不同类型的阻塞队列已到达更好的日志记录吞吐。

asyncappender配置参数  

https://logging.apache.org/log4j/2.x/manual/appenders.html#asyncappender

log4j2异步日志解读(一)AsyncAppender