Python介绍RabbitMQ使用篇二WorkQueue
1. rabbitmq workqueue基本工作模式介绍
上一篇我们使用c#语言讲解了单个消费者从消息队列中处理消息的模型,这一篇我们使用python语言来讲解多个消费者同时工作从一个queue处理消息的模型。
工作队列(又称:任务队列——task queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。这个概念在网络应用中是非常有用的,它可以在短暂的http请求中处理一些复杂的任务,我么可以将耗时的请求放在任务队列,然后立马返回响应,接下来由多个worker去处理复杂的业务操作。(这种架构叫做"分布式异步队列",有时候用来方式d-dos攻击,12306网站就是采用这种模式)
用python操作python模块首先要到如pika这个包,利用pip install pika去安装。
我们首先写一个new_task.py用来向任务队列中写入任务,已备用。
import pika import sys with pika.blockingconnection(pika.connection.connectionparameters(host="localhost")) as connection: channel = connection.channel() channel.queue_declare(queue = "hello") for index in range(0,10): channel.basic_publish(exchange="", routing_key="hello", body="["+str(index)+"]" + "hello world") connection.close()
接下来编写works.py程序,我们需要在works.py中创建消费者,让消费者从任务队列中提取任务去执行。
import pika import sys import time connection = pika.blockingconnection(pika.connection.connectionparameters(host="localhost")) channel = connection.channel() channel.queue_declare(queue = "hello") channel.basic_ack() def callback(ch, method, properties, body): print(" [x] received %r" % (body.decode('utf-8'),)) time.sleep(3) # 我们在这里利用线程休息来模拟一个比较耗时的任务处理 print(" [x] done") channel.basic_consume(callback, queue='hello', no_ack= true) # 我们把no_ack标记为true用来屏蔽消息确认 channel.start_consuming() connection.close()
在callback函数中让当前线程休息5秒用来模拟一个耗时的任务。
接下来首先打开两个terminal窗口同时去运行works.py程序,然后运行new_task.py程序来查看效果。注意:在这里为了说明多个work能够同时分享任务队列中的队列,我们一定要先运行works.py,后运行new_task.py程序。具体原因后面在说明。
默认来说,rabbitmq会按顺序得把消息发送给两个消费者(consumer),平均每个消费者都会收到同等数量得消息,这种发送消息的方式叫做——轮询(round-robin)。这样做的好处就是我们在处理相同数量的task所用的时间成倍的减少了。work越多,我们处理任务队列所用的时间就越少,这在高并发系统中会非常有用。
2.消息确认
当前的代码中,当消息被rabbitmq发送给消费者(consumers)之后,马上就会在内存中移除。这种情况之下,假如其中一个工作者挂掉了,那么它正在处理的消息就会丢失,并且与此同时,后面所有发送到这个工作者的还没来得及处理的消息也都会丢失。这显然不是我们想看到的结果。我们不想丢失任何消息,如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。
为了防止消息丢失,rabbitmq提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉rabbitmq已经收到并处理了某条消息,然后rabbitmq才会释放并删除这条消息,而不是这条消息一发送出去马上就从内存中删除。
如果消费者(consumer)挂掉了,没有发送响应,rabbitmq就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。消息是没有超时这个概念的;当工作者与它断开连的时候,rabbitmq会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。消息响应默认是开启的。之前的例子中我们可以使用no_ack=true标识把它关闭。接下来我们移除这个标识,当工作者(worker)完成了任务,就发送一个响应。
对我们的workers.py稍微进行一下改动:
1 import pika 2 import sys 3 import time 4 5 connection = pika.blockingconnection(pika.connection.connectionparameters(host="localhost")) 6 channel = connection.channel() 7 channel.queue_declare(queue = "hello") 8 channel.basic_ack() 9 10 def callback(ch, method, properties, body): 11 print(" [x] received %r" % (body.decode('utf-8'),)) 12 time.sleep(3) 13 print(" [x] done") 14 ch.basic_ack(delivery_tag = method.delivery_tag) # 2. channel.basic_ack()方法用来执行消息确认操作 15 16 channel.basic_consume(callback, 17 queue='hello', 18 no_ack= false) # 1. no_ack告诉rabbitmq开启消息确认机制,也就是说消息需要被确认 19 20 channel.start_consuming() 21 connection.close()
先开启两个terinmal窗口执行workers.py然后执行new_task.py,当执行一半是利用ctrl+c关掉其中一个worker。可以看到rabbitmq将已经关掉的worker的没来得及处理的消息,再一次发给worker2。以此保证消息不会丢失。
一定一定不要忘记消息确认
在回调方法中一定要记得调用channel.basic_ack()方法用来确认消息。原因很容易理解,消息如果不确认,任务就算是被callback函数处理成功了,rabbitmq在内存中也不会删除这条任务,这条任务还会停留在内存中。这样无疑会带来一个比较大的bug。
3.消息持久化
rabbittmq如果意外崩溃的话,就会丢失所有的“队列”和“消息”。因此为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。下面的代码分别演示了如何进行队列持久化和消息持久化。
1 import pika 2 import sys 3 4 5 with pika.blockingconnection(pika.connection.connectionparameters(host="localhost")) as connection: 6 channel = connection.channel() 7 channel.queue_declare(queue = "hello",durable=true) # 1.queue持久化 8 for index in range(0,10): 9 channel.basic_publish(exchange="", 10 routing_key="hello", 11 body="["+str(index)+"]" + "hello world", 12 properties= pika.basicproperties( # 2.消息持久化 13 delivery_mode= 2 14 )) 15 connection.close()
4.公平调度/多劳多得
在实际生产中我们不一定所有的任务处理时都消耗同样多的时间,有的任务需要更长的时间,有的任务需要比较少的时间。这样就造成有的工作者比较繁忙,有的工作者比较轻松。然而rabbitmq并不知道这些,它仍然一如既往的派发消息。这样无疑会造成资源的浪费。
这时因为rabbitmq只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。
我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉rabbitmq,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,rabbitmq就会把消息分发给下一个空闲的工作者(worker)。这样能保证消息是一个一个发出去的,并且是一个处理完成了再发另一个,而不是一次性全部发分出去了。这样尽可能的保证了每个worker的工作时间相同(公平调度),并且在相同时间执行效率高的worker会分享到更多的消息(多劳多得)。
channe.basic_qos(prefetch_count=1)
当然,如果所有的worker都长时间处于繁忙状态,没有时间接收下一条消息,那么任务队列就有可能满了。我们可以增加worker的数量,或者想其他办法。
代码整合
1 import pika 2 import sys 3 4 5 with pika.blockingconnection(pika.connection.connectionparameters(host="localhost")) as connection: 6 channel = connection.channel() 7 channel.queue_declare(queue = "hello",durable = true) # 1.queue持久化 8 for index in range(0,10): 9 channel.basic_publish(exchange="", 10 routing_key="hello", 11 body="["+str(index)+"]" + "hello world", 12 properties= pika.basicproperties( # 2.消息持久化 13 delivery_mode= 2 14 )) 15 connection.close()
1 import pika 2 import sys 3 import time 4 5 connection = pika.blockingconnection(pika.connection.connectionparameters(host="localhost")) 6 channel = connection.channel() 7 channel.queue_declare(queue = "hello",durable = true) # 队列持久化 8 9 def callback(ch, method, properties, body): 10 print(" [x] received %r" % (body.decode('utf-8'),)) 11 time.sleep(5) 12 print(" [x] done----%r" % time.strftime("%y-%m-%d %x",time.localtime())) 13 ch.basic_ack(delivery_tag = method.delivery_tag) 14 15 channel.basic_qos(prefetch_count = 1) # 用来告诉每个worker一次只能接受一条消息 16 channel.basic_consume(callback, 17 queue='hello', 18 no_ack = false) 19 channel.start_consuming() 20 connection.close()
1 import pika 2 import sys 3 import time 4 5 connection = pika.blockingconnection(pika.connection.connectionparameters(host="localhost")) 6 channel = connection.channel() 7 channel.queue_declare(queue = "hello") 8 9 def callback(ch, method, properties, body): 10 print(" [x] received %r" % (body.decode('utf-8'),)) 11 time.sleep(1) 12 print(" [x] done----%r" % time.strftime("%y-%m-%d %x",time.localtime())) 13 ch.basic_ack(delivery_tag = method.delivery_tag) 14 15 channel.basic_qos(prefetch_count = 1) 16 channel.basic_consume(callback, 17 queue='hello', 18 no_ack = false) 19 channel.start_consuming() 20 connection.close()
上一篇: 手机卡顿与CPU、内存哪个关系更大?
下一篇: Cython 三分钟入门教程