欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#队列学习笔记:RabbitMQ优先级队列

程序员文章站 2022-03-25 22:53:14
一、引言 在具体业务中可能会遇到一些要提前处理的消息,比如普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理。在RabbitMQ中,消息优先级的实现方式是:在声明queue时设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级即可。 RabbitMQ优 ......

    一、引言

    在具体业务中可能会遇到一些要提前处理的消息,比如普通客户的消息按先进先出的顺序处理,vip客户的消息要提前处理。在rabbitmq中,消息优先级的实现方式是:在声明queue时设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级即可。

C#队列学习笔记:RabbitMQ优先级队列

    rabbitmq优先级队列注意事项:

    1)rabbitmq3.5以后才支持优先级队列。

    2)只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效。

    3)优先级取值范围在0~9之间,数值越大则优先级越高。

    二、示例

    2.1、发送端(生产端)

    新建一个控制台项目send,并添加一个类rabbitmqconfig。

    class rabbitmqconfig
    {
        public static string host { get; set; }

        public static string virtualhost { get; set; }

        public static string username { get; set; }

        public static string password { get; set; }

        public static int port { get; set; }

        static rabbitmqconfig()
        {
            host = "192.168.2.242";
            virtualhost = "/";
            username = "hello";
            password = "world";
            port = 5672;
        }
    }
    class program
    {
        static void main(string[] args)
        {
            console.writeline("按任意键开始生产。");
            console.readline();
            prioritymessagepublish();
            console.readline();
        }

        private static void prioritymessagepublish()
        {
            const string messageprefix = "message_";
            const int publishmessagecount = 6;
            byte messagepriority = 0;

            var factory = new connectionfactory()
            {
                hostname = rabbitmqconfig.host,
                port = rabbitmqconfig.port,
                virtualhost = rabbitmqconfig.virtualhost,
                username = rabbitmqconfig.username,
                password = rabbitmqconfig.password,
                protocol = protocols.defaultprotocol
            };

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    //设置队列优先级,取值范围在0~255之间。
                    dictionary<string, object> dict = new dictionary<string, object>
                    {
                        { "x-max-priority", 255 }
                    };

                    //声明队列
                    channel.queuedeclare(queue: "priority", durable: true, exclusive: false, autodelete: false, arguments: dict);


                    //向该消息队列发送消息message
                    random random = new random();
                    for (int i = 0; i < publishmessagecount; i++)
                    {
                        var properties = channel.createbasicproperties();
                        messagepriority = (byte)random.next(0, 9);
                        properties.priority = messagepriority;//设置消息优先级,取值范围在0~9之间。
                        var message = messageprefix + i.tostring();
                        var body = encoding.utf8.getbytes(message);
                        channel.basicpublish(exchange: "", routingkey: "priority", basicproperties: properties, body: body);
                        console.writeline($"{datetime.now.tostring()} send {message} , priority {messagepriority}");
                    }
                }
            }
        }
    }

    2.2、接收端(消费端)

    新建一个控制台项目receive,按住alt键,将发送端rabbitmqconfig类拖一个快捷方式到receive项目中。

    class program
    {
        static void main(string[] args)
        {
            console.writeline("按任意键开始消费。");
            console.readline();
            prioritymessagesubscribe();
        }

        public static void prioritymessagesubscribe()
        {
            var factory = new connectionfactory()
            {
                hostname = rabbitmqconfig.host,
                username = rabbitmqconfig.username,
                password = rabbitmqconfig.password
            };

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.basicqos(prefetchsize: 0, prefetchcount: 1, global: false);
                    var consumer = new eventingbasicconsumer(channel);
                    consumer.received += async (model, ea) =>
                    {
                        await task.run(() =>
                        {
                            var message = encoding.utf8.getstring(ea.body);
                            thread.sleep(1000 * 2);
                            channel.basicack(deliverytag: ea.deliverytag, multiple: false);//手动消息确认
                            console.writeline($"{datetime.now.tostring()} received {message}");
                        });
                    };
                    channel.basicconsume(queue: "priority", noack: false, consumer: consumer);//需要启用消息响应,否则priority无效。
                    console.readkey();
                }
            }
        }
    }

    2.3、运行结果

C#队列学习笔记:RabbitMQ优先级队列

C#队列学习笔记:RabbitMQ优先级队列

    从消费情况可以看出,message_2及message_3由于priority优先级最高都是7,所以它们会被最早消费,而message_5的priority是0,所以最后才被消费。