ASP.NET Core 2.0利用MassTransit集成RabbitMQ
在asp.net core上利用masstransit来集成使用rabbitmq真的很简单,代码也很简洁。近期因为项目需要,我便在这基础上再次进行了封装,抽成了公共方法,使得使用rabbitmq的调用变得更方便简洁。那么,就让咱们来瞧瞧其魅力所在吧。
masstransit
先看看masstransit是个什么宝贝(masstransit官网的简介):
masstransit是一个免费的开源轻量级消息总线,用于使用.net框架创建分布式应用程序。masstransit在现有的*消息传输上提供了一系列广泛的功能,从而以开发人员友好的方式使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠且可扩展的方式。
通俗描述:
masstransit就是一套基于消息服务的高级封装类库,下游可联接rabbitmq、redis、mongodb等服务。
github官网:https://github.com/masstransit/masstransit
rabbitmq
rabbitmq是成熟的mq队列服务,是由 erlang 语言开发的 amqp 的开源实现。关于介绍rabbitmq的中文资料也很多,有需要可以自行查找。我这里贴出其官网与下载安装的链接,如下:
官网:
下载与安装:
实现代码
通过上面的介绍,咱们已对masstransit与rabbitmq有了初步了解,那么现在来看看如何在asp.net core上优雅的使用rabbitmq吧。
1、创建一个名为“rabbitmqhelp.cs”公共类,用于封装操作rabbitmq的公共方法,并通过nuget来管理并引用“masstransit”与“masstransit.rabbitmq”类库。
2、“rabbitmqhelp.cs”公共类主要对外封装两个静态方法,其代码如下:
1 using masstransit; 2 using masstransit.rabbitmqtransport; 3 using system; 4 using system.collections.generic; 5 using system.text; 6 using system.threading.tasks; 7 8 namespace lezhima.comm 9 { 10 /// <summary> 11 /// rabbitmq公共操作类,基于masstransit库 12 /// </summary> 13 public class rabbitmqhelp 14 { 15 #region 交换器 16 17 /// <summary> 18 /// 操作日志交换器 19 /// 同时需在rabbitmq的管理后台创建同名交换器 20 /// </summary> 21 public static readonly string actionlogexchange = "lezhima.actionlogexchange"; 22 23 24 #endregion 25 26 27 #region 声明变量 28 29 /// <summary> 30 /// mq联接地址,建议放到配置文件 31 /// </summary> 32 private static readonly string mqurl = "rabbitmq://192.168.1.181/"; 33 34 /// <summary> 35 /// mq联接账号,建议放到配置文件 36 /// </summary> 37 private static readonly string mquser = "admin"; 38 39 /// <summary> 40 /// mq联接密码,建议放到配置文件 41 /// </summary> 42 private static readonly string mqpwd = "admin"; 43 44 #endregion 45 46 /// <summary> 47 /// 创建连接对象 48 /// 不对外公开 49 /// </summary> 50 private static ibuscontrol createbus(action<irabbitmqbusfactoryconfigurator, irabbitmqhost> registrationaction = null) 51 { 52 //通过masstransit创建mq联接工厂 53 return bus.factory.createusingrabbitmq(cfg => 54 { 55 var host = cfg.host(new uri(mqurl), hst => 56 { 57 hst.username(mquser); 58 hst.password(mqpwd); 59 }); 60 registrationaction?.invoke(cfg, host); 61 }); 62 } 63 64 65 /// <summary> 66 /// mq生产者 67 /// 这里使用fanout的交换类型 68 /// </summary> 69 /// <param name="obj"></param> 70 public async static task pushmessage(string exchange, object obj) 71 { 72 var bus = createbus(); 73 var sendtouri = new uri($"{mqurl}{exchange}"); 74 var endpoint = await bus.getsendendpoint(sendtouri); 75 await endpoint.send(obj); 76 } 77 78 /// <summary> 79 /// mq消费者 80 /// 这里使用fanout的交换类型 81 /// consumer必需是实现iconsumer接口的类实例 82 /// </summary> 83 /// <param name="obj"></param> 84 public static void receivemessage(string exchange, object consumer) 85 { 86 var bus = createbus((cfg, host) => 87 { 88 //从指定的消息队列获取消息 通过consumer来实现消息接收 89 cfg.receiveendpoint(host, exchange, e => 90 { 91 e.instance(consumer); 92 }); 93 }); 94 bus.start(); 95 } 96 } 97 } 98
3、“rabbitmqhelp.cs”公共类已经有了mq“生产者”与“消费者”两个对外的静态公共方法,其中“生产者”方法可以在业务代码中直接调用,可传递json、对象等类型的参数向指定的交换器发送数据。而“消费者”方法是从指定交换器中进行接收绑定,但接收到的数据处理功能则交给了“consumer”类(因为在实际项目中,不同的数据有不同的业务处理逻辑,所以这里我们直接就通过iconsumer接口交给具体的实现类去做了)。那么,下面我们再来看看消费者里传递进来的“consumer”类的代码吧:
1 using masstransit; 2 using system; 3 using system.collections.generic; 4 using system.text; 5 using system.threading.tasks; 6 7 namespace lezhima.storage.consumer 8 { 9 /// <summary> 10 /// 从mq接收并处理数据 11 /// 实现masstransit的iconsumer接口 12 /// </summary> 13 public class logconsumer : iconsumer<actionlog> 14 { 15 /// <summary> 16 /// 实现consume方法 17 /// 接收并处理数据 18 /// </summary> 19 /// <param name="context"></param> 20 /// <returns></returns> 21 public task consume(consumecontext<actionlog> context) 22 { 23 return task.run(async () => 24 { 25 //获取接收到的对象 26 var amsg = context.message; 27 console.writeline($"recevied by consumer:{amsg}"); 28 console.writeline($"recevied by consumer:{amsg.actionlogid}"); 29 }); 30 } 31 } 32 } 33
调用代码
1、生产者调用代码如下:
1 /// <summary> 2 /// 测试mq生产者 3 /// </summary> 4 /// <returns></returns> 5 [httpget] 6 public async task<mobiresult> addmessagetest() 7 { 8 //声明一个实体对象 9 var model = new actionlog(); 10 model.actionlogid = guid.newguid(); 11 model.createtime = datetime.now; 12 model.updatetime = datetime.now; 13 //调用mq 14 await rabbitmqhelp.pushmessage(rabbitmqhelp.actionlogexchange, model); 15 16 return new mobiresult(1000, "操作成功"); 17 }
2、消费者调用代码如下:
1 using lezhima.storage.consumer; 2 using microsoft.extensions.configuration; 3 using system; 4 using system.io; 5 6 namespace lezhima.storage 7 { 8 class program 9 { 10 static void main(string[] args) 11 { 12 var conf = new configurationbuilder() 13 .setbasepath(directory.getcurrentdirectory()) 14 .addjsonfile("appsettings.json", true, true) 15 .build(); 16 17 //调用接收者 18 rabbitmqhelp.receivemessage(rabbitmqhelp.actionlogexchange, 19 new logconsumer() 20 ); 21 22 console.readline(); 23 } 24 } 25 } 26
总结
1、基于masstransit库使得我们使用rabbitmq变得更简洁、方便。而基于再次封装后,生产者与消费者将不需要关注具体的业务,也跟业务代码解耦了,更能适应项目的需要。
2、rabbitmq的交换器需在其管理后台自行创建,而这里使用的fanout类型是因为其发送速度最快,且能满足我的项目需要,各位可视自身情况选用不同的类型。fanout类型不会存储消息,必需要消费者绑定交换器后才会发送给消费者。
推荐阅读
-
ASP.NET Core 2.0集成Office Online Server(OWAS)实现办公文档的在线预览与编辑(支持word\excel\ppt\pdf等格式)
-
ASP.NET Core 2.0利用Jwt实现授权认证
-
ASP.NET Core 2.0利用MassTransit集成RabbitMQ
-
asp.net core 2.0 webapi集成signalr(实例讲解)
-
ASP.NET Core 2.0集成Office Online Server(OWAS)实现办公文档的在线预览与编辑(支持word\excel\ppt\pdf等格式)
-
ASP.NET Core 2.0利用Jwt实现授权认证
-
ASP.NET Core 2.0利用MassTransit集成RabbitMQ
-
Asp.Net Core利用xUnit进行主机级别的网络集成测试详解
-
Asp.Net Core 轻松学-利用xUnit进行主机级别的网络集成测试