解决python3 pika之连接断开的问题
问题描述
在消费rabbitmq队列时, 每次进入回调函数内需要进行一些比较耗时的操作;操作完成后给rabbitmq server发送ack信号以dequeue本条消息。
问题就发生在发送ack操作时, 程序提示链接已被断开或socket error。
源码示例
#!/usr/bin #coding: utf-8 import pika import time user = 'guest' pwd = 'guest' test_queue = 'just4test' def callback(ch, method, properties, body): print(body) time.sleep(600) ch.basic_publish('', routing_key=test_queue, body="fortest") ch.basic_ack(delivery_tag = method.delivery_tag) def test_main(): s_conn = pika.blockingconnection( pika.connectionparameters('127.0.0.1', credentials=pika.plaincredentials(user, pwd))) chan = s_conn.channel() chan.queue_declare(queue=test_queue) chan.basic_publish('', routing_key=test_queue, body="fortest") chan.basic_consume(callback, queue=test_queue) chan.start_consuming() if __name__ == "__main__": test_main()
运行一段时间后, 就会报错:
[error][pika.adapters.base_connection][2017-08-18 12:33:49]error event 25, none [critical][pika.adapters.base_connection][2017-08-18 12:33:49]tried to handle an error where no error existed [error][pika.adapters.base_connection][2017-08-18 12:33:49]fatal socket error: brokenpipeerror(32, 'broken pipe')
问题排查
猜测:pika客户端没有及时发送心跳,连接被server断开
一开始修改了heartbeat_interval参数值, 示例如下:
def test_main(): s_conn = pika.blockingconnection( pika.connectionparameters('127.0.0.1', heartbeat_interval=10, socket_timeout=5, credentials=pika.plaincredentials(user, pwd))) # ....
修改后运行依然报错,后来想想应该单线程被一直占用,pika无法发送心跳;
于是又加了个心跳线程, 示例如下:
#!/usr/bin #coding: utf-8 import pika import time import logging import threading user = 'guest' pwd = 'guest' test_queue = 'just4test' class heartbeat(threading.thread): def __init__(self, connection): super(heartbeat, self).__init__() self.lock = threading.lock() self.connection = connection self.quitflag = false self.stopflag = true self.setdaemon(true) def run(self): while not self.quitflag: time.sleep(10) self.lock.acquire() if self.stopflag : self.lock.release() continue try: self.connection.process_data_events() except exception as ex: logging.warn("error format: %s"%(str(ex))) self.lock.release() return self.lock.release() def startheartbeat(self): self.lock.acquire() if self.quitflag==true: self.lock.release() return self.stopflag=false self.lock.release() def callback(ch, method, properties, body): logging.info("recv_body:%s" % body) time.sleep(600) ch.basic_ack(delivery_tag = method.delivery_tag) def test_main(): s_conn = pika.blockingconnection( pika.connectionparameters('127.0.0.1', heartbeat_interval=10, socket_timeout=5, credentials=pika.plaincredentials(user, pwd))) chan = s_conn.channel() chan.queue_declare(queue=test_queue) chan.basic_consume(callback, queue=test_queue) heartbeat = heartbeat(s_conn) heartbeat.start() #开启心跳线程 heartbeat.startheartbeat() chan.start_consuming() if __name__ == "__main__": test_main()
尝试运行,结果还是不行,不得不安静下来思考自己是不是想错了。
去看它的api,看到heartbeat_interval的解析:
:param int heartbeat_interval: how often to send heartbeats. min between this value and server's proposal will be used. use 0 to deactivate heartbeats and none to accept server's proposal.
按这样说法,应该还是没有把心跳值给设置好。上面的程序期望是10秒发一次心跳,但是理论上发送心跳的间隔会比10秒多一点。所以艾玛,我应该是把heartbeat_interval的作用搞错了, 它是指超过这个时间间隔不发心跳或不给server任何信息,server就会断开连接, 而不是说pika会按这个间隔来发心跳。 结果我把heartbeat_interval值设置高一点(比实际发送心跳/信息的间隔更长),比如上面设置成60秒,就正常运行了。
如果不指定heartbeat_interval, 它默认为none, 意味着按rabbitmq server的配置来检测心跳是否正常。
如果设置heartbeat_interval=0, 意味着不检测心跳,server端将不会主动断开连接。
以上这篇解决python3 pika之连接断开的问题就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
上一篇: 使用PHP实现二分查找算法代码分享
下一篇: 几个常见的Mysql索引问题
推荐阅读
-
完美解决spring websocket自动断开连接再创建引发的问题
-
解决python3 pika之连接断开的问题
-
完美解决spring websocket自动断开连接再创建引发的问题
-
解决python3 Pycharm上连接数据库时报错的问题
-
解决python3 pika之连接断开的问题
-
MySQL之—使用c3p0与DBCP连接池,造成的MySql 8小时问题的详细代码解决方案
-
MySQL之—— 使用Hibernate连接MySQL数据库,MySQL连接超时断开的问题
-
MySQL之-使用Hibernate连接MySQL数据库时连接超时断开的问题解决方法
-
MySQL之—— 使用Hibernate连接MySQL数据库,MySQL连接超时断开的问题
-
MySQL之-使用Hibernate连接MySQL数据库时连接超时断开的问题解决方法