利用redis实现一个Queue(使其接口同python的内置队列接口一致)
程序员文章站
2023-01-05 21:18:52
# coding:utf-8import timeimport pickleimport redisfrom six.moves import queue as BaseQueueclass Queue(object): """ A Queue like message built over redis """ Empty = BaseQueue.Empty Full = BaseQueue.Full max_timeout = 0.3...
# coding:utf-8
import time
import pickle
import redis
from six.moves import queue as BaseQueue
class Queue(object):
"""
A Queue like message built over redis
"""
Empty = BaseQueue.Empty
Full = BaseQueue.Full
max_timeout = 0.3
def __init__(self, maxsize=0, name=REDIS_QUEUE_NAME, host=REDIS_QUEUE_HOST, port=REDIS_QUEUE_PORT, db=REDIS_QUEUE_DB,
lazy_limit=True, password=None):
"""
Constructor for RedisQueue
maxsize: an integer that sets the upperbound limit on the number of
items that can be placed in the queue.
lazy_limit: redis queue is shared via instance, a lazy size limit is used
for better performance.
"""
self.name = name
self.redis = redis.StrictRedis(host=host, port=port, db=db)
self.maxsize = maxsize
self.lazy_limit = lazy_limit
self.last_qsize = 0
def qsize(self):
self.last_qsize = self.redis.llen(self.name)
return self.last_qsize
def empty(self):
if self.qsize() == 0:
return True
else:
return False
def full(self):
if self.maxsize and self.qsize() >= self.maxsize:
return True
else:
return False
def put_nowait(self, obj):
if self.lazy_limit and self.last_qsize < self.maxsize:
pass
elif self.full():
raise self.Full
self.last_qsize = self.redis.rpush(self.name, pickle.dumps(obj))
return True
def put(self, obj, block=True, timeout=None):
if not block:
return self.put_nowait(obj)
start_time = time.time()
while True:
try:
return self.put_nowait(obj)
except self.Full:
if timeout:
lasted = time.time() - start_time
if timeout > lasted:
time.sleep(min(self.max_timeout, timeout - lasted))
else:
raise
else:
time.sleep(self.max_timeout)
def get_nowait(self):
ret = self.redis.lpop(self.name)
if ret is None:
raise self.Empty
return pickle.loads(ret)
def get(self, block=True, timeout=None):
if not block:
return self.get_nowait()
start_time = time.time()
while True:
try:
return self.get_nowait()
except self.Empty:
if timeout:
lasted = time.time() - start_time
if timeout > lasted:
time.sleep(min(self.max_timeout, timeout - lasted))
else:
raise
else:
time.sleep(self.max_timeout)
本文地址:https://blog.csdn.net/weixin_44857400/article/details/107088134
上一篇: Linux服务器管理日记分享
下一篇: 冬日的牛肉粒杏鲍菇你值得拥有