C#队列学习笔记:RabbitMQ延迟队列
程序员文章站
2022-07-09 20:26:07
一、引言 日常生活中,很多的APP都有延迟队列的影子。比如在手机淘宝上,经常遇到APP派发的限时消费红包,一般有几个小时或24小时不等。假如在红包倒计时的过程中,没有消费掉红包的话,红包会自动失效。假如上述行为使用RabbitMQ延时队列来理解的话,就是在你收到限时消费红包的时候,手机淘宝会自动发一 ......
一、引言
日常生活中,很多的app都有延迟队列的影子。比如在手机淘宝上,经常遇到app派发的限时消费红包,一般有几个小时或24小时不等。假如在红包倒计时的过程中,没有消费掉红包的话,红包会自动失效。假如上述行为使用rabbitmq延时队列来理解的话,就是在你收到限时消费红包的时候,手机淘宝会自动发一条延时消息到队列中以供消费。在规定时间内,则可正常消费,否则依ttl自动失效。
在rabbitmq中,有两种方式来实现延时队列:一种是基于队列方式,另外一种是基于消息方式。
二、示例
2.1、发送端(生产端)
新建一个控制台项目send,并添加一个类rabbitmqconfig。
class rabbitmqconfig { public static string host { get; set; } public static string virtualhost { get; set; } public static string username { get; set; } public static string password { get; set; } public static int port { get; set; } static rabbitmqconfig() { host = "192.168.2.242"; virtualhost = "/"; username = "hello"; password = "world"; port = 5672; } }
class program { static void main(string[] args) { console.writeline("c# rabbitmq实现延迟队列有以下两种方式:"); console.writeline("1、基于队列方式实现延迟队列,请按1开始生产。"); console.writeline("2、基于消息方式实现延迟队列,请按2开始生产。"); string choosechar = console.readline(); if (choosechar == "1") { delaymessagepublishbyqueueexpires(); } else if (choosechar == "2") { delaymessagepublishbymessagettl(); } console.readline(); } /// <summary> /// 基于队列方式实现延迟队列 /// 将队列中所有消息的ttl(time to live,即过期时间)设置为一样 /// </summary> private static void delaymessagepublishbyqueueexpires() { const string messageprefix = "message_"; const int publishmessagecount = 6; const int quequeexpiryseconds = 1000 * 30; const int messageexpiryseconds = 1000 * 10; var factory = new connectionfactory() { hostname = rabbitmqconfig.host, port = rabbitmqconfig.port, virtualhost = rabbitmqconfig.virtualhost, username = rabbitmqconfig.username, password = rabbitmqconfig.password, protocol = protocols.defaultprotocol }; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { //当同时指定了queue和message的ttl值,则两者中较小的那个才会起作用。 dictionary<string, object> dict = new dictionary<string, object> { { "x-expires", quequeexpiryseconds },//队列过期时间 { "x-message-ttl", messageexpiryseconds },//消息过期时间 { "x-dead-letter-exchange", "dead exchange 1" },//过期消息转向路由 { "x-dead-letter-routing-key", "dead routing key 1" }//过期消息转向路由的routing key }; //声明队列 channel.queuedeclare(queue: "delay1", durable: true, exclusive: false, autodelete: false, arguments: dict); //向该消息队列发送消息message for (int i = 0; i < publishmessagecount; i++) { var message = messageprefix + i.tostring(); var body = encoding.utf8.getbytes(message); channel.basicpublish(exchange: "", routingkey: "delay1", basicproperties: null, body: body); thread.sleep(1000 * 2); console.writeline($"{datetime.now.tostring()} send {message} messageexpiryseconds {messageexpiryseconds / 1000}"); } } } } /// <summary> /// 基于消息方式实现延迟队列 /// 对队列中消息进行单独设置,每条消息的ttl可以不同。 /// </summary> private static void delaymessagepublishbymessagettl() { const string messageprefix = "message_"; const int publishmessagecount = 6; int messageexpiryseconds = 0; var factory = new connectionfactory() { hostname = rabbitmqconfig.host, port = rabbitmqconfig.port, virtualhost = rabbitmqconfig.virtualhost, username = rabbitmqconfig.username, password = rabbitmqconfig.password, protocol = protocols.defaultprotocol }; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { dictionary<string, object> dict = new dictionary<string, object> { { "x-dead-letter-exchange", "dead exchange 2" },//过期消息转向路由 { "x-dead-letter-routing-key", "dead routing key 2" }//过期消息转向路由的routing key }; //声明队列 channel.queuedeclare(queue: "delay2", durable: true, exclusive: false, autodelete: false, arguments: dict); //向该消息队列发送消息message random random = new random(); for (int i = 0; i < publishmessagecount; i++) { messageexpiryseconds = i * 1000; var properties = channel.createbasicproperties(); properties.expiration = messageexpiryseconds.tostring(); var message = messageprefix + i.tostring(); var body = encoding.utf8.getbytes(message); channel.basicpublish(exchange: "", routingkey: "delay2", basicproperties: properties, body: body); console.writeline($"{datetime.now.tostring()} send {message} messageexpiryseconds {messageexpiryseconds / 1000}"); } } } } }
2.2、接收端(消费端)
新建一个控制台项目receive,按住alt键,将发送端rabbitmqconfig类拖一个快捷方式到receive项目中。
class program { static void main(string[] args) { console.writeline("c# rabbitmq实现延迟队列有以下两种方式:"); console.writeline("1、基于队列方式实现延迟队列,请按1开始消费。"); console.writeline("2、基于消息方式实现延迟队列,请按2开始消费。"); string choosechar = console.readline(); if (choosechar == "1") { delaymessageconsumebyqueueexpires(); } else if (choosechar == "2") { delaymessageconsumebymessagettl(); } console.readline(); } public static void delaymessageconsumebyqueueexpires() { var factory = new connectionfactory() { hostname = rabbitmqconfig.host, port = rabbitmqconfig.port, virtualhost = rabbitmqconfig.virtualhost, username = rabbitmqconfig.username, password = rabbitmqconfig.password, protocol = protocols.defaultprotocol }; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.exchangedeclare(exchange: "dead exchange 1", type: "direct"); string name = channel.queuedeclare().queuename; channel.queuebind(queue: name, exchange: "dead exchange 1", routingkey: "dead routing key 1"); var consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { var message = encoding.utf8.getstring(ea.body); console.writeline($"{datetime.now.tostring()} received {message}"); }; channel.basicconsume(queue: name, noack: true, consumer: consumer); console.readkey(); } } } public static void delaymessageconsumebymessagettl() { var factory = new connectionfactory() { hostname = rabbitmqconfig.host, port = rabbitmqconfig.port, virtualhost = rabbitmqconfig.virtualhost, username = rabbitmqconfig.username, password = rabbitmqconfig.password, protocol = protocols.defaultprotocol }; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.exchangedeclare(exchange: "dead exchange 2", type: "direct"); string name = channel.queuedeclare().queuename; channel.queuebind(queue: name, exchange: "dead exchange 2", routingkey: "dead routing key 2"); var consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { var message = encoding.utf8.getstring(ea.body); console.writeline($"{datetime.now.tostring()} received {message}"); }; channel.basicconsume(queue: name, noack: true, consumer: consumer); console.readkey(); } } } }
2.3、运行结果
-----------------------------------------------------------------------------------------------------------