欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

四种途径提高RabbitMQ传输消息数据的可靠性(一)

程序员文章站 2022-12-14 22:39:50
前言 上一篇四种途径提高RabbitMQ传输消息数据的可靠性(一)已经介绍了两种方式提高数据可靠性传输的方法,本篇针对上一篇中提出的问题(1)与问题(2)提出解决常用的方法。 本文其实也就是结合以上四个方面进行讲解的,主要参考《RabbitMQ实战指南》(有需要PDF电子书的可以评论或者私信我),本 ......

 前言

上一篇四种途径提高rabbitmq传输消息数据的可靠性(一)已经介绍了两种方式提高数据可靠性传输的方法,本篇针对上一篇中提出的问题(1)与问题(2)提出解决常用的方法。

本文其实也就是结合以上四个方面进行讲解的,主要参考《rabbitmq实战指南》(有需要pdf电子书的可以评论或者私信我),本文截图也来自其中,另外可以对一些rabbitmq的概念的认识可以参考我的另外两篇博文认识rabbitmq交换机模型rabbitmq是如何运转的?

 


 

三、生产者确认机制

针对问题(1),我们可以通过生产者的确认消息机制来解决,主要分为两种:第一是事务机制、第二是发送方确认机制

1、事务机制

  与事务机制相关的有三种方法,分别是channel.txselect设置当前信道为事务模式、channel.txcommit提交事务和channel.txrollback事务回滚。如果事务提交成功,则消息一定是到达了rabbitmq中,如果事务提交之前由于发送异常或者其他原因,捕获后可以进行channel.txrollback回滚。

// 将信道设置为事务模式,开启事务
channel.txselect(); 
// 发送持久化消息
channel.basicpublish(exchange_name, routing_key, messageproperties.persistent_text_plain, "transaction messages".getbytes()); 
// 事务提交
channel.txcommit();

 

发生异常之后事务回滚

try {
      channel.txselect();
       channel.basicpublish(exchange, routingkey, messageproperties.persistent_text_plain, "transaction messages".getbytes());
       channel.txcommit();
  } catch (exception e){
       e.printstacktrace();
       channel.txrollback();
   }

 

2、确认机制

  生产者将信道设置为confirm确认模式,确认之后所有在信道上的消息将会被指派一个唯一的从1开始的id,一旦消息被正确匹配到所有队列后,rabbitmq就会发送一个确认basic.ack给生产者(包含消息的唯一id),生产者便知晓消息是否正确到达目的地了。

                  四种途径提高RabbitMQ传输消息数据的可靠性(一)

 

  消息如果是持久化的,那么确认消息会在消息写入磁盘之后发出。rabbitmq中的deliverytag包含了确认消息序号,还可以设置multiple参数,表示到这个序号之前的所有消息都已经得到处理。确认机制相对事务机制来说,相比较代码来说比较复杂,但会经常使用,主要有单条确认、批量确认、异步批量确认三种方式。

  2.1 单条确认

  此种方式比较简单,一般都是一条条的发送,代码如下:

try {
    connection connection = connectionfactory.newconnection();
    channel channel = connection.createchannel();
    //set channel publisher confirm mode 
    channel.confirmselect();
    // publish message
    channel.basicpublish("exchange", "routingkey", null, "publisher confirm test".getbytes());
    if (!channel.waitforconfirms()) {
        // publisher confirm failed handle
        system.out.println("send message failed!");
    }
} catch (exception e) {
    e.printstacktrace();
}

 

  2.2 批量确认

  问:批量确认comfirm需要解决出现返回的basic.nack或者超时情况的话,客户需要将这一批次消息全部重发,那么采用什么样的存储结构才能合适地将这些消息动态筛选出来。

  最好是需要增加一个缓存,将发送成功并且确认ack之后的消息去除,剩下nack或者超时的消息,改进之后的代码如下:

// take arraylist or blockingqueue as a cache
list<object> cache = new arraylist<>();
// set channel publisher confirm mode
channel.confirmselect();
for (int i=0; i < 20; i++) {
    // publish message
    string message = "publisher message["+ i +"]";
    cache.add(message);
    channel.basicpublish("exchange", "routingkey", null, message.getbytes());
    if (channel.waitforconfirms()) {
        // remove message publisher confirm
        cache.remove(i);
    }
    // todo handle nack message:republish
}
} catch (exception e) {
  e.printstacktrace();
  // todo handle nack message:republish
}

 

      2.3 异步批量确认

  异步确认方式通过在客户端addconfirmlistener增加confirmlistener回调接口,包括handleack与handlenack处理方法:

  每次发送消息confirmset集合元素加1,当消息被确认ack进入handleack方法时,“unconfirm”集合中删除响应的一条(multiple设置为false时)或者多条记录(multiple设置为true时),其中存储缓存最好采用sortedset数据结构

  代码如下:

