欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMq怎么防止消息丢失

程序员文章站 2024-02-04 20:05:58
...

怎么防止交换机、队列和消息都设置了持久化但消息仍然丢失的问题

情景一:

消费者订阅消费队列时,autoAck(自动确认)参数设置了true,当消费者接收到了消息自动确认了,但是消息还没来得及处理,就宕机了。

解决方法:

autoAck参数设置成false,然后消费者处理完之后,手动确认

 

情景二:

在持久化消息的时候,数据在操作系统缓存中,还没写入磁盘中时宕机。

解决方法:

这里需要引入RabbitMq的镜像队列机制,相当于配置了副本,如果主节点(master)发生了宕机,可以自动切换到从节点(slave),这样可以有效的保证了高可用性,除非整个集群挂掉。虽然这种方式不能完全保证消息不丢失,但是比没有引入此机制可靠性会高出很多。

 

情景三:

生产者把消息发出去后,没有到达RabbitMq服务器

解决方法:

  • RabbitMq事务机制

RabbitMq客户端中与事务机制有关的方法有三个:channel.txSelect(将当前信道设置成事务信道)、channel.txCommit(提交事务)、channel.txRollback(回滚事务;当程序抛出异常时,catch到异常,调用方法)

提交事务:

        channel.txSelect();
        channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,"TEST".getBytes("UTF-8"));
        channel.txCommit();

AMQP协议流转过程:

1、客户端发送Tx.Select,将信道设置成事务模式

2、Broker回复Tx.Select-OK,确认将信道设置成事务模式

3、客户端发送消息

4、客户端发送完消息之后,发送Tx.Commit提交事务

5、Broker回复Tx.Commit-OK,确认事务已提交

事务回滚:

        try{
            channel.txSelect();
            channel.basicPublish("",QUEUE_NAME_TWO,false,properties,"TEST".getBytes("UTF-                8"));    
            throw new RuntimeException();
        }catch (Throwable throwable){
            channel.txRollback();
        }

AMQP协议流转过程:

1、客户端发送Tx.Select,将信道设置成事务模式

2、Broker回复Tx.Select-OK,确认将信道设置成事务模式

3、客户端发送消息

4、客户端发送Tx.Rollback,通知服务器回滚事务

5、Broker回复Tx.Rollback-OK,确认事务已回滚

 

如果发送多条消息,将channel.basicPublish、channel.txCommit、channel.txRollback方法放在同一个循环体内

        channel.txSelect();
        for (int i = 0;i < 100000; i++){
            try {
                channel.basicPublish("",QUEUE_NAME_TWO,false,properties,"TEST".getBytes("UTF-8"));
                channel.txCommit();
            }catch (Throwable throwable){
                channel.txRollback();
            }
        }

 

缺点:消耗服务器性能、严重降低RabbitMq的消息吞吐量

  • 通过发送方确认(publisher confirm)机制实现

生产者先将一个信道设置成confirm信道,这条信道上的消息都会被指定一个唯一ID(从1开始),当消息到达指定队列后,RabbitMq会返回一个确认(Basic.Ack)给生产者,如果消息或队列指定了持久化,则消息在写入磁盘后才会返回确认