欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Redis 发布订阅 与 管道

程序员文章站 2022-05-21 21:26:20
...

订阅与发布

发布者发布了消息,所有的订阅者都可以收到,就是生产者消费者模型(后订阅了,无法获取历史消息)

Redis 发布订阅 与 管道

 

发布者:服务器

订阅者:Dashboad和数据处理

API

publish channel message #发布命令
publish souhu:tv "hello world" #在souhu:tv频道发布一条hello world  返回订阅者个数

subscribe [channel] #订阅命令,可以订阅一个或多个
subscribe sohu:tv  #订阅sohu:tv频道

unsubscribe [channel] #取消订阅一个或多个频道
unsubscribe sohu:tv  #取消订阅sohu:tv频道
    
psubscribe [pattern...] #订阅模式匹配
psubscribe c*  #订阅以c开头的频道

unpsubscribe [pattern...] #按模式退订指定频道

pubsub channels #列出至少有一个订阅者的频道,列出活跃的频道

pubsub numsub [channel...] #列出给定频道的订阅者数量

pubsub numpat #列出被订阅模式的数量

python中实现发布与订阅

案例:

  • RedisHelper.py
import redis


class RedisHelper:

    def __init__(self):
        self.__conn = redis.Redis(host='IP')
        self.chan_sub = 'fm104.5'
        self.chan_pub = 'fm104.5'
    
    发布者
    def public(self, msg):
        self.__conn.publish(self.chan_pub, msg)
        return True

    订阅者
    def subscribe(self):
        pub = self.__conn.pubsub()  # 打开收音机
        pub.subscribe(self.chan_sub)  # 调频道
        pub.parse_response()  # 准接收
        return pub
  • subscribes.py
订阅者
from monitor.RedisHelper import RedisHelper
 
obj = RedisHelper()
redis_sub = obj.subscribe()
 
while True:
    msg= redis_sub.parse_response()
    print(msg)  
有消息就接收,没有就卡住
同时开启多个接收者,接受者们可同时接收到信息
  • publics.py
发布者
from monitor.RedisHelper import RedisHelper
 
obj = RedisHelper()
obj.public('hello')

管道

redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。

1次pipeline(n条命令)=1次网络时间+n次命令时间

import redis
 
pool = redis.ConnectionPool(host='10.211.55.4', port=6379)
 
r = redis.Redis(connection_pool=pool)
 
# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True)
# 开启事务
pipe.multi()
pipe.set('name', 'xxx')
pipe.set('role', 'yyy')
 
pipe.execute()  当执行了这句代码后,上面的两个set操作才会执行

注意:

pipeline期间将“独占”链接,此期间将不能进行非“管道”类型的其他操作,直到pipeline关闭;如果你的pipeline的指令集很庞大,为了不干扰链接中的其他操作,你可以为pipeline操作新建Client链接,让pipeline和其他正常操作分离在2个client中。不过pipeline事实上所能容忍的操作个数,和socket-output缓冲区大小/返回结果的数据尺寸都有很大的关系;同时也意味着每个redis-server同时所能支撑的pipeline链接的个数,也是有限的,这将受限于server的物理内存或网络接口的缓冲能力

案例:

计数器

import redis

conn = redis.Redis(host='192.168.1.41',port=6379)

conn.set('count',1000)

with conn.pipeline() as pipe:

    # 先监视,自己的值没有被修改过
    conn.watch('count')

    # 事务开始
    pipe.multi()
    old_count = conn.get('count')
    count = int(old_count)
    if count > 0:  # 有库存
        pipe.set('count', count - 1)

    # 执行,把所有命令一次性推送过去
    pipe.execute()

 

相关标签: Redis