欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间

程序员文章站 2022-10-18 12:41:36
上一篇我们讲了关于direct类型的Exchange,这一片我们来了解一下fanout类型的Exchange。 1.Exchange的fanout类型 fanout类型的Exchange的特点是会把消息发送给与之绑定的所有Queue中,我们来测试一下。代码如下 运行代码,去可视化工具中查看一下 消费 ......

上一篇我们讲了关于direct类型的exchange,这一片我们来了解一下fanout类型的exchange。

1.exchange的fanout类型

fanout类型的exchange的特点是会把消息发送给与之绑定的所有queue中,我们来测试一下。代码如下

using rabbitmq.client;
using system;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchange = "change3";
            var route = "route2";
            var queue3 = "queue3";
            var queue4 = "queue4";
            var queue5 = "queue5";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false);
                    channel.queuedeclare(queue3, durable: true, exclusive: false, autodelete: false);
                    channel.queuebind(queue3, exchange, queue3);

                    channel.queuedeclare(queue4, durable: true, exclusive: false, autodelete: false);
                    channel.queuebind(queue4, exchange, queue4);

                    channel.queuedeclare(queue5, durable: true, exclusive: false, autodelete: false);
                    channel.queuebind(queue5, exchange, queue5);

                  
                    var props = channel.createbasicproperties();
                    props.persistent = true;
                    channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit"));

                }
            }
        }
    }
}

运行代码,去可视化工具中查看一下

RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间

消费其中的一个

using rabbitmq.client;
using rabbitmq.client.events;
using system;
using system.text;
using system.threading;

namespace rabbitmqclient
{
    class program
    {
        private static readonly connectionfactory rabbitmqfactory = new connectionfactory()
        {
            hostname = "39.**.**.**",
            port = 5672,
            username = "root",
            password = "root",
            virtualhost = "/"
        };
        static void main(string[] args)
        {
            var exchange = "change3";
            var route = "route2";
            var queue = "queue3";


            using (iconnection conn = rabbitmqfactory.createconnection())
            using (imodel channel = conn.createmodel())
            {
                channel.exchangedeclare(exchange, "fanout", durable: true, autodelete: false);
                channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false);
                channel.queuebind(queue, exchange, route);

                channel.basicqos(prefetchsize: 0, prefetchcount: 50, global: false);
                eventingbasicconsumer consumer = new eventingbasicconsumer(channel);
                consumer.received += (model, ea) =>
                {
                    byte[] body = ea.body;
                    string message = encoding.utf8.getstring(body);
                    console.writeline( message+thread.currentthread.managedthreadid);
                    channel.basicack(deliverytag: ea.deliverytag, multiple: false);
                };

                channel.basicconsume(queue: queue, autoack: false, consumer: consumer);
                console.readline();
            }
        }
    }
}

结果如下

RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间

大家可以依次消费其他两个queue,这里就不演示了

2.消息的过期时间

我们在发送一些消息的时候,有时希望给消息设置一下过期时间,我们可以通过两种方式来设置

2.1设置队列的过期时间

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchange = "change4";
            var route = "route2";
            var queue7 = "queue7";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false);
            //队列过期时间,单位毫秒
                    channel.queuedeclare(queue7, durable: true, exclusive: false, autodelete: false,arguments:new dictionary<string, object> { { "x-message-ttl", 8000 } });
                    channel.queuebind(queue7, exchange, queue7);

                    var props = channel.createbasicproperties();
                    props.persistent = true;
                    channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit"));

                }
            }
        }
    }
}

这样过8秒去queue就看不到该消息了

2.2设置message的过期时间

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchange = "change4";
            var route = "route2";
            var queue7 = "queue7";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false);
                    channel.queuedeclare(queue7, durable: true, exclusive: false, autodelete: false,arguments:new dictionary<string, object> { { "x-message-ttl", 8000 } });
                    channel.queuebind(queue7, exchange, queue7);

                    var props = channel.createbasicproperties();
            //message过期时间,单位毫秒
                    props.expiration = "30000";
                    props.persistent = true;
                    channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit"));

                }
            }
        }
    }
}

我们发现还是8秒就过期了,说明如果同时设置了队列与消息的过期时间,则按照队列的时间过期。我们把队列的过期时间去掉重新试一下。

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchange = "change4";
            var route = "route2";
            var queue7 = "queue7";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false);
                    channel.queuedeclare(queue7, durable: true, exclusive: false, autodelete: false);
                    channel.queuebind(queue7, exchange, queue7);

                    var props = channel.createbasicproperties();
                    props.expiration = "30000";
                    props.persistent = true;
                    channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit"));

                }
            }
        }
    }
}

3.队列生存时间

我们还可以设置一个队列的生存时间

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchange = "change4";
            var route = "route2";
            var queue8 = "queue8";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type: "fanout", durable: true, autodelete: false);
                    channel.queuedeclare(queue8, durable: true, exclusive: false, autodelete: false,arguments: new dictionary<string, object> {
                        { "x-expires",10000} //设置当前队列的过期时间为10000毫秒
                    });
                    channel.queuebind(queue8, exchange, queue8);

                    var props = channel.createbasicproperties();
                    props.persistent = true;
                    channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit"));

                }
            }
        }
    }
}

这样10秒后队列就消失了