三.防止消息丢失
程序员文章站
2024-02-04 20:10:04
...
提供者和消费者的pom.xml和上一章一样
RabbitMQ中,消息丢失可以简单的分为两种:客户端丢失和服务端丢失。针对这两种消息丢失,RabbitMQ都给出了相应的解决方案。
默认情况下,RabbitMQ会平均的分发消费给多个消费者,假设一个消费者任务的执行时间非常长,在执行过程中,客户端挂了(连接断开),那么,该客户端正在处理且未完成的消息,以及分配给它还没来得及执行的消息,都将丢失。因为默认情况下,RabbitMQ分发完消息后,就会从内存中把消息删除掉。
解决方法:消息确认:
RabbitMQ引入了消息确认机制,当消息处理完成后,给Server端发送一个确认消息,来告诉服务端可以删除该消息了,如果连接断开的时候,Server端没有收到消费者发出的确认信息,则会把消息转发给其他保持在线的消费者。
上代码:
一.Producer提供者:
1.发送消息的类
2.测试启动发送小的Main
和上一章的提供者一样
二.Consumer消费者
1.消费者的类:MessageRecive.java
分两个部分,重现消息丢失的情况和解决方案
package com.rabbitmq.consumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class MessageRecive {
private Logger logger = LoggerFactory.getLogger(MessageRecive.class);
//测试客户端丢失消息的情况,设置10秒处理事件,处理的时候强制关闭
public boolean lostMessageConsumer(String queueName){
//连接rabbitmq
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = null;
Channel channel =null;
//声明消费的queue
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(queueName,false,false,false,null);
//这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String
Consumer consumer = new DefaultConsumer(channel){
//重写父类方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {
String message = new String(body, "UTF-8");
logger.info("接收到:" + message);
//休眠10秒,模拟10秒处理事件
try {
logger.info("开始处理消息(休眠)......");
Thread.sleep(10000);
System.out.println("处理完毕!");
} catch (Exception e) {
// TODO: handle exception
}
}
};
//上面是声明消费者,这里用 声明的消费者 消费 列队的消息
System.out.println("开始等待提供者的消息....");
//自动应答,消费者自动应答给提供者(手动应答为false,需要开发者手动应答)
boolean autoAck = true;
channel.basicConsume(queueName, autoAck,consumer);
//这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
} catch (Exception e) {
logger.error("出现异常:"+e);
return false;
}
return true;
}
//解决客户端丢失消息的情况,设置10秒处理事件,处理的时候强制关闭(原理:消息确认机制)
/**
* RabbitMQ引入了消息确认机制,当消息处理完成后,给Server端发送一个确认消息,
* 来告诉服务端可以删除该消息了,如果连接断开的时候,Server端没有收到消费者发
* 出的确认信息,则会把消息转发给其他保持在线的消费者。
* @param queueName
* @return
*/
public boolean resolveLostMessageConsumer(String queueName){
//连接rabbitmq
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection connection = factory.newConnection();
//解决内部类只能访问final修饰的局部变量
final Channel channel = connection.createChannel();
//声明消费的queue
channel.queueDeclare(queueName,false,false,false,null);
//在消息确认之前,不在处理其他消息
//prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
channel.basicQos(1);
//这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String(局部内部类)
Consumer consumer = new DefaultConsumer(channel){
//重写父类方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {
String message = new String(body, "UTF-8");
logger.info("接收到:" + message);
//休眠10秒,模拟10秒处理事件
try {
logger.info("开始处理消息(休眠)......");
Thread.sleep(10000);
System.out.println("处理完毕!");
} catch (Exception e) {
// TODO: handle exception
}finally {
//手动应答,告诉服务器可以删除消息,否则不删除或给其他消费者
/**
* @param deliveryTag the tag from the received 这个是RabbitMQ用来区分消息的,文档在这(https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.deliver.delivery-tag)
* @param multiple true to acknowledge all messages up to and 为true的话,确认所有消息,为false只确认当前消息
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//上面是声明消费者,这里用 声明的消费者 消费 列队的消息
System.out.println("开始等待提供者的消息....");
//关闭自动应答(接收到消息提供者就删除消息),改为手动应答,很重要
boolean autoAsk = false;
channel.basicConsume(queueName, autoAsk,consumer);
//这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
} catch (Exception e) {
logger.error("出现异常:"+e);
return false;
}
return true;
}
}
2.测试消息丢失的情况的Main:LostMessageMain.java
package com.rabbitmq.main;
import com.rabbitmq.consumer.MessageRecive;
public class LostMessageMain {
//从哪个列队取消息
private final static String QUEUE_NAME = "hello";
//测试客户端丢失消息的情况,当程序正在执行时,出现异常(强制中断程序),正在处理的和等待处理的消息都会丢失
public static void main(String[] args) {
MessageRecive messageRecive = new MessageRecive();
messageRecive.lostMessageConsumer(QUEUE_NAME);
}
}
当程序执行到一般时强制关闭程序,然后在到RabbitMq的消息列队中查看消息(http://localhost:15672/),发现列队要发给此消费者的消息s已经被删除。
3.测试解决消息丢失(消息确认)的Main:ResolveLostMessageMain.java
package com.rabbitmq.main;
import com.rabbitmq.consumer.MessageRecive;
public class ResolveLostMessageMain {
//指定需要消费哪个列队的消息
private static final String QUEUE_NAME = "hello";
//解决客户端消息丢失的方法(消息确认机制),生产两条消息,执行时强制关闭程序,发现消息还在
//如果有其他消费者,会发送给其他消费者
/**
* RabbitMQ引入了消息确认机制,当消息处理完成后,给Server端发送一个确认消息,
* 来告诉服务端可以删除该消息了,如果连接断开的时候,Server端没有收到消费者发
* 出的确认信息,则会把消息转发给其他保持在线的消费者。
* 自动确认:接收到消息后就向提供者发送确认消息,提供者就删除消息
* 手动确认:开发者认为消息处理完毕手动向提供者发送确认消息,提供者才从列队删除消息
*
*/
public static void main(String[] args) {
MessageRecive messageRecive = new MessageRecive();
messageRecive.resolveLostMessageConsumer(QUEUE_NAME);
}
}
当消息执行到一半,强制关闭程序,再去查看列队消息,发现此消费者的消息s都还在,如果有多个消费者 ,其中一个消费者挂掉了,mq不会删除列队的消息,会发送给其他消费者消费。