RabbitMQ-消费者"未处理完的消息"丢失
一个关于客户端(消费者)开启自动应答,重启后"未处理消息丢失"的小坑。(主要是对rabbitmq理解不够)
首先,申明一下: 本文所谓的 "丢失消息" 不是指服务器宕机、重启等原因导致内存中消息丢失,也就是说不是关于消息持久化的问题。
使用c# 编写测试。
问题表象: 消费者开启自动应答,某时,消费者掉线(关闭/崩溃等),届时重启消费者,发现消费者未处理完的消息丢失。
条件: 服务器不宕机、不重启,只有一个消费者、一个生产者。
消息流向: 消息--->生产者--->交换器--->队列--->消费者
问题的处理: 消费者开启手动应答,若再出现之前情况,消息不丢失。
先给个代码。
生产者代码如下:
static void main(string[] args) { var factory = new connectionfactory() { hostname = "localhost" }; using (var connection = factory.createconnection()) using (var channel = connection.createmodel()) { //申明广播类型交换器 channel.exchangedeclare(exchange: "ex1", type: "fanout"); //申明队列 channel.queuedeclare(queue: "test1", durable: false, exclusive: false, autodelete: false, arguments: null); int count = 0; while (true) { count++; var body = encoding.utf8.getbytes(count.tostring());
//向key为 p 的交换器 ex1 上推数据 channel.basicpublish(exchange: "ex1", routingkey: "p", basicproperties: null, body: body); console.writeline($"send msg {count}"); system.threading.thread.sleep(1000); } } }
消费者代码如下(开启自动应答):
static void main(string[] args) { var factory = new connectionfactory() { hostname = "localhost" }; using (var connection = factory.createconnection()) using (var channel = connection.createmodel()) { //队列与交换器绑定 channel.queuebind(queue: "test1", exchange: "ex1", routingkey: "p"); var consumer = new eventingbasicconsumer(channel); consumer.received += (model, ea) => { var body = ea.body; var message = encoding.utf8.getstring(body); console.writeline($"收到消息 -- {message}"); system.threading.thread.sleep(2000); }; channel.basicconsume(queue: "test1", autoack: true, consumer: consumer); console.readline(); } }
*交换器、队列必须先申明,两样都存在后才能进行绑定。
生产者向队列推送消息,队列中展现状态为ready的数据就是未被消费的消息。
推送90个消息:
队列中存了90个未应答的消息
打开消费者:
发现现在消息已经全部被自动应答,队列已清空。
再启动消费者:
如预料,空空一片。
小结 :开启消费者(开启自动应答),发现队列中状态为ready的消息全部被应答,队列中状态为ready的消息清空,不等消费者处理完这些消息,关闭消费者,然后再开启消费者,消费者不会再收到消息,出现消费者"未处理"完的消息丢失的问题。
同之前先屯90个消息。
然后关闭自动应答。
开启消费者:
消息状态一次性全部变成unacked。 因为没有写手动处理消息的逻辑,所以unacked状态的消息不会变少。
然后关闭消费者:
rabbitmq 未删除无应答的消息,消息重新转为ready状态,继续等待连接消费者处理。
再开启消费者:
没有出现丢失未处理完消息的情况。
小结:开启消费者(关闭自动应答),发现队列中状态为ready的消息状态全部转变为unacked,队列中状态为ready的消息清空,随消费者应答,队列中状态为unacked的消息逐渐减少,关闭消费者,发现队列中状态为unacked的消息重新改变回ready状态,
结论:
关闭自动应答可避免这种消息"丢失的情况"。
另外在开启自动应答 ack=true 的情况下,需要保证一定有消费者在线,才能保证消息都被接收处理。开启手动应答必然消耗更多资源,因为 rabbitmq 需要根据应答标号去删除队列中对应的消息。
以上仅个人理解,若有错误,欢迎指正~