Channel closed; cannot ack/nack 和 MQ的重复消费/重复发送消息问题(记RabbitMQ使用的几个小坑)
程序员文章站
2022-06-21 22:03:59
首先上改良后的代码(改良前的代码和基本的配置使用在:RabbitMQ的介绍/安装和SpringBoot集成简单使用(MacBook版)):Application.yml:spring: application: name: cloud-purchase-service rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: 123456Controller:@Autowired...
首先上改良后的代码(改良前的代码和基本的配置使用在:RabbitMQ的介绍/安装和SpringBoot集成简单使用(MacBook版)):
Application.yml:
spring: application: name: cloud-purchase-service
rabbitmq: host: 127.0.0.1 port: 5672 username: guest
password: 123456
Controller:
@Autowired private RabbitMqService rabbitMqService; @GetMapping("/rabbitMq") public void rabbitMq(){ String s = rabbitMqService.sendData(); System.out.println("发送出去的:"+s); }
Server:
@Service public class RabbitMqService { @Autowired RabbitTemplate rabbitTemplate; public String sendData() { String channelNo = "jojo123"; String randomNumber = new String(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("jojo_channel",null,randomNumber); return randomNumber; } }
消费端我配了监听器:
@Component public class RabbitMqAccept { HashSet hashSet = new HashSet(); @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "jojo_test_queue"), exchange = @Exchange(value = "jojo_channel",type = "fanout") ) ) @RabbitHandler public void handle(@Payload String msg, Channel channel, @Headers Map<String,Object> headers) throws IOException { // 获取到AMQP信道中的消息的唯一编号TagId,方便返回应答模式告知消息已接到 Long tag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); try { // 应答模式确认接收到消息,后面那个参数如果是false,就是只确认签收当前消息,如果是true,则签收全部比当前TagId小的消息 channel.basicAck(tag,false); System.out.println(msg); hashSet.add(msg); System.out.println(hashSet); System.out.println("请求了"+hashSet.size()); } catch (Exception e) { e.printStackTrace(); System.out.println(e); channel.basicNack(tag,false,true); } } }
(1)小坑一:重复消费与重复发送
首先明确,我这里说的重复发送,指的是MQ往消费端推的发送,如果是生产者的重复发送,这个涉及到的接口幂等性问题可以看:接口幂等性问题
好了实验开始,疯狂点击刷新请求那个Controller,或者注释掉消费端,先开启生产端往MQ中写入30条数据。
开启消费端从MQ中读取:
读出了162条,其中细看的话很多关键信息是一样的,自己对比一下UUID+RandomNumber就可以知道。产生原因:我们为了防止消息丢失,一般都是默认开启应答模式,在应答模式中,如果没有及时返回ACK响应,那么MQ中的该条消息将不会移除,而是一直往消费端塞,但是另外一方面你的业务代码又确实拿到了数据,正在处理,而我们习惯开启手动ack的话把Ack放在业务代码的后面,这就会出现重复消费问题。
所以我就用了Set,因为这里只是测试MQ这个著名的BUG,如果是实际生产,可以传对象和Map,里面新增一个属性UUID拼随机数就行,用HashMap的Key放UUID也能达到排重的目的。我这里就简单Set,再试一次只发送十条。
这样就看到被限制住了。
(2)小坑二:Channel closed; cannot ack/nack 和 Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
这个问题网上答案很少,我找了一段时间所以赶紧记下来这个小坑。
原因:RabbitMQ默认情况下是自动ACK应答模式,也就是只要默认开启应答并且是自动应答,然后从队列中删除消息。从我的yml配置文件和代码中可以看到,我没有关闭RabbitMQ自带的应答模式又开启了自己的手动channel.BasicACK();那么就会出现channel already cloused 问题了。
解决方案:
1.改yml配置文件,代码不用改,关闭自动ACK,开启手动ACK:
# 设置监听为手动应答模式
listener: simple: acknowledge-mode: manual
测试成功不报错:
2.把代码中的channel.basicAck(tag,false);和channel.basicNack(tag,false,true);注释掉,使用默认的自动ACK:
小坑三:OOM?!我关闭了自动ACK,开启了手动ACK,然后开始写代码,有100行业务代码,最后才channel.BasicAck();平时没问题,忽然有一行跳Exception,就返回消息队列中,继续推, 实习生代码套了太多if,try catch finally,或者某种情况忘了手动ack。会发生什么?
我们关掉全部的ACK,然后发消息到MQ,看会怎样:
是堆积,即使消费端已经把东西放进了Set,消费了,还是堆积在队列中,得不到释放。所以这也是为什么MQ默认给我们自动ACK的原因~
自动应答好还是手动应答好。
普通业务逻辑,调用链不长的,执行时间较短的,使用自动ACK。
处理时间较长的,可以等待消费端处理完成之后手动去确认。
关于ACK的详细使用和介绍,在一位总排名800+ 的大脖上有更好的总结,膜拜:
总结:自己还是太菜了,连基本的使用都这么多坑,更别提看懂Rabbit底层源码和摸透那些工作模式,还要继续学习,继续踩坑。
本文地址:https://blog.csdn.net/whiteBearClimb/article/details/108844581