欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

哎呀,我老大写Bug啦——记一次MessageQueen的优化

程序员文章站 2022-04-14 10:18:47
MessageQueen,顾名思义消息队列,在系统开发中也是用的比较多的一个中间件吧。我们这里主要用它来做日志管理和订单管理的,记得老老大(恩,是的,就是老老大,因为他已经跳槽了)还在的时候,当时也是为了赶项目进度,他也参与开发了,那时候我才刚刚入职,他负责写后端这块,我来了就把他手上的任务接过来了 ......

  messagequeen,顾名思义消息队列,在系统开发中也是用的比较多的一个中间件吧。我们这里主要用它来做日志管理和订单管理的,记得老老大(恩,是的,就是老老大,因为他已经跳槽了)还在的时候,当时也是为了赶项目进度,他也参与开发了,那时候我才刚刚入职,他负责写后端这块,我来了就把他手上的任务接过来了,(接着接着……就辞职了)。哎呀,我老大写Bug啦——记一次MessageQueen的优化

之后我们的开发仍然有条不紊的开发着,直到今年的一月份吧,才上线开始运行,然后就出现了常规状态,上线之后就开始爆炸,

                                                                                     哎呀,我老大写Bug啦——记一次MessageQueen的优化

这个页面打不开呀,那个内容没东西呀,第三方登录问题呀,支付问题呀,临时再改需求呀……(该来的都来了),加班、debug、测试、再debug……,然后经过几天的修复,终于完成了跟自己电脑一样稳定的运行,组员们都美滋滋的,今晚加个鸡腿才行。

                                                                                    哎呀,我老大写Bug啦——记一次MessageQueen的优化

都说祸不单行,古人是不会骗我们的,bug怎么会修得完呢?天真,要是bug能修得完还要我们来干啥,好景不长,果然,过了一周之后,组员突然群里叫喳喳,

哎呀,我老大写Bug啦——记一次MessageQueen的优化哎呀,我老大写Bug啦——记一次MessageQueen的优化

what is it ? 

 哎呀,我老大写Bug啦——记一次MessageQueen的优化

 

来了,今天的主角登场了,我也要开始加班了。

rabbitmq

  这个是今天要说的东西,基础概念什么的不是今天要说的重点,重点是:

哎呀,我老大写Bug啦——记一次MessageQueen的优化

 

rabbitmq内存使得整个服务器濒临瘫痪,远程登录服务器都差点挤不进去的状态,别看截图目前才1.3g,吃个午饭回来,就2.3g了,可怕不可怕?咋回事?

老板喊你回来加班啦

  先不管了,线上优先解决,手动先reset回收资源以释放空间,这个只是临时的办法,然后检查一下rabbitmq的配置有没有问题,路径在

 c:\users\administrator\appdata\roaming\rabbitmq 

哎呀,我老大写Bug啦——记一次MessageQueen的优化

完全是默认的配置,完全ojbk啊,那到底咋回事?继续检查,想想不如从项目开始吧,然后查看项目中的代码,都是从来自【messagelib】的组件调用

哎呀,我老大写Bug啦——记一次MessageQueen的优化

哎呀,我老大写Bug啦——记一次MessageQueen的优化

好了,叫我老老大要这个组件的代码,他把git的地址就发给我,我把项目down下来,

哎呀,我老大写Bug啦——记一次MessageQueen的优化

这个封装的组件内容不多,主要的文件一目了然,其实就是用到这个两个组件来进行的二次封装来调用

哎呀,我老大写Bug啦——记一次MessageQueen的优化

主要的代码是在【messagequeue.cs】文件里,展示一下当时的代码情况:

哎呀,我老大写Bug啦——记一次MessageQueen的优化
using system;
using system.collections.generic;
using system.linq;
using system.text;
using system.threading.tasks;
using messagelib.classbean;
using easynetq;
using system.threading;

namespace messagelib
{
    public static class messagequeue
    {
        public static ibus bus = mqbusbuilder.createmessagebus();
        //消息队列
        private static queue<item> noticqueue = new queue<item>(5000);
        //日志队列
        private static queue<item> logqueue = new queue<item>(5000);
        //队列数目发布数量
        private static int max_count_to_pulish = 1000;

