欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ高级特性——消息的可靠投递(confirm模式与return模式)

程序员文章站 2022-05-31 18:05:41
...

大伙可以到我的RabbitMQ专栏获取更多信息

demo示例这里拿

概述

顾名思义,就是MQ在整个msg的生产到消费者接收到msg过程中的可靠性。RabbitMQ提供了一下两种方式来保证消息投递的可靠性:

  • confirm确认模式
  • return 退回模式

回顾一下rabbitmq整个消息的投递过程

producer--->rabbitmq broker--->exchanger--->queue--->cunsumer


confirm确认模式中

producer将消息发送给exchanger时候,将设置confirmCallback,之后消息到达或者不到达exchanger,这个confirmCallback都会被执行。成功时回调的参数会为true,反之为false。

return回退模式中

exchanger将消息分发给queue的过程中,将设置returnCallBack,之后消息到是否成功根据routing key分发到对应的queue,这个returnCallBack都会执行。成功时回调的参数会为true,反之为false。

confirm确认模式

springboot整合RabbitMQ可查阅我的另一篇文章:SpringBoot整合RabbitMQ,干净利索!(附项目地址)

1.开启confirm确认模式

server:
  port: 2001

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: xxxxxxxx
    password: xxxxxxxx
    virtual-host: /LeoLee
    publisher-confirms: true #开启confirm确认模式

2.在RabbitTemplate定义ConfirmCallBack回调函数

ConfirmCallBack回调有三个参数:

  • correlationData 发送消息的时候的相关配置,发送消息时候没有设置则为空
  • ack exchange是否成功收到消息,true成功 
  • cause 失败原因
package com.leolee.rabbitmq;

import com.leolee.rabbitmq.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.lang.Nullable;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @ClassName com.leolee.rabbitmq.ProducerTest
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/7
 * @Version V1.0
 **/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /*
     * 功能描述: <br>
     * 〈确认模式〉
     *  1.开启确认模式publisher-confirms: true
     *  2.在RabbitTemplate定义ConfirmCallBack回调函数
     * @Param: []
     * @Return: void
     * @Author: LeoLee
     * @Date: 2020/11/7 15:58
     */
    @Test
    public void testConfirmModel() {

        //定义callback
        /*
         * correlationData 发送消息的时候的相关配置
         * ack exchange是否成功收到消息,true成功
         * cause 失败原因
         */
        rabbitTemplate.setConfirmCallback((@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) -> {
            System.out.println("confirmCallBack executed");

            if (ack) {
                System.out.println("exchange 接收消息成功");
            } else {
                System.out.println("exchange 接收消息失败");
                System.out.println("失败原因:" + cause);
            }
        });

        //发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.test", "test msg send");
    }

}

发送消息后执行结果:

RabbitMQ高级特性——消息的可靠投递(confirm模式与return模式)

return回退模式

return模式与confirm模式不同,return模式的判断点在于exchange根据routingKey分发消息到queue的过程中:

  • 如果消息没有路由到Queue,丢弃消息,没有回调(默认设置)
  • 如果消息没有路由到Queue,返回给消息的发送方ReturnCallBack
package com.leolee.rabbitmq;

import com.leolee.rabbitmq.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.lang.Nullable;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @ClassName com.leolee.rabbitmq.ProducerTest
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/7
 * @Version V1.0
 **/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSend() {

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.test", "test msg send");
    }

    /*
     * 功能描述: <br>
     * 〈确认模式〉
     *  1.开启确认模式publisher-confirms: true
     *  2.在RabbitTemplate定义ConfirmCallBack回调函数
     * @Param: []
     * @Return: void
     * @Author: LeoLee
     * @Date: 2020/11/7 15:58
     */
    @Test
    public void testConfirmModel() {

        //定义callback
        /*
         * correlationData 发送消息的时候的相关配置
         * ack exchange是否成功收到消息,true成功
         * cause 失败原因
         */
        rabbitTemplate.setConfirmCallback((@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) -> {
            System.out.println("confirmCallBack executed");

            if (ack) {
                System.out.println("exchange 接收消息成功");
            } else {
                System.out.println("exchange 接收消息失败");
                System.out.println("失败原因:" + cause);
            }
        });

        //发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.test", "test msg send");
    }


    /*
     * 功能描述: <br>
     * 〈回退模式〉
     * 1.开启回退模式 publisher-returns: true #开启return回退模式
     * 2.设置returnCallBack
     * 3.设置exchange处理消息的模式:
     *     3.1.如果消息没有路由到Queue,丢弃消息,没有回调(默认设置)
     *     3.2.如果消息没有路由到Queue,返回给消息的发送方ReturnCallBack
     * @Param: []
     * @Return: void
     * @Author: LeoLee
     * @Date: 2020/11/7 16:31
     */
    @Test
    public void testReturnModel() {

        /**
         * @Param: [
         * message 消息对象
         * replyCode 返回code,失败码
         * replyText 错误信息
         * exchange 失败所处的交换机
         * routingKey 消息的routingKey
         * ]
         */
        rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
            System.out.println("returnCallBack executed");
            System.out.println("replyCode:" + replyCode);
            System.out.println("replyText:" + replyText);
            System.out.println("exchange:" + exchange);
            System.out.println("routingKey:" + routingKey);
        });

        //设置交换机处理失败消息的模式为true,当消息没有路由到Queue,返回给消息的发送方ReturnCallBack
        rabbitTemplate.setMandatory(true);

        //发送消息(故意写错routingKey制造exchange消息分发错误)
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "11111.boot.test", "test msg send");
    }

}

执行结果:

RabbitMQ高级特性——消息的可靠投递(confirm模式与return模式)