欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

【c#】队列(Queue)和MSMQ(消息队列)的基础使用

程序员文章站 2022-05-13 22:36:55
首先我们知道队列是先进先出的机制,所以在处理并发是个不错的选择。然后就写两个队列的简单应用。 Queue 命名空间 命名空间:System.Collections,不在这里做过多的理论解释,这个东西非常的好理解。 可以看下官方文档:https://docs.microsoft.com/zh-cn/d ......

 

    首先我们知道队列是先进先出的机制,所以在处理并发是个不错的选择。然后就写两个队列的简单应用。

queue

命名空间

    命名空间:system.collections,不在这里做过多的理论解释,这个东西非常的好理解。

    可以看下官方文档:https://docs.microsoft.com/zh-cn/dotnet/api/system.collections.queue?view=netframework-4.7.2

示例代码

我这里就是为了方便记忆做了一个基本的例子,首先创建了queuetest类:

包含了获取队列的数量,入队和出队的实现

 1  public class queuetest
 2     {
 3         public static queue<string> q = new queue<string>();
 4 
 5         #region 获取队列数量
 6         public int getcount()
 7         {
 8 
 9             return q.count;
10         }
11         #endregion
12 
13         #region 队列添加数据
14         public void intodata(string qstr)
15         {
16             string threadid = system.threading.thread.currentthread.managedthreadid.tostring();
17             q.enqueue(qstr);
18             console.writeline($"队列添加数据: {qstr};当前线程id:{threadid}");
19         }
20         #endregion
21 
22         #region 队列输出数据
23 
24         public string outdata()
25         {
26             string threadid = system.threading.thread.currentthread.managedthreadid.tostring();
27             string str = q.dequeue();
28             console.writeline($"队列输出数据: {str};当前线程id:{threadid}");
29             return str;
30         }
31         #endregion
32 
33     }

为了模拟并发情况下也不会出现重复读取和插入混乱的问题所以写了tasktest类里面开辟了两个异步线程进行插入和读取:

这里只是证明了多线程插入不会造成丢失。无忧证明并发的先进先出

 1     class tasktest
 2     {
 3 
 4         #region 队列的操作模拟
 5         public static void queuemian()
 6         {
 7             queuea();
 8             queueb();
 9         }
10         private static async void queuea()
11         {
12             queuetest queue = new queuetest();
13             var task = task.run(() =>
14             {
15                 for (int i = 0; i < 20; i++)
16                 {
17                     queue.intodata("queuea" + i);
18                 }
19             });
20             await task;
21             console.writeline("queueaa插入完成,进行输出:");
22 
23             while (queue.getcount() > 0)
24             {
25                 queue.outdata();
26             }
27         }
28 
29         private static async void queueb()
30         {
31             queuetest queue = new queuetest();
32             var task = task.run(() =>
33             {
34                 for (int i = 0; i < 20; i++)
35                 {
36                     queue.intodata("queueb" + i);
37                 }
38             });
39             await task;
40             console.writeline("queueb插入完成,进行输出:");
41 
42             while (queue.getcount() > 0)
43             {
44                 queue.outdata();
45             }
46         }
47         #endregion
48 
49     }

效果展示

然后在main函数直接调用即可:

【c#】队列(Queue)和MSMQ(消息队列)的基础使用

通过上面的截图可以看出插入线程是无先后的。

【c#】队列(Queue)和MSMQ(消息队列)的基础使用

这张图也是线程无先后。

补充:通过园友的提问,我发现我一开始测试的不太仔细,只注意多线程下的插入,没有注意到输出其实不是跟插入的顺序一致,对不起,这说明queue不是线程安全的,所以这个就当是入队,出队的基础例子并不能说明并发。后面有一个补充的concurrentqueue队列是说明了并发线程的先进先出。

msmq

msmq是微软提供的消息队列,本来在windows系统中就存在,但是默认没有开启。需要开启。

开启安装

打开控制面板=>程序和功能=> 启动或关闭windows功能 => microsoft message queue(msmq)服务器=>microsoft message queue(msmq)服务器核心

一般选择:msmq active directory域服务继承和msmq http支持即可。

【c#】队列(Queue)和MSMQ(消息队列)的基础使用

点击确定等待安装成功。

命名空间

