RabbitMQ与.net core(四) 消息的优先级 与 死信队列
程序员文章站
2022-10-18 14:20:03
1.消息的优先级 假如现在有个需求,我们需要让一些优先级最高的通知推送到客户端,我们可以使用redis的sortedset,也可以使用我们今天要说的rabbit的消息优先级属性 Producer代码 consumer代码 运行producer 在运行consumer 可以看出消息是按优先级消费的 2 ......
1.消息的优先级
假如现在有个需求,我们需要让一些优先级最高的通知推送到客户端,我们可以使用redis的sortedset,也可以使用我们今天要说的rabbit的消息优先级属性
producer代码
using rabbitmq.client; using system; using system.collections.generic; using system.text; using system.threading; using system.threading.tasks; namespace rabbitmqconsole { class program { static void main(string[] args) { connectionfactory factory = new connectionfactory(); factory.hostname = "39.**.**.**"; factory.port = 5672; factory.virtualhost = "/"; factory.username = "root"; factory.password = "root"; var exchange = "change4"; var route = "route2"; var queue9 = "queue9"; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false); //x-max-priority属性必须设置,否则消息优先级不生效 channel.queuedeclare(queue9, durable: true, exclusive: false, autodelete: false,arguments: new dictionary<string, object> { { "x-max-priority", 50 } }); channel.queuebind(queue9, exchange, queue9); while(true) { var messagestr = console.readline(); var messagepri = console.readline(); var props = channel.createbasicproperties(); props.persistent = true; props.priority = (byte)int.parse(messagepri);//设置消息优先级 channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes(messagestr)); } } } } } }
consumer代码
using rabbitmq.client; using rabbitmq.client.events; using system; using system.collections.generic; using system.text; using system.threading; namespace rabbitmqclient { class program { private static readonly connectionfactory rabbitmqfactory = new connectionfactory() { hostname = "39.**.**.**", port = 5672, username = "root", password = "root", virtualhost = "/" }; static void main(string[] args) { var exchange = "change4"; var route = "route2"; var queue9 = "queue9"; using (iconnection conn = rabbitmqfactory.createconnection()) using (imodel channel = conn.createmodel()) { channel.exchangedeclare(exchange, "fanout", durable: true, autodelete: false); channel.queuedeclare(queue9, durable: true, exclusive: false, autodelete: false, arguments: new dictionary<string, object> { { "x-max-priority", 50 } }); channel.queuebind(queue9, exchange, route); channel.basicqos(prefetchsize: 0, prefetchcount: 50, global: false); eventingbasicconsumer consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { byte[] body = ea.body; string message = encoding.utf8.getstring(body); console.writeline( message); channel.basicack(deliverytag: ea.deliverytag, multiple: false); }; channel.basicconsume(queue: queue9, autoack: false, consumer: consumer); console.readline(); } } } }
运行producer
在运行consumer
可以看出消息是按优先级消费的
2.死信队列
死信队列可以用来做容错机制,当我们的消息处理异常时我们可以把消息放入到死信队列中,以便后期处理,死信的产生有三种
1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue=false);
2.当前队列中的消息数量已经超过最大长度。
3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的ttl(time to live)时间;
看代码
using rabbitmq.client; using system; using system.collections.generic; using system.text; using system.threading; using system.threading.tasks; namespace rabbitmqconsole { class program { static void main(string[] args) { connectionfactory factory = new connectionfactory(); factory.hostname = "39.**.**.**"; factory.port = 5672; factory.virtualhost = "/"; factory.username = "root"; factory.password = "root"; var exchangea = "changea"; var routea = "routea"; var queuea = "queuea"; var exchanged = "changed"; var routed = "routed"; var queued = "queued"; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.exchangedeclare(exchanged, type: "fanout", durable: true, autodelete: false); channel.queuedeclare(queued, durable: true, exclusive: false, autodelete: false); channel.queuebind(queued, exchanged, routed); channel.exchangedeclare(exchangea, type: "fanout", durable: true, autodelete: false); channel.queuedeclare(queuea, durable: true, exclusive: false, autodelete: false, arguments: new dictionary<string, object> { { "x-dead-letter-exchange",exchanged}, //设置当前队列的dlx { "x-dead-letter-routing-key",routed}, //设置dlx的路由key,dlx会根据该值去找到死信消息存放的队列 { "x-message-ttl",10000} //设置消息的存活时间,即过期时间 }); channel.queuebind(queuea, exchangea, routea); var properties = channel.createbasicproperties(); properties.persistent = true; //发布消息 channel.basicpublish(exchange: exchangea, routingkey: routea, basicproperties: properties, body: encoding.utf8.getbytes("message")); } } } } }
这样10秒后消息过期,我们可以看到queued中有了消息