        /// <summary>
        /// 可供外部使用的消息入列操作
        /// </summary>
        public static void push(item item)
        {
            if (item.type == itemtype.notic)
            {
                noticqueue.enqueue(item);
            }

            if (item.type == itemtype.log)
            {
                logqueue.enqueue(item);
            }
        }

        /// <summary>
        /// 监听后需要调用的发布接口
        /// </summary>
        private static void pulish(object source, system.timers.elapsedeventargs e)
        {
            if (noticqueue.count > 0 || logqueue.count > 0)
            {
                if (bus == null || !bus.isconnected)
                {
                    bus = mqbusbuilder.createmessagebus();
                }

                if (bus.isconnected)
                {
                    send(itemtype.notic);
                    send(itemtype.log);
                }
            }
        }

        /// <summary>
        /// 程序自运行并开始监听
        /// </summary>
        public static void run()
        {
            system.timers.timer timer = new system.timers.timer();
            timer.interval = 1000;
            timer.elapsed += new system.timers.elapsedeventhandler(pulish);//到达时间的时候执行事件;    
            timer.autoreset = true;//设置是执行一次(false)还是一直执行(true);    
            timer.enabled = true;//是否执行system.timers.timer.elapsed事件;    
        }

        /// <summary>
        /// 启动线程异步调用
        /// </summary>
        /// <param name="channeltype"></param>
        private static void send(string channeltype)
        {
            thread thread = new thread(new parameterizedthreadstart(publishaction));
            thread.isbackground = true;
            thread.start(channeltype);
        }

        /// <summary>
        /// 调用发布日志及提醒两个接口
        /// </summary>
        /// <param name="channel"></param>
        private static void publishaction(object channel)
        {
            publislog();
            publisnotic();
        }

        /// <summary>
        /// 日志消息发送至rabbitmq指定exchange、queue
        /// </summary>
        private static void publislog()
        {
            string channelname = itemtype.log;
            try
            {
                var routingkey = channelname;
                var mqqueue = bus.advanced.queuedeclare(string.format("queue.{0}", channelname));
                var exchange = bus.advanced.exchangedeclare(string.format("exchange.{0}",channelname), "direct");
                var binding = bus.advanced.bind(exchange, mqqueue, routingkey);
                while (logqueue.count > 0)
                {
                    item item = logqueue.dequeue();
                    if (item != null)
                    {
                        var properties = new messageproperties();
                        var message = new message<string>(newtonsoft.json.jsonconvert.serializeobject(item));
                        message.properties.appid = item.appid;
                        bus.advanced.publish(exchange, routingkey, false, message);
                    }

                }
            }
            catch (exception ex)
            {
                throw ex;
            }
        }

        /// <summary>
        /// 提醒消息发送至rabbitmq指定exchange、queue
        /// </summary>
        private static void publisnotic()
        {
            string channelname = itemtype.notic;
            var routingkey = channelname;
            var mqqueue = bus.advanced.queuedeclare(string.format("queue.{0}", channelname));
            var exchange = bus.advanced.exchangedeclare(string.format("exchange.{0}", channelname), "direct");
            var binding = bus.advanced.bind(exchange, mqqueue, routingkey);
            while(noticqueue.count > 0)
            {
                item item = noticqueue.dequeue();
                if (item != null)
                {
                    var properties = new messageproperties();
                    var message = new message<string>(newtonsoft.json.jsonconvert.serializeobject(item));
                    message.properties.appid = item.appid;
                    bus.advanced.publish(exchange, routingkey, false, message);
                }
            }
        }
    }
}
view code

然后我就发现了这一段代码!

        /// <summary>
        /// 程序自运行并开始监听
        /// </summary>
        public static void run()
        {
            system.timers.timer timer = new system.timers.timer();
            timer.interval = 1000;
            timer.elapsed += new system.timers.elapsedeventhandler(pulish);//到达时间的时候执行事件;    
            timer.autoreset = true;//设置是执行一次(false)还是一直执行(true);    
            timer.enabled = true;//是否执行system.timers.timer.elapsed事件;    
        }
        /// <summary>
        /// 启动线程异步调用
        /// </summary>
        /// <param name="channeltype"></param>
        private static void send(string channeltype)
        {
            thread thread = new thread(new parameterizedthreadstart(publishaction));
            thread.isbackground = true;
            thread.start(channeltype);
        }

