C#队列学习笔记:RabbitMQ优先级队列
程序员文章站
2022-03-25 22:53:14
一、引言 在具体业务中可能会遇到一些要提前处理的消息,比如普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理。在RabbitMQ中,消息优先级的实现方式是:在声明queue时设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级即可。 RabbitMQ优 ......
一、引言
在具体业务中可能会遇到一些要提前处理的消息,比如普通客户的消息按先进先出的顺序处理,vip客户的消息要提前处理。在rabbitmq中,消息优先级的实现方式是:在声明queue时设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级即可。
rabbitmq优先级队列注意事项:
1)rabbitmq3.5以后才支持优先级队列。
2)只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效。
3)优先级取值范围在0~9之间,数值越大则优先级越高。
二、示例
2.1、发送端(生产端)
新建一个控制台项目send,并添加一个类rabbitmqconfig。
class rabbitmqconfig { public static string host { get; set; } public static string virtualhost { get; set; } public static string username { get; set; } public static string password { get; set; } public static int port { get; set; } static rabbitmqconfig() { host = "192.168.2.242"; virtualhost = "/"; username = "hello"; password = "world"; port = 5672; } }
class program { static void main(string[] args) { console.writeline("按任意键开始生产。"); console.readline(); prioritymessagepublish(); console.readline(); } private static void prioritymessagepublish() { const string messageprefix = "message_"; const int publishmessagecount = 6; byte messagepriority = 0; var factory = new connectionfactory() { hostname = rabbitmqconfig.host, port = rabbitmqconfig.port, virtualhost = rabbitmqconfig.virtualhost, username = rabbitmqconfig.username, password = rabbitmqconfig.password, protocol = protocols.defaultprotocol }; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { //设置队列优先级,取值范围在0~255之间。 dictionary<string, object> dict = new dictionary<string, object> { { "x-max-priority", 255 } }; //声明队列 channel.queuedeclare(queue: "priority", durable: true, exclusive: false, autodelete: false, arguments: dict); //向该消息队列发送消息message random random = new random(); for (int i = 0; i < publishmessagecount; i++) { var properties = channel.createbasicproperties(); messagepriority = (byte)random.next(0, 9); properties.priority = messagepriority;//设置消息优先级,取值范围在0~9之间。 var message = messageprefix + i.tostring(); var body = encoding.utf8.getbytes(message); channel.basicpublish(exchange: "", routingkey: "priority", basicproperties: properties, body: body); console.writeline($"{datetime.now.tostring()} send {message} , priority {messagepriority}"); } } } } }
2.2、接收端(消费端)
新建一个控制台项目receive,按住alt键,将发送端rabbitmqconfig类拖一个快捷方式到receive项目中。
class program { static void main(string[] args) { console.writeline("按任意键开始消费。"); console.readline(); prioritymessagesubscribe(); } public static void prioritymessagesubscribe() { var factory = new connectionfactory() { hostname = rabbitmqconfig.host, username = rabbitmqconfig.username, password = rabbitmqconfig.password }; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.basicqos(prefetchsize: 0, prefetchcount: 1, global: false); var consumer = new eventingbasicconsumer(channel); consumer.received += async (model, ea) => { await task.run(() => { var message = encoding.utf8.getstring(ea.body); thread.sleep(1000 * 2); channel.basicack(deliverytag: ea.deliverytag, multiple: false);//手动消息确认 console.writeline($"{datetime.now.tostring()} received {message}"); }); }; channel.basicconsume(queue: "priority", noack: false, consumer: consumer);//需要启用消息响应,否则priority无效。 console.readkey(); } } } }
2.3、运行结果
从消费情况可以看出,message_2及message_3由于priority优先级最高都是7,所以它们会被最早消费,而message_5的priority是0,所以最后才被消费。
上一篇: 很羡慕村里一个小学同学
下一篇: python 发送get请求接口详解