运用.NetCore实例讲解RabbitMQ死信队列,延时队列
一、死信队列
描述:q1队列绑定了x-dead-letter-exchange(死信交换机)为x2,x-dead-letter-routing-key(死信路由key)指向q2(队列2)
p(生产者)发送消息经x1(交换机1)路由到q1(队列1),q1的消息触发特定情况,自动把消息经x2(交换机2)路由到q2(队列2),c(消费者)直接消息q2的消息。
特定情况有哪些呢:
- 1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue=false);
- 2.当前队列中的消息数量已经超过最大长度(创建队列时指定" x-max-length参数设置队列最大消息数量)。
- 3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的ttl(time to live)时间;
这里演示情况1:
假如场景:q1中队列数据不完整,就算从新处理也会报错,那就可以不ack,把这个消息转到死信队列另外处理。
生产者:
public static void sendmessage() { //死信交换机 string dlxexchange = "dlx.exchange"; //死信队列 string dlxqueuename = "dlx.queue"; //消息交换机 string exchange = "direct-exchange"; //消息队列 string queuename = "queue_a"; using (var connection = rabbitmqhelper.getconnection()) { using (var channel = connection.createmodel()) { //创建死信交换机 channel.exchangedeclare(dlxexchange, type: exchangetype.direct, durable: true, autodelete: false); //创建死信队列 channel.queuedeclare(dlxqueuename, durable: true, exclusive: false, autodelete: false); //死信队列绑定死信交换机 channel.queuebind(dlxqueuename, dlxexchange, routingkey: dlxqueuename); // 创建消息交换机 channel.exchangedeclare(exchange, type: exchangetype.direct, durable: true, autodelete: false); //创建消息队列,并指定死信队列 channel.queuedeclare(queuename, durable: true, exclusive: false, autodelete: false, arguments: new dictionary<string, object> { { "x-dead-letter-exchange",dlxexchange}, //设置当前队列的dlx(死信交换机) { "x-dead-letter-routing-key",dlxqueuename}, //设置dlx的路由key,dlx会根据该值去找到死信消息存放的队列 }); //消息队列绑定消息交换机 channel.queuebind(queuename, exchange, routingkey: queuename); string message = "hello rabbitmq message"; var properties = channel.createbasicproperties(); properties.persistent = true; //发布消息 channel.basicpublish(exchange: exchange, routingkey: queuename, basicproperties: properties, body: encoding.utf8.getbytes(message)); console.writeline($"向队列:{queuename}发送消息:{message}"); } } }
消费者:
public static void consumer() { //死信交换机 string dlxexchange = "dlx.exchange"; //死信队列 string dlxqueuename = "dlx.queue"; //消息交换机 string exchange = "direct-exchange"; //消息队列 string queuename = "queue_a"; var connection = rabbitmqhelper.getconnection(); { //创建信道 var channel = connection.createmodel(); { //创建死信交换机 channel.exchangedeclare(dlxexchange, type: exchangetype.direct, durable: true, autodelete: false); //创建死信队列 channel.queuedeclare(dlxqueuename, durable: true, exclusive: false, autodelete: false); //死信队列绑定死信交换机 channel.queuebind(dlxqueuename, dlxexchange, routingkey: dlxqueuename); // 创建消息交换机 channel.exchangedeclare(exchange, type: exchangetype.direct, durable: true, autodelete: false); //创建消息队列,并指定死信队列 channel.queuedeclare(queuename, durable: true, exclusive: false, autodelete: false, arguments: new dictionary<string, object> { { "x-dead-letter-exchange",dlxexchange}, //设置当前队列的dlx { "x-dead-letter-routing-key",dlxqueuename}, //设置dlx的路由key,dlx会根据该值去找到死信消息存放的队列 }); //消息队列绑定消息交换机 channel.queuebind(queuename, exchange, routingkey: queuename); var consumer = new eventingbasicconsumer(channel); channel.basicqos(prefetchsize: 0, prefetchcount: 1, global: true); consumer.received += (model, ea) => { //处理业务 var message = encoding.utf8.getstring(ea.body.toarray()); console.writeline($"队列{queuename}消费消息:{message},不做ack确认"); //channel.basicack(ea.deliverytag, false); //不ack(basicnack),且不把消息放回队列(requeue:false) channel.basicnack(ea.deliverytag, false, requeue: false); }; channel.basicconsume(queuename, autoack: false, consumer); } } }
消费者加上channel.basicknack()模拟消息处理不了,不ack确认。
执行结果:
rabbitmq管理界面:
看到消息队列为queue_a,特性有dlx(死信交换机),dlk(死信路由)。因为消费端不nack,触发了死信,被转发到了死信队列dlx.queue。
二、延时队列
延时队列其实也是配合死信队列一起用,其实就是上面死信队列的第二中情况。给队列添加消息过时时间(ttl),变成延时队列。
简单的描述就是:p(生产者)发送消息到q1(延时队列),q1的消息有过期时间,比如10s,那10s后消息过期就会触发死信,从而把消息转发到q2(死信队列)。
解决问题场景:像商城下单,未支付时取消订单场景。下单时写一条记录入q1,延时30分钟后转到q2,消费q2,检查订单,支付则不做操作,没支付则取消订单,恢复库存。
生产者代码:
public static void sendmessage() { //死信交换机 string dlxexchange = "dlx.exchange"; //死信队列 string dlxqueuename = "dlx.queue"; //消息交换机 string exchange = "direct-exchange"; //消息队列 string queuename = "delay_queue"; using (var connection = rabbitmqhelper.getconnection()) { using (var channel = connection.createmodel()) { //创建死信交换机 channel.exchangedeclare(dlxexchange, type: exchangetype.direct, durable: true, autodelete: false); //创建死信队列 channel.queuedeclare(dlxqueuename, durable: true, exclusive: false, autodelete: false); //死信队列绑定死信交换机 channel.queuebind(dlxqueuename, dlxexchange, routingkey: dlxqueuename); // 创建消息交换机 channel.exchangedeclare(exchange, type: exchangetype.direct, durable: true, autodelete: false); //创建消息队列,并指定死信队列,和设置这个队列的消息过期时间为10s channel.queuedeclare(queuename, durable: true, exclusive: false, autodelete: false, arguments: new dictionary<string, object> { { "x-dead-letter-exchange",dlxexchange}, //设置当前队列的dlx(死信交换机) { "x-dead-letter-routing-key",dlxqueuename}, //设置dlx的路由key,dlx会根据该值去找到死信消息存放的队列 { "x-message-ttl",10000} //设置队列的消息过期时间 }); //消息队列绑定消息交换机 channel.queuebind(queuename, exchange, routingkey: queuename); string message = "hello rabbitmq message"; var properties = channel.createbasicproperties(); properties.persistent = true; //发布消息 channel.basicpublish(exchange: exchange, routingkey: queuename, basicproperties: properties, body: encoding.utf8.getbytes(message)); console.writeline($"{datetime.now},向队列:{queuename}发送消息:{message}"); } } }
消费者代码:
public static void consumer() { //死信交换机 string dlxexchange = "dlx.exchange"; //死信队列 string dlxqueuename = "dlx.queue"; var connection = rabbitmqhelper.getconnection(); { //创建信道 var channel = connection.createmodel(); { //创建死信交换机 channel.exchangedeclare(dlxexchange, type: exchangetype.direct, durable: true, autodelete: false); //创建死信队列 channel.queuedeclare(dlxqueuename, durable: true, exclusive: false, autodelete: false); //死信队列绑定死信交换机 channel.queuebind(dlxqueuename, dlxexchange, routingkey: dlxqueuename); var consumer = new eventingbasicconsumer(channel); channel.basicqos(prefetchsize: 0, prefetchcount: 1, global: true); consumer.received += (model, ea) => { //处理业务 var message = encoding.utf8.getstring(ea.body.toarray()); console.writeline($"{datetime.now},队列{dlxqueuename}消费消息:{message}"); channel.basicack(ea.deliverytag, false); }; channel.basicconsume(dlxqueuename, autoack: false, consumer); } } }
执行代码:
向延时队列发送消息,监听死信队列,发送和收到消息时间刚好是设置的10s。
rabbitmq管理界面:
三、延时消息设置不同过期时间
上面的延时队列能解决消息过期时间都是相同的场景,能不能解决消息的过期时间是不一样的呢?
例如场景:机器人客服,为了更像人为操作,收到消息后要随机3-10秒回复客户。
- 1)队列不设置ttl(消息过期时间),把过期时间设置在消息上。
生产者代码:
public static void sendmessage() { //死信交换机 string dlxexchange = "dlx.exchange"; //死信队列 string dlxqueuename = "dlx.queue"; //消息交换机 string exchange = "direct-exchange"; //消息队列 string queuename = "delay_queue"; using (var connection = rabbitmqhelper.getconnection()) { using (var channel = connection.createmodel()) { //创建死信交换机 channel.exchangedeclare(dlxexchange, type: exchangetype.direct, durable: true, autodelete: false); //创建死信队列 channel.queuedeclare(dlxqueuename, durable: true, exclusive: false, autodelete: false); //死信队列绑定死信交换机 channel.queuebind(dlxqueuename, dlxexchange, routingkey: dlxqueuename); // 创建消息交换机 channel.exchangedeclare(exchange, type: exchangetype.direct, durable: true, autodelete: false); //创建消息队列,并指定死信队列,和设置这个队列的消息过期时间为10s channel.queuedeclare(queuename, durable: true, exclusive: false, autodelete: false, arguments: new dictionary<string, object> { { "x-dead-letter-exchange",dlxexchange}, //设置当前队列的dlx(死信交换机) { "x-dead-letter-routing-key",dlxqueuename}, //设置dlx的路由key,dlx会根据该值去找到死信消息存放的队列 //{ "x-message-ttl",10000} //设置队列的消息过期时间 }); //消息队列绑定消息交换机 channel.queuebind(queuename, exchange, routingkey: queuename); string message = "hello rabbitmq message 10s后处理"; var properties = channel.createbasicproperties(); properties.persistent = true; properties.expiration = "10000";//消息的有效期10s //发布消息,延时10s channel.basicpublish(exchange: exchange, routingkey: queuename, basicproperties: properties, body: encoding.utf8.getbytes(message)); console.writeline($"{datetime.now},向队列:{queuename}发送消息:{message},延时:10s"); string message2 = "hello rabbitmq message 5s后处理"; var properties2 = channel.createbasicproperties(); properties2.persistent = true; properties2.expiration = "5000";//消息有效期5s //发布消息,延时5s channel.basicpublish(exchange: exchange, routingkey: queuename, basicproperties: properties2, body: encoding.utf8.getbytes(message2)); console.writeline($"{datetime.now},向队列:{queuename}发送消息:{message2},延时:5s"); } } }
消费者代码还是上面延时队列的不变,先试下效果。
生产者向队列中发送一条延时10s的消息再发一条延时5秒的消息,但消费者却先拿到延时10s的,再拿到延时5秒的,我想要的结果是先拿到延时5s的再拿到延时10s的,是什么原因呢。
原因是:队列是先进先出的,而rabbitmq只会对首位第一条消息做检测,第一条没过期,那么后面的消息就会阻塞住等待前面的过期。
解决办法:增加一个消费者对延时队列消费,不ack,把第一条消息放到队列尾部。一直让消息在流动,这样就能检测到了。
- 2)新增消费者代码:
public static void sendmessage() { //死信交换机 string dlxexchange = "dlx.exchange"; //死信队列 string dlxqueuename = "dlx.queue"; //消息交换机 string exchange = "direct-exchange"; //消息队列 string queuename = "delay_queue"; using (var connection = rabbitmqhelper.getconnection()) { using (var channel = connection.createmodel()) { //创建死信交换机 channel.exchangedeclare(dlxexchange, type: exchangetype.direct, durable: true, autodelete: false); //创建死信队列 channel.queuedeclare(dlxqueuename, durable: true, exclusive: false, autodelete: false); //死信队列绑定死信交换机 channel.queuebind(dlxqueuename, dlxexchange, routingkey: dlxqueuename); // 创建消息交换机 channel.exchangedeclare(exchange, type: exchangetype.direct, durable: true, autodelete: false); //创建消息队列,并指定死信队列,和设置这个队列的消息过期时间为10s channel.queuedeclare(queuename, durable: true, exclusive: false, autodelete: false, arguments: new dictionary<string, object> { { "x-dead-letter-exchange",dlxexchange}, //设置当前队列的dlx(死信交换机) { "x-dead-letter-routing-key",dlxqueuename}, //设置dlx的路由key,dlx会根据该值去找到死信消息存放的队列 //{ "x-message-ttl",10000} //设置队列的消息过期时间 }); //消息队列绑定消息交换机 channel.queuebind(queuename, exchange, routingkey: queuename); string message = "hello rabbitmq message 10s后处理"; var properties = channel.createbasicproperties(); properties.persistent = true; properties.expiration = "10000";//消息的有效期10s //发布消息,延时10s channel.basicpublish(exchange: exchange, routingkey: queuename, basicproperties: properties, body: encoding.utf8.getbytes(message)); console.writeline($"{datetime.now},向队列:{queuename}发送消息:{message},延时:10s"); string message2 = "hello rabbitmq message 5s后处理"; var properties2 = channel.createbasicproperties(); properties2.persistent = true; properties2.expiration = "5000";//消息有效期5s //发布消息,延时5s channel.basicpublish(exchange: exchange, routingkey: queuename, basicproperties: properties2, body: encoding.utf8.getbytes(message2)); console.writeline($"{datetime.now},向队列:{queuename}发送消息:{message2},延时:5s"); } } }
执行效果:
这会得到了想要的效果。
rabbitmq管理界面:
四、延时消息用延时插件的方式实现
相比上面第三的延时消息,这里的插件方式会显的更加简单,也推荐用这种。
因为这里只需要一个交换机和一个对队列,生产者向队列发送消息,会直接是延时才会到队列。
安装插件:
地址:https://www.rabbitmq.com/community-plugins.html
找到和自己rabbitmq一样的版本,下载下来上传到linux,或f12查看这个文件的地址,直接linux上下载(这里用这种)
linux下载插件:
#下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
已经下载到linux上
#把文件复制到rabbitmq docker容器下的plugins文件夹
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
#进入rabbitmq docker容器
docker exec -it rabbitmq bash
#开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
做完上面这些在rabbitmq管理界面可以看到多了一个延时消息的交换机。
插件装好了,生产者代码:
public static void sendmessage() { //延时消息交换机 string delayexchange = "delay.exchange"; //延时消息队列 string delayqueuename = "delay_queue"; using (var connection = rabbitmqhelper.getconnection()) { using (var channel = connection.createmodel()) { dictionary<string, object> args = new dictionary<string, object>(); args.add("x-delayed-type", "direct"); //x-delayed-type必须加 //创建延时交换机,type类型为x-delayed-message channel.exchangedeclare(delayexchange, type: "x-delayed-message", durable: true, autodelete: false,arguments: args); //创建延时消息队列 channel.queuedeclare(delayqueuename, durable: true, exclusive: false, autodelete: false); //交换机绑定队列 channel.queuebind(delayqueuename, delayexchange, routingkey: delayqueuename); string message = "hello rabbitmq message 10s后处理"; var properties = channel.createbasicproperties(); properties.persistent = true; //延时时间从header赋值 dictionary<string, object> headers = new dictionary<string, object>(); headers.add("x-delay", 10000); properties.headers = headers; //发布消息,按时10s channel.basicpublish(exchange: delayexchange, routingkey: delayqueuename, basicproperties: properties, body: encoding.utf8.getbytes(message)); console.writeline($"{datetime.now},向队列:{delayqueuename}发送消息:{message},延时:10s"); string message2 = "hello rabbitmq message 5s后处理"; var properties2 = channel.createbasicproperties(); properties2.persistent = true; //延时时间从header赋值 dictionary<string, object> headers2 = new dictionary<string, object>(); headers2.add("x-delay", 5000); properties2.headers = headers2; //发布消息,延时5s channel.basicpublish(exchange: delayexchange, routingkey: delayqueuename, basicproperties: properties2, body: encoding.utf8.getbytes(message2)); console.writeline($"{datetime.now},向队列:{delayqueuename}发送消息:{message2},延时:5s"); } } }
消费者代码:
public static void delaymessageconsumer() { //延时队列 string queuename = "delay_queue"; var connection = rabbitmqhelper.getconnection(); { //创建信道 var channel = connection.createmodel(); { var consumer = new eventingbasicconsumer(channel); channel.basicqos(prefetchsize: 0, prefetchcount: 1, global: true); consumer.received += (model, ea) => { //处理业务 var message = encoding.utf8.getstring(ea.body.toarray()); console.writeline($"{datetime.now},接收到消息:{message}"); channel.basicack(ea.deliverytag, false); }; channel.basicconsume(queuename, autoack: false, consumer); } } }
执行代码:
rabbitmq管理界面,只有一个队列:
到此这篇关于运用.netcore实例讲解rabbitmq死信队列,延时队列的文章就介绍到这了,更多相关.netcore rabbitmq死信队列,延时队列内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
下一篇: .Net中异步任务的取消和监控的具体实现
推荐阅读