rabbitmq的ttl和死信交换机
程序员文章站
2022-04-12 22:09:43
TTL:过期时间 * 1. 队列统一过期(整个队列设置了时间的过期) * 2. 消息单独过期(在发送消息的时候,其中有一条消息有过期时间,而其他的消息都是正常的消息没有设置过期的消息时间限制,只有设置时间消息在队列顶端,才会判断其是否移除掉) * 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。 * 队列过期后,会将队列所有消息全部移除。 * 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)成为死信的三种情况 * 1.进入队列的消息超过了队列本身的长度限制,这...
TTL:过期时间
消息的过期设置都是在生产者那一端进行的.
首先队列queue就会有永久的队列或者是带有时间的设置的队列.
然后我的队列中的消息也会存在永久或者带有时间的消息.
当我们队列和消息都设置了时间,就会以时间短的那个算起.
* 1. 队列统一过期(整个队列设置了时间的过期)
* 2. 消息单独过期(在发送消息的时候,其中有一条消息有过期时间,而其他的消息都是正常的消息没有设置过期的消息时间限制,只有设置时间消息在队列顶端,才会判断其是否移除掉)
* 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
* 队列过期后,会将队列所有消息全部移除。
* 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
成为死信的三种情况
* 1.进入队列的消息超过了队列本身的长度限制,这条消息会成为死信
* 2.消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(新建一个消费者,去监听正常的队列)
* 3.原队列存在消息过期设置,消息到达超时时间未被消费
在生产者端
rabbitmq.properties
rabbitmq.host=192.168.88.133
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
publisher-returns="true"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--ttl-->
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<!--设置queue的参数-->
<rabbit:queue-arguments>
<!--x-message-ttl指队列的过期时间-->
<entry key="x-message-ttl" value="60000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl" >
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--
死信队列:
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
3. 正常队列绑定死信交换机
设置两个参数:
* x-dead-letter-exchange:死信交换机名称
* x-dead-letter-routing-key:发送给死信交换机的routingkey
-->
<!--
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
-->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<!--3. 正常队列绑定死信交换机-->
<rabbit:queue-arguments>
<!--3.1 x-dead-letter-exchange:死信交换机名称-->
<entry key="x-dead-letter-exchange" value="exchange_dlx" />
<!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />
<!--4.1 设置队列的过期时间 ttl-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
<!--4.2 设置队列的长度限制 max-length -->
<entry key="x-max-length" value="10" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--
2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
-->
<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
测试方法用于检测时间的过期,和检测死信队列
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class TestProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* TTL:过期时间
* 1. 队列统一过期(整个队列设置了时间的过期)
* <p>
* 2. 消息单独过期(在发送消息的时候,其中有一条消息有过期时间,而其他的消息都是正常的消息没有设置过期的消息时间限制,只有设置时间消息在队列顶端,才会判断其是否移除掉)
* <p>
* <p>
* 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
* 队列过期后,会将队列所有消息全部移除。
* 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
*/
@Test
public void testTtl() {
/* for (int i = 0; i < 10; i++) {
// 发送消息
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
}*/
// 消息后处理对象,设置一些消息的参数信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1.设置message的信息
message.getMessageProperties().setExpiration("5000");//消息的过期时间
//2.返回该消息
return message;
}
};
//消息单独过期
//rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
for (int i = 0; i < 10; i++) {
if (i == 5) {
//消息单独过期
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....", messagePostProcessor);
} else {
//不过期的消息
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
}
}
}
/**
* 测试死信队列
* 成为死信的三种情况
* 1.进入队列的消息超过了队列本身的长度限制,这条消息会成为死信
* 2.消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(新建一个消费者,去监听正常的队列)
* 3.原队列存在消息过期设置,消息到达超时时间未被消费
*/
@Test
public void testDlx() {
/* //3.原队列存在消息过期设置,消息到达超时时间未被消费
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.dlx","我是一条消息,我会成为死信马?");*/
/* //1.进入队列的消息超过了队列本身的长度限制,这条消息会成为死信
for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.dlx","我是一条消息,我会成为死信马?");
}*/
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.dlx","我是一条消息,我会成为死信马?");
}
}
成为死信的第二种情况
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(新建一个消费者,去监听正常的队列)
@Component
public class SpringDlxListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//basicAck(long deliveryTag, boolean multiple)
try {
System.out.println(new String(message.getBody()));
System.out.println("处理业务逻辑");
//人为发送异常,消费者会拒绝接受消息
int i = 1 / 0;
//一次性设置1数据的传输
if (deliveryTag % 1 == 0) {
channel.basicAck(deliveryTag, true);
}
} catch (Exception e) {
//消费者拒绝接受消息,并且不让消息重新进入队列,此时消息就会成为死信消息
channel.basicNack(deliveryTag, true, false);
}
}
}
rabbitmq.properties
rabbitmq.host=192.168.88.133
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
/>
<context:component-scan base-package="com.itheima.rabbitmq.listener"/>
<!--设置手动的签收,acknowledge="manual"-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" auto-declare="true">
<rabbit:listener ref="springDlxListener" queue-names="test_queue_dlx"/>
</rabbit:listener-container>
</beans>
测试方法用于消费者一直监听消息
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class TestConsumer {
@Test
public void test1(){
boolean flag = true;
while (true){
}
}
}
本文地址:https://blog.csdn.net/QiYang1024/article/details/107690987
推荐阅读
-
Python通过RabbitMQ服务器实现交换机功能的实例教程
-
SpringBoot集成RabbitMQ的方法(死信队列)
-
Python通过RabbitMQ服务器实现交换机功能的实例教程
-
PHP基于rabbitmq操作类的生产者和消费者功能示例
-
java操作RabbitMQ添加队列、消费队列和三个交换机
-
rabbitmq~消息失败后重试达到 TTL放到死信队列(事务型消息补偿机制)
-
RabbitMQ指南之四:路由(Routing)和直连交换机(Direct Exchange)
-
路由器和交换机的区别
-
主机和交换机端口位置的有效定位方法
-
浅谈surging服务引擎中的rabbitmq组件和容器化部署