Spring Boot与RabbitMQ结合实现延迟队列的示例
背景
何为延迟队列?
顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。
场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。
场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备。
延迟队列能做什么?
延迟队列多用于需要延迟工作的场景。最常见的是以下两种场景:
1、延迟消费。比如:
- 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
- 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
2、延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。
如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成。
如何实现?
别急,在下文中,我们将详细介绍如何利用spring boot加rabbitmq来实现延迟队列。
本文出现的示例代码都已push到github仓库中:https://github.com/lovelcp/blog-demos/tree/master/spring-boot-rabbitmq-delay-queue
实现思路
在介绍具体的实现思路之前,我们先来介绍一下rabbitmq的两个特性,一个是time-to-live extensions,另一个是dead letter exchanges。
time-to-live extensions
rabbitmq允许我们为消息或者队列设置ttl(time to live),也就是过期时间。ttl表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了ttl或者当某条消息进入了设置了ttl的队列时,这条消息会在经过ttl秒后“死亡”,成为dead letter。如果既配置了消息的ttl,又配置了队列的ttl,那么较小的那个值会被取用。更多资料请查阅官方文档。
dead letter exchange
刚才提到了,被设置了ttl的消息在过期后会成为dead letter。其实在rabbitmq中,一共有三种消息的“死亡”形式:
- 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
- 消息因为设置了ttl而过期。
- 消息进入了一条已经达到最大长度的队列。
如果队列设置了dead letter exchange(dlx),那么这些dead letter就会被重新publish到dead letter exchange,通过dead letter exchange路由到其他队列。更多资料请查阅官方文档。
流程图
聪明的你肯定已经想到了,如何将rabbitmq的ttl和dlx特性结合在一起,实现一个延迟队列。
针对于上述的延迟队列的两个场景,我们分别有以下两种流程图:
延迟消费
延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过rabbitmq提供的ttl扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的dlx转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。
延迟重试
延迟重试本质上也是延迟消费的一种,但是这种模式的结构与普通的延迟消费的流程图较为不同,所以单独拎出来介绍。
如下图所示,消费者发现该消息处理出现了异常,比如是因为网络波动引起的异常。那么如果不等待一段时间,直接就重试的话,很可能会导致在这期间内一直无法成功,造成一定的资源浪费。那么我们可以将其先放在缓冲队列中(图中红色队列),等消息经过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时由于已经过了“较长”的时间了,异常的一些波动通常已经恢复,这些消息可以被正常地消费。
代码实现
接下来我们将介绍如何在spring boot中实现基于rabbitmq的延迟队列。我们假设读者已经拥有了spring boot与rabbitmq的基本知识。
初始化工程
首先我们在intellij中创建一个spring boot工程,并且添加spring-boot-starter-amqp扩展。
配置队列
从上述的流程图中我们可以看到,一个延迟队列的实现,需要一个缓冲队列以及一个实际的消费队列。又由于在rabbitmq中,我们拥有两种消息过期的配置方式,所以在代码中,我们一共配置了三条队列:
- delay_queue_per_message_ttl:ttl配置在消息上的缓冲队列。
- delay_queue_per_queue_ttl:ttl配置在队列上的缓冲队列。
- delay_process_queue:实际消费队列。
我们通过java config的方式将上述的队列配置为bean。由于我们添加了spring-boot-starter-amqp扩展,spring boot在启动时会根据我们的配置自动创建这些队列。为了方便接下来的测试,我们将delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的dlx配置为同一个,且过期的消息都会通过dlx转发到delay_process_queue。
delay_queue_per_message_ttl
首先介绍delay_queue_per_message_ttl的配置代码:
@bean queue delayqueuepermessagettl() { return queuebuilder.durable(delay_queue_per_message_ttl_name) .withargument("x-dead-letter-exchange", delay_exchange_name) // dlx,dead letter发送到的exchange .withargument("x-dead-letter-routing-key", delay_process_queue_name) // dead letter携带的routing key .build(); }
其中,x-dead-letter-exchange声明了队列里的死信转发到的dlx名称,x-dead-letter-routing-key声明了这些死信在转发时携带的routing-key名称。
delay_queue_per_queue_ttl
类似地,delay_queue_per_queue_ttl的配置代码:
@bean queue delayqueueperqueuettl() { return queuebuilder.durable(delay_queue_per_queue_ttl_name) .withargument("x-dead-letter-exchange", delay_exchange_name) // dlx .withargument("x-dead-letter-routing-key", delay_process_queue_name) // dead letter携带的routing key .withargument("x-message-ttl", queue_expiration) // 设置队列的过期时间 .build(); }
delay_queue_per_queue_ttl队列的配置比delay_queue_per_message_ttl队列的配置多了一个x-message-ttl,该配置用来设置队列的过期时间。
delay_process_queue
delay_process_queue的配置最为简单:
@bean queue delayprocessqueue() { return queuebuilder.durable(delay_process_queue_name) .build(); }
配置exchange
配置dlx
首先,我们需要配置dlx,代码如下:
@bean directexchange delayexchange() { return new directexchange(delay_exchange_name); }
然后再将该dlx绑定到实际消费队列即delay_process_queue上。这样所有的死信都会通过dlx被转发到delay_process_queue:
@bean binding dlxbinding(queue delayprocessqueue, directexchange delayexchange) { return bindingbuilder.bind(delayprocessqueue) .to(delayexchange) .with(delay_process_queue_name); }
配置延迟重试所需的exchange
从延迟重试的流程图中我们可以看到,消息处理失败之后,我们需要将消息转发到缓冲队列,所以缓冲队列也需要绑定一个exchange。在本例中,我们将delay_process_per_queue_ttl作为延迟重试里的缓冲队列。具体代码是如何配置的,这里就不赘述了,大家可以查阅我github中的代码。
定义消费者
我们创建一个最简单的消费者processreceiver,这个消费者监听delay_process_queue队列,对于接受到的消息,他会:
- 如果消息里的消息体不等于fail_message,那么他会输出消息体。
- 如果消息里的消息体恰好是fail_message,那么他会模拟抛出异常,然后将该消息重定向到缓冲队列(对应延迟重试场景)。
另外,我们还需要新建一个监听容器用于存放消费者,代码如下:
@bean simplemessagelistenercontainer processcontainer(connectionfactory connectionfactory, processreceiver processreceiver) { simplemessagelistenercontainer container = new simplemessagelistenercontainer(); container.setconnectionfactory(connectionfactory); container.setqueuenames(delay_process_queue_name); // 监听delay_process_queue container.setmessagelistener(new messagelisteneradapter(processreceiver)); return container; }
至此,我们前置的配置代码已经全部编写完成,接下来我们需要编写测试用例来测试我们的延迟队列。
编写测试用例
延迟消费场景
首先我们编写用于测试ttl设置在消息上的测试代码。
我们借助spring-rabbit包下提供的rabbittemplate类来发送消息。由于我们添加了spring-boot-starter-amqp扩展,spring boot会在初始化时自动地将rabbittemplate当成bean加载到容器中。
解决了消息的发送问题,那么又该如何为每个消息设置ttl呢?这里我们需要借助messagepostprocessor。
messagepostprocessor通常用来设置消息的header以及消息的属性。我们新建一个expirationmessagepostprocessor类来负责设置消息的ttl属性:
/** * 设置消息的失效时间 */ public class expirationmessagepostprocessor implements messagepostprocessor { private final long ttl; // 毫秒 public expirationmessagepostprocessor(long ttl) { this.ttl = ttl; } @override public message postprocessmessage(message message) throws amqpexception { message.getmessageproperties() .setexpiration(ttl.tostring()); // 设置per-message的失效时间 return message; } }
然后在调用rabbittemplate的convertandsend方法时,传入expirationmessagepostporcessor即可。我们向缓冲队列中发送3条消息,过期时间依次为1秒,2秒和3秒。具体的代码如下所示:
@test public void testdelayqueuepermessagettl() throws interruptedexception { processreceiver.latch = new countdownlatch(3); for (int i = 1; i <= 3; i++) { long expiration = i * 1000; rabbittemplate.convertandsend(queueconfig.delay_queue_per_message_ttl_name, (object) ("message from delay_queue_per_message_ttl with expiration " + expiration), new expirationmessagepostprocessor(expiration)); } processreceiver.latch.await(); }
细心的朋友一定会问,为什么要在代码中加一个countdownlatch呢?这是因为如果没有latch阻塞住测试方法的话,测试用例会直接结束,程序退出,我们就看不到消息被延迟消费的表现了。
那么类似地,测试ttl设置在队列上的代码如下:
@test public void testdelayqueueperqueuettl() throws interruptedexception { processreceiver.latch = new countdownlatch(3); for (int i = 1; i <= 3; i++) { rabbittemplate.convertandsend(queueconfig.delay_queue_per_queue_ttl_name, "message from delay_queue_per_queue_ttl with expiration " + queueconfig.queue_expiration); } processreceiver.latch.await(); }
我们向缓冲队列中发送3条消息。理论上这3条消息会在4秒后同时过期。
延迟重试场景
我们同样还需测试延迟重试场景。
@test public void testfailmessage() throws interruptedexception { processreceiver.latch = new countdownlatch(6); for (int i = 1; i <= 3; i++) { rabbittemplate.convertandsend(queueconfig.delay_process_queue_name, processreceiver.fail_message); } processreceiver.latch.await(); }
我们向delay_process_queue发送3条会触发fail的消息,理论上这3条消息会在4秒后自动重试。
查看测试结果
延迟消费场景
延迟消费的场景测试我们分为了ttl设置在消息上和ttl设置在队列上两种。首先,我们先看一下ttl设置在消息上的测试结果:
从上图中我们可以看到,processreceiver分别经过1秒、2秒、3秒收到消息。测试结果表明消息不仅被延迟消费了,而且每条消息的延迟时间是可以被个性化设置的。ttl设置在消息上的延迟消费场景测试成功。
然后,ttl设置在队列上的测试结果如下图:
从上图中我们可以看到,processreceiver经过了4秒的延迟之后,同时收到了3条消息。测试结果表明消息不仅被延迟消费了,同时也证明了当ttl设置在队列上的时候,消息的过期时间是固定的。ttl设置在队列上的延迟消费场景测试成功。
延迟重试场景
接下来,我们再来看一下延迟重试的测试结果:
processreceiver首先收到了3条会触发fail的消息,然后将其移动到缓冲队列之后,过了4秒,又收到了刚才的那3条消息。延迟重试场景测试成功。
总结
本文首先介绍了延迟队列的概念以及用途,并且通过代码详细讲解了如何通过spring boot和rabbitmq实现一个延迟队列。希望本文能够对大家平时的学习和工作能有所启发和帮助。也希望大家多多支持。
上一篇: Java日期时间以及日期相互转换
下一篇: Jdbc连接数据库基本步骤详解