欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RabbitMQ与.net core(二)Producer与Exchange

程序员文章站 2022-06-24 13:57:10
Producer:消息的生产者,也就是创建消息的对象 Exchange:消息的接受者,也就是用来接收消息的对象,Exchange接收到消息后将消息按照规则发送到与他绑定的Queue中。下面我们来定义一个Producer与Exchange。 1.新建.netcore console项目,并引入Rabb ......

producer:消息的生产者,也就是创建消息的对象

exchange:消息的接受者,也就是用来接收消息的对象,exchange接收到消息后将消息按照规则发送到与他绑定的queue中。下面我们来定义一个producer与exchange。

1.新建.netcore console项目,并引入rabbitmq.client的nuget包

RabbitMQ与.net core(二)Producer与Exchange

2.创建exchange

using rabbitmq.client;

namespace rabbitmqconsole
{
    class program
    {
        static void main(string[] args)
        {
            connectionfactory factory = new connectionfactory();
            factory.hostname = "39.**.**.**";
            factory.port = 5672;
            factory.virtualhost = "/";
            factory.username = "root";
            factory.password = "root";

            var exchange = "change2";
            var route = "route2";
            var queue = "queue2";

            using (var connection = factory.createconnection())
            {
                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type:"direct", durable: true, autodelete: false);   //创建exchange
                    
                }
            }
        }
    }
}

可以看到echange的参数有:

type:可选项为,fanout,direct,topic,headers。区别如下:

    fanout:发送到所有与当前exchange绑定的queue中

    direct:发送到与消息的routekey相同的rueue中

    topic:fanout的模糊版本

    headers:发送到与消息的header属性相同的queue中

durable:持久化

autodelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。

 运行程序,可以在可视化界面看到change2

RabbitMQ与.net core(二)Producer与Exchange

接下来我们可以创建与change2绑定的queue

3.创建queue

                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type: "direct", durable: true, autodelete: false);
                    channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false);  #创建queue2
                    channel.queuebind(queue, exchange, route);  #将queue2绑定到exchange2
                }

可以看到echange的参数有:

durable:持久化

exclusive:如果为true,则queue只在channel存在时存在,channel关闭则queue消失

autodelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。

去可视化界面看queue

RabbitMQ与.net core(二)Producer与Exchange

4.发送消息

                using (var channel = connection.createmodel())
                {
                    channel.exchangedeclare(exchange, type: "direct", durable: true, autodelete: false);
                    channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false);
                    channel.queuebind(queue, exchange, route);
                    var props = channel.createbasicproperties();
                    props.persistent = true; #持久化
                    channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit"));
                }

RabbitMQ与.net core(二)Producer与Exchange

5.消费消息

using rabbitmq.client;
using system;
using system.text;

namespace rabbitmqclient
{
    class program
    {
        private static readonly connectionfactory rabbitmqfactory = new connectionfactory()
        {
            hostname = "39.**.**.**",
            port = 5672,
            username = "root",
            password = "root",
            virtualhost = "/"
        };
        static void main(string[] args)
        {
            var exchange = "change2";
            var route = "route2";
            var queue = "queue2";


            using (iconnection conn = rabbitmqfactory.createconnection())
            using (imodel channel = conn.createmodel())
            {
                channel.exchangedeclare(exchange, "direct", durable: true, autodelete: false);
                channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false);
                channel.queuebind(queue, exchange, route);
                while (true)
                {
                    var message = channel.basicget(queue, true);  #第二个参数说明自动释放消息,如为false需手动释放消息
                    if(message!=null)
                    {
                        var msgbody = encoding.utf8.getstring(message.body);
                        console.writeline(string.format("***接收时间:{0},消息内容:{1}", datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), msgbody));
                    }
                    system.threading.thread.sleep(timespan.fromseconds(1));
                }
            }
        }
    }
}

运行查看结果

RabbitMQ与.net core(二)Producer与Exchange

查看可视化界面

RabbitMQ与.net core(二)Producer与Exchange

6.手动释放消息

                while (true)
                {
                    var message = channel.basicget(queue, false);#设置为手动释放
                    if(message!=null)
                    {
                        var msgbody = encoding.utf8.getstring(message.body);
                        console.writeline(string.format("***接收时间:{0},消息内容:{1}", datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), msgbody));
                    }
                    channel.basicack(message.deliverytag, false); #手动释放
                    system.threading.thread.sleep(timespan.fromseconds(1));
                }

我们再发一条消息,然后开始消费,加个断点调试一下

RabbitMQ与.net core(二)Producer与Exchange

查看一下queue中消息状态

RabbitMQ与.net core(二)Producer与Exchange

然后直接取消调试,不让程序走到释放的那一步,再查看一下消息状态

RabbitMQ与.net core(二)Producer与Exchange

这么说来只要不走到 channel.basicack(message.deliverytag, false);这一行,消息就不会被释放掉,我们让程序直接走到这一行代码,查看一下消息的状态

RabbitMQ与.net core(二)Producer与Exchange

如图已经被释放了

7.让失败的消息回到队列中

                while (true)
                {
                    var message = channel.basicget(queue, false);
                    if(message!=null)
                    {
                        var msgbody = encoding.utf8.getstring(message.body);
                        console.writeline(string.format("***接收时间:{0},消息内容:{1}", datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), msgbody));
                        console.writeline(message.deliverytag);    #当前消息被处理的次序数
                        if (1==1)
                            channel.basicreject(message.deliverytag, true);
                    }
                    
                    system.threading.thread.sleep(timespan.fromseconds(1));
                }

重新发送4条消息

RabbitMQ与.net core(二)Producer与Exchange

开始消费

RabbitMQ与.net core(二)Producer与Exchange

我们可以看到消息一直没有没消费,因为消息被处理之后又放到了队尾

8.监听消息

 using (iconnection conn = rabbitmqfactory.createconnection())
            using (imodel channel = conn.createmodel())
            {
                channel.exchangedeclare(exchange, "direct", durable: true, autodelete: false);
                channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false);
                channel.queuebind(queue, exchange, route);

                channel.basicqos(prefetchsize: 0, prefetchcount: 10, global: false);  #一次接受10条消息,否则rabbit会把所有的消息一次性推到client,会增大client的负荷
                eventingbasicconsumer consumer = new eventingbasicconsumer(channel);
                consumer.received += (model, ea) =>
                {
                    byte[] body = ea.body;
                    string message = encoding.utf8.getstring(body);
                    console.writeline( message+thread.currentthread.managedthreadid);
                    channel.basicack(deliverytag: ea.deliverytag, multiple: false);
                };

                channel.basicconsume(queue: queue, autoack: false, consumer: consumer);
                console.readline();
            }