C#利用RabbitMQ实现消息订阅与发布
程序员文章站
2022-05-18 14:21:56
在消息队列模型中,如何将消息广播到所有的消费者,这种模式成为“发布/订阅”。本文主要以一个简单的小例子,简述通过fanout交换机,实现消息的发布与订阅,仅供学习分享使用,如有不足之处,还请指正。 ......
在消息队列模型中,如何将消息广播到所有的消费者,这种模式成为“发布/订阅”。本文主要以一个简单的小例子,简述通过fanout交换机,实现消息的发布与订阅,仅供学习分享使用,如有不足之处,还请指正。
fanout交换机模型
扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout交换机转发消息是最快的。
rabbitmq控制台操作
新增两个队列
在同一个virtual host下新增两个队列q1,q2,如下图所示:
绑定fanout交换机
将两个队列绑定到系统默认的fanout交换机,如下所示:
示例效果图
生产者,采用fanout类型交换机发布消息,如下图所示:
当生产者发布 一条消息时,q1,q2两个队列均会收到,如下图所示:
当启动消费者后,两个消费者,均会订阅到相关消息,如下图所示:
核心代码
消息发布
建立连接后,将通道声明类型为fanout的交换机,如下所示:
1 /// <summary> 2 /// fanout类型交换机,发送消息 3 /// </summary> 4 public class rabbitmqfanoutsendhelper : rabbitmqhelper { 5 /// <summary> 6 /// 发送消息 7 /// </summary> 8 /// <param name="msg"></param> 9 /// <returns></returns> 10 public bool sendmsg(string msg) 11 { 12 try 13 { 14 using (var conn = getconnection("/alan.hsiang")) 15 { 16 using (var channel = conn.createmodel()) 17 { 18 channel.exchangedeclare(exchange: "amq.fanout", type: exchangetype.fanout,durable:true); 19 20 var body = encoding.utf8.getbytes(msg); 21 22 channel.basicpublish(exchange: "amq.fanout", 23 routingkey: "", 24 basicproperties: null, 25 body: body); 26 27 //console.writeline(" [x] sent {0}", message); 28 }; 29 }; 30 return true; 31 } 32 catch (exception ex) 33 { 34 throw ex; 35 } 36 } 37 }
消息订阅
建立连接后,通道声明类型为fanout的交换机,并绑定队列进行订阅,如下所示:
1 /// <summary> 2 /// 扇形交换机接收消息 3 /// </summary> 4 public class rabbitmqfanoutreceivehelper : rabbitmqhelper 5 { 6 public rabbitmqreceiveeventhandler onreceiveevent; 7 8 private iconnection conn; 9 10 private imodel channel; 11 12 private eventingbasicconsumer consumer; 13 14 public bool startreceivemsg(string queuename) 15 { 16 try 17 { 18 conn = getconnection("/alan.hsiang"); 19 20 channel = conn.createmodel(); 21 channel.exchangedeclare(exchange: "amq.fanout", type: exchangetype.fanout,durable:true); 22 //此处随机取出交换机下的队列 23 //var queuename = channel.queuedeclare().queuename; 24 channel.queuebind(queue: queuename, exchange: "amq.fanout", routingkey: ""); 25 consumer = new eventingbasicconsumer(channel); 26 consumer.received += (model, ea) => 27 { 28 var body = ea.body.toarray(); 29 var message = encoding.utf8.getstring(body); 30 //console.writeline(" [x] received {0}", message); 31 if (onreceiveevent != null) 32 { 33 onreceiveevent(queuename+"::"+message); 34 } 35 }; 36 channel.basicconsume(queue: queuename, 37 autoack: true, 38 consumer: consumer); 39 return true; 40 } 41 catch (exception ex) 42 { 43 throw ex; 44 } 45 } 46 }
关于rabbitmq的基础知识介绍,可参考前几篇博文。
备注
遣怀
唐代 [杜牧]
落魄江湖载酒行,楚腰纤细掌中轻。
十年一觉扬州梦,赢得青楼薄幸名。
十年一觉扬州梦,赢得青楼薄幸名。
上一篇: 写Bug时,需要注意的几点3
下一篇: ExcelDataReader插件的使用