运用.net core中实例讲解RabbitMQ
一、rabbitmq简介
是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,rabbitmq是使用erlang(高并发语言)语言来编写的,并且rabbitmq是基于amqp协议的。
(1) amqp协议
advanced message queuing protocol(高级消息队列协议)
(2)amqp专业术语
(多路复用->在同一个线程中开启多个通道进行操作)
- server:又称broker,接受客户端的链接,实现amqp实体服务
- connection:连接,应用程序与broker的网络连接
- channel:网络信道,几乎所有的操作都在channel中进行,channel是进行消息读写的通道。客户端可以建立多个channel,每个channel代表一个会话任务。
- message:消息,服务器与应用程序之间传送的数据,由properties和body组成.properties可以对消息进行修饰,必须消息的优先级、延迟等高级特性;body则是消息体内容。
- virtualhost: 虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个virtual host里面可以有若干个exchange和queue,同一个virtual host 里面不能有相同名称的exchange 或 queue。
- exchange:交换机,接收消息,根据路由键转单消息到绑定队列
- binding: exchange和queue之间的虚拟链接,binding中可以包换routing key
- routing key: 一个路由规则,虚拟机可用它来确定如何路由一个特定消息。(如负载均衡)
(3)rabbitmq整体架构
clienta(生产者)发送消息到exchange1(交换机),同时带上routekey(路由key),exchange1找到绑定交换机为它和绑定传入的routekey的队列,把消息转发到对应的队列,消费者client1,client2,client3只需要指定对应的队列名即可以消费队列数据。
交换机和队列多对多关系,实际开发中一般是一个交换机对多个队列,防止设计复杂化。
二、安装rabbitmq
安装方式不影响下面的使用,这里用docker安装
#15672端口为web管理端的端口,5672为rabbitmq服务的端口
docker run -d --name rabbitmq -e rabbitmq_default_user=admin -e rabbitmq_default_pass=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3-management
输入:ip:5672访问验证。
建一个名为develop的virtual host(虚拟主机)使用,项目中一般是一个项目建一个virtual host用,能够隔离队列。
切换virtual host
三、rabbitmq六种队列模式在.netcore中使用
(1)简单队列
最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列。也称为点对点模式
描述:一个生产者 p 发送消息到队列 q,一个消费者 c 接收
建一个rabbitmqhelper.cs类
/// <summary> /// rabbitmq帮助类 /// </summary> public class rabbitmqhelper { private static connectionfactory factory; private static object lockobj = new object(); /// <summary> /// 获取单个rabbitmq连接 /// </summary> /// <returns></returns> public static iconnection getconnection() { if (factory == null) { lock (lockobj) { if (factory == null) { factory = new connectionfactory { hostname = "172.16.2.84",//ip port = 5672,//端口 username = "admin",//账号 password = "123456",//密码 virtualhost = "develop" //虚拟主机 }; } } } return factory.createconnection(); } }
生产者代码:
新建发送类send.cs
public static void simplesendmsg() { string queuename = "simple_order";//队列名 //创建连接 using (var connection = rabbitmqhelper.getconnection()) { //创建信道 using (var channel = connection.createmodel()) {//创建队列 channel.queuedeclare(queuename, durable: false, exclusive: false, autodelete: false, arguments: null); for (var i = 0; i < 10; i++) { string message = $"hello rabbitmq messagehello,{i + 1}"; var body = encoding.utf8.getbytes(message);//发送消息 channel.basicpublish(exchange: "", routingkey: queuename, mandatory: false, basicproperties: null, body); console.writeline($"发送消息到队列:{queuename},内容:{message}"); } } } }
创建队列参数解析:
durable:是否持久化。
exclusive:排他队列,只有创建它的连接(connection)能连,创建它的连接关闭,会自动删除队列。
autodelete:被消费后,消费者数量都断开时自动删除队列。
arguments:创建队列的参数。
发送消息参数解析:
exchange:交换机,为什么能传空呢,因为rabbitmq内置有一个默认的交换机,如果传空时,就会用默认交换机。
routingkey:路由名称,这里用队列名称做路由key。
mandatory:true告诉服务器至少将消息route到一个队列种,否则就将消息return给发送者;false:没有找到路由则消息丢弃。
执行效果:
队列产生10条消息。
消费者代码:
新建recevie.cs类
public static void simpleconsumer() { string queuename = "simple_order"; var connection = rabbitmqhelper.getconnection(); { //创建信道 var channel = connection.createmodel(); { //创建队列 channel.queuedeclare(queuename, durable: false, exclusive: false, autodelete: false, arguments: null); var consumer = new eventingbasicconsumer(channel); int i = 0; consumer.received += (model, ea) => { //消费者业务处理 var message = encoding.utf8.getstring(ea.body.toarray()); console.writeline($"{i},队列{queuename}消费消息长度:{message.length}"); i++; }; channel.basicconsume(queuename, true, consumer); } } }
消费者只需要知道队列名就可以消费了,不需要exchange和routingkey。
注:消费者这里有一个创建队列,它本身不需要,是预防消费端程序先执行,没有队列会报错。
执行效果:
消息已经被消费完。
(2)工作队列模式
一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式
生产者p发送消息到队列,多个消费者c消费队列的数据。
工作队列也称为公平性队列模式,循环分发,rabbitmq将按顺序将每条消息发送给下一个消费者,每个消费者将获得相同数量的消息。
生产者:
send.cs代码:
/// <summary> /// 工作队列模式 /// </summary> public static void workersendmsg() { string queuename = "worker_order";//队列名 //创建连接 using (var connection = rabbitmqhelper.getconnection()) { //创建信道 using (var channel = connection.createmodel()) { //创建队列 channel.queuedeclare(queuename, durable: true, exclusive: false, autodelete: false, arguments: null); var properties = channel.createbasicproperties(); properties.persistent = true; //消息持久化 for ( var i=0;i<10;i++) { string message = $"hello rabbitmq messagehello,{i+1}"; var body = encoding.utf8.getbytes(message); //发送消息到rabbitmq channel.basicpublish(exchange: "", routingkey: queuename, mandatory: false, basicproperties: properties, body); console.writeline($"发送消息到队列:{queuename},内容:{message}"); } } } }
参数durable:true,需要持久化,实际项目中肯定需要持久化的,不然重启rabbitmq数据就会丢失了。
执行效果:
写入10条数据,有持久化标识d
消费端:
recevie代码:
public static void workerconsumer() { string queuename = "worker_order"; var connection = rabbitmqhelper.getconnection(); { //创建信道 var channel = connection.createmodel(); { //创建队列 channel.queuedeclare(queuename, durable: false, exclusive: false, autodelete: false, arguments: null); var consumer = new eventingbasicconsumer(channel); //prefetchcount:1来告知rabbitmq,不要同时给一个消费者推送多于 n 个消息,也确保了消费速度和性能 channel.basicqos(prefetchsize: 0, prefetchcount:1, global: false); int i = 1; int index = new random().next(10); consumer.received += (model, ea) => { //处理业务 var message = encoding.utf8.getstring(ea.body.toarray()); console.writeline($"{i},消费者:{index},队列{queuename}消费消息长度:{message.length}"); channel.basicack(ea.deliverytag, false); //消息ack确认,告诉mq这条队列处理完,可以从mq删除了 thread.sleep(1000); i++; }; channel.basicconsume(queuename,autoack:false, consumer); } } }
basicqos参数解析:
prefetchsize:每条消息大小,一般设为0,表示不限制。
prefetchcount:1,作用限流,告诉rabbitmq不要同时给一个消费者推送多于n个消息,消费者会把n条消息缓存到本地一条条消费,如果不设,rabbitmq会进可能快的把消息推到客户端,导致客户端内存升高。设置合理可以不用频繁从rabbitmq 获取能提升消费速度和性能,设的太多的话则会增大本地内存,需要根据机器性能合理设置,官方建议设为30。
global:是否为全局设置。
这些限流设置针对消费者autoack:false时才有效,如果是自动ack的,限流不生效。
执行两个消费者,效果:
可以看到消费者号的标识,8,2,8,2是平均的,一个消费者5个,rabbitmq上也能看到有2个消费者,unacked数是2,因为每个客户端的限流数是1。
工作队列模式也是很常用的队列模式。
(3)发布订阅模式
pulish/subscribe,无选择接收消息,一个消息生产者,一个交换机(交换机类型为fanout),多个消息队列,多个消费者。称为发布/订阅模式
在应用中,只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。
生产者p只需把消息发送到交换机x,绑定这个交换机的队列都会获得一份一样的数据。
应用场景:适合于用同一份数据源做不同的业务。
生产者代码:
/// <summary> /// 发布订阅, 扇形队列 /// </summary> public static void sendmessagefanout() { //创建连接 using (var connection = rabbitmqhelper.getconnection()) { //创建信道 using (var channel = connection.createmodel()) { string exchangename = "fanout_exchange"; //创建交换机,fanout类型 channel.exchangedeclare(exchangename, exchangetype.fanout); string queuename1 = "fanout_queue1"; string queuename2 = "fanout_queue2"; string queuename3 = "fanout_queue3"; //创建队列 channel.queuedeclare(queuename1, false, false, false); channel.queuedeclare(queuename2, false, false, false); channel.queuedeclare(queuename3, false, false, false); //把创建的队列绑定交换机,routingkey不用给值,给了也没意义的 channel.queuebind(queue: queuename1, exchange: exchangename, routingkey: ""); channel.queuebind(queue: queuename2, exchange: exchangename, routingkey: ""); channel.queuebind(queue: queuename3, exchange: exchangename, routingkey: ""); var properties = channel.createbasicproperties(); properties.persistent = true; //消息持久化 //向交换机写10条消息 for (int i = 0; i < 10; i++) { string message = $"rabbitmq fanout {i + 1} message"; var body = encoding.utf8.getbytes(message); channel.basicpublish(exchangename, routingkey: "", null, body); console.writeline($"发送fanout消息:{message}"); } } } }
执行代码:
向交换机发送10条消息,则绑定这个交换机的3个队列都会有10条消息。
消费端的代码和工作队列的一样,只需知道队列名即可消费,声明时要和生产者的声明一样。
(4)路由模式(推荐使用)
在发布/订阅模式的基础上,有选择的接收消息,也就是通过 routing 路由进行匹配条件是否满足接收消息。
上图是一个结合日志消费级别的配图,在路由模式它会把消息路由到那些 binding key 与 routing key 完全匹配的 queue 中,此模式也就是 exchange 模式中的direct
模式。
生产者p发送数据是要指定交换机(x)和routing发送消息 ,指定的routingkey=error,则队列q1和队列q2都会有一份数据,如果指定routingkey=into,或=warning,交换机(x)只会把消息发到q2队列。
生产者代码:
/// <summary> /// 路由模式,点到点直连队列 /// </summary> public static void sendmessagedirect() { //创建连接 using (var connection = rabbitmqhelper.getconnection()) { //创建信道 using (var channel = connection.createmodel()) { //声明交换机对象,fanout类型 string exchangename = "direct_exchange"; channel.exchangedeclare(exchangename, exchangetype.direct); //创建队列 string queuename1 = "direct_errorlog"; string queuename2 = "direct_alllog"; channel.queuedeclare(queuename1, true, false, false); channel.queuedeclare(queuename2, true, false, false); //把创建的队列绑定交换机,direct_errorlog队列只绑定routingkey:error channel.queuebind(queue: queuename1, exchange: exchangename, routingkey: "error"); //direct_alllog队列绑定routingkey:error,info channel.queuebind(queue: queuename2, exchange: exchangename, routingkey: "info"); channel.queuebind(queue: queuename2, exchange: exchangename, routingkey: "error"); var properties = channel.createbasicproperties(); properties.persistent = true; //消息持久化 //向交换机写10条错误日志和10条info日志 for (int i = 0; i < 10; i++) { string message = $"rabbitmq direct {i + 1} error message"; var body = encoding.utf8.getbytes(message); channel.basicpublish(exchangename, routingkey: "error", properties, body); console.writeline($"发送direct消息error:{message}"); string message2 = $"rabbitmq direct {i + 1} info message"; var body2 = encoding.utf8.getbytes(message); channel.basicpublish(exchangename, routingkey: "info", properties, body2); console.writeline($"info:{message2}"); } } } }
这里创建一个direct类型的交换机,两个路由key,一个error,一个info,两个队列,一个队列只绑定error,一个队列绑定error和info,向error和info各发10条消息。
执行代码:
查看rabbitmq管理界面,direct_errorlog队列10条,而direct_alllog有20条,因为direct_alllog队列两个routingkey的消息都进去了。
点进去看下两个队列绑定的交换机和routingkey
消费者代码:
消费者和工作队列一样,只需根据队列名消费即可,这里只消费direct_errorlog队列作示例
public static void directconsumer() { string queuename = "direct_errorlog"; var connection = rabbitmqhelper.getconnection(); { //创建信道 var channel = connection.createmodel(); { //创建队列 channel.queuedeclare(queuename, durable: true, exclusive: false, autodelete: false, arguments: null); var consumer = new eventingbasicconsumer(channel); ///prefetchcount:1来告知rabbitmq,不要同时给一个消费者推送多于 n 个消息,也确保了消费速度和性能 ///global:是否设为全局的 ///prefetchsize:单条消息大小,通常设0,表示不做限制 //是autoack=false才会有效 channel.basicqos(prefetchsize: 0, prefetchcount: 1, global: true); int i = 1; consumer.received += (model, ea) => { //处理业务 var message = encoding.utf8.getstring(ea.body.toarray()); console.writeline($"{i},队列{queuename}消费消息长度:{message.length}"); channel.basicack(ea.deliverytag, false); //消息ack确认,告诉mq这条队列处理完,可以从mq删除了 i++; }; channel.basicconsume(queuename, autoack: false, consumer); } } }
普通场景中推荐使用路由模式,因为路由模式有交换机,有路由key,能够更好的拓展各种应用场景。
(5)主题模式
topics(主题)模式跟routing路由模式类似,只不过路由模式是指定固定的路由键 routingkey,而主题模式是可以模糊匹配路由键 routingkey,类似于sql中 = 和 like 的关系。
p 表示为生产者、 x 表示交换机、c1c2 表示为消费者,红色表示队列。
topics 模式与 routing 模式比较相近,topics 模式不能具有任意的 routingkey,必须由一个英文句点号“.”分隔的字符串(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词),比如 "lazy.orange.a"。topics routingkey 中可以存在两种特殊字符"*"与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。
以上图为例:
如果发送消息的routingkey设置为:
aaa.orange.rabbit,那么消息会路由到q1与q2,
routingkey=aaa.orange.bb的消息会路由到q1,
routingkey=lazy.aa.bb.cc的消息会路由到q2;
routingkey=lazy.aa.rabbit的消息会路由到 q2(只会投递给q2一次,虽然这个routingkey 与 q2 的两个 bindingkey 都匹配);
没匹配routingkey的消息将会被丢弃。
生产者代码:
public static void sendmessagetopic() { //创建连接 using (var connection = rabbitmqhelper.getconnection()) { //创建信道 using (var channel = connection.createmodel()) { //声明交换机对象,fanout类型 string exchangename = "topic_exchange"; channel.exchangedeclare(exchangename, exchangetype.topic); //队列名 string queuename1 = "topic_queue1"; string queuename2 = "topic_queue2"; //路由名 string routingkey1 = "*.orange.*"; string routingkey2 = "*.*.rabbit"; string routingkey3 = "lazy.#"; channel.queuedeclare(queuename1, true, false, false); channel.queuedeclare(queuename2, true, false, false); //把创建的队列绑定交换机,routingkey指定routingkey channel.queuebind(queue: queuename1, exchange: exchangename, routingkey: routingkey1); channel.queuebind(queue: queuename2, exchange: exchangename, routingkey: routingkey2); channel.queuebind(queue: queuename2, exchange: exchangename, routingkey: routingkey3); //向交换机写10条消息 for (int i = 0; i < 10; i++) { string message = $"rabbitmq direct {i + 1} message"; var body = encoding.utf8.getbytes(message); channel.basicpublish(exchangename, routingkey: "aaa.orange.rabbit", null, body); channel.basicpublish(exchangename, routingkey: "lazy.aa.rabbit", null, body); console.writeline($"发送topic消息:{message}"); } } } }
这里演示了 routingkey为aaa.orange.rabbit,和lazy.aa.rabbit的情况,第一个匹配到q1和q2,第二个匹配到q2,所以应该q1是10条,q2有20条,
执行后看rabbitmq界面:
(6)rpc模式
与上面其他5种所不同之处,该模式是拥有请求/回复的。也就是有响应的,上面5种都没有。
rpc是指远程过程调用,也就是说两台服务器a,b,一个应用部署在a服务器上,想要调用b服务器上应用提供的处理业务,处理完后然后在a服务器继续执行下去,把异步的消息以同步的方式执行。
客户端(c)声明一个排他队列自己订阅,然后发送消息到rpc队列同时也把这个排他队列名也在消息里传进去,服务端监听rpc队列,处理完业务后把处理结果发送到这个排他队列,然后客户端收到结果,继续处理自己的逻辑。
rpc的处理流程:
- 当客户端启动时,创建一个匿名的回调队列。
- 客户端为rpc请求设置2个属性:replyto:设置回调队列名字;correlationid:标记request。
- 请求被发送到rpc_queue队列中。
- rpc服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyto设定的回调队列。
- 客户端监听回调队列,当有消息时,检查correlationid属性,如果与request中匹配,那就是结果了。
服务端代码:
public class rpcserver { public static void rpchandle() { var connection = rabbitmqhelper.getconnection(); { var channel = connection.createmodel(); { string queuename = "rpc_queue"; channel.queuedeclare(queue: queuename, durable: false, exclusive: false, autodelete: false, arguments: null); channel.basicqos(0, 1, false); var consumer = new eventingbasicconsumer(channel); channel.basicconsume(queue: queuename, autoack: false, consumer: consumer); console.writeline("【服务端】等待rpc请求..."); consumer.received += (model, ea) => { string response = null; var body = ea.body.toarray(); var props = ea.basicproperties; var replyprops = channel.createbasicproperties(); replyprops.correlationid = props.correlationid; try { var message = encoding.utf8.getstring(body); console.writeline($"【服务端】接收到数据:{ message},开始处理"); response = $"消息:{message},处理完成"; } catch (exception e) { console.writeline("错误:" + e.message); response = ""; } finally { var responsebytes = encoding.utf8.getbytes(response); channel.basicpublish(exchange: "", routingkey: props.replyto, basicproperties: replyprops, body: responsebytes); channel.basicack(deliverytag: ea.deliverytag, multiple: false); } }; } } } }
客户端:
public class rpcclient { private readonly iconnection connection; private readonly imodel channel; private readonly string replyqueuename; private readonly eventingbasicconsumer consumer; private readonly blockingcollection<string> respqueue = new blockingcollection<string>(); private readonly ibasicproperties props; public rpcclient() { connection = rabbitmqhelper.getconnection(); channel = connection.createmodel(); replyqueuename = channel.queuedeclare().queuename; consumer = new eventingbasicconsumer(channel); props = channel.createbasicproperties(); var correlationid = guid.newguid().tostring(); props.correlationid = correlationid; //给消息id props.replyto = replyqueuename;//回调的队列名,client关闭后会自动删除 consumer.received += (model, ea) => { var body = ea.body.toarray(); var response = encoding.utf8.getstring(body); //监听的消息id和定义的消息id相同代表这条消息服务端处理完成 if (ea.basicproperties.correlationid == correlationid) { respqueue.add(response); } }; channel.basicconsume( consumer: consumer, queue: replyqueuename, autoack: true); } public string call(string message) { var messagebytes = encoding.utf8.getbytes(message); //发送消息 channel.basicpublish( exchange: "", routingkey: "rpc_queue", basicproperties: props, body: messagebytes); //等待回复 return respqueue.take(); } public void close() { connection.close(); } }
执行代码:
static void main(string[] args) { console.writeline("hello world!"); //启动服务端,正常逻辑是在另一个程序 rpcserver.rpchandle(); //实例化客户端 var rpcclient = new rpcclient(); string message = $"消息id:{new random().next(1, 1000)}"; console.writeline($"【客服端】rpc请求中,{message}"); //向服务端发送消息,等待回复 var response = rpcclient.call(message); console.writeline("【客服端】收到回复响应:{0}", response); rpcclient.close(); console.readkey(); }
测试效果:
z执行完,客服端close后,可以接着自己的下一步业务处理。
总结
以上便是rabbitmq的6中模式在.net core中实际使用,其中(1)简单队列,(2)工作队列,(4)路由模式,(6)rpc模式的交换机类型都是direct,(3)发布订阅的交换机是fanout,(5)topics的交换机是topic。正常场景用的是direct,默认交换机也是direct类型的,推荐用(4)路由模式,因为指定交换机名比起默认的交换机会容易扩展场景,其他的交换机看业务场景所需使用。
下面位置可以看到交换机类型,amq.开头那几个是内置的,避免交换机过多可以直接使用。
到此这篇关于运用.net core中实例讲解rabbitmq的文章就介绍到这了,更多相关.net core rabbitmq内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
推荐阅读
-
深入讲解.Net Core中的Api版本控制
-
.NET Core中本地化机制的深入讲解
-
运用.net core中实例讲解RabbitMQ高可用集群构建
-
运用.net core中实例讲解RabbitMQ
-
运用.NetCore实例讲解RabbitMQ死信队列,延时队列
-
如何在ASP.NET Core中给上传图片功能添加水印实例代码
-
asp.net core 2.0 webapi集成signalr(实例讲解)
-
.net core2.0下使用Identity改用dapper存储数据(实例讲解)
-
ASP.NET Core SignalR中的流式传输深入讲解
-
在ASP.NET Core中实现一个Token base的身份认证实例