C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率
程序员文章站
2022-12-18 11:54:50
一、引言 使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。本例使用多线程来创建多信道并绑定队列,达到多workers的目的。 二、示例 2.1、环境准备 在NuGet上安装RabbitMQ.Client。 2.2、 ......
一、引言
使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。本例使用多线程来创建多信道并绑定队列,达到多workers的目的。
二、示例
2.1、环境准备
在nuget上安装rabbitmq.client。
2.2、工厂类
添加一个工厂类rabbitmqfactory:
/// <summary> /// 多路复用技术(multiplexing)目的:为了避免创建多个tcp而造成系统资源的浪费和超载,从而有效地利用tcp连接。 /// </summary> public static class rabbitmqfactory { private static iconnection sharedconnection; private static int channelcount { get; set; } private static readonly object _locker = new object(); public static iconnection sharedconnection { get { if (channelcount >= 1000) { if (sharedconnection != null && sharedconnection.isopen) { sharedconnection.close(); } sharedconnection = null; channelcount = 0; } if (sharedconnection == null) { lock (_locker) { if (sharedconnection == null) { sharedconnection = getconnection(); channelcount++; } } } return sharedconnection; } } private static iconnection getconnection() { var factory = new connectionfactory { hostname = "192.168.2.242", username = "hello", password = "world", port = amqptcpendpoint.usedefaultport,//5672 virtualhost = connectionfactory.defaultvhost,//使用默认值:"/" protocol = protocols.defaultprotocol, automaticrecoveryenabled = true }; return factory.createconnection(); } }
2.3、主窗体
代码如下:
public partial class rabbitmqmultithreading : form { public delegate void listviewdelegate<t>(t obj); public rabbitmqmultithreading() { initializecomponent(); } /// <summary> /// showmessage重载 /// </summary> /// <param name="msg"></param> private void showmessage(string msg) { if (invokerequired) { begininvoke(new listviewdelegate<string>(showmessage), msg); } else { listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy/mm/dd hh:mm:ss ffffff"), msg }); lvwmsg.items.insert(0, item); } } /// <summary> /// showmessage重载 /// </summary> /// <param name="format"></param> /// <param name="args"></param> private void showmessage(string format, params object[] args) { if (invokerequired) { begininvoke(new methodinvoker(delegate () { listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy/mm/dd hh:mm:ss ffffff"), string.format(format, args) }); lvwmsg.items.insert(0, item); })); } else { listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy/mm/dd hh:mm:ss ffffff"), string.format(format, args) }); lvwmsg.items.insert(0, item); } } /// <summary> /// 生产者 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void btnsend_click(object sender, eventargs e) { int messagecount = 100; var factory = new connectionfactory { hostname = "192.168.2.242", username = "hello", password = "world", port = amqptcpendpoint.usedefaultport,//5672 virtualhost = connectionfactory.defaultvhost,//使用默认值:"/" protocol = protocols.defaultprotocol, automaticrecoveryenabled = true }; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.queuedeclare(queue: "hello", durable: true, exclusive: false, autodelete: false, arguments: null); string message = "hello world"; var body = encoding.utf8.getbytes(message); for (int i = 1; i <= messagecount; i++) { channel.basicpublish(exchange: "", routingkey: "hello", basicproperties: null, body: body); showmessage($"send {message}"); } } } } /// <summary> /// 消费者 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private async void btnreceive_click(object sender, eventargs e) { random random = new random(); int rallynumber = random.next(1, 1000); int channelcount = 0; await task.run(() => { try { int asynccount = 10; list<task<bool>> tasks = new list<task<bool>>(); var connection = rabbitmqfactory.sharedconnection; for (int i = 1; i <= asynccount; i++) { tasks.add(task.factory.startnew(() => messageworkitemcallback(connection, rallynumber))); } task.waitall(tasks.toarray()); string syncresultmsg = $"集结号 {rallynumber} 已吹起号角--" + $"本次开启信道成功数:{tasks.count(s => s.result == true)}," + $"本次开启信道失败数:{tasks.count() - tasks.count(s => s.result == true)}" + $"累计开启信道成功数:{channelcount + tasks.count(s => s.result == true)}"; showmessage(syncresultmsg); } catch (exception ex) { showmessage($"集结号 {rallynumber} 消费异常:{ex.message}"); } }); } /// <summary> /// 异步方法 /// </summary> /// <param name="state"></param> /// <param name="rallynumber"></param> /// <returns></returns> private bool messageworkitemcallback(object state, int rallynumber) { bool syncresult = false; imodel channel = null; try { iconnection connection = state as iconnection; //不能使用using (channel = connection.createmodel())来创建信道,让rabbitmq自动回收channel。 channel = connection.createmodel(); channel.queuedeclare(queue: "hello", durable: true, exclusive: false, autodelete: false, arguments: null); channel.basicqos(prefetchsize: 0, prefetchcount: 1, global: false); var consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { var message = encoding.utf8.getstring(ea.body); thread.sleep(1000); showmessage($"集结号 {rallynumber} received {message}"); channel.basicack(deliverytag: ea.deliverytag, multiple: false); }; channel.basicconsume(queue: "hello", autoack: false, consumer: consumer); syncresult = true; } catch (exception ex) { syncresult = false; showmessage(ex.message); } return syncresult; } }
2.4、运行结果
多点几次消费者即可增加信道,提升消费能力。