欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

三.防止消息丢失

程序员文章站 2024-02-04 20:10:04
...

提供者和消费者的pom.xml和上一章一样

RabbitMQ中,消息丢失可以简单的分为两种:客户端丢失和服务端丢失。针对这两种消息丢失,RabbitMQ都给出了相应的解决方案。

默认情况下,RabbitMQ会平均的分发消费给多个消费者,假设一个消费者任务的执行时间非常长,在执行过程中,客户端挂了(连接断开),那么,该客户端正在处理且未完成的消息,以及分配给它还没来得及执行的消息,都将丢失。因为默认情况下,RabbitMQ分发完消息后,就会从内存中把消息删除掉。

解决方法:消息确认:
RabbitMQ引入了消息确认机制,当消息处理完成后,给Server端发送一个确认消息,来告诉服务端可以删除该消息了,如果连接断开的时候,Server端没有收到消费者发出的确认信息,则会把消息转发给其他保持在线的消费者。

上代码:

一.Producer提供者:
1.发送消息的类
2.测试启动发送小的Main
和上一章的提供者一样

二.Consumer消费者
1.消费者的类:MessageRecive.java
分两个部分,重现消息丢失的情况和解决方案

package com.rabbitmq.consumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;


public class MessageRecive {

    private Logger logger = LoggerFactory.getLogger(MessageRecive.class);

    //测试客户端丢失消息的情况,设置10秒处理事件,处理的时候强制关闭
    public boolean lostMessageConsumer(String queueName){
        //连接rabbitmq
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        Channel channel =null;

        //声明消费的queue
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            channel.queueDeclare(queueName,false,false,false,null);
            //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String
            Consumer consumer = new DefaultConsumer(channel){
                //重写父类方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {
                    String message = new String(body, "UTF-8");
                    logger.info("接收到:" + message);
                    //休眠10秒,模拟10秒处理事件
                    try {
                        logger.info("开始处理消息(休眠)......");
                        Thread.sleep(10000);
                        System.out.println("处理完毕!");
                    } catch (Exception e) {
                        // TODO: handle exception
                    }
                }
            };
            //上面是声明消费者,这里用 声明的消费者  消费  列队的消息
            System.out.println("开始等待提供者的消息....");
            //自动应答,消费者自动应答给提供者(手动应答为false,需要开发者手动应答)
            boolean autoAck = true;
            channel.basicConsume(queueName, autoAck,consumer);
            //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
        } catch (Exception e) {
            logger.error("出现异常:"+e);
            return false;
        }
        return true;
    }

    //解决客户端丢失消息的情况,设置10秒处理事件,处理的时候强制关闭(原理:消息确认机制)
    /**
     * RabbitMQ引入了消息确认机制,当消息处理完成后,给Server端发送一个确认消息,
     * 来告诉服务端可以删除该消息了,如果连接断开的时候,Server端没有收到消费者发
     * 出的确认信息,则会把消息转发给其他保持在线的消费者。
     * @param queueName
     * @return
     */
    public boolean resolveLostMessageConsumer(String queueName){
        //连接rabbitmq
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            Connection connection = factory.newConnection();
            //解决内部类只能访问final修饰的局部变量
            final Channel channel = connection.createChannel();
            //声明消费的queue
            channel.queueDeclare(queueName,false,false,false,null);
            //在消息确认之前,不在处理其他消息
            //prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
            channel.basicQos(1);
            //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String(局部内部类)
            Consumer consumer = new DefaultConsumer(channel){
                //重写父类方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {
                    String message = new String(body, "UTF-8");
                    logger.info("接收到:" + message);
                    //休眠10秒,模拟10秒处理事件
                    try {
                        logger.info("开始处理消息(休眠)......");
                        Thread.sleep(10000);
                        System.out.println("处理完毕!");
                    } catch (Exception e) {
                        // TODO: handle exception
                    }finally {
                        //手动应答,告诉服务器可以删除消息,否则不删除或给其他消费者
                        /**
                         * @param deliveryTag the tag from the received 这个是RabbitMQ用来区分消息的,文档在这(https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.deliver.delivery-tag)
                         * @param multiple true to acknowledge all messages up to and 为true的话,确认所有消息,为false只确认当前消息
                         */
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //上面是声明消费者,这里用 声明的消费者  消费  列队的消息
            System.out.println("开始等待提供者的消息....");
            //关闭自动应答(接收到消息提供者就删除消息),改为手动应答,很重要
            boolean autoAsk = false;
            channel.basicConsume(queueName, autoAsk,consumer);
            //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
        } catch (Exception e) {
            logger.error("出现异常:"+e);
            return false;
        }
        return true;
    }

}

2.测试消息丢失的情况的Main:LostMessageMain.java

package com.rabbitmq.main;

import com.rabbitmq.consumer.MessageRecive;

public class LostMessageMain {

    //从哪个列队取消息
    private final static String QUEUE_NAME = "hello";

    //测试客户端丢失消息的情况,当程序正在执行时,出现异常(强制中断程序),正在处理的和等待处理的消息都会丢失
    public static void main(String[] args) {
        MessageRecive messageRecive = new MessageRecive();
        messageRecive.lostMessageConsumer(QUEUE_NAME);
    }

}

当程序执行到一般时强制关闭程序,然后在到RabbitMq的消息列队中查看消息(http://localhost:15672/),发现列队要发给此消费者的消息s已经被删除。

3.测试解决消息丢失(消息确认)的Main:ResolveLostMessageMain.java

package com.rabbitmq.main;

import com.rabbitmq.consumer.MessageRecive;

public class ResolveLostMessageMain {

    //指定需要消费哪个列队的消息
    private static final String QUEUE_NAME = "hello";

    //解决客户端消息丢失的方法(消息确认机制),生产两条消息,执行时强制关闭程序,发现消息还在
    //如果有其他消费者,会发送给其他消费者
    /**
     * RabbitMQ引入了消息确认机制,当消息处理完成后,给Server端发送一个确认消息,
     * 来告诉服务端可以删除该消息了,如果连接断开的时候,Server端没有收到消费者发
     * 出的确认信息,则会把消息转发给其他保持在线的消费者。
     * 自动确认:接收到消息后就向提供者发送确认消息,提供者就删除消息
     * 手动确认:开发者认为消息处理完毕手动向提供者发送确认消息,提供者才从列队删除消息
     * 
     */
    public static void main(String[] args) {
        MessageRecive messageRecive = new MessageRecive();
        messageRecive.resolveLostMessageConsumer(QUEUE_NAME);
    }

}

当消息执行到一半,强制关闭程序,再去查看列队消息,发现此消费者的消息s都还在,如果有多个消费者 ,其中一个消费者挂掉了,mq不会删除列队的消息,会发送给其他消费者消费。

相关标签: RabbitMQ