RabbitMQ高级特性——消息的可靠投递(confirm模式与return模式)
程序员文章站
2022-05-31 18:05:41
...
大伙可以到我的RabbitMQ专栏获取更多信息
demo示例这里拿
概述
顾名思义,就是MQ在整个msg的生产到消费者接收到msg过程中的可靠性。RabbitMQ提供了一下两种方式来保证消息投递的可靠性:
- confirm确认模式
- return 退回模式
回顾一下rabbitmq整个消息的投递过程
producer--->rabbitmq broker--->exchanger--->queue--->cunsumer
confirm确认模式中
producer将消息发送给exchanger时候,将设置confirmCallback,之后消息到达或者不到达exchanger,这个confirmCallback都会被执行。成功时回调的参数会为true,反之为false。
return回退模式中
exchanger将消息分发给queue的过程中,将设置returnCallBack,之后消息到是否成功根据routing key分发到对应的queue,这个returnCallBack都会执行。成功时回调的参数会为true,反之为false。
confirm确认模式
springboot整合RabbitMQ可查阅我的另一篇文章:SpringBoot整合RabbitMQ,干净利索!(附项目地址)
1.开启confirm确认模式
server:
port: 2001
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: xxxxxxxx
password: xxxxxxxx
virtual-host: /LeoLee
publisher-confirms: true #开启confirm确认模式
2.在RabbitTemplate定义ConfirmCallBack回调函数
ConfirmCallBack回调有三个参数:
- correlationData 发送消息的时候的相关配置,发送消息时候没有设置则为空
- ack exchange是否成功收到消息,true成功
- cause 失败原因
package com.leolee.rabbitmq;
import com.leolee.rabbitmq.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.lang.Nullable;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @ClassName com.leolee.rabbitmq.ProducerTest
* @Description: TODO
* @Author LeoLee
* @Date 2020/11/7
* @Version V1.0
**/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/*
* 功能描述: <br>
* 〈确认模式〉
* 1.开启确认模式publisher-confirms: true
* 2.在RabbitTemplate定义ConfirmCallBack回调函数
* @Param: []
* @Return: void
* @Author: LeoLee
* @Date: 2020/11/7 15:58
*/
@Test
public void testConfirmModel() {
//定义callback
/*
* correlationData 发送消息的时候的相关配置
* ack exchange是否成功收到消息,true成功
* cause 失败原因
*/
rabbitTemplate.setConfirmCallback((@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) -> {
System.out.println("confirmCallBack executed");
if (ack) {
System.out.println("exchange 接收消息成功");
} else {
System.out.println("exchange 接收消息失败");
System.out.println("失败原因:" + cause);
}
});
//发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.test", "test msg send");
}
}
发送消息后执行结果:
return回退模式
return模式与confirm模式不同,return模式的判断点在于exchange根据routingKey分发消息到queue的过程中:
- 如果消息没有路由到Queue,丢弃消息,没有回调(默认设置)
- 如果消息没有路由到Queue,返回给消息的发送方ReturnCallBack
package com.leolee.rabbitmq;
import com.leolee.rabbitmq.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.lang.Nullable;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @ClassName com.leolee.rabbitmq.ProducerTest
* @Description: TODO
* @Author LeoLee
* @Date 2020/11/7
* @Version V1.0
**/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.test", "test msg send");
}
/*
* 功能描述: <br>
* 〈确认模式〉
* 1.开启确认模式publisher-confirms: true
* 2.在RabbitTemplate定义ConfirmCallBack回调函数
* @Param: []
* @Return: void
* @Author: LeoLee
* @Date: 2020/11/7 15:58
*/
@Test
public void testConfirmModel() {
//定义callback
/*
* correlationData 发送消息的时候的相关配置
* ack exchange是否成功收到消息,true成功
* cause 失败原因
*/
rabbitTemplate.setConfirmCallback((@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) -> {
System.out.println("confirmCallBack executed");
if (ack) {
System.out.println("exchange 接收消息成功");
} else {
System.out.println("exchange 接收消息失败");
System.out.println("失败原因:" + cause);
}
});
//发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.test", "test msg send");
}
/*
* 功能描述: <br>
* 〈回退模式〉
* 1.开启回退模式 publisher-returns: true #开启return回退模式
* 2.设置returnCallBack
* 3.设置exchange处理消息的模式:
* 3.1.如果消息没有路由到Queue,丢弃消息,没有回调(默认设置)
* 3.2.如果消息没有路由到Queue,返回给消息的发送方ReturnCallBack
* @Param: []
* @Return: void
* @Author: LeoLee
* @Date: 2020/11/7 16:31
*/
@Test
public void testReturnModel() {
/**
* @Param: [
* message 消息对象
* replyCode 返回code,失败码
* replyText 错误信息
* exchange 失败所处的交换机
* routingKey 消息的routingKey
* ]
*/
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
System.out.println("returnCallBack executed");
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
});
//设置交换机处理失败消息的模式为true,当消息没有路由到Queue,返回给消息的发送方ReturnCallBack
rabbitTemplate.setMandatory(true);
//发送消息(故意写错routingKey制造exchange消息分发错误)
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "11111.boot.test", "test msg send");
}
}
执行结果:
上一篇: CIO40:IT&EQ的碎碎念
下一篇: 封装弹窗并模拟confirm方法