RabbitMQ与.net core(五) topic类型 与 headers类型 的Exchange
程序员文章站
2022-05-11 11:36:01
1.topic类型的Exchange 我们之前说过Topic类型的Exchange是direct类型的模糊查询模式,可以通过routkey来实现模糊消费message,topic的模糊匹配有两种模式: 1. 使用*来匹配一个单词 2.使用#来匹配0个或多个单词 我们来看代码 消费端 生产者代码 我们 ......
1.topic类型的exchange
我们之前说过topic类型的exchange是direct类型的模糊查询模式,可以通过routkey来实现模糊消费message,topic的模糊匹配有两种模式:
1. 使用*来匹配一个单词
2.使用#来匹配0个或多个单词
我们来看代码
消费端
using rabbitmq.client; using rabbitmq.client.events; using system; using system.collections.generic; using system.text; using system.threading; namespace rabbitmqclient { class program { private static readonly connectionfactory rabbitmqfactory = new connectionfactory() { hostname = "39.**.**.**", port = 5672, username = "root", password = "root", virtualhost = "/" }; static void main(string[] args) { var exchangeall = "changeall"; var queueman = "queueman"; var quemankey = "man.#"; using (iconnection conn = rabbitmqfactory.createconnection()) using (imodel channel = conn.createmodel()) { channel.exchangedeclare(exchangeall, type: "topic", durable: true, autodelete: false); channel.queuedeclare(queueman, durable: true, exclusive: false, autodelete: false); channel.queuebind(queueman, exchangeall, quemankey); channel.basicqos(prefetchsize: 0, prefetchcount: 50, global: false); eventingbasicconsumer consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { byte[] body = ea.body; string message = encoding.utf8.getstring(body); console.writeline( message); channel.basicack(deliverytag: ea.deliverytag, multiple: false); }; channel.basicconsume(queue: queueman, autoack: false, consumer: consumer); console.readline(); } } } }
生产者代码
using rabbitmq.client; using system; using system.collections.generic; using system.text; using system.threading; using system.threading.tasks; namespace rabbitmqconsole { class program { static void main(string[] args) { connectionfactory factory = new connectionfactory(); factory.hostname = "39.**.**.**"; factory.port = 5672; factory.virtualhost = "/"; factory.username = "root"; factory.password = "root"; var exchangeall = "changeall"; //性别.姓氏.头发长度 var keymana = "man.chen.long"; var keymanb = "man.liu.long"; var keymanc = "woman.liu.long"; var keymand = "woman.chen.short"; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.exchangedeclare(exchangeall, type: "topic", durable: true, autodelete: false); var properties = channel.createbasicproperties(); properties.persistent = true; //发布消息 channel.basicpublish(exchange: exchangeall, routingkey: keymana, basicproperties: properties, body: encoding.utf8.getbytes(keymana)); channel.basicpublish(exchange: exchangeall, routingkey: keymanb, basicproperties: properties, body: encoding.utf8.getbytes(keymanb)); channel.basicpublish(exchange: exchangeall, routingkey: keymanc, basicproperties: properties, body: encoding.utf8.getbytes(keymanc)); channel.basicpublish(exchange: exchangeall, routingkey: keymand, basicproperties: properties, body: encoding.utf8.getbytes(keymand)); } } } } }
我们先运行消费端再运行生产段,结果如下
消费端:
2.headers类型的exchange
生成者代码
using rabbitmq.client; using system; using system.collections.generic; using system.text; using system.threading; using system.threading.tasks; namespace rabbitmqconsole { class program { static void main(string[] args) { connectionfactory factory = new connectionfactory(); factory.hostname = "39.**.**.**"; factory.port = 5672; factory.virtualhost = "/"; factory.username = "root"; factory.password = "root"; var exchangeall = "changeheader"; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.exchangedeclare(exchangeall, type: "headers", durable: true, autodelete: false); var properties = channel.createbasicproperties(); properties.persistent = true; properties.headers = new dictionary<string, object> { { "sex","man"} }; //发布消息 channel.basicpublish(exchange: exchangeall, routingkey: "", basicproperties: properties, body: encoding.utf8.getbytes("hihihi")); } } } } }
消费端代码
using rabbitmq.client; using rabbitmq.client.events; using system; using system.collections.generic; using system.text; using system.threading; namespace rabbitmqclient { class program { private static readonly connectionfactory rabbitmqfactory = new connectionfactory() { hostname = "39.**.**.**", port = 5672, username = "root", password = "root", virtualhost = "/" }; static void main(string[] args) { var exchangeall = "changeheader"; var queueman = "queueheader"; using (iconnection conn = rabbitmqfactory.createconnection()) using (imodel channel = conn.createmodel()) { channel.exchangedeclare(exchangeall, type: "headers", durable: true, autodelete: false); channel.queuedeclare(queueman, durable: true, exclusive: false, autodelete: false); channel.queuebind(queueman, exchangeall, "",new dictionary<string, object> { { "sex","man" } }); channel.basicqos(prefetchsize: 0, prefetchcount: 50, global: false); eventingbasicconsumer consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { byte[] body = ea.body; string message = encoding.utf8.getstring(body); console.writeline( message); channel.basicack(deliverytag: ea.deliverytag, multiple: false); }; channel.basicconsume(queue: queueman, autoack: false, consumer: consumer); console.readline(); } } } }
下一篇: 程序员帮忙抢票
推荐阅读
-
asp.net core系列 23 EF模型配置(概述, 类型和属性的包含与排除)
-
RabbitMQ与.net core(五) topic类型 与 headers类型 的Exchange
-
RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间
-
RabbitMQ与.net core(五) topic类型 与 headers类型 的Exchange
-
RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间
-
asp.net core系列 23 EF模型配置(概述, 类型和属性的包含与排除)