哎呀,我老大写Bug啦——记一次MessageQueen的优化

  老老大写bug了,当run()起来之后,队列中【noticqueue】有内容,就开始推送消息,发送消息send(),每来一次推送new一个线程并设置为后台线程,然后发送消息。好了,明白了,这里的线程很混乱,因为线程操作不当,new了n多个频道,并且没有主动回收,这也难怪内存暴涨呢。并且要是run()调用多次,后果更加不堪设想。

加班改起来

  开始动手吧,业务主要推送有普通消息、错误消息和通知消息,那么将队列与线程组装一起,新增一个类queuetask.cs:

    public class queuetask
    {
        private queue<item> noticqueue = new queue<item>(5000);
        //队列数目发布数量
        private int max_count_to_pulish = 1000;
        public  bool isrunning = false;
        private string itemtype = itemtype.info;
        private string messagerouter = itemtype.info;

        public queuetask(string itemtype,string messagerouter)
        {
            this.itemtype = itemtype;
            this.messagerouter = messagerouter;
        }

        /// <summary>
        /// 可供外部使用的消息入列操作
        /// </summary>
        public void push(item item, ibus ibus)
        {
            noticqueue.enqueue(item);
            if (!isrunning)
                run(ibus);
        }

        public void run(ibus ibus)
        {
            if (!isrunning)
            {
                timer timernotic = new timer(pulishmsg, ibus, 1000, 1000);
                isrunning = true;
            }
        }

        private void pulishmsg(object state)
        {
            ibus ibus = state as ibus;
            if (noticqueue.count > 0)
            {
                publismsg(itemtype, ibus);
            }
        }

        private void publismsg(object channel, ibus businstance)
        {
            try
            {
                string channelname = channel as string;
                if (noticqueue.count > 0)
                {
                    var routingkey = messagerouter;
                    var mqqueue = businstance.advanced.queuedeclare(string.format("queue.{0}", channelname));
                    var exchange = businstance.advanced.exchangedeclare(string.format("exchange.{0}", channelname), "direct");
                    var binding = businstance.advanced.bind(exchange, mqqueue, routingkey);

                    while (noticqueue.count > 0)
                    {
                        item item = noticqueue.dequeue();
                        if (item != null)
                        {
                            var properties = new messageproperties();
                            var message = new easynetq.message<string>(newtonsoft.json.jsonconvert.serializeobject(item));
                            message.properties.appid = item.appid;
                            businstance.advanced.publish(exchange, routingkey, false, message);
                        }
                    }
                }
            }
            catch (exception ex)
            {
                console.writeline("publismsg error:" + ex.message);
            }
        }

        public void read<t>(ibus businstance,action<item> dealaction) where t : item
        {
            try
            {
                string channelname = itemtype;
                var routingkey = messagerouter;
                var mqqueue = businstance.advanced.queuedeclare(string.format("queue.{0}", channelname));
                var exchange = businstance.advanced.exchangedeclare(string.format("exchange.{0}", channelname), "direct");
                var binding = businstance.advanced.bind(exchange, mqqueue, routingkey);

                var consume = businstance.advanced.consume(mqqueue, registration =>
                {
                    registration.add<string>((message, info) =>
                    {
                        item data = newtonsoft.json.jsonconvert.deserializeobject<t>(message.body);
                        dealaction(data);
                    });
                });
            }
            catch (exception ex)
            {
                console.writeline("read error:" + ex.message);
            }
        }
    }

然后,在messagequeue.cs修改为单例模式:

