欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

四种途径提升RabbitMQ传输数据的可靠性

程序员文章站 2022-05-18 15:33:03
前言 RabbitMQ虽然有对队列及消息等的一些持久化设置,但其实光光只是这一个是不能够保障数据的可靠性的,下面我们提出这样的质疑: (1)RabbitMQ生产者是不知道自己发布的消息是否已经正确达到服务器呢,如果中间发生网络异常等情况呢?消息必然会丢失! (2)RabbitMQ如果没有设置队列持久 ......

前言

rabbitmq虽然有对队列及消息等的一些持久化设置,但其实光光只是这一个是不能够保障数据的可靠性的,下面我们提出这样的质疑:

(1)rabbitmq生产者是不知道自己发布的消息是否已经正确达到服务器呢,如果中间发生网络异常等情况呢?消息必然会丢失!

(2)rabbitmq如果没有设置队列持久化,rabbitmq服务器重后队列的元数据会丢失,消息自然也会丢失!

(3)rabbitmq如果消费者设置自动确认,即autoack为true,那么不管消费者发生什么情况,该消息会自动从队列中移除,实际上消费者有可能挂掉,消息必然会丢失!

(4)rabbitmq中的消息如果没有匹配到队列时,那么消息也会丢失!

本文其实也就是结合以上四个方面进行讲解的,主要参考《rabbitmq实战指南》(有需要pdf电子书的可以评论或者私信我),本文截图也来自其中,另外可以对一些rabbitmq的概念的认识可以参考我的上两篇博文认识rabbitmq交换机模型rabbitmq是如何运转的?

 


 

 

一、设置mandotory参数、ae备份交换器

针对前言中的第(4)个问题,我们可以通过设置mandotory参数与ae备份交换器来解决

1、mandotory参数

  1)当为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,此时rabbitmq会调用basic.return命令将消息返回给生产者,消息将不会丢失

  2)当为false时,消息将会被直接丢弃。

  3)rabbitmq通过addreturnlistener添加returnlisener监听器监听获取没有被正确路由到合适队列的消息

channel.basicpublish(exchange name, "", true, 
messageproperties.persistent_text_plain, 
"mandatory test".getbytes()); 
channel.addreturnlistener(new returnlistener(){ 
  public void handlereturn(int replycode, string replytext, 
                string exchange, string routingkey, 
                amqp.basicproperties basicproperties, 
                byte[] body) throws ioexception { 
    string message = new string(body); 
    system.out.println("basic.return 返回的结果是: " + message);
  }
}); .

 

2、ae备份交换器

  alternate exchange,简称ae,不设置mandatory参数,那么消息将会被丢失,设置mandatory参数的话,需要添加returnlistner监听器,增加复杂代码,如果既不想增加代码又不想消息丢失,则使用ae,将没有被路由的消息存储于rabbitmq中。当mandatory参数用ae一起使用时,mandatory将失效。在介绍ae之前,也认识rabbitmq对于消息的过期时间ttl设置以及队列的过期时间ttl设置

2.1 ttl过期时间设置

  可以对队列设置ttl与消息设置ttl,其中消息设置ttl经常用于死信队列、延迟队列等高级应用中。

  1)设置消息ttl

  设置ttl过期时间一般有两种当时:一是通过队列属性,对队列中所有消息设置相同的ttl。二就是对消息本身单独设置,每条消息ttl不同。如果一起使用时候,ttl小的为准,当一旦超过设置的ttl时间时,就会变成“死信”。

  方式一:针对每条消息设置ttl是通过增加expiration的属性参数实现的,不可能像方式二一样扫描整个队列再判断是否过期,只有当该消息即将被消费时再判定是否过期即可删除,也就是消息即使已经过期,但不一定立马被删除!

amqp.basicproperties.builder builder = new amqp.basicproperties.builder(); 
// 持久化消息
builder deliverymode(2);
// 设置 ttl=60000ms
builder expiration( 60000 ); 
amqp.basicproperties properties = builder. build(); 
channel.basicpublish(exchangename, routingkey, mandatory, properties, "ttltestmessage".getbytes());

  方式二:通过队列属性设置消息ttl是增加x-message-ttl参数实现的,只需要扫描整个队列头部即可立即删除,也就是消息一旦过期就会被删除!

map<string, object> argss = new hashmap<string , object>(); 
argss.put("x-message-ttl", 6000); 
channel.queuedeclare(queuename, durable, exclusive, autodelete, argss) ;

 

  2)设置队列ttl

  通过在队列中添加参数x-message-ttl参数实现设置队列被自动删除前处于未被使用状态的时间,注意是队列的使用状态,并不是消息是否被消费的状态

  设置ttl=30min的队列,时间一到rabbitmq会保证队列被删除,但是不会保证删除的速度有多快。

map<string, object> args = new hashmap<string, object>{); 
args.put("x-expires", 1800000);
channel.queuedeclare("myqueue", false, false, false, args);

 

 

2.2 ae备份交换器的使用

  声明交换器的时候,添加alternate-exchange参数实现,或通过策略实现。前者优先级高。从代码角度需要以下三个步骤,具体代码如下:

map<string, object> args = new hashmap<string, object>(); 
args.put("a1ternate-exchange", "myae"); 
channe1.exchangedec1are("norma1exchange", "direct", true, fa1se, args); 
channe1.exchangedec1are("myae", "fanout", true, fa1se, nu11) ; 
channe1.queuedec1are( "norma1queue", true, fa1se, fa1se, nu11); 
channe1.queueb nd("norma1queue", "norma1exchange", "norma1key"); 
channe1.queuedec1are("unroutedqueue", true, fa1se, fa1se, nu11);

  1)声明normalexchange类型为direct的交换器、类型为fanout的myae备份交换器;并且normalexchange的备份交换器为myae(备份交换器建议使用fanout类型交换器)

  2)声明normalqueue队列,声明unroutequeue队列;

  3)通过路由键normalkey绑定normalexchange与normalqueue,不适用路由键绑定unroutequeue与myae

 

        四种途径提升RabbitMQ传输数据的可靠性