RabbitMQ(基于Spring)通过设置队列的过期时间延时时间,监听死信队列来实现延时取消订单
程序员文章站
2022-06-21 13:38:52
直接上步骤1、导入pom坐标 1.6.6 1.2.12
直接上步骤
1、导入pom坐标
<properties>
<slf4j.version>1.6.6</slf4j.version>
<log4j.version>1.2.12</log4j.version>
</properties><dependencies>
<!--log4j相关坐标-->
<!-- log start -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- log end -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
</dependencies>`
2、相关Spring的配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672"
username="admin" password="admin" virtual-host="/v_admin"/>
<!--2.定义Rabbit模板。指定连接工厂以及定义exchange-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange">
</rabbit:template>
<!--MQ的管理,包括队列、交换器声明等-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义队列,自动声明-->
<!--消费者start-->
<!--监听的队列,监听到之后的方法-->
<!--不知道为什么所有队列都能监听到-->
<bean id="foo" class="rabbitMQTest.MyConsumer"/>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="foo" method="listen" queue-names="my_dlx_queue"/>
</rabbit:listener-container>
<!--消费者end-->
<!--定义死信队列和死信交换机-->
<rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true"/>
<!--定义广播类型交换机;并绑定上述两个队列-->
<rabbit:direct-exchange name="my_dlx_exchange" auto-declare="true">
<rabbit:bindings>
<!--绑定路由键my_ttl_dlx my_max_dlx,可以将过期的消息转移到my_dlx_queue队列-->
<rabbit:binding key="my_ttl_dlx" queue="my_dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义一个队列,设置6秒过期-->
<rabbit:queue id="my_ttl_dlx_queue" name="my_ttl_dlx_queue" auto-declare="true">
<rabbit:queue-arguments>
<!--投递到该队列的消息如果没有被消费都将在6秒后投递到私信交换机-->
<entry key="x-message-ttl" value-type="long" value="10000"/>
<!--设置当消息过期后投递到对应的死信交换机-->
<entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--定义定向交换机 根据不同的路由key投递消息 ,这个交换机是测试代码投递消息的时候投递给的对象my_ttl_dlx-->
<rabbit:direct-exchange name="my_normal_exchange" id="my_normal_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding key="my_ttl_dlx" queue="my_ttl_dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
3、编写生产者,即发送消息的,将消息发送到延时队列my_ttl_dlx_queue中,该队列中所有数据会在上面设置的long类型的时间之后失效,并被投递到死信队列my_dlx_queue。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @Author Fy
* @Time 2020年7月16日 10:28:18
* @QQ 1057072154
*/
public class SpringMain {
public static void main(String[] args) throws Exception {
//RabbitMQ模板
AbstractApplicationContext ctx =new ClassPathXmlApplicationContext("classpath:applicationContext-rabbitmq.xml");
//发送消息
RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
for (int i = 0; i < 10; i++) {
template.convertAndSend("my_normal_exchange",
"my_ttl_dlx","测试过期消息6秒之后将被投递到死信队列");
}
// ctx.destroy();
}
}
4、编写消费者
package rabbitMQTest;
public class MyConsumer {
// @RabbitListener(queues = "my_dlx_queue")
public void listen(String message){
System.out.println("消费者收到的消息为::"+message);
}
}
5、程序启动后会自动监听
6、10秒之后延时队列中的消息过期,被交换机投递到死信队列,消费者监听到,开始消费,
忽略图中写的6秒,其实我设置的延时是10秒。
7、根据自己需求往自己的项目中去进行集成。有的在SSM集成这个RabbitMQ的时候会报相关异常,可能添加这个坐标就可以解决
<!--添加了这个jar就不报错了-->
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.0.3.RELEASE</version>
</dependency>
本文地址:https://blog.csdn.net/weixin_43620015/article/details/107377412
上一篇: 浅谈java中接口与抽象类之间的异同
下一篇: 关于json时间数据格式转换与修改