欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RabbitMQ与.net core(五) topic类型 与 headers类型 的Exchange

程序员文章站 2022-05-11 11:36:01
1.topic类型的Exchange 我们之前说过Topic类型的Exchange是direct类型的模糊查询模式,可以通过routkey来实现模糊消费message,topic的模糊匹配有两种模式: 1. 使用*来匹配一个单词 2.使用#来匹配0个或多个单词 我们来看代码 消费端 生产者代码 我们 ......

1.topic类型的exchange

我们之前说过topic类型的exchange是direct类型的模糊查询模式,可以通过routkey来实现模糊消费message,topic的模糊匹配有两种模式:

1. 使用*来匹配一个单词

2.使用#来匹配0个或多个单词

我们来看代码

消费端

using rabbitmq.client;
using rabbitmq.client.events;
using system;
using system.collections.generic;
using system.text;
using system.threading;

namespace rabbitmqclient
{
    class program
    {
        private static readonly connectionfactory rabbitmqfactory = new connectionfactory()
        {
            hostname = "39.**.**.**",
            port = 5672,
            username = "root",
            password = "root",
            virtualhost = "/"
        };
        static void main(string[] args)
        {
            var exchangeall = "changeall";
            var queueman = "queueman";
            var quemankey = "man.#";

            using (iconnection conn = rabbitmqfactory.createconnection())
            using (imodel channel = conn.createmodel())
            {
                channel.exchangedeclare(exchangeall, type: "topic", durable: true, autodelete: false);
                channel.queuedeclare(queueman, durable: true, exclusive: false, autodelete: false);
                channel.queuebind(queueman, exchangeall, quemankey);

                channel.basicqos(prefetchsize: 0, prefetchcount: 50, global: false);
                eventingbasicconsumer consumer = new eventingbasicconsumer(channel);
                consumer.received += (model, ea) =>
                {
                    byte[] body = ea.body;
                    string message = encoding.utf8.getstring(body);
                    console.writeline( message);
                    channel.basicack(deliverytag: ea.deliverytag, multiple: false);
                };

                channel.basicconsume(queue: queueman, autoack: false, consumer: consumer);
                console.readline();
            }
        }
    }
}

生产者代码

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchangeall = "changeall";
            //性别.姓氏.头发长度
            var keymana = "man.chen.long";
            var keymanb = "man.liu.long";
            var keymanc = "woman.liu.long";
            var keymand = "woman.chen.short";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchangeall, type: "topic", durable: true, autodelete: false);

                    var properties = channel.createbasicproperties();
                    properties.persistent = true;
                    //发布消息
                    channel.basicpublish(exchange: exchangeall,
                    routingkey: keymana,
                    basicproperties: properties,
                    body: encoding.utf8.getbytes(keymana));
                    channel.basicpublish(exchange: exchangeall,
                     routingkey: keymanb,
                     basicproperties: properties,
                     body: encoding.utf8.getbytes(keymanb));
                    channel.basicpublish(exchange: exchangeall,
                     routingkey: keymanc,
                     basicproperties: properties,
                     body: encoding.utf8.getbytes(keymanc));
                    channel.basicpublish(exchange: exchangeall,
                     routingkey: keymand,
                     basicproperties: properties,
                     body: encoding.utf8.getbytes(keymand));
                }
            }
        }
    }
}

我们先运行消费端再运行生产段,结果如下

消费端:

RabbitMQ与.net core(五) topic类型 与 headers类型 的Exchange

2.headers类型的exchange

生成者代码

using rabbitmq.client;
using system;
using system.collections.generic;
using system.text;
using system.threading;
using system.threading.tasks;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchangeall = "changeheader";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchangeall, type: "headers", durable: true, autodelete: false);

                    var properties = channel.createbasicproperties();
                    properties.persistent = true;
                    properties.headers = new dictionary<string, object> {
                        { "sex","man"}
                    };
                    //发布消息
                    channel.basicpublish(exchange: exchangeall,
                    routingkey: "",
                    basicproperties: properties,
                    body: encoding.utf8.getbytes("hihihi"));
                }
            }
        }
    }
}

消费端代码

using rabbitmq.client;
using rabbitmq.client.events;
using system;
using system.collections.generic;
using system.text;
using system.threading;

namespace rabbitmqclient
{
    class program
    {
        private static readonly connectionfactory rabbitmqfactory = new connectionfactory()
        {
            hostname = "39.**.**.**",
            port = 5672,
            username = "root",
            password = "root",
            virtualhost = "/"
        };
        static void main(string[] args)
        {
            var exchangeall = "changeheader";
            var queueman = "queueheader";

            using (iconnection conn = rabbitmqfactory.createconnection())
            using (imodel channel = conn.createmodel())
            {
                channel.exchangedeclare(exchangeall, type: "headers", durable: true, autodelete: false);
                channel.queuedeclare(queueman, durable: true, exclusive: false, autodelete: false);
                channel.queuebind(queueman, exchangeall, "",new dictionary<string, object> { { "sex","man" } });

                channel.basicqos(prefetchsize: 0, prefetchcount: 50, global: false);
                eventingbasicconsumer consumer = new eventingbasicconsumer(channel);
                consumer.received += (model, ea) =>
                {
                    byte[] body = ea.body;
                    string message = encoding.utf8.getstring(body);
                    console.writeline( message);
                    channel.basicack(deliverytag: ea.deliverytag, multiple: false);
                };

                channel.basicconsume(queue: queueman, autoack: false, consumer: consumer);
                console.readline();
            }
        }
    }
}