欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

[开源]基于Log4Net简单实现KafkaAppender

程序员文章站 2022-03-13 17:13:54
背景 1. 基于之前 "基于Log4Net本地日志服务简单实现" 实现本地日志服务,但是随着项目开发演进,本地日志服务满足不了需求,譬如在预发布环境或者生产环境,不可能让开发人员登录查看本地日志文件分析。 2. Kafka+ELK日志服务套件,可以在线日志服务可以解决上述问题,并且提供丰富报表分析等 ......

背景

  1. 基于之前基于log4net本地日志服务简单实现 实现本地日志服务,但是随着项目开发演进,本地日志服务满足不了需求,譬如在预发布环境或者生产环境,不可能让开发人员登录查看本地日志文件分析。
  2. kafka+elk日志服务套件,可以在线日志服务可以解决上述问题,并且提供丰富报表分析等等;
  3. 具体源码:masterchief
  4. nuget:install-package masterchief.dotnet.core.kafkalog
  5. 欢迎star,欢迎issues;

源码

  1. 基于log4net来实现与kafka通讯appender

     public class kafkaappender : appenderskeleton
     {
         #region fields
    
         /// <summary>
         ///     kafka 生产者
         /// </summary>
         private producer _kafkaproducer;
    
         #endregion fields
    
         #region properties
    
         /// <summary>
         ///     brokers
         /// </summary>
         public string brokers { get; set; }
    
         /// <summary>
         ///     topic
         /// </summary>
         public string topic { get; set; }
    
         #endregion properties
    
         #region methods
    
         /// <summary>
         ///     initialize the appender based on the options set
         /// </summary>
         /// <remarks>
         ///     <para>
         ///         this is part of the <see cref="t:log4net.core.ioptionhandler" /> delayed object
         ///         activation scheme. the <see cref="m:log4net.appender.appenderskeleton.activateoptions" /> method must
         ///         be called on this object after the configuration properties have
         ///         been set. until <see cref="m:log4net.appender.appenderskeleton.activateoptions" /> is called this
         ///         object is in an undefined state and must not be used.
         ///     </para>
         ///     <para>
         ///         if any of the configuration properties are modified then
         ///         <see cref="m:log4net.appender.appenderskeleton.activateoptions" /> must be called again.
         ///     </para>
         /// </remarks>
         public override void activateoptions()
         {
             base.activateoptions();
             initkafkaproducer();
         }
    
         /// <summary>
         ///     subclasses of <see cref="t:log4net.appender.appenderskeleton" /> should implement this method
         ///     to perform actual logging.
         /// </summary>
         /// <param name="loggingevent">the event to append.</param>
         /// <remarks>
         ///     <para>
         ///         a subclass must implement this method to perform
         ///         logging of the <paramref name="loggingevent" />.
         ///     </para>
         ///     <para>
         ///         this method will be called by <see cref="m:doappend(loggingevent)" />
         ///         if all the conditions listed for that method are met.
         ///     </para>
         ///     <para>
         ///         to restrict the logging of events in the appender
         ///         override the <see cref="m:preappendcheck()" /> method.
         ///     </para>
         /// </remarks>
         protected override void append(loggingevent loggingevent)
         {
             try
             {
                 var message = getlogmessage(loggingevent);
                 var topic = gettopic(loggingevent);
    
                 _ = _kafkaproducer.sendmessageasync(topic, new[] {new message(message)});
             }
             catch (exception ex)
             {
                 errorhandler.error("kafkaproducer sendmessageasync", ex);
             }
         }
    
         /// <summary>
         ///     raises the close event.
         /// </summary>
         /// <remarks>
         ///     <para>
         ///         releases any resources allocated within the appender such as file handles,
         ///         network connections, etc.
         ///     </para>
         ///     <para>
         ///         it is a programming error to append to a closed appender.
         ///     </para>
         /// </remarks>
         protected override void onclose()
         {
             base.onclose();
             stopkafkaproducer();
         }
    
         private string getlogmessage(loggingevent loggingevent)
         {
             var builder = new stringbuilder();
             using (var writer = new stringwriter(builder))
             {
                 layout.format(writer, loggingevent);
    
                 if (layout.ignoresexception && loggingevent.exceptionobject != null)
                     writer.write(loggingevent.getexceptionstring());
    
                 return writer.tostring();
             }
         }
    
         private string gettopic(loggingevent loggingevent)
         {
             return string.isnullorempty(topic) ? path.getfilenamewithoutextension(loggingevent.domain) : topic;
         }
    
         /// <summary>
         ///     初始化kafka 生产者
         /// </summary>
         private void initkafkaproducer()
         {
             try
             {
                 if (string.isnullorempty(brokers)) brokers = "http://localhost:9200";
    
                 if (_kafkaproducer == null)
                 {
                     var brokers = new uri(brokers);
                     var kafkaoptions = new kafkaoptions(brokers)
                     {
                         log = new kafkalog()
                     };
                     _kafkaproducer = new producer(new brokerrouter(kafkaoptions));
                 }
             }
             catch (exception ex)
             {
                 errorhandler.error("initkafkaproducer", ex);
             }
         }
    
         /// <summary>
         ///     停止生产者
         /// </summary>
         private void stopkafkaproducer()
         {
             try
             {
                 _kafkaproducer?.stop();
             }
             catch (exception ex)
             {
                 errorhandler.error("stopkafkaproducer", ex);
             }
         }
    
         #endregion methods
     }
    
  2. 基于之前定义接口,来实现kafkalogservice

    public sealed class kafkalogservice : ilogservice
    {
        #region constructors
    
        /// <summary>
        ///     initializes the <see cref="filelogservice" /> class.
        /// </summary>
        static kafkalogservice()
        {
            kafkalogger = logmanager.getlogger(kafkaloggername);
        }
    
        #endregion constructors
    
        #region fields
    
        /// <summary>
        ///     kafka logger name
        /// </summary>
        public const string kafkaloggername = "kafkalogger";
    
        /// <summary>
        ///     kafka logger
        /// </summary>
        public static readonly ilog kafkalogger;
    
        #endregion fields
    
        #region methods
    
        /// <summary>
        ///     debug记录
        /// </summary>
        /// <param name="message">日志信息</param>
        public void debug(string message)
        {
            if (kafkalogger.isdebugenabled) kafkalogger.debug(message);
        }
    
        /// <summary>
        ///     debug记录
        /// </summary>
        /// <param name="message">日志信息</param>
        /// <param name="ex">异常信息</param>
        public void debug(string message, exception ex)
        {
            if (kafkalogger.isdebugenabled) kafkalogger.debug(message, ex);
        }
    
        /// <summary>
        ///     error记录
        /// </summary>
        /// <param name="message">日志信息</param>
        public void error(string message)
        {
            if (kafkalogger.iserrorenabled) kafkalogger.error(message);
        }
    
        /// <summary>
        ///     error记录
        /// </summary>
        /// <param name="message">日志信息</param>
        /// <param name="ex">异常信息</param>
        public void error(string message, exception ex)
        {
            if (kafkalogger.iserrorenabled) kafkalogger.error(message, ex);
        }
    
        /// <summary>
        ///     fatal记录
        /// </summary>
        /// <param name="message">日志信息</param>
        public void fatal(string message)
        {
            if (kafkalogger.isfatalenabled) kafkalogger.fatal(message);
        }
    
        /// <summary>
        ///     fatal记录
        /// </summary>
        /// <param name="message">日志信息</param>
        /// <param name="ex">异常信息</param>
        public void fatal(string message, exception ex)
        {
            if (kafkalogger.isfatalenabled) kafkalogger.fatal(message, ex);
        }
    
        /// <summary>
        ///     info记录
        /// </summary>
        /// <param name="message">日志信息</param>
        public void info(string message)
        {
            if (kafkalogger.isinfoenabled) kafkalogger.info(message);
        }
    
        /// <summary>
        ///     info记录
        /// </summary>
        /// <param name="message">日志信息</param>
        /// <param name="ex">异常信息</param>
        public void info(string message, exception ex)
        {
            if (kafkalogger.isinfoenabled) kafkalogger.info(message, ex);
        }
    
        /// <summary>
        ///     warn记录
        /// </summary>
        /// <param name="message">日志信息</param>
        public void warn(string message)
        {
            if (kafkalogger.iswarnenabled) kafkalogger.warn(message);
        }
    
        /// <summary>
        ///     warn记录
        /// </summary>
        /// <param name="message">日志信息</param>
        /// <param name="ex">异常信息</param>
        public void warn(string message, exception ex)
        {
            if (kafkalogger.iswarnenabled) kafkalogger.warn(message, ex);
        }
    
        #endregion methods
    }
  3. 修改log4net.config,定义kafka的topic以及brokers

        <appender name="kafkaappender" type="masterchief.dotnet.core.kafkalog.kafkaappender, masterchief.dotnet.core.kafkalog">
            <param name="topic" value="beats" />
            <param name="brokers" value="http://localhost:9092" />
            <layout type="log4net.layout.patternlayout">
                <conversionpattern value="发生时间:%date %newline事件级别:%-5level %newline事件来源:%logger%newline日志内容:%message%newline" />
            </layout>
        </appender>

使用

  1. 由于基于上篇说的日志接口,所以可以通过ioc切换,而且不影响在业务代码调用;
  2. 基于业务需求,您可以同时落地本地日志,保证网络抖动或者不正常的时候能够正常记录日志;

结语

  1. 小弟不才,大佬轻拍;