欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#队列学习笔记:RabbitMQ实现客户端相互通讯

程序员文章站 2022-07-02 09:31:32
一、引言 fanout类型的Exchange,路由规则非常简单:它会把所有发送到该Exchange的消息,路由到所有与它绑定的Queue中。假设有一个聊天室,各个客户端都订阅在同一fanout exchange type,那每个客户端发送出来的消息,所有的客户端都能收到,因为大家都订阅了。此时,只需 ......

    一、引言

    fanout类型的exchange,路由规则非常简单:它会把所有发送到该exchange的消息,路由到所有与它绑定的queue中。假设有一个聊天室,各个客户端都订阅在同一fanout exchange type,那每个客户端发送出来的消息,所有的客户端都能收到,因为大家都订阅了。此时,只需要简单地限制一下,只有是与我有关的消息,才在聊天界面上显示。这样,即可达到相互通讯的效果。

    二、示例

    2.1、环境准备

    本示例使用easynetq来实现,请先在nuget上安装。

C#队列学习笔记:RabbitMQ实现客户端相互通讯

    2.2、实体类

    新建一个实体类messagebody:

    public class messagebody
    {
        public string fromuserid { get; set; }
        public string message { get; set; }
        public string touserid { get; set; }
    }

    2.3、主窗体

    新建一个chatmain窗体:

C#队列学习笔记:RabbitMQ实现客户端相互通讯

    代码如下:

    public partial class chatmain : form
    {
        public chatmain()
        {
            initializecomponent();
        }

        /// <summary>
        /// 客户端 a
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void button1_click(object sender, eventargs e)
        {
            chatwith chatwith = new chatwith(currentuserid: "usera")
            {
                startposition = formstartposition.centerscreen
            };
            chatwith.show();
        }

        /// <summary>
        /// 客户端 b
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void button2_click(object sender, eventargs e)
        {
            chatwith chatwith = new chatwith(currentuserid: "userb")
            {
                startposition = formstartposition.centerscreen
            };
            chatwith.show();
        }

        /// <summary>
        /// 客户端 c
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void button3_click(object sender, eventargs e)
        {

            chatwith chatwith = new chatwith(currentuserid: "userc")
            {
                startposition = formstartposition.centerscreen
            };
            chatwith.show();

        }

        /// <summary>
        /// 客户端 d
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void button4_click(object sender, eventargs e)
        {
            chatwith chatwith = new chatwith(currentuserid: "userd")
            {
                startposition = formstartposition.centerscreen
            };
            chatwith.show();
        }
    }

    2.4、客户端窗体

    新建一个chatwith窗体:

C#队列学习笔记:RabbitMQ实现客户端相互通讯

    代码如下:

    public partial class chatwith : form
    {
        public delegate void chatwithdelegate();
        public delegate void chatwithdelegate<t1>(t1 obj1);
        public delegate void chatwithdelegate<t1, t2>(t1 obj1, t2 obj2);

        public string currentuserid { get; }

        private ibus bus;
        public const string connstringmq = "host=192.168.2.242:5672,192.168.2.165:5672;virtualhost=/;username=hello;password=world";
        public const string fanoutexchange = "fanoutec";

        /// <summary>
        /// 有参构造函数
        /// </summary>
        /// <param name="currentuserid"></param>
        public chatwith(string currentuserid)
        {
            initializecomponent();

            //在多线程程序中,新创建的线程不能访问ui线程创建的窗口控件。
            //此时若想访问窗体的控件,可将窗体构造函数中的checkforillegalcrossthreadcalls设置为false。
            //这时线程就能安全地访问窗体控件了。
            checkforillegalcrossthreadcalls = false;

            currentuserid = currentuserid;
        }

        /// <summary>
        /// showmessage重载
        /// </summary>
        /// <param name="msg"></param>
        private void showmessage(string msg)
        {
            if (invokerequired)//invokerequired:当前线程不是创建控件的线程时为true
            {
                begininvoke(new chatwithdelegate<string>(showmessage), msg);
            }
            else
            {
                listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), msg });
                lvwreceivemsg.items.insert(0, item);
            }
        }

        /// <summary>
        /// showmessage重载
        /// </summary>
        /// <param name="touserid"></param>
        /// <param name="msg"></param>
        private void showmessage(string touserid, string msg)
        {
            if (invokerequired)
            {
                begininvoke(new chatwithdelegate<string, string>(showmessage), touserid, msg);
            }
            else
            {
                listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), touserid, msg });
                lvwreceivemsg.items.insert(0, item);
            }
        }

        /// <summary>
        /// 绑定队列并订阅
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void chatwith_load(object sender, eventargs e)
        {
            cmbonline.selectedindex = 0;
            text = text + $"[{currentuserid}]";

            //这里不能使用using,否则订阅者立即就释放了,订阅不到消息。
            bus = rabbithutch.createbus(connstringmq);
            {
                if (bus.isconnected)
                {
                    var exchange = bus.advanced.exchangedeclare(name: fanoutexchange, type: exchangetype.fanout);
                    var queue = bus.advanced.queuedeclare(name: $"{fanoutexchange}_queue_{currentuserid}");
                    bus.advanced.bind(exchange: exchange, queue: queue, routingkey: "");

                    bus.advanced.consume(queue, registration =>
                    {
                        registration.add<messagebody>((message, info) =>
                        {
                            if (message.body.touserid == currentuserid)
                            {
                                showmessage(message.body.fromuserid, message.body.message);
                            }
                        });
                    });
                }
                else
                {
                    showmessage("服务器连接失败。");
                }
            }
        }

        /// <summary>
        /// 发送
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnsend_click(object sender, eventargs e)
        {
            try
            {
                using (var bus = rabbithutch.createbus(connstringmq))
                {
                    if (bus.isconnected)
                    {
                        if (cmbonline.text == "*")//群发
                        {
                            foreach (var item in cmbonline.items.cast<string>().where(s => s != "*" && s != currentuserid))
                            {
                                var exchange = bus.advanced.exchangedeclare(name: fanoutexchange, type: exchangetype.fanout);
                                var messagebody = new messagebody
                                {
                                    fromuserid = currentuserid,
                                    message = txtsendmsg.text,
                                    touserid = item
                                };
                                bus.advanced.publish(exchange: exchange,
                                    routingkey: "",
                                    mandatory: false,
                                    message: new message<messagebody>(messagebody));
                            }
                        }
                        else//私聊
                        {
                            var exchange = bus.advanced.exchangedeclare(name: fanoutexchange, type: exchangetype.fanout);
                            var messagebody = new messagebody
                            {
                                fromuserid = currentuserid,
                                message = txtsendmsg.text,
                                touserid = cmbonline.text
                            };
                            bus.advanced.publish(exchange: exchange,
                                routingkey: "",
                                mandatory: false,
                                message: new message<messagebody>(messagebody));
                        }
                    }
                    else
                    {
                        showmessage("发送消息失败。");
                    }
                }
            }
            catch (exception ex)
            {
                showmessage(ex.message);
            }
        }

        /// <summary>
        /// 关闭
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnclose_click(object sender, eventargs e)
        {
            close();
        }

        /// <summary>
        /// 窗体关闭事件
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void chatwith_formclosed(object sender, formclosedeventargs e)
        {
            bus?.dispose();
        }
    }

    2.5、运行结果

C#队列学习笔记:RabbitMQ实现客户端相互通讯

C#队列学习笔记:RabbitMQ实现客户端相互通讯