redis集群的发布订阅模式
程序员文章站
2022-03-04 20:02:28
...
- 项目开发过程中,遇到需要发消息的情况,是不是脑海里不自主的浮现kafka、rabbitmq等常用的消息队列?但如果消息非常简单,并且用量也不大,消息队列就会有点大材小用了吧,忽然想起了redis 也有消息队列的功能,只不过我们经常把redis 用作缓存(这个是redis最大的卖点),忽略了它的辅助技能,今天我就简单讲解一下 redis 的发布订阅模式如何使用。
- 发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个频道,订阅了这个频道的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。
- 方法的选择
操作 redis 集群的方法一般有两种,一种是 redis 官方推荐的 JedisCluster,另一种是RedisTemplate,这两种客户端工具均能实现redis 的发布订阅模式,下面我会逐一讲解。 - maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- Redsi 集群连接配置
1、在application.yml 文件添加属性
spring:
redis-config:
max-total: 50
max-idle: 10
min-idle: 5
max-wait-millis: 6000
test-on-borrow: true
test-on-return: false
test-while-idle: false
max-redirects: 10
connection-timeout: 1000
so-timeout: 1000
address:
- 192.168.xx.xxx:7360
- 192.168.yy.yyy:7360
- 192.168.zz.zzz:7360
2、注入config 文件
package com.jianmin.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @Author: jianmin.li
* @Description: redis配置
* @Date: 2019/5/28 14:47
* @Version: 1.0
*/
@Configuration
@Slf4j
@Data
@ConfigurationProperties(prefix = "spring.redis-config")
public class RedisConfig {
private List<String> address;
private Integer connectionTimeout;
private Integer soTimeout;
private Integer maxRedirects;
@ConfigurationProperties(prefix = "spring.redis-config")
@Bean
public JedisPoolConfig jedisPoolConfig() {
return new JedisPoolConfig();
}
@Bean
public JedisCluster jedisCluster(JedisPoolConfig poolConfig) {
Set<HostAndPort> hostAndPorts = new HashSet<>(address.size());
for (String ipAndPort : address) {
String[] arr = ipAndPort.split(":");
hostAndPorts.add(new HostAndPort(arr[0],Integer.parseInt(arr[1])));
log.warn("JedisCluster init Redis Address--->{}:{}",arr[0].trim(),arr[1]);
}
return new JedisCluster(hostAndPorts,connectionTimeout,soTimeout,maxRedirects,poolConfig);
}
@Bean
public RedisClusterConfiguration redisClusterConfiguration() {
RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();
Set<RedisNode> nodes = new HashSet<>(address.size());
for (String ipPort : address) {
String[] ipAndPort = ipPort.split(":");
nodes.add(new RedisNode(ipAndPort[0].trim(),Integer.valueOf(ipAndPort[1])));
log.warn("RedisTemplate init Redis Address--->{}:{}",ipAndPort[0].trim(),ipAndPort[1]);
}
redisClusterConfiguration.setClusterNodes(nodes);
redisClusterConfiguration.setMaxRedirects(maxRedirects);
return redisClusterConfiguration;
}
@Bean
public JedisConnectionFactory jedisConnectionFactory(JedisPoolConfig jedisPoolConfig,RedisClusterConfiguration
redisClusterConfiguration) {
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(redisClusterConfiguration,
jedisPoolConfig);
jedisConnectionFactory.afterPropertiesSet();
return jedisConnectionFactory;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
redisTemplate.setEnableTransactionSupport(true);
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
RedisMessageListenerContainer redisMessageListenerContainer(MessageListenerAdapter listenerAdapter,
JedisConnectionFactory jedisConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(jedisConnectionFactory);
container.addMessageListener(listenerAdapter,Arrays.asList(new PatternTopic
("channel-lijianmin-redistemplate")));
return container;
}
}
- 使用方法
1、JedisCluster 实现redis 的发布订阅
<1>发送消息
@CrossOrigin
@RestController
@RequestMapping("/demo")
public class Demo {
@Autowired
private JedisCluster jedisCluster;
/**
* 发送消息非常简单,直接使用 JedisCluster 的 publish 方法即可。发送消息成
* 功,会返回一个长整型数值0L。
*
* @param
* @return : void
* @Author: jianmin.li
* @Date: 2019/10/25 13:38
*/
@GetMapping("/jedisCluster")
public void method() {
Long publish = jedisCluster.publish("channel-lijianmin","这是来自JedisCluster发布者的消息");
System.err.println(publish);
}
}
<2>监听消息
要想让redis 在web 容器开启时就一直处于订阅状态,考虑通过listener 实现: 自定义MyListener,实现ServletContextListener,开启异步线程去监听redis 的channel,因为JedisCluster 的发布和订阅是阻塞的,如果用同步监听,那么你的项目就起不来了,一直阻塞在JedisCluster 初始化的地方,所以这里必须用异步线程去监听redsi 的channel。
package com.jianmin.util;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.JedisCluster;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
/**
* @Author: jianmin.li
* @Description: 消息监听
* @Date: 2019/5/30 12:55
* @Version: 1.0
*/
@Configuration
@WebListener
public class MyListener implements ServletContextListener {
@Autowired
private MySubscribe jedisPubSub;
@Autowired
private JedisCluster jedisCluster;
@Override
public void contextInitialized(ServletContextEvent sce) {
new Thread(() -> jedisCluster.subscribe(jedisPubSub,"channel-lijianmin")).start();
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
System.out.println("ServletContext容器销毁了。。。");
}
}
自定义 MySubscribe 并继承 JedisPubSub,实现对 redis 通道的监听,重写
onMessage 方法,该方法可以获取通道的名称以及监听到的消息,具体处理消
息的逻辑就写在这里。
package com.jianmin.util;
import org.springframework.stereotype.Component;
import redis.clients.jedis.JedisPubSub;
/**
* @Author: jianmin.li
* @Description: 监听redis的通道
* @Date: 2019/5/30 12:51
* @Version: 1.0
*/
@Component
public class MySubscribe extends JedisPubSub {
/**
* 监听到的消息在这里进行业务处理
*
* @param channel
* @param message
* @return : void
* @Author: jianmin.li
* @Date: 2019/10/25 13:46
*/
@Override
public void onMessage(String channel,String message) {
System.err.println("redis通道" + channel + "监听到的消息:" + message);
}
}
<3>启动项目,触发接口
redis发布订阅模式--jedisCluster客户端
2、RedisTemplate 实现redis 的发布订阅
<1>发送消息
@CrossOrigin
@RestController
@RequestMapping("/demo")
public class Demo {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 使用 RedisTemplate 发送消息也非常简单,
* 直接调用 convertAndSend 方法即可。
*
* @param
* @return : void
* @Author: jianmin.li
* @Date: 2019/10/25 14:10
*/
@GetMapping("/redisTemplate")
public void send() {
redisTemplate.convertAndSend("channel-lijianmin-redistemplate","这是来自RedisTemplate发布者的消息");
}
}
<2>监听消息
自定义RedisSubscriber 并继承MessageListenerAdapter,重写onMessage 方
法,该方法可以获取通道的名称以及监听到的消息,具体处理消息的逻辑就写
在这里。
package com.jianmin.util;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
/**
* @Author: jianmin.li
* @Description: 监听redis通道
* @Date: 2019/5/30 15:37
* @Version: 1.0
*/
@Component
public class RedisSubscriber extends MessageListenerAdapter {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(Message message,byte[] pattern) {
System.err.println("通道----->" + redisTemplate.getKeySerializer().deserialize(message.getChannel()));
System.err.println("消息----->" + redisTemplate.getValueSerializer().deserialize(message.getBody()));
}
}
<3>在上面的RedisConfig 里面注入redis 消息监听容器 (上面代码已经注入)
@Bean
RedisMessageListenerContainer redisMessageListenerContainer(MessageListenerAdapter listenerAdapter,
JedisConnectionFactory jedisConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(jedisConnectionFactory);
container.addMessageListener(listenerAdapter,Arrays.asList(new PatternTopic
("channel-lijianmin-redistemplate")));
return container;
}
<3>启动项目,触发接口
redis发布订阅模式--redisTemplate客户端
- 总结:
1、 redis 的发布订阅模式,监听了同一个通道的监听者都能收到相同的消息,在集群模式下,多个相同的监听者进行相同的信息消费,容易产生表单重复提交的问题,在实际开发中需要注意,建议采取分布式锁;
2、 JedisCluster 的发布订阅,在容器启动时要异步去监听redis 通道;
3、 RedisTemplate 的发布订阅,在监听到消息时,建议用redisTemplate 的KeySerializer反序列化通道,用ValueSerializer 反序列化消息,不建议用new String()反序列化,因为redisTemplate 在初始化时如果指定了相关的序列化转换器,在序列化和反序列化时就必须使用相同的序列化转换器,否则容易出现反序列化异常或乱码。