欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Redis发布订阅模式浅析

程序员文章站 2022-05-21 13:48:41
...

最近写一个模块时,用到了缓存,由于是两个服务之间的通信,所以会遇到数据不一致的情况,最后选择了使用Redis的发布订阅模式进行解决。

什么是发布订阅?

这就如同是我们常用的朋友圈,我们发送朋友圈时就是一个发布者,而能看到这条朋友圈的人就是一个订阅者,订阅者可以根据自己的喜好对发布的消息进行点赞,评论等操作。

下面贴一张形象的图片:
Redis发布订阅模式浅析

一个完整的发布订阅需要经过三个步骤:

  1. 订阅者对发布者(频道)进行订阅:比如我们想要追剧,这时就可以订阅一个节目,每天更新都会为我们发送通知

  2. 发布者发送消息:发布者如同一个发令枪,发送一条消息给所有的订阅者

  3. 订阅者接收消息并做出动作:当发布者发布消息时,订阅者可以对消息进行处理或者做出响应的动作

什么时候需要使用?

在分布式场景下,如果需要更新每一个结点的状态则可以使用发布订阅模式。

业务场景:
前几天我在写一个批改服务,批改服务需要首先去题库服务调用题目已经用户答案,再批改完成后需要将批改后的结果传回题库服务进行存储。

解决方案:
1. 使用kafka作为数据传递者(快递员),实现异步(解决高并发问题)
2. 使用redis与cache作为缓存
3. 发布者为题库服务,订阅者为批改服务(题目服务更新完批改信息后发布消息)

示例代码

// 订阅者:

// 初始化时执行
subscribeRedisChannel(CHANNEL_NAME);

// 订阅频道
public void subscribeRedisChannel(final String channel) {
        log.info("add redis PUB/SUB , refresh Channel is " + channel);
        new Thread(() -> {
            while (true) {
                try {
                    jedis.subscribe(refreshCachePubSub, channel);
                } catch (Exception e) {
                    log.warn("Redis Sub Exception", e);
                    // 订阅出现异常时,暂停1s线程
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e2) {
                        log.warn("Sleep Exception", e2);
                    }
                }
            }
        }).start();
    }

    // 刷新缓存
private JedisPubSub refreshCachePubSub = new JedisPubSub() {
        @Override
        public void onMessage(String channel, String message) {
            log.info("refresh channel : " + channel + " || refresh key : " + message);
            if (CHANNEL_NAME.equals(channel)) {
                // 如果存在messge即只清空对应的缓存,如果不存在则清空该cache的所有缓存
                if (StringUtils.isBlank(message)) {
                    cacheApi.getCache().invalidateAll();
                } else {
                    cacheApi.getCache().invalidate(Long.parseLong(message));
                }
            }
        }
    };

    // 发布者:

    // 在更新完成后,主动调用,更新对应key的缓存
    public void invalidCache(String key) {
        redisService.publishRedisChannel(RedisService.CHANNEL_NAME, key);
    }

停更了许久的博客终于复更了。
人就不能闲下来。。。一旦闲下来就越来越懒
就如同减肥。。。总是说说,一直在增肥

生命的意义就在于,不断的充实自己,不断的挑战自己,不断的去拥抱变化。

相关标签: redis 发布订阅