RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间
程序员文章站
2022-10-18 12:41:36
上一篇我们讲了关于direct类型的Exchange,这一片我们来了解一下fanout类型的Exchange。 1.Exchange的fanout类型 fanout类型的Exchange的特点是会把消息发送给与之绑定的所有Queue中,我们来测试一下。代码如下 运行代码,去可视化工具中查看一下 消费 ......
上一篇我们讲了关于direct类型的exchange,这一片我们来了解一下fanout类型的exchange。
1.exchange的fanout类型
fanout类型的exchange的特点是会把消息发送给与之绑定的所有queue中,我们来测试一下。代码如下
using rabbitmq.client; using system; using system.text; using system.threading; using system.threading.tasks; namespace rabbitmqconsole { class program { static void main(string[] args) { connectionfactory factory = new connectionfactory(); factory.hostname = "39.**.**.**"; factory.port = 5672; factory.virtualhost = "/"; factory.username = "root"; factory.password = "root"; var exchange = "change3"; var route = "route2"; var queue3 = "queue3"; var queue4 = "queue4"; var queue5 = "queue5"; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false); channel.queuedeclare(queue3, durable: true, exclusive: false, autodelete: false); channel.queuebind(queue3, exchange, queue3); channel.queuedeclare(queue4, durable: true, exclusive: false, autodelete: false); channel.queuebind(queue4, exchange, queue4); channel.queuedeclare(queue5, durable: true, exclusive: false, autodelete: false); channel.queuebind(queue5, exchange, queue5); var props = channel.createbasicproperties(); props.persistent = true; channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit")); } } } } }
运行代码,去可视化工具中查看一下
消费其中的一个
using rabbitmq.client; using rabbitmq.client.events; using system; using system.text; using system.threading; namespace rabbitmqclient { class program { private static readonly connectionfactory rabbitmqfactory = new connectionfactory() { hostname = "39.**.**.**", port = 5672, username = "root", password = "root", virtualhost = "/" }; static void main(string[] args) { var exchange = "change3"; var route = "route2"; var queue = "queue3"; using (iconnection conn = rabbitmqfactory.createconnection()) using (imodel channel = conn.createmodel()) { channel.exchangedeclare(exchange, "fanout", durable: true, autodelete: false); channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false); channel.queuebind(queue, exchange, route); channel.basicqos(prefetchsize: 0, prefetchcount: 50, global: false); eventingbasicconsumer consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { byte[] body = ea.body; string message = encoding.utf8.getstring(body); console.writeline( message+thread.currentthread.managedthreadid); channel.basicack(deliverytag: ea.deliverytag, multiple: false); }; channel.basicconsume(queue: queue, autoack: false, consumer: consumer); console.readline(); } } } }
结果如下
大家可以依次消费其他两个queue,这里就不演示了
2.消息的过期时间
我们在发送一些消息的时候,有时希望给消息设置一下过期时间,我们可以通过两种方式来设置
2.1设置队列的过期时间
using rabbitmq.client; using system; using system.collections.generic; using system.text; using system.threading; using system.threading.tasks; namespace rabbitmqconsole { class program { static void main(string[] args) { connectionfactory factory = new connectionfactory(); factory.hostname = "39.**.**.**"; factory.port = 5672; factory.virtualhost = "/"; factory.username = "root"; factory.password = "root"; var exchange = "change4"; var route = "route2"; var queue7 = "queue7"; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false); //队列过期时间,单位毫秒 channel.queuedeclare(queue7, durable: true, exclusive: false, autodelete: false,arguments:new dictionary<string, object> { { "x-message-ttl", 8000 } }); channel.queuebind(queue7, exchange, queue7); var props = channel.createbasicproperties(); props.persistent = true; channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit")); } } } } }
这样过8秒去queue就看不到该消息了
2.2设置message的过期时间
using rabbitmq.client; using system; using system.collections.generic; using system.text; using system.threading; using system.threading.tasks; namespace rabbitmqconsole { class program { static void main(string[] args) { connectionfactory factory = new connectionfactory(); factory.hostname = "39.**.**.**"; factory.port = 5672; factory.virtualhost = "/"; factory.username = "root"; factory.password = "root"; var exchange = "change4"; var route = "route2"; var queue7 = "queue7"; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false); channel.queuedeclare(queue7, durable: true, exclusive: false, autodelete: false,arguments:new dictionary<string, object> { { "x-message-ttl", 8000 } }); channel.queuebind(queue7, exchange, queue7); var props = channel.createbasicproperties(); //message过期时间,单位毫秒 props.expiration = "30000"; props.persistent = true; channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit")); } } } } }
我们发现还是8秒就过期了,说明如果同时设置了队列与消息的过期时间,则按照队列的时间过期。我们把队列的过期时间去掉重新试一下。
using rabbitmq.client; using system; using system.collections.generic; using system.text; using system.threading; using system.threading.tasks; namespace rabbitmqconsole { class program { static void main(string[] args) { connectionfactory factory = new connectionfactory(); factory.hostname = "39.**.**.**"; factory.port = 5672; factory.virtualhost = "/"; factory.username = "root"; factory.password = "root"; var exchange = "change4"; var route = "route2"; var queue7 = "queue7"; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false); channel.queuedeclare(queue7, durable: true, exclusive: false, autodelete: false); channel.queuebind(queue7, exchange, queue7); var props = channel.createbasicproperties(); props.expiration = "30000"; props.persistent = true; channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit")); } } } } }
3.队列生存时间
我们还可以设置一个队列的生存时间
using rabbitmq.client; using system; using system.collections.generic; using system.text; using system.threading; using system.threading.tasks; namespace rabbitmqconsole { class program { static void main(string[] args) { connectionfactory factory = new connectionfactory(); factory.hostname = "39.**.**.**"; factory.port = 5672; factory.virtualhost = "/"; factory.username = "root"; factory.password = "root"; var exchange = "change4"; var route = "route2"; var queue8 = "queue8"; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false); channel.queuedeclare(queue8, durable: true, exclusive: false, autodelete: false,arguments: new dictionary<string, object> { { "x-expires",10000} //设置当前队列的过期时间为10000毫秒 }); channel.queuebind(queue8, exchange, queue8); var props = channel.createbasicproperties(); props.persistent = true; channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit")); } } } } }
这样10秒后队列就消失了