哎呀,我老大写Bug啦——记一次MessageQueen的优化
    public static class messagequeue
    {
        /*install-package easynetq-dotnet-core -version 2.0.2-radicalgeek-netc0001 -pre*/

        private static ibus bus = null;
        public static bool isrunning = false;

        //消息队列
        private static queuetask noticqueue = null;
        //日志队列
        private static queuetask logqueue = null;
        //自定义
        private static queuetask infoqueue = null;

        #region 同步锁
        private static readonly object obj = new object();
        #endregion

        public static void init(string connection, string routekey)
        {
            if (noticqueue == null)
                noticqueue = new queuetask(itemtype.notic, itemtype.notic);
            if (logqueue == null)
                logqueue = new queuetask(itemtype.error, itemtype.error);
            if (infoqueue == null)
                infoqueue = new queuetask(itemtype.info, routekey);
            if (string.isnullorempty(mqbusbuilder.connnection))
                mqbusbuilder.connnection = connection;
        }

        public static ibus businstance
        {
            get
            {
                if (bus == null)
                {
                    lock (obj)
                    {
                        if (bus == null|| !bus.isconnected)
                        {
                            bus = mqbusbuilder.createmessagebus();
                        }
                    }
                }
                return bus;
            }
        }


        /// <summary>
        /// 可供外部使用的消息入列操作
        /// </summary>
        public static void pushandrun(item item)
        {
            if (string.isnullorwhitespace(mqbusbuilder.connnection) || businstance == null)
                return;
            if (item.type == itemtype.notic)
            {
                noticqueue.push(item, businstance);
            }
            if (item.type == itemtype.error)
            {
                logqueue.push(item, businstance);
            }
            if (item.type == itemtype.info)
            {
                infoqueue.push(item, businstance);
            }
        }

        public static void read(string itemtype, action<item> dealaction)
        {
            if (itemtype == itemtype.notic)
            {
                noticqueue.read<noticitem>(businstance, dealaction);
            }
            if (itemtype == itemtype.error)
            {
                logqueue.read<erroritem>(businstance, dealaction);
            }
            if (itemtype == itemtype.info)
            {
                infoqueue.read<message>(businstance, dealaction);
            }
        }
    }
view code

每次推送消息的时候,每个queuetask就自己维护自己的线程和队列了,当调用推送之后,就开始运作起来。恩,应该没问题了。然后就发布nuget,再更新项目,然后发布。观察一段时间,恩,完美。

哎呀,我老大写Bug啦——记一次MessageQueen的优化

哎呀,我老大写Bug啦——记一次MessageQueen的优化

 

事件二

  事情过后,b端开始搞起来了,然后涉及到订单系统,跟老大(不是老老大,老老大那时候已经跑了)商量之后确定使用消息队列来做订单的事件的拓展,然后就直接美滋滋的调用好之前写的了,没想到啊,这次是线程涨!因为订单是从b端推送过来的,b端肯定没事,订单后台订阅消息之后,读取过程中出现的线程增多,然后看看之前写的read()方法,感觉没啥问题啊,每运行完一次,就多了一个线程,这个神奇了啊,那么源代码撸起来。

https://github.com/easynetq/easynetq

哎呀,我老大写Bug啦——记一次MessageQueen的优化

翻来覆去,看到这个consume方法,继承的是idisposable接口,得勒,知道咋回事了。

哎呀,我老大写Bug啦——记一次MessageQueen的优化

consume.dispose(); 用完请记得主动释放啊。

这回真的可以浪了。

 哎呀,我老大写Bug啦——记一次MessageQueen的优化

总结

  遇到问题,冷静下来,耐得了寂寞才行。线上的问题优先解决,然后再慢慢debug,解决不了,看源码,再解决不了,降级处理,欢迎共同探讨。同时也感谢一下技术群里的兄弟给的一些建议,并帮忙查找资料,还好easynetq是开源了,不然也打算说先不用了,毕竟一开始没什么用户量,所以没必要整那么麻烦,加班加点的弄这个问题。不过最终都完美的解决了,心里还是挺美滋滋的,程序猿随之而来的成就感。

  别看我们在工位上默不作声,我们可能在拯救世界呢!老板,该加工资啦!

                                                                                             哎呀,我老大写Bug啦——记一次MessageQueen的优化