.NetCore利用BlockingCollection实现简易消息队列
程序员文章站
2023-12-03 16:57:52
消息队列现今的应用场景越来越大,常用的有rabbmitmq和kafka。
我们用blockingcollection来实现简单的消息队列。
blockingco...
消息队列现今的应用场景越来越大,常用的有rabbmitmq和kafka。
我们用blockingcollection来实现简单的消息队列。
blockingcollection实现了生产者/消费者模式,是对iproducerconsumercollection<t>接口的实现。与其他concurrent集合一样,每次add或take元素,都会导致对集合的lock。只有当确定需要在内存中创建一个生产者,消费者模式时,再考虑这个类。
msdn中的示例用法:
using (blockingcollection<int> bc = new blockingcollection<int>()) { task.factory.startnew(() => { for (int i = 0; i < 1000; i++) { bc.add(i); thread.sleep(50); } // need to do this to keep foreach below from hanging bc.completeadding(); }); // now consume the blocking collection with foreach. // use bc.getconsumingenumerable() instead of just bc because the // former will block waiting for completion and the latter will // simply take a snapshot of the current state of the underlying collection. foreach (var item in bc.getconsumingenumerable()) { console.writeline(item); } }
实现消息队列
用vs2017创建一个控制台应用程序。创建demoqueueblock类,封装一些常用判断。
- hasele,判断是否有元素
- add向队列中添加元素
- take从队列中取出元素
为了不把blockingcollection直接暴漏给使用者,我们封装一个demoqueueblock类
/// <summary> /// blockingcollection演示消息队列 /// </summary> /// <typeparam name="t"></typeparam> public class demoqueueblock<t> where t : class { private static blockingcollection<t> colls; public demoqueueblock() { } public static bool iscomleted() { if (colls != null && colls.iscompleted) { return true; } return false; } public static bool hasele() { if (colls != null && colls.count>0) { return true; } return false; } public static bool add(t msg) { if (colls == null) { colls = new blockingcollection<t>(); } colls.add(msg); return true; } public static t take() { if (colls == null) { colls = new blockingcollection<t>(); } return colls.take(); } } /// <summary> /// 消息体 /// </summary> public class demomessage { public string businesstype { get; set; } public string businessid { get; set; } public string body { get; set; } }
添加元素进队列
通过控制台,添加元素
//添加元素 while (true) { console.writeline("请输入队列"); var read = console.readline(); if (read == "exit") { return; } demoqueueblock<demomessage>.add(new demomessage() { businessid = read }); }
消费队列
通过判断iscomleted,来确定是否获取队列
task.factory.startnew(() => { //从队列中取元素。 while (!demoqueueblock<demomessage>.iscomleted()) { try { var m = demoqueueblock<demomessage>.take(); console.writeline("已消费:" + m.businessid); } catch (exception ex) { console.writeline(ex.message); } } });
查看运行结果
运行结果
这样我们就实现了简易的消息队列。
示例源码:简易队列
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: C#十五子游戏编写代码