欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率

程序员文章站 2022-05-25 09:49:11
一、引言 使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。本例使用多线程来创建多信道并绑定队列,达到多workers的目的。 二、示例 2.1、环境准备 在NuGet上安装RabbitMQ.Client。 2.2、 ......

    一、引言

    使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。本例使用多线程来创建多信道并绑定队列,达到多workers的目的。

    二、示例

    2.1、环境准备

    在nuget上安装rabbitmq.client。

C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率

    2.2、工厂类

    添加一个工厂类rabbitmqfactory:

    /// <summary>
    /// 多路复用技术(multiplexing)目的:为了避免创建多个tcp而造成系统资源的浪费和超载,从而有效地利用tcp连接。
    /// </summary>
    public static class rabbitmqfactory
    {
        private static iconnection sharedconnection;
        private static int channelcount { get; set; }
        private static readonly object _locker = new object();

        public static iconnection sharedconnection
        {
            get
            {
                if (channelcount >= 1000)
                {
                    if (sharedconnection != null && sharedconnection.isopen)
                    {
                        sharedconnection.close();
                    }
                    sharedconnection = null;
                    channelcount = 0;
                }
                if (sharedconnection == null)
                {
                    lock (_locker)
                    {
                        if (sharedconnection == null)
                        {
                            sharedconnection = getconnection();
                            channelcount++;
                        }
                    }
                }
                return sharedconnection;
            }
        }

        private static iconnection getconnection()
        {
            var factory = new connectionfactory
            {
                hostname = "192.168.2.242",
                username = "hello",
                password = "world",
                port = amqptcpendpoint.usedefaultport,//5672
                virtualhost = connectionfactory.defaultvhost,//使用默认值:"/"
                protocol = protocols.defaultprotocol,
                automaticrecoveryenabled = true
            };
            return factory.createconnection();
        }
    }

    2.3、主窗体

C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率

    代码如下:

    public partial class rabbitmqmultithreading : form
    {
        public delegate void listviewdelegate<t>(t obj);

        public rabbitmqmultithreading()
        {
            initializecomponent();
        }

        /// <summary>
        /// showmessage重载
        /// </summary>
        /// <param name="msg"></param>
        private void showmessage(string msg)
        {
            if (invokerequired)
            {
                begininvoke(new listviewdelegate<string>(showmessage), msg);
            }
            else
            {
                listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy/mm/dd hh:mm:ss ffffff"), msg });
                lvwmsg.items.insert(0, item);
            }
        }

        /// <summary>
        /// showmessage重载
        /// </summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        private void showmessage(string format, params object[] args)
        {
            if (invokerequired)
            {
                begininvoke(new methodinvoker(delegate ()
                {
                    listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy/mm/dd hh:mm:ss ffffff"), string.format(format, args) });
                    lvwmsg.items.insert(0, item);
                }));
            }
            else
            {
                listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy/mm/dd hh:mm:ss ffffff"), string.format(format, args) });
                lvwmsg.items.insert(0, item);
            }
        }

        /// <summary>
        /// 生产者
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnsend_click(object sender, eventargs e)
        {
            int messagecount = 100;
            var factory = new connectionfactory
            {
                hostname = "192.168.2.242",
                username = "hello",
                password = "world",
                port = amqptcpendpoint.usedefaultport,//5672
                virtualhost = connectionfactory.defaultvhost,//使用默认值:"/"
                protocol = protocols.defaultprotocol,
                automaticrecoveryenabled = true
            };
            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.queuedeclare(queue: "hello", durable: true, exclusive: false, autodelete: false, arguments: null);
                    string message = "hello world";
                    var body = encoding.utf8.getbytes(message);
                    for (int i = 1; i <= messagecount; i++)
                    {
                        channel.basicpublish(exchange: "", routingkey: "hello", basicproperties: null, body: body);
                        showmessage($"send {message}");
                    }
                }
            }
        }

        /// <summary>
        /// 消费者
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void btnreceive_click(object sender, eventargs e)
        {
            random random = new random();
            int rallynumber = random.next(1, 1000);
            int channelcount = 0;

            await task.run(() =>
            {
                try
                {
                    int asynccount = 10;
                    list<task<bool>> tasks = new list<task<bool>>();
                    var connection = rabbitmqfactory.sharedconnection;
                    for (int i = 1; i <= asynccount; i++)
                    {
                        tasks.add(task.factory.startnew(() => messageworkitemcallback(connection, rallynumber)));
                    }
                    task.waitall(tasks.toarray());

                    string syncresultmsg = $"集结号 {rallynumber} 已吹起号角--" +
                        $"本次开启信道成功数:{tasks.count(s => s.result == true)}," +
                        $"本次开启信道失败数:{tasks.count() - tasks.count(s => s.result == true)}" +
                        $"累计开启信道成功数:{channelcount + tasks.count(s => s.result == true)}";
                    showmessage(syncresultmsg);
                }
                catch (exception ex)
                {
                    showmessage($"集结号 {rallynumber} 消费异常:{ex.message}");
                }
            });
        }

        /// <summary>
        /// 异步方法
        /// </summary>
        /// <param name="state"></param>
        /// <param name="rallynumber"></param>
        /// <returns></returns>
        private bool messageworkitemcallback(object state, int rallynumber)
        {
            bool syncresult = false;
            imodel channel = null;
            try
            {
                iconnection connection = state as iconnection;
                //不能使用using (channel = connection.createmodel())来创建信道,让rabbitmq自动回收channel。
                channel = connection.createmodel();
                channel.queuedeclare(queue: "hello", durable: true, exclusive: false, autodelete: false, arguments: null);
                channel.basicqos(prefetchsize: 0, prefetchcount: 1, global: false);
                var consumer = new eventingbasicconsumer(channel);
                consumer.received += (model, ea) =>
                {
                    var message = encoding.utf8.getstring(ea.body);
                    thread.sleep(1000);
                    showmessage($"集结号 {rallynumber} received {message}");
                    channel.basicack(deliverytag: ea.deliverytag, multiple: false);
                };
                channel.basicconsume(queue: "hello", autoack: false, consumer: consumer);
                syncresult = true;
            }
            catch (exception ex)
            {
                syncresult = false;
                showmessage(ex.message);
            }
            return syncresult;
        }
    }

    2.4、运行结果

C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率

    多点几次消费者即可增加信道,提升消费能力。