RabbitMQ与.net core(二)Producer与Exchange
producer:消息的生产者,也就是创建消息的对象
exchange:消息的接受者,也就是用来接收消息的对象,exchange接收到消息后将消息按照规则发送到与他绑定的queue中。下面我们来定义一个producer与exchange。
1.新建.netcore console项目,并引入rabbitmq.client的nuget包
2.创建exchange
using rabbitmq.client; namespace rabbitmqconsole { class program { static void main(string[] args) { connectionfactory factory = new connectionfactory(); factory.hostname = "39.**.**.**"; factory.port = 5672; factory.virtualhost = "/"; factory.username = "root"; factory.password = "root"; var exchange = "change2"; var route = "route2"; var queue = "queue2"; using (var connection = factory.createconnection()) { using (var channel = connection.createmodel()) { channel.exchangedeclare(exchange, type:"direct", durable: true, autodelete: false); //创建exchange } } } } }
可以看到echange的参数有:
type:可选项为,fanout,direct,topic,headers。区别如下:
fanout:发送到所有与当前exchange绑定的queue中
direct:发送到与消息的routekey相同的rueue中
topic:fanout的模糊版本
headers:发送到与消息的header属性相同的queue中
durable:持久化
autodelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。
运行程序,可以在可视化界面看到change2
接下来我们可以创建与change2绑定的queue
3.创建queue
using (var channel = connection.createmodel()) { channel.exchangedeclare(exchange, type: "direct", durable: true, autodelete: false); channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false); #创建queue2 channel.queuebind(queue, exchange, route); #将queue2绑定到exchange2 }
可以看到echange的参数有:
durable:持久化
exclusive:如果为true,则queue只在channel存在时存在,channel关闭则queue消失
autodelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。
去可视化界面看queue
4.发送消息
using (var channel = connection.createmodel()) { channel.exchangedeclare(exchange, type: "direct", durable: true, autodelete: false); channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false); channel.queuebind(queue, exchange, route); var props = channel.createbasicproperties(); props.persistent = true; #持久化 channel.basicpublish(exchange, route, true, props, encoding.utf8.getbytes("hello rabbit")); }
5.消费消息
using rabbitmq.client; using system; using system.text; namespace rabbitmqclient { class program { private static readonly connectionfactory rabbitmqfactory = new connectionfactory() { hostname = "39.**.**.**", port = 5672, username = "root", password = "root", virtualhost = "/" }; static void main(string[] args) { var exchange = "change2"; var route = "route2"; var queue = "queue2"; using (iconnection conn = rabbitmqfactory.createconnection()) using (imodel channel = conn.createmodel()) { channel.exchangedeclare(exchange, "direct", durable: true, autodelete: false); channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false); channel.queuebind(queue, exchange, route); while (true) { var message = channel.basicget(queue, true); #第二个参数说明自动释放消息,如为false需手动释放消息 if(message!=null) { var msgbody = encoding.utf8.getstring(message.body); console.writeline(string.format("***接收时间:{0},消息内容:{1}", datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), msgbody)); } system.threading.thread.sleep(timespan.fromseconds(1)); } } } } }
运行查看结果
查看可视化界面
6.手动释放消息
while (true) { var message = channel.basicget(queue, false);#设置为手动释放 if(message!=null) { var msgbody = encoding.utf8.getstring(message.body); console.writeline(string.format("***接收时间:{0},消息内容:{1}", datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), msgbody)); } channel.basicack(message.deliverytag, false); #手动释放 system.threading.thread.sleep(timespan.fromseconds(1)); }
我们再发一条消息,然后开始消费,加个断点调试一下
查看一下queue中消息状态
然后直接取消调试,不让程序走到释放的那一步,再查看一下消息状态
这么说来只要不走到 channel.basicack(message.deliverytag, false);这一行,消息就不会被释放掉,我们让程序直接走到这一行代码,查看一下消息的状态
如图已经被释放了
7.让失败的消息回到队列中
while (true) { var message = channel.basicget(queue, false); if(message!=null) { var msgbody = encoding.utf8.getstring(message.body); console.writeline(string.format("***接收时间:{0},消息内容:{1}", datetime.now.tostring("yyyy-mm-dd hh:mm:ss"), msgbody)); console.writeline(message.deliverytag); #当前消息被处理的次序数 if (1==1) channel.basicreject(message.deliverytag, true); } system.threading.thread.sleep(timespan.fromseconds(1)); }
重新发送4条消息
开始消费
我们可以看到消息一直没有没消费,因为消息被处理之后又放到了队尾
8.监听消息
using (iconnection conn = rabbitmqfactory.createconnection()) using (imodel channel = conn.createmodel()) { channel.exchangedeclare(exchange, "direct", durable: true, autodelete: false); channel.queuedeclare(queue, durable: true, exclusive: false, autodelete: false); channel.queuebind(queue, exchange, route); channel.basicqos(prefetchsize: 0, prefetchcount: 10, global: false); #一次接受10条消息,否则rabbit会把所有的消息一次性推到client,会增大client的负荷 eventingbasicconsumer consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { byte[] body = ea.body; string message = encoding.utf8.getstring(body); console.writeline( message+thread.currentthread.managedthreadid); channel.basicack(deliverytag: ea.deliverytag, multiple: false); }; channel.basicconsume(queue: queue, autoack: false, consumer: consumer); console.readline(); }
推荐阅读
-
Docker的使用初探(二):Docker与.NET Core的结合
-
RabbitMQ与.net core(四) 消息的优先级 与 死信队列
-
RabbitMQ与.net core(五) topic类型 与 headers类型 的Exchange
-
RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间
-
abp(net core)+easyui+efcore实现仓储管理系统——ABP WebAPI与EasyUI结合增删改查之二(二十八)
-
Kafka与.net core(二)zookeeper
-
RabbitMQ与.net core(一)安装
-
RabbitMQ与.net core(二)Producer与Exchange
-
RabbitMQ与.net core(五) topic类型 与 headers类型 的Exchange
-
RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间