欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RabbitMQ与.net core(四) 消息的优先级 与 死信队列

程序员文章站 2022-10-18 14:20:03
1.消息的优先级 假如现在有个需求,我们需要让一些优先级最高的通知推送到客户端,我们可以使用redis的sortedset,也可以使用我们今天要说的rabbit的消息优先级属性 Producer代码 consumer代码 运行producer 在运行consumer 可以看出消息是按优先级消费的 2 ......

1.消息的优先级

假如现在有个需求,我们需要让一些优先级最高的通知推送到客户端,我们可以使用redis的sortedset,也可以使用我们今天要说的rabbit的消息优先级属性

producer代码

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchange = "change4";
            var route = "route2";
            var queue9 = "queue9";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false);
            //x-max-priority属性必须设置,否则消息优先级不生效
                    channel.queuedeclare(queue9, durable: true, exclusive: false, autodelete: false,arguments: new dictionary<string, object> { { "x-max-priority", 50 } });
                    channel.queuebind(queue9, exchange, queue9);
                    while(true)
                    {
                        var messagestr = console.readline();
                        var messagepri = console.readline();
                        var props = channel.createbasicproperties();
                        props.persistent = true;
                        props.priority = (byte)int.parse(messagepri);//设置消息优先级
                        channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes(messagestr));
                    }
                }
            }
        }
    }
}

consumer代码

using rabbitmq.client;
using rabbitmq.client.events;
using system;
using system.collections.generic;
using system.text;
using system.threading;

namespace rabbitmqclient
{
    class program
    {
        private static readonly connectionfactory rabbitmqfactory = new connectionfactory()
        {
            hostname = "39.**.**.**",
            port = 5672,
            username = "root",
            password = "root",
            virtualhost = "/"
        };
        static void main(string[] args)
        {
            var exchange = "change4";
            var route = "route2";
            var queue9 = "queue9";


            using (iconnection conn = rabbitmqfactory.createconnection())
            using (imodel channel = conn.createmodel())
            {
                channel.exchangedeclare(exchange, "fanout", durable: true, autodelete: false);
                channel.queuedeclare(queue9, durable: true, exclusive: false, autodelete: false, arguments: new dictionary<string, object> { { "x-max-priority", 50 } });
                channel.queuebind(queue9, exchange, route);

                channel.basicqos(prefetchsize: 0, prefetchcount: 50, global: false);
                eventingbasicconsumer consumer = new eventingbasicconsumer(channel);
                consumer.received += (model, ea) =>
                {
                    byte[] body = ea.body;
                    string message = encoding.utf8.getstring(body);
                    console.writeline( message);
                    channel.basicack(deliverytag: ea.deliverytag, multiple: false);
                };

                channel.basicconsume(queue: queue9, autoack: false, consumer: consumer);
                console.readline();
            }
        }
    }
}

运行producer

RabbitMQ与.net core(四) 消息的优先级 与 死信队列

在运行consumer

RabbitMQ与.net core(四) 消息的优先级 与 死信队列

可以看出消息是按优先级消费的

2.死信队列

死信队列可以用来做容错机制,当我们的消息处理异常时我们可以把消息放入到死信队列中,以便后期处理,死信的产生有三种

1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue=false);

2.当前队列中的消息数量已经超过最大长度。

3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的ttl(time to live)时间;

看代码

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchangea = "changea";
            var routea = "routea";
            var queuea = "queuea";

            var exchanged = "changed";
            var routed = "routed";
            var queued = "queued";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchanged, type: "fanout", durable: true, autodelete: false);
                    channel.queuedeclare(queued, durable: true, exclusive: false, autodelete: false);
                    channel.queuebind(queued, exchanged, routed);

                    channel.exchangedeclare(exchangea, type: "fanout", durable: true, autodelete: false);
                    channel.queuedeclare(queuea, durable: true, exclusive: false, autodelete: false, arguments: new dictionary<string, object> {
                                         { "x-dead-letter-exchange",exchanged}, //设置当前队列的dlx
                                         { "x-dead-letter-routing-key",routed}, //设置dlx的路由key,dlx会根据该值去找到死信消息存放的队列
                                         { "x-message-ttl",10000} //设置消息的存活时间,即过期时间
                                         });
                    channel.queuebind(queuea, exchangea, routea);


                    var properties = channel.createbasicproperties();
                    properties.persistent = true;
                    //发布消息
                    channel.basicpublish(exchange: exchangea,
                                         routingkey: routea,
                                         basicproperties: properties,
                                         body: encoding.utf8.getbytes("message"));
                }
            }
        }
    }
}

这样10秒后消息过期,我们可以看到queued中有了消息


RabbitMQ与.net core(四) 消息的优先级 与 死信队列