try {
    connection connection = connectionfactory.newconnection();
    channel channel = connection.createchannel();
    // take  as a cache
    sortedset cache = new treeset();
    // set channel publisher confirm mode
    channel.confirmselect();
    for (int i = 0; i < 20; i++) {
        // publish message
        long nextseqno = channel.getnextpublishseqno();
        string message = "publisher message[" + i + "]";
        cache.add(message);
        channel.basicpublish("exchange", "routingkey", messageproperties.persistent_text_plain, message.getbytes());
        cache.add(nextseqno);

    }
    // add confirmcalback: handleack, handlenack
    channel.addconfirmlistener(new confirmlistener() {
        @override
        public void handleack(long deliverytag, boolean multiple) {
            if (multiple) {
                // batch remove ack message
                cache.headset(deliverytag - 1).clear();
            } else {
                // remove ack message
                cache.remove(deliverytag);
            }
        }
        @override
        public void handlenack(long deliverytag, boolean multiple) {
            // todo handle nack message:republish
        }
    });

} catch (exception e) {
    e.printstacktrace();
    // todo handle nack message:republish
}

3、总结比较

  1)是确认机制好呢?还是事务机制?两者可以共存吗?

确认机制相对于事务机制,最大的好处就是可以异步处理提高吞吐量,不需要额外等待消耗资源。但是两者时候不能同时共存的。

  

  2)那么确认机制的三种方式之间呢?实际产生环境是推荐哪一种呢?(其实毫无疑问当然是推荐异步批量确认方式)

批量确认的最大问题就是在于返回的nack消息需要重新发送,以上普通单条确认、批量确认、批量异步确认三种方法,在实际生产环境中强烈推荐使用批量异步确认方式。

 

四、消息与队列的持久化

针对的问题(2),我们可以通过增加队列与消息的持久化来实现。

1、交换器的持久化

  交换器的持久化是通过声明队列durable参数为true实现的,如果交换器不设置持久化,那么在rabbitmq服务器重启之后,相关的交换器元数据会丢失,消息不会丢失,只是不能将消息发送到这个交换器中。因此,都是建议将其置为持久化。

channel.exchangedeclare(exchange_name, "direct", true, false, null);

 2、队列的持久化

  队列持久化同理与交换器持久化,只是rabbitmq服务器重启之后,相关的元数据会丢失,数据也会跟着丢失,消息也自然丢失。

 channel.queuedeclare(queue_name, true, false, false, null);

3、消息的持久化

  队列的持久化不能保证内存存储的消息不会丢失,要确保消息不会丢失,需要将其通过设置basicproperties中的deliverymode属性为2可实现消息的持久化(persistent_text_plain实际上已经封装了这个属性),也就是说只有实现了队列与消息的持久化,才能保证消息不会丢失。

// 其中的2就是投递模式
public static class final basicproperties_persistent_text_plain = 
new basicproperties("text/plain", null, null, 2, null, null, null, null, null, null, null, null, null);

 

4、消息丢失的几种情况

  但实际上不是设置了交换器、队列、消息持久化就能一定保证消息不会被丢失,以下几种情况是可能丢失的,比如:

  1)设置autoack为true,消费者收到消息后,还没处理就宕机了,这样也算数据丢失,解决办法是设置为false,之后手动确认。

  2)在设置了持久化后消息存入rabbitmq之后,还需要一段时间才能存入磁盘之中(虽然很短,但不能忽视),rabbitmq并不会为每条消息都今次那个同步存盘,可能只会保存到操作系统缓存之中而不是物理磁盘中如果rabbitmq这个时间段内宕机、异常、重启等情况,消息也会丢失,解决办法是引入rabbitmq的镜像队列机制(类似于集群,master挂了切换到slave)

 

总结

  没有完全十全十美的方式能保证数据能100%不丢失,并且最大效率节约性能消耗等,两篇博文虽然已经提出常用的四种方式,当实际环境中整个rabbitmq环境在搭建没有结合实际的生产业务环境的话,也会发生消息丢失的等情况,解决这样的问题无非就完善消息备份,健全rabbitmq集群..........