需要引用system.messaging.dll

命名空间:system.messaging

官方资料文档:https://docs.microsoft.com/zh-cn/dotnet/api/system.messaging.messagequeue?view=netframework-4.7.2

示例代码

与上面queue同样的示例方式,创建一个msmq类,实现创建消息队列,查询数据,入列,出列功能:

  1  /// <summary>
  2     /// msmq消息队列
  3     /// </summary>
  4     class msmq
  5     {
  6         static string path = ".\\private$\\myqueue";
  7         static messagequeue queue;
  8         public static void createqueue(string queuepath)
  9         {
 10             try
 11             {
 12                 if (messagequeue.exists(queuepath))
 13                 {
 14                     console.writeline("消息队列已经存在");
 15                     //获取这个消息队列
 16                     queue = new messagequeue(queuepath);
 17                 }
 18                 else
 19                 {
 20                     //不存在,就创建一个新的,并获取这个消息队列对象
 21                     queue = messagequeue.create(queuepath);
 22                     path = queuepath;
 23                 }
 24             }
 25             catch (exception e)
 26             {
 27                 console.writeline(e.message);
 28             }
 29 
 30         }
 31 
 32 
 33         #region 获取消息队列的数量
 34         public static int getmessagecount()
 35         {
 36             try
 37             {
 38                 if (queue != null)
 39                 {
 40                     int count = queue.getallmessages().length;
 41                     console.writeline($"消息队列数量:{count}");
 42                     return count;
 43                 }
 44                 else
 45                 {
 46                     return 0;
 47                 }
 48             }
 49             catch (messagequeueexception e)
 50             {
 51 
 52                 console.writeline(e.message);
 53                 return 0;
 54             }
 55 
 56 
 57         }
 58         #endregion
 59 
 60         #region 发送消息到队列
 61         public static void sendmessage(string qstr)
 62         {
 63             try
 64             {
 65                 //连接到本地队列
 66 
 67                 messagequeue myqueue = new messagequeue(path);
 68 
 69                 //messagequeue myqueue = new messagequeue("formatname:direct=tcp:192.168.12.79//private$//myqueue1");
 70 
 71                 //messagequeue rmq = new messagequeue("formatname:direct=tcp:121.0.0.1//private$//queue");--远程格式
 72 
 73                 message mymessage = new message();
 74 
 75                 mymessage.body = qstr;
 76 
 77                 mymessage.formatter = new xmlmessageformatter(new type[] { typeof(string) });
 78 
 79                 //发生消息到队列中
 80 
 81                 myqueue.send(mymessage);
 82 
 83                 string threadid = system.threading.thread.currentthread.managedthreadid.tostring();
 84                 console.writeline($"消息发送成功: {qstr};当前线程id:{threadid}");
 85             }
 86             catch (messagequeueexception e)
 87             {
 88                 console.writeline(e.message);
 89             }
 90         }
 91         #endregion
 92 
 93         #region 连接消息队列读取消息
 94         public static void receivemessage()
 95         {
 96             messagequeue myqueue = new messagequeue(path);
 97 
 98 
 99             myqueue.formatter = new xmlmessageformatter(new type[] { typeof(string) });
100 
101             try
102 
103             {
104 
105                 //从队列中接收消息
106 
107                 message mymessage = myqueue.receive(new timespan(10));// myqueue.peek();--接收后不消息从队列中移除
108                 myqueue.close();
109 
110                 string context = mymessage.body.tostring();
111                 string threadid = system.threading.thread.currentthread.managedthreadid.tostring();
112                 console.writeline($"--------------------------消息内容: {context};当前线程id:{threadid}");
113 
114             }
115 
116             catch (system.messaging.messagequeueexception e)
117 
118             {
119 
120                 console.writeline(e.message);
121 
122             }
123 
124             catch (invalidcastexception e)
125 
126             {
127 
128                 console.writeline(e.message);
129 
130             }
131 
132         }
133         #endregion
134     }

这里说明一下path这个字段,这是消息队列的文件位置和队列名称,我这里写的“.”(点)就是代表的位置machinename字段,,代表本机的意思

【c#】队列(Queue)和MSMQ(消息队列)的基础使用

