DotNet Core中使用RabbitMQ
上一篇随笔记录到rabbitmq的安装,安装完成,我们就开始使用吧。
rabbitmq简介
amqp,即advanced message queuing protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
amqp的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
rabbitmq是一个开源的amqp实现,服务器端用erlang语言编写,支持多种客户端,如:python、ruby、.net、java、jms、c、php、actionscript、xmpp、stomp等,支持ajax。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
rabbitmq提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、cqrs等应用场景。
dotnet core使用rabbitmq
通过nuget安装:https://www.nuget.org/packages/rabbitmq.client/
定义生产者:
//创建连接工厂 connectionfactory factory = new connectionfactory { username = "guest",//用户名 password = "guest",//密码 hostname = "127.0.0.1"//rabbitmq ip }; //创建连接 var connection = factory.createconnection(); //创建通道 var channel = connection.createmodel(); //声明一个队列 channel.queuedeclare("hello", false, false, false, null); console.writeline("\nrabbitmq连接成功,请输入消息,输入exit退出!"); string input; do { input = console.readline(); var sendbytes = encoding.utf8.getbytes(input); //发布消息 channel.basicpublish("", "hello", null, sendbytes); } while (input.trim().tolower() != "exit"); channel.close(); connection.close();
定义消费者:
//创建连接工厂 connectionfactory factory = new connectionfactory { username = "guest",//用户名 password = "guest",//密码 hostname = "127.0.0.1"//rabbitmq ip }; //创建连接 var connection = factory.createconnection(); //创建通道 var channel = connection.createmodel(); //事件基本消费者 eventingbasicconsumer consumer = new eventingbasicconsumer(channel); //接收到消息事件 consumer.received += (ch, ea) => { var message = encoding.utf8.getstring(ea.body); console.writeline($"收到消息: {message}"); //确认该消息已被消费 channel.basicack(ea.deliverytag, false); }; //启动消费者 设置为手动应答消息 channel.basicconsume("hello", false, consumer); console.writeline("消费者已启动"); console.readkey(); channel.dispose(); connection.close();
演示如下:
启动了一个生产者,两个消费者,可以看见两个消费者都能接收到消息,消息投递到哪个消费者是由rabbitmq决定的。
rabbitmq消费失败的处理
rabbitmq采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后rabbitmq才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么rabbitmq会将这个消息重新投递。
我们来修改一下消费者的代码:
//接收到消息事件 consumer.received += (ch, ea) => { var message = encoding.utf8.getstring(ea.body); console.writeline($"收到消息: {message}"); console.writeline($"收到该消息[{ea.deliverytag}] 延迟10s发送回执"); thread.sleep(10000); //确认该消息已被消费 channel.basicack(ea.deliverytag, false); console.writeline($"已发送回执[{ea.deliverytag}]"); };
演示如下:
从图中可以看出,设置了消息应答延迟10s,如果在这10s中,该消费者断开了连接,那么消息会被rabbitmq重新投递。
使用rabbitmq的exchange
前面的例子,我们可以看到生产者将消息投递到queue中,实际上这种方式在rabbitmq中永远都不会发生的。实际的情况是,生产者将消息发送到exchange(交换器),下图中的x,由exchange(交换器)将消息路由到一个或多个queue中(或者丢弃)。
amqp协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由exchange来接收,然后exchange按照特定的策略转发到queue进行存储。同理,消费者也是如此。exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。
exchange types(交换器类型)
rabbitmq常用的exchange type有fanout、direct、topic、headers这四种
1、fanout:
这种类型的exchange路由规则非常简单,它会把所有发送到该exchange的消息路由到所有与它绑定的queue中,这时routing key不起作用
fanout exchange 不需要处理routekey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。
所以,fanout exchange 转发消息是最快的。
为了演示效果,定义了两个队列,分别为hello1,hello2,每个队列都拥有一个消费者。
static void main(string[] args) { string exchangename = "testfanoutchange"; string queuename1 = "hello1"; string queuename2 = "hello2"; string routekey = ""; //创建连接工厂 connectionfactory factory = new connectionfactory { username = "guest",//用户名 password = "guest",//密码 hostname = "127.0.0.1"//rabbitmq ip }; //创建连接 var connection = factory.createconnection(); //创建通道 var channel = connection.createmodel(); //定义一个direct类型交换机 channel.exchangedeclare(exchangename, exchangetype.fanout, false, false, null); //定义队列1 channel.queuedeclare(queuename1, false, false, false, null); //定义队列2 channel.queuedeclare(queuename2, false, false, false, null); //将队列绑定到交换机 channel.queuebind(queuename1, exchangename, routekey, null); channel.queuebind(queuename2, exchangename, routekey, null); //生成两个队列的消费者 consumergenerator(queuename1); consumergenerator(queuename2); console.writeline($"\nrabbitmq连接成功,\n\n请输入消息,输入exit退出!"); string input; do { input = console.readline(); var sendbytes = encoding.utf8.getbytes(input); //发布消息 channel.basicpublish(exchangename, routekey, null, sendbytes); } while (input.trim().tolower() != "exit"); channel.close(); connection.close(); }
/// <summary> /// 根据队列名称生成消费者 /// </summary> /// <param name="queuename"></param> static void consumergenerator(string queuename) { //创建连接工厂 connectionfactory factory = new connectionfactory { username = "guest",//用户名 password = "guest",//密码 hostname = "127.0.0.1"//rabbitmq ip }; //创建连接 var connection = factory.createconnection(); //创建通道 var channel = connection.createmodel(); //事件基本消费者 eventingbasicconsumer consumer = new eventingbasicconsumer(channel); //接收到消息事件 consumer.received += (ch, ea) => { var message = encoding.utf8.getstring(ea.body); console.writeline($"queue:{queuename}收到消息: {message}"); //确认该消息已被消费 channel.basicack(ea.deliverytag, false); }; //启动消费者 设置为手动应答消息 channel.basicconsume(queuename, false, consumer); console.writeline($"queue:{queuename},消费者已启动"); }
运行效果如下:
2、direct
这种类型的exchange路由规则也很简单,它会把消息路由到哪些binding key与routingkey完全匹配的queue中。
direct模式,可以使用rabbitmq自带的exchange:default exchange 。所以不需要将exchange进行任何绑定(binding)操作 。消息传递时,routekey必须完全匹配,才会被队列接收,否则该消息会被抛弃。
static void main(string[] args) { string exchangename = "testchange"; string queuename = "hello"; string routekey = "helloroutekey"; //创建连接工厂 connectionfactory factory = new connectionfactory { username = "guest",//用户名 password = "guest",//密码 hostname = "127.0.0.1"//rabbitmq ip }; //创建连接 var connection = factory.createconnection(); //创建通道 var channel = connection.createmodel(); //定义一个direct类型交换机 channel.exchangedeclare(exchangename, exchangetype.direct, false, false, null); //定义一个队列 channel.queuedeclare(queuename, false, false, false, null); //将队列绑定到交换机 channel.queuebind(queuename, exchangename, routekey, null); console.writeline($"\nrabbitmq连接成功,exchange:{exchangename},queue:{queuename},route:{routekey},\n\n请输入消息,输入exit退出!"); string input; do { input = console.readline(); var sendbytes = encoding.utf8.getbytes(input); //发布消息 channel.basicpublish(exchangename, routekey, null, sendbytes); } while (input.trim().tolower() != "exit"); channel.close(); connection.close();
运行效果如下:
3、topic
这种类型的exchange的路由规则支持 binding key 和 routing key 的模糊匹配,会把消息路由到满足条件的queue。 binding key 中可以存在两种特殊字符 *与 #,用于做模糊匹配,其中 * 用于匹配一个单词,# 用于匹配0个或多个单词,单词以符号“.”为分隔符。
以上图中的配置为例,routingkey=”quick.orange.rabbit”的消息会同时路由到q1与q2,routingkey=”lazy.orange.fox”的消息会路由到q1与q2,routingkey=”lazy.brown.fox”的消息会路由到q2,routingkey=”lazy.pink.rabbit”的消息会路由到q2(只会投递给q2一次,虽然这个routingkey与q2的两个bindingkey都匹配);routingkey=”quick.brown.fox”、routingkey=”orange”、routingkey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingkey。
static void main(string[] args) { string exchangename = "testtopicchange"; string queuename = "hello"; string routekey = "testroutekey.*"; //创建连接工厂 connectionfactory factory = new connectionfactory { username = "guest",//用户名 password = "guest",//密码 hostname = "127.0.0.1"//rabbitmq ip }; //创建连接 var connection = factory.createconnection(); //创建通道 var channel = connection.createmodel(); //定义一个direct类型交换机 channel.exchangedeclare(exchangename, exchangetype.topic, false, false, null); //定义队列1 channel.queuedeclare(queuename, false, false, false, null); //将队列绑定到交换机 channel.queuebind(queuename, exchangename, routekey, null); console.writeline($"\nrabbitmq连接成功,\n\n请输入消息,输入exit退出!"); string input; do { input = console.readline(); var sendbytes = encoding.utf8.getbytes(input); //发布消息 channel.basicpublish(exchangename, "testroutekey.one", null, sendbytes); } while (input.trim().tolower() != "exit"); channel.close(); connection.close(); }
运行效果如下:
4、headers
这种类型的exchange不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。
推荐阅读
-
在.NET Core 3.0中的WPF中使用IOC图文教程
-
03 .NET CORE 2.2 使用OCELOT -- Docker中的Consul
-
.net core中的Authorization过滤器使用
-
.net core中serilog的基本使用
-
详解iOS应用开发中Core Data数据存储的使用
-
在.net Core 中像以前那样的使用HttpContext.Current
-
使用dotnet-dump 查找 .net core 3.0 占用CPU 100%的原因
-
使用dotnet-dump 查找 .net core 3.0 占用CPU 100%的原因解析
-
.NET Core中如何实现或使用对象池?
-
Dotnet Core中使用AutoMapper