欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RabbitMQ-消费者"未处理完的消息"丢失

程序员文章站 2022-03-26 09:31:37
一个关于客户端(消费者)开启自动应答,重启后"未处理消息丢失"的小坑。(主要是对RabbitMQ理解不够) 首先,申明一下: 本文所谓的 "丢失消息" 不是指服务器宕机、重启等原因导致内存中消息丢失,也就是说不是关于消息持久化的问题。 使用C# 编写测试。 问题表象: 消费者开启自动应答,某时,消费 ......

一个关于客户端(消费者)开启自动应答,重启后"未处理消息丢失"的小坑。(主要是对rabbitmq理解不够)

首先,申明一下: 本文所谓的 "丢失消息" 不是指服务器宕机、重启等原因导致内存中消息丢失,也就是说不是关于消息持久化的问题

 

  使用c# 编写测试。

  问题表象:  消费者开启自动应答,某时,消费者掉线(关闭/崩溃等),届时重启消费者,发现消费者未处理完的消息丢失

  条件: 服务器不宕机、不重启,只有一个消费者、一个生产者。

  消息流向:  消息--->生产者--->交换器--->队列--->消费者

  问题的处理: 消费者开启手动应答,若再出现之前情况,消息不丢失。

  先给个代码。

  生产者代码如下:

static void main(string[] args)
        {
            var factory = new connectionfactory() { hostname = "localhost" };
            using (var connection = factory.createconnection())
            using (var channel = connection.createmodel())
            {
          //申明广播类型交换器
                channel.exchangedeclare(exchange: "ex1", type: "fanout");
          //申明队列
                channel.queuedeclare(queue: "test1",
                                     durable: false,
                                     exclusive: false,
                                     autodelete: false,
                                     arguments: null);

                int count = 0;
                
                while (true)
                {
                    count++;
                    var body = encoding.utf8.getbytes(count.tostring());
            //向key为 p 的交换器 ex1 上推数据 channel.basicpublish(exchange: "ex1", routingkey: "p", basicproperties: null, body: body); console.writeline($"send msg {count}"); system.threading.thread.sleep(1000); } } }

 

 消费者代码如下(开启自动应答):

static void main(string[] args)
        {
            var factory = new connectionfactory() { hostname = "localhost" };
            using (var connection = factory.createconnection())
            using (var channel = connection.createmodel())
            {
          //队列与交换器绑定
                channel.queuebind(queue: "test1",
                              exchange: "ex1",
                              routingkey: "p");

                var consumer = new eventingbasicconsumer(channel);

                consumer.received += (model, ea) =>
                {
                    var body = ea.body;
                    var message = encoding.utf8.getstring(body);

                    console.writeline($"收到消息 -- {message}");

                    system.threading.thread.sleep(2000);
                };

                channel.basicconsume(queue: "test1",
                                     autoack: true,
                                     consumer: consumer);

                console.readline();
            }

        }

*交换器、队列必须先申明,两样都存在后才能进行绑定。 

 

 

生产者向队列推送消息,队列中展现状态为ready的数据就是未被消费的消息。

推送90个消息:

RabbitMQ-消费者"未处理完的消息"丢失

 

队列中存了90个未应答的消息

RabbitMQ-消费者"未处理完的消息"丢失

 

打开消费者:

RabbitMQ-消费者"未处理完的消息"丢失

发现现在消息已经全部被自动应答,队列已清空。

 

再启动消费者:

RabbitMQ-消费者"未处理完的消息"丢失

如预料,空空一片。

 

        小结 :开启消费者(开启自动应答),发现队列中状态为ready的消息全部被应答,队列中状态为ready的消息清空,不等消费者处理完这些消息,关闭消费者,然后再开启消费者,消费者不会再收到消息,出现消费者"未处理"完的消息丢失的问题。

 

同之前先屯90个消息。

然后关闭自动应答。

RabbitMQ-消费者"未处理完的消息"丢失

 

开启消费者:

RabbitMQ-消费者"未处理完的消息"丢失

 

消息状态一次性全部变成unacked。 因为没有写手动处理消息的逻辑,所以unacked状态的消息不会变少。

 

然后关闭消费者:

RabbitMQ-消费者"未处理完的消息"丢失

rabbitmq 未删除无应答的消息,消息重新转为ready状态,继续等待连接消费者处理。

 

再开启消费者:

RabbitMQ-消费者"未处理完的消息"丢失

没有出现丢失未处理完消息的情况。

 

     小结:开启消费者(关闭自动应答),发现队列中状态为ready的消息状态全部转变为unacked,队列中状态为ready的消息清空,随消费者应答,队列中状态为unacked的消息逐渐减少,关闭消费者,发现队列中状态为unacked的消息重新改变回ready状态,

       

  结论:

  关闭自动应答可避免这种消息"丢失的情况"。

  另外在开启自动应答 ack=true 的情况下,需要保证一定有消费者在线,才能保证消息都被接收处理。开启手动应答必然消耗更多资源,因为 rabbitmq 需要根据应答标号去删除队列中对应的消息。

 

以上仅个人理解,若有错误,欢迎指正~