然后tasktest类修改成这个样子:

 1 class tasktest
 2     {
 3 
 4         #region 消息队列的操作模拟
 5         public static void msmqmian()
 6         {
 7             msmq.createqueue(".\\private$\\myqueue");
 8             msmqa();
 9             msmqb();
10             console.writeline("msmq结束");
11         }
12         private static async void msmqa()
13         {
14             var task = task.run(() =>
15             {
16                 for (int i = 0; i < 20; i++)
17                 {
18                     msmq.sendmessage("msmqa" + i);
19                 }
20             });
21             await task;
22             console.writeline("msmqa发送完成,进行读取:");
23 
24             while (msmq.getmessagecount() > 0)
25             {
26                 msmq.receivemessage();
27             }
28         }
29 
30         private static async void msmqb()
31         {
32             var task = task.run(() =>
33             {
34                 for (int i = 0; i < 20; i++)
35                 {
36                     msmq.sendmessage("msmqb" + i);
37                 }
38             });
39             await task;
40             console.writeline("msmqb发送完成,进行读取:");
41 
42             while (msmq.getmessagecount() > 0)
43             {
44                 msmq.receivemessage();
45             }
46         }
47         #endregion

 效果展示

【c#】队列(Queue)和MSMQ(消息队列)的基础使用

【c#】队列(Queue)和MSMQ(消息队列)的基础使用

本机查看消息队列

创建成功的消息队列我们可以在电脑上查看:我的电脑=>管理 =>计算机管理 =>服务与应用程序 =>消息队列 =>专用队列就看到我刚才创建的消息队列

【c#】队列(Queue)和MSMQ(消息队列)的基础使用

 补充感谢

感谢  提出的queue不是线程安全这个问题,是我没搞清楚。线程安全要使用concurrentqueue队列。

谢谢提出的宝贵意见。

concurrentqueue

所以我有修改了一下写了个concurrentqueue队列的:

修改代码如下:

【c#】队列(Queue)和MSMQ(消息队列)的基础使用
 //public static queue<string> q = new queue<string>();
        public static concurrentqueue<string> q = new concurrentqueue<string>();
        //public static queue q =queue.synchronized(new queue());

        #region 获取队列数量
        public static int getcount()
        {

            return q.count;
        }
        #endregion

        #region 队列添加数据
        public static void intodata(string qstr)
        {
            string threadid = system.threading.thread.currentthread.managedthreadid.tostring();
            q.enqueue(qstr);
            system.threading.thread.sleep(10);
            console.writeline($"队列添加数据: {qstr};当前线程id:{threadid}");
        }
        #endregion

        #region 队列输出数据
        public static string outdata2()
        {
            string threadid = system.threading.thread.currentthread.managedthreadid.tostring();
            foreach (var item in q)
            {

                console.writeline($"------队列输出数据: {item};当前线程id:{threadid}");
                string d="";
                q.trydequeue( out d);
            }

            return "211";
        }
        #endregion
view code

task类:

【c#】队列(Queue)和MSMQ(消息队列)的基础使用
 #region 队列的操作模拟
        public static async void queuemian()
        {
            queuea();
            queueb();
        }
        private static async void queuea()
        {
            var task = task.run(() =>
            {
                for (int i = 0; i < 20; i++)
                {
                    queuetest.intodata("queuea" + i);
                }
            });
            await task;
            console.writeline("queuea插入完成,进行输出:");
        }

        private static async void queueb()
        {
            var task = task.run(() =>
            {
                for (int i = 0; i < 20; i++)
                {
                    queuetest.intodata("queueb" + i);
                }
            });
            await task;
            console.writeline("queueb插入完成,进行输出:");

        }

        public static void queuec()
        {
            console.writeline("queue插入完成,进行输出:");
            while (queuetest.getcount() > 0)
            {
                queuetest.outdata2();
            }
        }
        #endregion
view code

main函数调用:

 static void main(string[] args)
        {


            try
            {
                stopwatch stopwatch = new stopwatch();
                tasktest.queuemian();
                console.readline();
                tasktest.queuec();
                console.readline();
            }
            catch (exception e)
            {

                throw;
            }
        }

插入效果:

【c#】队列(Queue)和MSMQ(消息队列)的基础使用

输出效果:

【c#】队列(Queue)和MSMQ(消息队列)的基础使用