.net平台的rabbitmq使用封装demo详解
前言
rabbitmq大家再熟悉不过,这篇文章主要针对rabbitmq学习后封装rabbitmq.client的一个分享。文章最后,我会把封装组件和demo奉上。
什么是rabbitmq
rabbitmq是一个由erlang开发的amqp(advanced message queue 高级消息队列协议 )的开源实现,能够实现异步消息处理
rabbitmq是一个消息代理:它接受和转发消息。
你可以把它想象成一个邮局:当你把你想要发布的邮件放在邮箱中时,你可以确定邮差先生最终将邮件发送给你的收件人。在这个比喻中,rabbitmq是邮政信箱,邮局和邮递员。
rabbitmq和邮局的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据块
优点:异步消息处理
业务解耦(下订单操作:扣减库存、生成订单、发红包、发短信),将下单操作主流程:扣减库存、生成订单,然后通过mq消息队列完成通知,发红包、发短信,错峰流控 (通知量 消息量 订单量大的情况实现mq消息队列机制,淡季情况下访问量会少)
灵活的路由(flexible routing)
在消息进入队列之前,通过 exchange 来路由消息的。对于典型的路由功能,rabbitmq 已经提供了一些内置的 exchange 来实现。针对更复杂的路由功能,可以将多个 exchange 绑定在一起,也通过插件机制实现自己的 exchange 。
rabbitmq网站端口号:15672
程序里面实现的端口为:5672
rabbitmq的关键术语
1、绑定器(binding):根据路由规则绑定queue和exchange。
2、路由键(routing key):exchange根据关键字进行消息投递。
3、交换机(exchange):指定消息按照路由规则进入指定队列
4、消息队列(queue):消息的存储载体
5、生产者(producer):消息发布者。
6、消费者(consumer):消息接收者。
rabbitmq的运作
从下图可以看出,发布者(publisher)是把消息先发送到交换器(exchange),再从交换器发送到指定队列(queue),而先前已经声明交换器与队列绑定关系,最后消费者(customer)通过订阅或者主动取指定队列消息进行消费。
那么刚刚提到的订阅和主动取可以理解成,推(被动),拉(主动)。
推,只要队列增加一条消息,就会通知空闲的消费者进行消费。(我不找你,就等你找我,观察者模式)
拉,不会通知消费者,而是由消费者主动轮循或者定时去取队列消息。(我需要才去找你)
使用场景我举个例子,假如有两套系统 订单系统和发货系统,从订单系统发起发货消息指令,为了及时发货,发货系统需要订阅队列,只要有指令就处理。
可是程序偶尔会出异常,例如网络或者db超时了,把消息丢到失败队列,这个时候需要重发机制。但是我又不想while(ispostsuccess == true),因为只要出异常了,会在某个时间段内都会有异常,这样的重试是没意义的。
这个时候不需要及时的去处理消息,有个job定时或者每隔几分钟(失败次数*间隔分钟)去取失败队列消息,进行重发。
publish(发布)的封装
步骤:初始化链接->声明交换器->声明队列->换机器与队列绑定->发布消息。注意的是,我将model存到了concurrentdictionary里面,因为声明与绑定是非常耗时的,其次,往重复的队列发送消息是不需要重新初始化的。
/// <summary> /// 交换器声明 /// </summary> /// <param name="imodel"></param> /// <param name="exchange">交换器</param> /// <param name="type">交换器类型: /// 1、direct exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全 /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的 /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog /// 2、fanout exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都 /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout /// 交换机转发消息是最快的。 /// 3、topic exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多 /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” /// 只会匹配到“audit.irs”。</param> /// <param name="durable">持久化</param> /// <param name="autodelete">自动删除</param> /// <param name="arguments">参数</param> private static void exchangedeclare(imodel imodel, string exchange, string type = exchangetype.direct, bool durable = true, bool autodelete = false, idictionary<string, object> arguments = null) { exchange = exchange.isnullorwhitespace() ? "" : exchange.trim(); imodel.exchangedeclare(exchange, type, durable, autodelete, arguments); } /// <summary> /// 队列声明 /// </summary> /// <param name="channel"></param> /// <param name="queue">队列</param> /// <param name="durable">持久化</param> /// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见, /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可 /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连 /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者 /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param> /// <param name="autodelete">自动删除</param> /// <param name="arguments">参数</param> private static void queuedeclare(imodel channel, string queue, bool durable = true, bool exclusive = false, bool autodelete = false, idictionary<string, object> arguments = null) { queue = queue.isnullorwhitespace() ? "undefinedqueuename" : queue.trim(); channel.queuedeclare(queue, durable, exclusive, autodelete, arguments); } /// <summary> /// 获取model /// </summary> /// <param name="exchange">交换机名称</param> /// <param name="queue">队列名称</param> /// <param name="routingkey"></param> /// <param name="isproperties">是否持久化</param> /// <returns></returns> private static imodel getmodel(string exchange, string queue, string routingkey, bool isproperties = false) { return modeldic.getoradd(queue, key => { var model = _conn.createmodel(); exchangedeclare(model, exchange, exchangetype.fanout, isproperties); queuedeclare(model, queue, isproperties); model.queuebind(queue, exchange, routingkey); modeldic[queue] = model; return model; }); } /// <summary> /// 发布消息 /// </summary> /// <param name="routingkey">路由键</param> /// <param name="body">队列信息</param> /// <param name="exchange">交换机名称</param> /// <param name="queue">队列名</param> /// <param name="isproperties">是否持久化</param> /// <returns></returns> public void publish(string exchange, string queue, string routingkey, string body, bool isproperties = false) { var channel = getmodel(exchange, queue, routingkey, isproperties); try { channel.basicpublish(exchange, routingkey, null, body.serializeutf8()); } catch (exception ex) { throw ex.getinnestexception(); } }
下次是本机测试的发布速度截图:
4.2w/s属于稳定速度,把反序列化(tojson)会稍微快一些。
subscribe(订阅)的封装
发布的时候是申明了交换器和队列并绑定,然而订阅的时候只需要声明队列就可。从下面代码能看到,捕获到异常的时候,会把消息送到自定义的“死信队列”里,由另外的job进行定时重发,因此,finally是应答成功的。
/// <summary> /// 获取model /// </summary> /// <param name="queue">队列名称</param> /// <param name="isproperties"></param> /// <returns></returns> private static imodel getmodel(string queue, bool isproperties = false) { return modeldic.getoradd(queue, value => { var model = _conn.createmodel(); queuedeclare(model, queue, isproperties); //每次消费的消息数 model.basicqos(0, 1, false); modeldic[queue] = model; return model; }); } /// <summary> /// 接收消息 /// </summary> /// <typeparam name="t"></typeparam> /// <param name="queue">队列名称</param> /// <param name="isproperties"></param> /// <param name="handler">消费处理</param> /// <param name="isdeadletter"></param> public void subscribe<t>(string queue, bool isproperties, action<t> handler, bool isdeadletter) where t : class { //队列声明 var channel = getmodel(queue, isproperties); var consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { var body = ea.body; var msgstr = body.deserializeutf8(); var msg = msgstr.fromjson<t>(); try { handler(msg); } catch (exception ex) { ex.getinnestexception().writetofile("队列接收消息", "rabbitmq"); if (!isdeadletter) publishtodead<deadletterqueue>(queue, msgstr, ex); } finally { channel.basicack(ea.deliverytag, false); } }; channel.basicconsume(queue, false, consumer); }
下次是本机测试的发布速度截图:
快的时候有1.9k/s,慢的时候也有1.7k/s
pull(拉)的封装
直接上代码:
/// <summary> /// 获取消息 /// </summary> /// <typeparam name="t"></typeparam> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="routingkey"></param> /// <param name="handler">消费处理</param> private void poll<t>(string exchange, string queue, string routingkey, action<t> handler) where t : class { var channel = getmodel(exchange, queue, routingkey); var result = channel.basicget(queue, false); if (result.isnull()) return; var msg = result.body.deserializeutf8().fromjson<t>(); try { handler(msg); } catch (exception ex) { ex.getinnestexception().writetofile("队列接收消息", "rabbitmq"); } finally { channel.basicack(result.deliverytag, false); } }
快的时候有1.8k/s,稳定是1.5k/s
rpc(远程调用)的封装
首先说明下,rabbitmq只是提供了这个rpc的功能,但是并不是真正的rpc,为什么这么说:
1、传统rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常
2、rabbitmq的rpc是基于消息的,消费者消费后,通过新队列返回响应结果。
/// <summary> /// rpc客户端 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="routingkey"></param> /// <param name="body"></param> /// <param name="isproperties"></param> /// <returns></returns> public string rpcclient(string exchange, string queue, string routingkey, string body, bool isproperties = false) { var channel = getmodel(exchange, queue, routingkey, isproperties); var consumer = new queueingbasicconsumer(channel); channel.basicconsume(queue, true, consumer); try { var correlationid = guid.newguid().tostring(); var basicproperties = channel.createbasicproperties(); basicproperties.replyto = queue; basicproperties.correlationid = correlationid; channel.basicpublish(exchange, routingkey, basicproperties, body.serializeutf8()); var sw = stopwatch.startnew(); while (true) { var ea = consumer.queue.dequeue(); if (ea.basicproperties.correlationid == correlationid) { return ea.body.deserializeutf8(); } if (sw.elapsedmilliseconds > 30000) throw new exception("等待响应超时"); } } catch (exception ex) { throw ex.getinnestexception(); } } /// <summary> /// rpc服务端 /// </summary> /// <typeparam name="t"></typeparam> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="isproperties"></param> /// <param name="handler"></param> /// <param name="isdeadletter"></param> public void rpcservice<t>(string exchange, string queue, bool isproperties, func<t, t> handler, bool isdeadletter) { //队列声明 var channel = getmodel(queue, isproperties); var consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { var body = ea.body; var msgstr = body.deserializeutf8(); var msg = msgstr.fromjson<t>(); var props = ea.basicproperties; var replyprops = channel.createbasicproperties(); replyprops.correlationid = props.correlationid; try { msg = handler(msg); } catch (exception ex) { ex.getinnestexception().writetofile("队列接收消息", "rabbitmq"); } finally { channel.basicpublish(exchange, props.replyto, replyprops, msg.tojson().serializeutf8()); channel.basicack(ea.deliverytag, false); } }; channel.basicconsume(queue, false, consumer); }
可以用,但不建议去用。可以考虑其他的rpc框架。grpc、thrift等。
结尾
本篇文章,没有过多的写rabbitmq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 https://github.com/skychensky/sikiro.mq.rabbit。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。
到此这篇关于.net平台的rabbitmq使用封装的文章就介绍到这了,更多相关.net使用rabbitmq内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!