Redis发布订阅模式浅析
程序员文章站
2022-05-21 13:48:41
...
最近写一个模块时,用到了缓存,由于是两个服务之间的通信,所以会遇到数据不一致的情况,最后选择了使用Redis的发布订阅模式进行解决。
什么是发布订阅?
这就如同是我们常用的朋友圈,我们发送朋友圈时就是一个发布者,而能看到这条朋友圈的人就是一个订阅者,订阅者可以根据自己的喜好对发布的消息进行点赞,评论等操作。
下面贴一张形象的图片:
一个完整的发布订阅需要经过三个步骤:
订阅者对发布者(频道)进行订阅:比如我们想要追剧,这时就可以订阅一个节目,每天更新都会为我们发送通知
发布者发送消息:发布者如同一个发令枪,发送一条消息给所有的订阅者
订阅者接收消息并做出动作:当发布者发布消息时,订阅者可以对消息进行处理或者做出响应的动作
什么时候需要使用?
在分布式场景下,如果需要更新每一个结点的状态则可以使用发布订阅模式。
业务场景:
前几天我在写一个批改服务,批改服务需要首先去题库服务调用题目已经用户答案,再批改完成后需要将批改后的结果传回题库服务进行存储。
解决方案:
1. 使用kafka作为数据传递者(快递员),实现异步(解决高并发问题)
2. 使用redis与cache作为缓存
3. 发布者为题库服务,订阅者为批改服务(题目服务更新完批改信息后发布消息)
示例代码
// 订阅者:
// 初始化时执行
subscribeRedisChannel(CHANNEL_NAME);
// 订阅频道
public void subscribeRedisChannel(final String channel) {
log.info("add redis PUB/SUB , refresh Channel is " + channel);
new Thread(() -> {
while (true) {
try {
jedis.subscribe(refreshCachePubSub, channel);
} catch (Exception e) {
log.warn("Redis Sub Exception", e);
// 订阅出现异常时,暂停1s线程
try {
Thread.sleep(1000);
} catch (Exception e2) {
log.warn("Sleep Exception", e2);
}
}
}
}).start();
}
// 刷新缓存
private JedisPubSub refreshCachePubSub = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
log.info("refresh channel : " + channel + " || refresh key : " + message);
if (CHANNEL_NAME.equals(channel)) {
// 如果存在messge即只清空对应的缓存,如果不存在则清空该cache的所有缓存
if (StringUtils.isBlank(message)) {
cacheApi.getCache().invalidateAll();
} else {
cacheApi.getCache().invalidate(Long.parseLong(message));
}
}
}
};
// 发布者:
// 在更新完成后,主动调用,更新对应key的缓存
public void invalidCache(String key) {
redisService.publishRedisChannel(RedisService.CHANNEL_NAME, key);
}
停更了许久的博客终于复更了。
人就不能闲下来。。。一旦闲下来就越来越懒
就如同减肥。。。总是说说,一直在增肥
生命的意义就在于,不断的充实自己,不断的挑战自己,不断的去拥抱变化。