欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

rabbitmq的ttl和死信交换机

程序员文章站 2022-04-12 22:09:43
TTL:过期时间 * 1. 队列统一过期(整个队列设置了时间的过期) * 2. 消息单独过期(在发送消息的时候,其中有一条消息有过期时间,而其他的消息都是正常的消息没有设置过期的消息时间限制,只有设置时间消息在队列顶端,才会判断其是否移除掉) * 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。 * 队列过期后,会将队列所有消息全部移除。 * 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)成为死信的三种情况 * 1.进入队列的消息超过了队列本身的长度限制,这...

TTL:过期时间

消息的过期设置都是在生产者那一端进行的.
首先队列queue就会有永久的队列或者是带有时间的设置的队列.
然后我的队列中的消息也会存在永久或者带有时间的消息.
当我们队列和消息都设置了时间,就会以时间短的那个算起.
* 1. 队列统一过期(整个队列设置了时间的过期)
* 2. 消息单独过期(在发送消息的时候,其中有一条消息有过期时间,而其他的消息都是正常的消息没有设置过期的消息时间限制,只有设置时间消息在队列顶端,才会判断其是否移除掉)
* 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
* 队列过期后,会将队列所有消息全部移除。
* 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)

成为死信的三种情况

 * 1.进入队列的消息超过了队列本身的长度限制,这条消息会成为死信
 * 2.消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(新建一个消费者,去监听正常的队列)
 * 3.原队列存在消息过期设置,消息到达超时时间未被消费

在生产者端

rabbitmq.properties

rabbitmq.host=192.168.88.133
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/

spring-rabbitmq-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
                               publisher-confirms="true"
                               publisher-returns="true"
    />
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>
    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
    <!--ttl-->
    <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
        <!--设置queue的参数-->
        <rabbit:queue-arguments>
            <!--x-message-ttl指队列的过期时间-->
            <entry key="x-message-ttl" value="60000" value-type="java.lang.Integer"></entry>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_ttl" >
        <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--
      死信队列:
          1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
          2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
          3. 正常队列绑定死信交换机
              设置两个参数:
                  * x-dead-letter-exchange:死信交换机名称
                  * x-dead-letter-routing-key:发送给死信交换机的routingkey
  -->
    <!--
        1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
    -->
    <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
        <!--3. 正常队列绑定死信交换机-->
        <rabbit:queue-arguments>
            <!--3.1 x-dead-letter-exchange:死信交换机名称-->
            <entry key="x-dead-letter-exchange" value="exchange_dlx" />
            <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
            <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
            <!--4.1 设置队列的过期时间 ttl-->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
            <!--4.2 设置队列的长度限制 max-length -->
            <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--
       2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
   -->
    <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
</beans>

测试方法用于检测时间的过期,和检测死信队列

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class TestProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * TTL:过期时间
     * 1. 队列统一过期(整个队列设置了时间的过期)
     * <p>
     * 2. 消息单独过期(在发送消息的时候,其中有一条消息有过期时间,而其他的消息都是正常的消息没有设置过期的消息时间限制,只有设置时间消息在队列顶端,才会判断其是否移除掉)
     * <p>
     * <p>
     * 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
     * 队列过期后,会将队列所有消息全部移除。
     * 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
     */
    @Test
    public void testTtl() {
      /*  for (int i = 0; i < 10; i++) {
            // 发送消息
            rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
        }*/
        // 消息后处理对象,设置一些消息的参数信息
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //1.设置message的信息
                message.getMessageProperties().setExpiration("5000");//消息的过期时间
                //2.返回该消息
                return message;
            }
        };
        //消息单独过期
        //rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
        for (int i = 0; i < 10; i++) {
            if (i == 5) {
                //消息单独过期
                rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....", messagePostProcessor);
            } else {
                //不过期的消息
                rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
            }
        }
    }
    /**
     * 测试死信队列
     * 成为死信的三种情况
     * 1.进入队列的消息超过了队列本身的长度限制,这条消息会成为死信
     * 2.消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(新建一个消费者,去监听正常的队列)
     * 3.原队列存在消息过期设置,消息到达超时时间未被消费
     */
    @Test
    public void testDlx() {
       /* //3.原队列存在消息过期设置,消息到达超时时间未被消费
       rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.dlx","我是一条消息,我会成为死信马?");*/
       /* //1.进入队列的消息超过了队列本身的长度限制,这条消息会成为死信
        for (int i = 0; i < 20; i++) {        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.dlx","我是一条消息,我会成为死信马?");
        }*/
       rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.dlx","我是一条消息,我会成为死信马?");
    }

}

成为死信的第二种情况
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(新建一个消费者,去监听正常的队列)

@Component
public class SpringDlxListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
//basicAck(long deliveryTag, boolean multiple)
        try {
            System.out.println(new String(message.getBody()));
            System.out.println("处理业务逻辑");
            //人为发送异常,消费者会拒绝接受消息
            int i = 1 / 0;
            //一次性设置1数据的传输
            if (deliveryTag % 1 == 0) {
                channel.basicAck(deliveryTag, true);
            }
        } catch (Exception e) {
        //消费者拒绝接受消息,并且不让消息重新进入队列,此时消息就会成为死信消息
            channel.basicNack(deliveryTag, true, false);
        }
    }
}

rabbitmq.properties

rabbitmq.host=192.168.88.133
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/

spring-rabbitmq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
    />
    <context:component-scan base-package="com.itheima.rabbitmq.listener"/>
    <!--设置手动的签收,acknowledge="manual"-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" auto-declare="true">
        <rabbit:listener ref="springDlxListener" queue-names="test_queue_dlx"/>
    </rabbit:listener-container>
</beans>

测试方法用于消费者一直监听消息

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class TestConsumer {
    @Test
    public void test1(){
        boolean flag = true;
        while (true){
        }
    }
}

本文地址:https://blog.csdn.net/QiYang1024/article/details/107690987

相关标签: rabbitmq