欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#队列学习笔记:RabbitMQ延迟队列

程序员文章站 2022-04-15 16:19:00
一、引言 日常生活中,很多的APP都有延迟队列的影子。比如在手机淘宝上,经常遇到APP派发的限时消费红包,一般有几个小时或24小时不等。假如在红包倒计时的过程中,没有消费掉红包的话,红包会自动失效。假如上述行为使用RabbitMQ延时队列来理解的话,就是在你收到限时消费红包的时候,手机淘宝会自动发一 ......

    一、引言

    日常生活中,很多的app都有延迟队列的影子。比如在手机淘宝上,经常遇到app派发的限时消费红包,一般有几个小时或24小时不等。假如在红包倒计时的过程中,没有消费掉红包的话,红包会自动失效。假如上述行为使用rabbitmq延时队列来理解的话,就是在你收到限时消费红包的时候,手机淘宝会自动发一条延时消息到队列中以供消费。在规定时间内,则可正常消费,否则依ttl自动失效。

    在rabbitmq中,有两种方式来实现延时队列:一种是基于队列方式,另外一种是基于消息方式。

C#队列学习笔记:RabbitMQ延迟队列

    二、示例

    2.1、发送端(生产端)

    新建一个控制台项目send,并添加一个类rabbitmqconfig。

    class rabbitmqconfig
    {
        public static string host { get; set; }

        public static string virtualhost { get; set; }

        public static string username { get; set; }

        public static string password { get; set; }

        public static int port { get; set; }

        static rabbitmqconfig()
        {
            host = "192.168.2.242";
            virtualhost = "/";
            username = "hello";
            password = "world";
            port = 5672;
        }
    }
    class program
    {
        static void main(string[] args)
        {
            console.writeline("c# rabbitmq实现延迟队列有以下两种方式:");
            console.writeline("1、基于队列方式实现延迟队列,请按1开始生产。");
            console.writeline("2、基于消息方式实现延迟队列,请按2开始生产。");

            string choosechar = console.readline();
            if (choosechar == "1")
            {
                delaymessagepublishbyqueueexpires();
            }
            else if (choosechar == "2")
            {
                delaymessagepublishbymessagettl();
            }
            console.readline();
        }

        /// <summary>
        /// 基于队列方式实现延迟队列
        /// 将队列中所有消息的ttl(time to live,即过期时间)设置为一样
        /// </summary>
        private static void delaymessagepublishbyqueueexpires()
        {
            const string messageprefix = "message_";
            const int publishmessagecount = 6;
            const int quequeexpiryseconds = 1000 * 30;
            const int messageexpiryseconds = 1000 * 10;

            var factory = new connectionfactory()
            {
                hostname = rabbitmqconfig.host,
                port = rabbitmqconfig.port,
                virtualhost = rabbitmqconfig.virtualhost,
                username = rabbitmqconfig.username,
                password = rabbitmqconfig.password,
                protocol = protocols.defaultprotocol
            };

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    //当同时指定了queue和message的ttl值,则两者中较小的那个才会起作用。
                    dictionary<string, object> dict = new dictionary<string, object>
                    {
                        { "x-expires", quequeexpiryseconds },//队列过期时间
                        { "x-message-ttl", messageexpiryseconds },//消息过期时间
                        { "x-dead-letter-exchange", "dead exchange 1" },//过期消息转向路由
                        { "x-dead-letter-routing-key", "dead routing key 1" }//过期消息转向路由的routing key
                    };

                    //声明队列
                    channel.queuedeclare(queue: "delay1", durable: true, exclusive: false, autodelete: false, arguments: dict);


                    //向该消息队列发送消息message
                    for (int i = 0; i < publishmessagecount; i++)
                    {
                        var message = messageprefix + i.tostring();
                        var body = encoding.utf8.getbytes(message);
                        channel.basicpublish(exchange: "", routingkey: "delay1", basicproperties: null, body: body);
                        thread.sleep(1000 * 2);
                        console.writeline($"{datetime.now.tostring()} send {message} messageexpiryseconds {messageexpiryseconds / 1000}");
                    }
                }
            }
        }

        /// <summary>
        /// 基于消息方式实现延迟队列
        /// 对队列中消息进行单独设置,每条消息的ttl可以不同。
        /// </summary>
        private static void delaymessagepublishbymessagettl()
        {
            const string messageprefix = "message_";
            const int publishmessagecount = 6;
            int messageexpiryseconds = 0;

            var factory = new connectionfactory()
            {
                hostname = rabbitmqconfig.host,
                port = rabbitmqconfig.port,
                virtualhost = rabbitmqconfig.virtualhost,
                username = rabbitmqconfig.username,
                password = rabbitmqconfig.password,
                protocol = protocols.defaultprotocol
            };

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    dictionary<string, object> dict = new dictionary<string, object>
                    {
                        { "x-dead-letter-exchange", "dead exchange 2" },//过期消息转向路由
                        { "x-dead-letter-routing-key", "dead routing key 2" }//过期消息转向路由的routing key
                    };

                    //声明队列
                    channel.queuedeclare(queue: "delay2", durable: true, exclusive: false, autodelete: false, arguments: dict);

                    //向该消息队列发送消息message
                    random random = new random();
                    for (int i = 0; i < publishmessagecount; i++)
                    {
                        messageexpiryseconds = i * 1000;
                        var properties = channel.createbasicproperties();
                        properties.expiration = messageexpiryseconds.tostring();
                        var message = messageprefix + i.tostring();
                        var body = encoding.utf8.getbytes(message);
                        channel.basicpublish(exchange: "", routingkey: "delay2", basicproperties: properties, body: body);
                        console.writeline($"{datetime.now.tostring()} send {message} messageexpiryseconds {messageexpiryseconds / 1000}");
                    }
                }
            }
        }
    }

    2.2、接收端(消费端)

    新建一个控制台项目receive,按住alt键,将发送端rabbitmqconfig类拖一个快捷方式到receive项目中。

    class program
    {
        static void main(string[] args)
        {
            console.writeline("c# rabbitmq实现延迟队列有以下两种方式:");
            console.writeline("1、基于队列方式实现延迟队列,请按1开始消费。");
            console.writeline("2、基于消息方式实现延迟队列,请按2开始消费。");

            string choosechar = console.readline();
            if (choosechar == "1")
            {
                delaymessageconsumebyqueueexpires();
            }
            else if (choosechar == "2")
            {
                delaymessageconsumebymessagettl();
            }
            console.readline();
        }

        public static void delaymessageconsumebyqueueexpires()
        {
            var factory = new connectionfactory()
            {
                hostname = rabbitmqconfig.host,
                port = rabbitmqconfig.port,
                virtualhost = rabbitmqconfig.virtualhost,
                username = rabbitmqconfig.username,
                password = rabbitmqconfig.password,
                protocol = protocols.defaultprotocol
            };

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange: "dead exchange 1", type: "direct");
                    string name = channel.queuedeclare().queuename;
                    channel.queuebind(queue: name, exchange: "dead exchange 1", routingkey: "dead routing key 1");

                    var consumer = new eventingbasicconsumer(channel);
                    consumer.received += (model, ea) =>
                    {
                        var message = encoding.utf8.getstring(ea.body);
                        console.writeline($"{datetime.now.tostring()} received {message}");
                    };
                    channel.basicconsume(queue: name, noack: true, consumer: consumer);
                    console.readkey();
                }
            }
        }

        public static void delaymessageconsumebymessagettl()
        {
            var factory = new connectionfactory()
            {
                hostname = rabbitmqconfig.host,
                port = rabbitmqconfig.port,
                virtualhost = rabbitmqconfig.virtualhost,
                username = rabbitmqconfig.username,
                password = rabbitmqconfig.password,
                protocol = protocols.defaultprotocol
            };

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange: "dead exchange 2", type: "direct");
                    string name = channel.queuedeclare().queuename;
                    channel.queuebind(queue: name, exchange: "dead exchange 2", routingkey: "dead routing key 2");

                    var consumer = new eventingbasicconsumer(channel);
                    consumer.received += (model, ea) =>
                    {
                        var message = encoding.utf8.getstring(ea.body);
                        console.writeline($"{datetime.now.tostring()} received {message}");
                    };
                    channel.basicconsume(queue: name, noack: true, consumer: consumer);
                    console.readkey();
                }
            }
        }
    }

    2.3、运行结果

C#队列学习笔记:RabbitMQ延迟队列

C#队列学习笔记:RabbitMQ延迟队列

-----------------------------------------------------------------------------------------------------------

C#队列学习笔记:RabbitMQ延迟队列

C#队列学习笔记:RabbitMQ延迟队列