欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

ASP.NET Core 2.0利用MassTransit集成RabbitMQ

程序员文章站 2022-05-03 14:00:14
在ASP.NET Core上利用MassTransit来集成使用RabbitMQ真的很简单,代码也很简洁。近期因为项目需要,我便在这基础上再次进行了封装,抽成了公共方法,使得使用RabbitMQ的调用变得更方便简洁。那么,就让咱们来瞧瞧其魅力所在吧。基于MassTransit库使得我们使用Rabbi... ......

在asp.net core上利用masstransit来集成使用rabbitmq真的很简单,代码也很简洁。近期因为项目需要,我便在这基础上再次进行了封装,抽成了公共方法,使得使用rabbitmq的调用变得更方便简洁。那么,就让咱们来瞧瞧其魅力所在吧。

 

masstransit

先看看masstransit是个什么宝贝(masstransit官网的简介):

masstransit是一个免费的开源轻量级消息总线,用于使用.net框架创建分布式应用程序。masstransit在现有的*消息传输上提供了一系列广泛的功能,从而以开发人员友好的方式使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠且可扩展的方式。

通俗描述:

masstransit就是一套基于消息服务的高级封装类库,下游可联接rabbitmq、redis、mongodb等服务。

github官网:https://github.com/masstransit/masstransit

 

rabbitmq

rabbitmq是成熟的mq队列服务,是由 erlang 语言开发的 amqp 的开源实现。关于介绍rabbitmq的中文资料也很多,有需要可以自行查找。我这里贴出其官网与下载安装的链接,如下:

官网:

下载与安装:

 

实现代码

通过上面的介绍,咱们已对masstransit与rabbitmq有了初步了解,那么现在来看看如何在asp.net core上优雅的使用rabbitmq吧。

1、创建一个名为“rabbitmqhelp.cs”公共类,用于封装操作rabbitmq的公共方法,并通过nuget来管理并引用“masstransit”与“masstransit.rabbitmq”类库。

2、“rabbitmqhelp.cs”公共类主要对外封装两个静态方法,其代码如下:

  1 using masstransit;
  2 using masstransit.rabbitmqtransport;
  3 using system;
  4 using system.collections.generic;
  5 using system.text;
  6 using system.threading.tasks;
  7 
  8 namespace lezhima.comm
  9 {
 10     /// <summary>
 11     /// rabbitmq公共操作类,基于masstransit库
 12     /// </summary>
 13     public class rabbitmqhelp
 14     {
 15         #region 交换器
 16 
 17         /// <summary>
 18         /// 操作日志交换器
 19         /// 同时需在rabbitmq的管理后台创建同名交换器
 20         /// </summary>
 21         public static readonly string actionlogexchange = "lezhima.actionlogexchange";
 22 
 23 
 24         #endregion
 25 
 26 
 27         #region 声明变量
 28 
 29         /// <summary>
 30         /// mq联接地址,建议放到配置文件
 31         /// </summary>
 32         private static readonly string mqurl = "rabbitmq://192.168.1.181/";
 33 
 34         /// <summary>
 35         /// mq联接账号,建议放到配置文件
 36         /// </summary>
 37         private static readonly string mquser = "admin";
 38 
 39         /// <summary>
 40         /// mq联接密码,建议放到配置文件
 41         /// </summary>
 42         private static readonly string mqpwd = "admin";
 43 
 44         #endregion
 45 
 46         /// <summary>
 47         /// 创建连接对象
 48         /// 不对外公开
 49         /// </summary>
 50         private static ibuscontrol createbus(action<irabbitmqbusfactoryconfigurator, irabbitmqhost> registrationaction = null)
 51         {
 52             //通过masstransit创建mq联接工厂
 53             return bus.factory.createusingrabbitmq(cfg =>
 54             {
 55                 var host = cfg.host(new uri(mqurl), hst =>
 56                 {
 57                     hst.username(mquser);
 58                     hst.password(mqpwd);
 59                 });
 60                 registrationaction?.invoke(cfg, host);
 61             });
 62         }
 63 
 64 
 65         /// <summary>
 66         /// mq生产者
 67         /// 这里使用fanout的交换类型
 68         /// </summary>
 69         /// <param name="obj"></param>
 70         public async static task pushmessage(string exchange, object obj)
 71         {
 72             var bus = createbus();
 73             var sendtouri = new uri($"{mqurl}{exchange}");
 74             var endpoint = await bus.getsendendpoint(sendtouri);
 75             await endpoint.send(obj);
 76         }
 77 
 78         /// <summary>
 79         /// mq消费者
 80         /// 这里使用fanout的交换类型
 81         /// consumer必需是实现iconsumer接口的类实例
 82         /// </summary>
 83         /// <param name="obj"></param>
 84         public static void receivemessage(string exchange, object consumer)
 85         {
 86             var bus = createbus((cfg, host) =>
 87             {
 88                 //从指定的消息队列获取消息 通过consumer来实现消息接收
 89                 cfg.receiveendpoint(host, exchange, e =>
 90                 {
 91                     e.instance(consumer);
 92                 });
 93             });
 94             bus.start();
 95         }
 96     }
 97 }
 98 

 

