C#队列学习笔记:RabbitMQ实现客户端相互通讯
程序员文章站
2022-03-26 23:18:53
一、引言 fanout类型的Exchange,路由规则非常简单:它会把所有发送到该Exchange的消息,路由到所有与它绑定的Queue中。假设有一个聊天室,各个客户端都订阅在同一fanout exchange type,那每个客户端发送出来的消息,所有的客户端都能收到,因为大家都订阅了。此时,只需 ......
一、引言
fanout类型的exchange,路由规则非常简单:它会把所有发送到该exchange的消息,路由到所有与它绑定的queue中。假设有一个聊天室,各个客户端都订阅在同一fanout exchange type,那每个客户端发送出来的消息,所有的客户端都能收到,因为大家都订阅了。此时,只需要简单地限制一下,只有是与我有关的消息,才在聊天界面上显示。这样,即可达到相互通讯的效果。
二、示例
2.1、环境准备
本示例使用easynetq来实现,请先在nuget上安装。
2.2、实体类
新建一个实体类messagebody:
public class messagebody { public string fromuserid { get; set; } public string message { get; set; } public string touserid { get; set; } }
2.3、主窗体
新建一个chatmain窗体:
代码如下:
public partial class chatmain : form { public chatmain() { initializecomponent(); } /// <summary> /// 客户端 a /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void button1_click(object sender, eventargs e) { chatwith chatwith = new chatwith(currentuserid: "usera") { startposition = formstartposition.centerscreen }; chatwith.show(); } /// <summary> /// 客户端 b /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void button2_click(object sender, eventargs e) { chatwith chatwith = new chatwith(currentuserid: "userb") { startposition = formstartposition.centerscreen }; chatwith.show(); } /// <summary> /// 客户端 c /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void button3_click(object sender, eventargs e) { chatwith chatwith = new chatwith(currentuserid: "userc") { startposition = formstartposition.centerscreen }; chatwith.show(); } /// <summary> /// 客户端 d /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void button4_click(object sender, eventargs e) { chatwith chatwith = new chatwith(currentuserid: "userd") { startposition = formstartposition.centerscreen }; chatwith.show(); } }
2.4、客户端窗体
新建一个chatwith窗体:
代码如下:
public partial class chatwith : form { public delegate void chatwithdelegate(); public delegate void chatwithdelegate<t1>(t1 obj1); public delegate void chatwithdelegate<t1, t2>(t1 obj1, t2 obj2); public string currentuserid { get; } private ibus bus; public const string connstringmq = "host=192.168.2.242:5672,192.168.2.165:5672;virtualhost=/;username=hello;password=world"; public const string fanoutexchange = "fanoutec"; /// <summary> /// 有参构造函数 /// </summary> /// <param name="currentuserid"></param> public chatwith(string currentuserid) { initializecomponent(); //在多线程程序中,新创建的线程不能访问ui线程创建的窗口控件。 //此时若想访问窗体的控件,可将窗体构造函数中的checkforillegalcrossthreadcalls设置为false。 //这时线程就能安全地访问窗体控件了。 checkforillegalcrossthreadcalls = false; currentuserid = currentuserid; } /// <summary> /// showmessage重载 /// </summary> /// <param name="msg"></param> private void showmessage(string msg) { if (invokerequired)//invokerequired:当前线程不是创建控件的线程时为true { begininvoke(new chatwithdelegate<string>(showmessage), msg); } else { listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), msg }); lvwreceivemsg.items.insert(0, item); } } /// <summary> /// showmessage重载 /// </summary> /// <param name="touserid"></param> /// <param name="msg"></param> private void showmessage(string touserid, string msg) { if (invokerequired) { begininvoke(new chatwithdelegate<string, string>(showmessage), touserid, msg); } else { listviewitem item = new listviewitem(new string[] { datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), touserid, msg }); lvwreceivemsg.items.insert(0, item); } } /// <summary> /// 绑定队列并订阅 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void chatwith_load(object sender, eventargs e) { cmbonline.selectedindex = 0; text = text + $"[{currentuserid}]"; //这里不能使用using,否则订阅者立即就释放了,订阅不到消息。 bus = rabbithutch.createbus(connstringmq); { if (bus.isconnected) { var exchange = bus.advanced.exchangedeclare(name: fanoutexchange, type: exchangetype.fanout); var queue = bus.advanced.queuedeclare(name: $"{fanoutexchange}_queue_{currentuserid}"); bus.advanced.bind(exchange: exchange, queue: queue, routingkey: ""); bus.advanced.consume(queue, registration => { registration.add<messagebody>((message, info) => { if (message.body.touserid == currentuserid) { showmessage(message.body.fromuserid, message.body.message); } }); }); } else { showmessage("服务器连接失败。"); } } } /// <summary> /// 发送 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void btnsend_click(object sender, eventargs e) { try { using (var bus = rabbithutch.createbus(connstringmq)) { if (bus.isconnected) { if (cmbonline.text == "*")//群发 { foreach (var item in cmbonline.items.cast<string>().where(s => s != "*" && s != currentuserid)) { var exchange = bus.advanced.exchangedeclare(name: fanoutexchange, type: exchangetype.fanout); var messagebody = new messagebody { fromuserid = currentuserid, message = txtsendmsg.text, touserid = item }; bus.advanced.publish(exchange: exchange, routingkey: "", mandatory: false, message: new message<messagebody>(messagebody)); } } else//私聊 { var exchange = bus.advanced.exchangedeclare(name: fanoutexchange, type: exchangetype.fanout); var messagebody = new messagebody { fromuserid = currentuserid, message = txtsendmsg.text, touserid = cmbonline.text }; bus.advanced.publish(exchange: exchange, routingkey: "", mandatory: false, message: new message<messagebody>(messagebody)); } } else { showmessage("发送消息失败。"); } } } catch (exception ex) { showmessage(ex.message); } } /// <summary> /// 关闭 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void btnclose_click(object sender, eventargs e) { close(); } /// <summary> /// 窗体关闭事件 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void chatwith_formclosed(object sender, formclosedeventargs e) { bus?.dispose(); } }
2.5、运行结果
上一篇: 干贝是什么?这里有干贝大全