Redis 发布订阅 与 管道
程序员文章站
2022-05-21 21:26:20
...
订阅与发布
发布者发布了消息,所有的订阅者都可以收到,就是生产者消费者模型(后订阅了,无法获取历史消息)
发布者:服务器
订阅者:Dashboad和数据处理
API
publish channel message #发布命令
publish souhu:tv "hello world" #在souhu:tv频道发布一条hello world 返回订阅者个数
subscribe [channel] #订阅命令,可以订阅一个或多个
subscribe sohu:tv #订阅sohu:tv频道
unsubscribe [channel] #取消订阅一个或多个频道
unsubscribe sohu:tv #取消订阅sohu:tv频道
psubscribe [pattern...] #订阅模式匹配
psubscribe c* #订阅以c开头的频道
unpsubscribe [pattern...] #按模式退订指定频道
pubsub channels #列出至少有一个订阅者的频道,列出活跃的频道
pubsub numsub [channel...] #列出给定频道的订阅者数量
pubsub numpat #列出被订阅模式的数量
python中实现发布与订阅
案例:
- RedisHelper.py
import redis
class RedisHelper:
def __init__(self):
self.__conn = redis.Redis(host='IP')
self.chan_sub = 'fm104.5'
self.chan_pub = 'fm104.5'
发布者
def public(self, msg):
self.__conn.publish(self.chan_pub, msg)
return True
订阅者
def subscribe(self):
pub = self.__conn.pubsub() # 打开收音机
pub.subscribe(self.chan_sub) # 调频道
pub.parse_response() # 准接收
return pub
- subscribes.py
订阅者
from monitor.RedisHelper import RedisHelper
obj = RedisHelper()
redis_sub = obj.subscribe()
while True:
msg= redis_sub.parse_response()
print(msg)
有消息就接收,没有就卡住
同时开启多个接收者,接受者们可同时接收到信息
- publics.py
发布者
from monitor.RedisHelper import RedisHelper
obj = RedisHelper()
obj.public('hello')
管道
redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。
1次pipeline(n条命令)=1次网络时间+n次命令时间
import redis
pool = redis.ConnectionPool(host='10.211.55.4', port=6379)
r = redis.Redis(connection_pool=pool)
# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True)
# 开启事务
pipe.multi()
pipe.set('name', 'xxx')
pipe.set('role', 'yyy')
pipe.execute() 当执行了这句代码后,上面的两个set操作才会执行
注意:
pipeline期间将“独占”链接,此期间将不能进行非“管道”类型的其他操作,直到pipeline关闭;如果你的pipeline的指令集很庞大,为了不干扰链接中的其他操作,你可以为pipeline操作新建Client链接,让pipeline和其他正常操作分离在2个client中。不过pipeline事实上所能容忍的操作个数,和socket-output缓冲区大小/返回结果的数据尺寸都有很大的关系;同时也意味着每个redis-server同时所能支撑的pipeline链接的个数,也是有限的,这将受限于server的物理内存或网络接口的缓冲能力
案例:
计数器
import redis
conn = redis.Redis(host='192.168.1.41',port=6379)
conn.set('count',1000)
with conn.pipeline() as pipe:
# 先监视,自己的值没有被修改过
conn.watch('count')
# 事务开始
pipe.multi()
old_count = conn.get('count')
count = int(old_count)
if count > 0: # 有库存
pipe.set('count', count - 1)
# 执行,把所有命令一次性推送过去
pipe.execute()
上一篇: Redis发布与订阅
下一篇: redis-发布与订阅