RabbitMq怎么防止消息丢失
怎么防止交换机、队列和消息都设置了持久化但消息仍然丢失的问题
情景一:
消费者订阅消费队列时,autoAck(自动确认)参数设置了true,当消费者接收到了消息自动确认了,但是消息还没来得及处理,就宕机了。
解决方法:
autoAck参数设置成false,然后消费者处理完之后,手动确认
情景二:
在持久化消息的时候,数据在操作系统缓存中,还没写入磁盘中时宕机。
解决方法:
这里需要引入RabbitMq的镜像队列机制,相当于配置了副本,如果主节点(master)发生了宕机,可以自动切换到从节点(slave),这样可以有效的保证了高可用性,除非整个集群挂掉。虽然这种方式不能完全保证消息不丢失,但是比没有引入此机制可靠性会高出很多。
情景三:
生产者把消息发出去后,没有到达RabbitMq服务器
解决方法:
- RabbitMq事务机制
RabbitMq客户端中与事务机制有关的方法有三个:channel.txSelect(将当前信道设置成事务信道)、channel.txCommit(提交事务)、channel.txRollback(回滚事务;当程序抛出异常时,catch到异常,调用方法)
提交事务:
channel.txSelect();
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,"TEST".getBytes("UTF-8"));
channel.txCommit();
AMQP协议流转过程:
1、客户端发送Tx.Select,将信道设置成事务模式
2、Broker回复Tx.Select-OK,确认将信道设置成事务模式
3、客户端发送消息
4、客户端发送完消息之后,发送Tx.Commit提交事务
5、Broker回复Tx.Commit-OK,确认事务已提交
事务回滚:
try{
channel.txSelect();
channel.basicPublish("",QUEUE_NAME_TWO,false,properties,"TEST".getBytes("UTF- 8"));
throw new RuntimeException();
}catch (Throwable throwable){
channel.txRollback();
}
AMQP协议流转过程:
1、客户端发送Tx.Select,将信道设置成事务模式
2、Broker回复Tx.Select-OK,确认将信道设置成事务模式
3、客户端发送消息
4、客户端发送Tx.Rollback,通知服务器回滚事务
5、Broker回复Tx.Rollback-OK,确认事务已回滚
如果发送多条消息,将channel.basicPublish、channel.txCommit、channel.txRollback方法放在同一个循环体内
channel.txSelect(); for (int i = 0;i < 100000; i++){ try { channel.basicPublish("",QUEUE_NAME_TWO,false,properties,"TEST".getBytes("UTF-8")); channel.txCommit(); }catch (Throwable throwable){ channel.txRollback(); } }
缺点:消耗服务器性能、严重降低RabbitMq的消息吞吐量
- 通过发送方确认(publisher confirm)机制实现
生产者先将一个信道设置成confirm信道,这条信道上的消息都会被指定一个唯一ID(从1开始),当消息到达指定队列后,RabbitMq会返回一个确认(Basic.Ack)给生产者,如果消息或队列指定了持久化,则消息在写入磁盘后才会返回确认
上一篇: 防止精度丢失
下一篇: 自定义JTabbedPane皮肤