3、“rabbitmqhelp.cs”公共类已经有了mq“生产者”与“消费者”两个对外的静态公共方法,其中“生产者”方法可以在业务代码中直接调用,可传递json、对象等类型的参数向指定的交换器发送数据。而“消费者”方法是从指定交换器中进行接收绑定,但接收到的数据处理功能则交给了“consumer”类(因为在实际项目中,不同的数据有不同的业务处理逻辑,所以这里我们直接就通过iconsumer接口交给具体的实现类去做了)。那么,下面我们再来看看消费者里传递进来的“consumer”类的代码吧:

  1 using masstransit;
  2 using system;
  3 using system.collections.generic;
  4 using system.text;
  5 using system.threading.tasks;
  6 
  7 namespace lezhima.storage.consumer
  8 {
  9     /// <summary>
 10     /// 从mq接收并处理数据
 11     /// 实现masstransit的iconsumer接口
 12     /// </summary>
 13     public class logconsumer : iconsumer<actionlog>
 14     {
 15         /// <summary>
 16         /// 实现consume方法
 17         /// 接收并处理数据
 18         /// </summary>
 19         /// <param name="context"></param>
 20         /// <returns></returns>
 21         public task consume(consumecontext<actionlog> context)
 22         {
 23             return task.run(async () =>
 24             {
 25                 //获取接收到的对象
 26                 var amsg = context.message;
 27                 console.writeline($"recevied by consumer:{amsg}");
 28                 console.writeline($"recevied by consumer:{amsg.actionlogid}");
 29             });
 30         }
 31     }
 32 }
 33 

 

调用代码

1、生产者调用代码如下:

  1 /// <summary>
  2 /// 测试mq生产者
  3 /// </summary>
  4 /// <returns></returns>
  5 [httpget]
  6 public async task<mobiresult> addmessagetest()
  7 {
  8     //声明一个实体对象
  9     var model = new actionlog();
 10     model.actionlogid = guid.newguid();
 11     model.createtime = datetime.now;
 12     model.updatetime = datetime.now;
 13     //调用mq
 14     await rabbitmqhelp.pushmessage(rabbitmqhelp.actionlogexchange, model);
 15 
 16     return new mobiresult(1000, "操作成功");
 17 }

 

2、消费者调用代码如下:

  1 using lezhima.storage.consumer;
  2 using microsoft.extensions.configuration;
  3 using system;
  4 using system.io;
  5 
  6 namespace lezhima.storage
  7 {
  8     class program
  9     {
 10         static void main(string[] args)
 11         {
 12             var conf = new configurationbuilder()
 13               .setbasepath(directory.getcurrentdirectory())
 14               .addjsonfile("appsettings.json", true, true)
 15               .build();
 16 
 17             //调用接收者
 18             rabbitmqhelp.receivemessage(rabbitmqhelp.actionlogexchange,
 19              new logconsumer()
 20             );
 21 
 22             console.readline();
 23         }
 24     }
 25 }
 26 

 

总结

1、基于masstransit库使得我们使用rabbitmq变得更简洁、方便。而基于再次封装后,生产者与消费者将不需要关注具体的业务,也跟业务代码解耦了,更能适应项目的需要。

2、rabbitmq的交换器需在其管理后台自行创建,而这里使用的fanout类型是因为其发送速度最快,且能满足我的项目需要,各位可视自身情况选用不同的类型。fanout类型不会存储消息,必需要消费者绑定交换器后才会发送给消费者。