RabbitMQ死信队列另类用法之复合死信
前言
在业务开发过程中,我们常常需要做一些定时任务,这些任务一般用来做监控或者清理任务,比如在订单的业务场景中,用户在创建订单后一段时间内,没有完成支付,系统将自动取消该订单,并将库存返回到商品中,又比如在微信中,用户发出红包24小时后,需要对红包进行检查,是否已领取完成,如未领取完成,将剩余金额退回到发送者钱包中,同时销毁该红包。
在项目初始阶段,或者是一些小型的项目中,常常采用定时轮询的方法进行检查,但是我们都知道,定时轮询将给数据库带来不小的压力,而且定时间隔无法进行动态调整,特别是一个系统中,同时存在好几个定时器的时候,就显得非常的麻烦,同时给数据库造成巨大的访问压力。
下面,本文将演示如何使用一个 rabbitmq 的死信队列同时监控多种业务(复合业务),达到模块解耦,释放压力的目的。
注意:名词“复合死信”是为了叙述方便临时创造的,如有不妥,欢迎指正
1. 什么是 rabbitmq 死信队列
dlx(dead letter exchanges)死信交换,死信队列本身也是一个普通的消息队列,在创建队列的时候,通过设置一些关键参数,可以将一个普通的消息队列设置为死信队列,与其它消息队列不同的是,其入栈的消息根据入栈时指定的过期时间/被拒绝/超出队列长度被移除,依次被转发到指定的消息队列中进行二次处理。这样说法比较拗口,其原理就是死信队列内位于顶部的消息过期时,该消息将被马上发送到另外一个订阅者(消息队列)中。
其原理入下图
由上图可以看到,目前有三种类型的业务需要使用 dlx 进行处理,因为每个业务的超时时间不一致的问题,如果将他们都放入一个 dlx 中进行处理,将会出现一个时序的问题,即消息队列总数处理顶部的消息,如果顶部的消息未过期,而底部的消息过期,这就麻烦了,因为过期的消息无法得到消费,将会造成延迟;所以正常情况下,最好的办法是每个业务都独立一个队列,这样就可以保证,即将过期的消息总是处于队列的顶部,从而被第一时间处理。
但是多个 dlx 又带来了管理上面的问题,随着业务的增加,越来越多的业务需要进入不同的 dlx ,这个时候我们发现,由于人手不足的原因,维护这么多 dlx 实在是太吃力了,如果能将这些消息都接入一个 dlx 中该多好呀,在一个 dlx 中进行消息订阅,然后进行分发或者处理,这就非常有趣了。
下面就按照这个思路,我们进行集中处理,也就是复合死信交换 cdlx(composite dead letter exchanges)
2. 如何创建死信队列
创建 dlx 队列的方式非常简单,我们使用 rabbitmq web 控制面板进行创建 exhcange(交换机)/consumer(死信消费队列)/cdlx(复合死信队列)
2.1 创建队列
创建交换机 cdlx-exchange
死信消费队列 cdlx-consumer
复合死信队列 cdlx-master
- 注意,这里添加死信队列必须同时设置死信转发交换机和路由,后续通过路由绑定实现消费队列
路由绑定
上面的路由绑定共有两个,分别是 master 和 consumer 用于消息路由到队列,为下面的业务消息做准备,建好后的队列如下
3.复合业务进入死信队列
当建立好队列以后,我们就可以专心的处理业务了,下面就来模拟3种业务将消息发送到死信队列的过程
3.1 发送死信消息到队列
发送消息使用了 asp.netcore轻松学-实现一个轻量级高可复用的rabbitmq客户端 中的轻量客户端,封装后的发送消息代码如下
public class cdlxmasterservice { private iconfiguration cfg = null; private ilogger logger = null; private string vhost = "test_mq"; private string exchange = "cdlx-exchange"; private string routekey = "master"; private static mqconnection connection = null; private mqconnection connection { get { if (connection == null || !connection.connection.isopen) { connection = new mqconnection( cfg["rabbitmq:username"], cfg["rabbitmq:password"], cfg["rabbitmq:host"], convert.toint32(cfg["rabbitmq:port"]), vhost, logger); } return connection; } } private static imodel channel = null; private imodel channel { get { if (channel == null || channel.isclosed) channel = connection.connection.createmodel(); return channel; } } public void sendmessage(object data) { string message = jsonconvert.serializeobject(data); this.connection.publish(this.channel, exchange, routekey, message); } }
3.2 将 cdlxmasterservice 注入到服务
public void configureservices(iservicecollection services) { services.addsingleton<cdlxmasterservice>(); ... }
3.3 模拟3种业务生产死信消息
public class homecontroller : controller { private cdlxmasterservice masterservice; public homecontroller(cdlxmasterservice masterservice) { this.masterservice = masterservice; } [httpget("publish")] public int publish() { contract contract = new contract(this.masterservice); for (int i = 0; i < 10; i++) { contract.publish(messagetype.redpackage, "红包信息,超时时间1024s"); contract.publish(messagetype.order, "订单信息,超时时间2048s"); contract.publish(messagetype.vote, "投票信息,超时时间4096s"); } return 0; } }
上面的接口 puhlish 模拟了业务消息,由于我们依次发布了 红包/订单/投票 消息,所以迭代发布 10 次后,正好形成了一个时序错乱的信息队列,按照自动过期时序计算,当第一个红包超时到达时,第四条消息(红包)也会接着超时,可是由于此时订单和投票消息位于红包消息上面,该红包消息在达到超时时间后并不会被投递到 consumer 消费队列,这是正确的,我们确实也是希望是这个结果
如果有一个办法把超时的消息自动将其提升到队列顶部就好了!
4. 处理复合死信
在 rabbitmq 提供的 api 接口中,没有什么直接可用的能将死信队列中超时消息提升到顶部的好办法;但是,我们可以利用部分 api 接口的特性来完成这件事情。
4.1 定时消费客户端
下面,我们将使用一个定时消费客户端来完成对死信队列的轮询,充分利用 rabbitmq 的消费特性来完成超时消息的位置提升。
过程如下图:
如上图所示,我们增加一个 dlx-timer 定时器,定时的发起对死信队列的消费,该消费者仅仅是消费,不确认消息,也就是不做 ack,然后将消息重新置入队列中;这个过程,就是将消息不断提升位置的过程。
4.2 定时消费客户端实现代码
public class cdlxtimerservice : mqservicebase { public override string vhost { get { return "test_mq"; } } public override string exchange { get { return "cdlx-exchange"; } } public override list<bindinfo> binds => new list<bindinfo>(); private string queue = "cdlx-master"; public cdlxtimerservice(iconfiguration cfg, ilogger logger) : base(cfg, logger) { } /// <summary> /// 检查死信队列 /// </summary> /// <returns></returns> public list<cdlxmessage> checkmessage() { long total = 0; list<cdlxmessage> list = new list<cdlxmessage>(); var connection = base.createconnection(); using (imodel channel = connection.connection.createmodel()) { bool latest = true; while (latest) { basicgetresult result = channel.basicget(this.queue, false); total++; latest = result != null; if (latest) { var json = encoding.utf8.getstring(result.body); list.add(jsonconvert.deserializeobject<cdlxmessage>(json)); } } channel.close(); connection.close(); } return list; } }
上面的代码首先在定时调用到来的时候,创建了一个 connection,然后利用此 connection 创建了了一个 channel,紧接着,使用该 channel 调用 basicget 方法,获得队列顶部的信息,且设置 autoack=false,表示仅检查消息,不确认,然后进入一个 while 迭代过程,一直读取到队列底部,获得所有队列中的信息,最后,关闭了通道释放连接。
这样,就完成了一次消息检查的过程,在调用 basicget 后,下一条信息将会出现在队列的顶部,同步,队列将自动对该消息进行超时检查,由于我们在调用 basicget 的时候,传入 autoack=false,不确认该消息,在 rabbitmq 控制台中,将显示为 unacted,所以在释放连接后,所有消息将会被重新置入队列中,这是一个自动的过程,无需我们做额外的工作。
4.3 consumer(死信消费队列)最终处理业务
配置队列管理随程序启动停止
private mqservciemanager servicemanager; // this method gets called by the runtime. use this method to configure the http request pipeline. public void configure(iapplicationbuilder app, ihostingenvironment env, iloggerfactory factory, iapplicationlifetime lifetime) { servicemanager = new mqservciemanager(this.configuration, factory.createlogger<mqservciemanager>()); lifetime.applicationstarted.register(() => { servicemanager.start(); }); lifetime.applicationstopping.register(() => { servicemanager.stop(); }); ... }
实现消费队列
public class cdlxconsumerservice : mqservicebase { public override string vhost { get { return "test_mq"; } } public override string exchange { get { return "cdlx-exchange"; } } private string queue = "cdlx-consumer"; private string routekey = "all"; private list<bindinfo> bs = new list<bindinfo>(); public override list<bindinfo> binds { get { return bs; } } public cdlxconsumerservice(iconfiguration cfg, ilogger logger) : base(cfg, logger) { this.bs.add(new bindinfo { exchangetype = exchangetype.direct, queue = this.queue, routerkey = this.routekey, onreceived = this.onreceived }); } private void onreceived(messagebody body) { var message = jsonconvert.deserializeobject<cdlxmessage>(body.content); console.writeline("类型:{0}\t 内容:{1}\t进入时间:{2}\t过期时间:{3}", message.type, message.data, message.createtime, message.createtime.addseconds(message.expire)); body.consumer.model.basicack(body.basicdeliver.deliverytag, true); } }
上面的代码,模拟了最终业务处理的过程,这里仅仅是简单演示,所以只是将消息打印到屏幕上;在实际的业务场景中,我们可以根据不同的 messagetype 进行消息的分发处理。
5. 消费过程演示
为了比较直观的观看死信消费过程,我们编写一个简单的列表页面,自动刷新后去消费死信队列,然后将消息输出到页面上,通过观察此页面,我们可以实时了解到死信队列的消费过程,实际的业务场景中,大家可以利用第三方定时器定时调用接口实现,或者使用内置的轻量主机做后台任务实现定时轮询,具体参考 asp.net core 轻松学-基于微服务的后台任务调度管理器
5.1 发布消息
浏览器访问本机地址:
下面将发布 30 条信息到 dlx 中,每个业务各 10 条信息。
通常情况下,红包的过期时间最短且超时时间一致,应该最快超时,意味着当第一条红包消息超时的时候,其余 9 条红包消息也会一并超时,但是由于红包消息混合的发布在队列中,且只有第一条红包消息位移队列顶部;所以,当第一条红包消息超时被消费后,其余 9 条红包由于不是位于队列顶部,虽然此时他们已经超时,但是 dlx 将无法处理;当我们使用 cdlx-timer(定时器)模拟调用 cdlxtimerservice 的时候(也就是刷新首页), cdlxtimerservice 服务将会对 dlx 进行检查。
查看消费状态
通过上图的观察得知,红色部分首先位于消息顶部被消费,然后就无法进行超时判断,接下来,由于使用了定时轮询,使得绿色部分消息得以浮动到消息顶部,然后被 dlx 进行处理后消费。
5.2 定时器检查死信队列
浏览器访问本机地址:
上图的每一次刷新,都是对 dlx 的一次轮询检查,随着轮询的深入,所有处于队列中不同位置的超时消息都有机会浮动到队列顶部进行消费处理。
结束语
业务的发展促进了架构的演进,每一个需求升级的背后,是程序员深深的思考;本文从 cdlx 的需求出发,充分利用了 rabbitmq dlx 对消息检查的特性,实现了对复合业务的集中处理。
演示代码下载
https://github.com/lianggx/examples/tree/master/rabbitmq.cdlx