欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

redis集群的发布订阅模式

程序员文章站 2022-03-04 20:02:28
...
  • 项目开发过程中,遇到需要发消息的情况,是不是脑海里不自主的浮现kafka、rabbitmq等常用的消息队列?但如果消息非常简单,并且用量也不大,消息队列就会有点大材小用了吧,忽然想起了redis 也有消息队列的功能,只不过我们经常把redis 用作缓存(这个是redis最大的卖点),忽略了它的辅助技能,今天我就简单讲解一下 redis 的发布订阅模式如何使用。
  • 发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个频道,订阅了这个频道的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。
    redis集群的发布订阅模式
  • 方法的选择
    操作 redis 集群的方法一般有两种,一种是 redis 官方推荐的 JedisCluster,另一种是RedisTemplate,这两种客户端工具均能实现redis 的发布订阅模式,下面我会逐一讲解。
  • maven依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  • Redsi 集群连接配置
    1、在application.yml 文件添加属性
spring:
  redis-config:
    max-total: 50
    max-idle: 10
    min-idle: 5
    max-wait-millis: 6000
    test-on-borrow: true
    test-on-return: false
    test-while-idle: false
    max-redirects: 10
    connection-timeout: 1000
    so-timeout: 1000
    address:
      - 192.168.xx.xxx:7360
      - 192.168.yy.yyy:7360
      - 192.168.zz.zzz:7360

2、注入config 文件

package com.jianmin.config;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
 * @Author: jianmin.li
 * @Description: redis配置
 * @Date: 2019/5/28 14:47
 * @Version: 1.0
 */
@Configuration
@Slf4j
@Data
@ConfigurationProperties(prefix = "spring.redis-config")
public class RedisConfig {
    private List<String> address;
    private Integer connectionTimeout;
    private Integer soTimeout;
    private Integer maxRedirects;

    @ConfigurationProperties(prefix = "spring.redis-config")
    @Bean
    public JedisPoolConfig jedisPoolConfig() {
        return new JedisPoolConfig();
    }


    @Bean
    public JedisCluster jedisCluster(JedisPoolConfig poolConfig) {
        Set<HostAndPort> hostAndPorts = new HashSet<>(address.size());
        for (String ipAndPort : address) {
            String[] arr = ipAndPort.split(":");
            hostAndPorts.add(new HostAndPort(arr[0],Integer.parseInt(arr[1])));
            log.warn("JedisCluster init Redis Address--->{}:{}",arr[0].trim(),arr[1]);
        }
        return new JedisCluster(hostAndPorts,connectionTimeout,soTimeout,maxRedirects,poolConfig);
    }

    @Bean
    public RedisClusterConfiguration redisClusterConfiguration() {
        RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();
        Set<RedisNode> nodes = new HashSet<>(address.size());
        for (String ipPort : address) {
            String[] ipAndPort = ipPort.split(":");
            nodes.add(new RedisNode(ipAndPort[0].trim(),Integer.valueOf(ipAndPort[1])));
            log.warn("RedisTemplate init Redis Address--->{}:{}",ipAndPort[0].trim(),ipAndPort[1]);
        }
        redisClusterConfiguration.setClusterNodes(nodes);
        redisClusterConfiguration.setMaxRedirects(maxRedirects);
        return redisClusterConfiguration;
    }


    @Bean
    public JedisConnectionFactory jedisConnectionFactory(JedisPoolConfig jedisPoolConfig,RedisClusterConfiguration
            redisClusterConfiguration) {
        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(redisClusterConfiguration,
                jedisPoolConfig);
        jedisConnectionFactory.afterPropertiesSet();
        return jedisConnectionFactory;
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setEnableTransactionSupport(true);
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

   @Bean
    RedisMessageListenerContainer redisMessageListenerContainer(MessageListenerAdapter listenerAdapter,
                                                                JedisConnectionFactory jedisConnectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(jedisConnectionFactory);
        container.addMessageListener(listenerAdapter,Arrays.asList(new PatternTopic
                ("channel-lijianmin-redistemplate")));
        return container;
    }
}

  • 使用方法
    1、JedisCluster 实现redis 的发布订阅
    <1>发送消息
@CrossOrigin
@RestController
@RequestMapping("/demo")
public class Demo {
    @Autowired
    private JedisCluster jedisCluster;
    
     /**
     * 发送消息非常简单,直接使用 JedisCluster 的 publish 方法即可。发送消息成
     * 功,会返回一个长整型数值0L。
     *
     * @param
     * @return : void
     * @Author: jianmin.li
     * @Date: 2019/10/25 13:38
     */
    @GetMapping("/jedisCluster")
    public void method() {
        Long publish = jedisCluster.publish("channel-lijianmin","这是来自JedisCluster发布者的消息");
        System.err.println(publish);
    }
}

<2>监听消息
要想让redis 在web 容器开启时就一直处于订阅状态,考虑通过listener 实现: 自定义MyListener,实现ServletContextListener,开启异步线程去监听redis 的channel,因为JedisCluster 的发布和订阅是阻塞的,如果用同步监听,那么你的项目就起不来了,一直阻塞在JedisCluster 初始化的地方,所以这里必须用异步线程去监听redsi 的channel。

package com.jianmin.util;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.JedisCluster;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;

/**
 * @Author: jianmin.li
 * @Description: 消息监听
 * @Date: 2019/5/30 12:55
 * @Version: 1.0
 */
@Configuration
@WebListener
public class MyListener implements ServletContextListener {
    @Autowired
    private MySubscribe jedisPubSub;
    @Autowired
    private JedisCluster jedisCluster;

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        new Thread(() -> jedisCluster.subscribe(jedisPubSub,"channel-lijianmin")).start();
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        System.out.println("ServletContext容器销毁了。。。");

    }
}

自定义 MySubscribe 并继承 JedisPubSub,实现对 redis 通道的监听,重写
onMessage 方法,该方法可以获取通道的名称以及监听到的消息,具体处理消
息的逻辑就写在这里。

package com.jianmin.util;

import org.springframework.stereotype.Component;
import redis.clients.jedis.JedisPubSub;

/**
 * @Author: jianmin.li
 * @Description: 监听redis的通道
 * @Date: 2019/5/30 12:51
 * @Version: 1.0
 */
@Component
public class MySubscribe extends JedisPubSub {
    /**
     * 监听到的消息在这里进行业务处理
     *
     * @param channel
     * @param message
     * @return : void
     * @Author: jianmin.li
     * @Date: 2019/10/25 13:46
     */
    @Override
    public void onMessage(String channel,String message) {
        System.err.println("redis通道" + channel + "监听到的消息:" + message);
    }
}

<3>启动项目,触发接口

redis发布订阅模式--jedisCluster客户端


2、RedisTemplate 实现redis 的发布订阅
<1>发送消息

@CrossOrigin
@RestController
@RequestMapping("/demo")
public class Demo {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
  
    /**
     * 使用 RedisTemplate 发送消息也非常简单,
     * 直接调用 convertAndSend 方法即可。
     *
     * @param
     * @return : void
     * @Author: jianmin.li
     * @Date: 2019/10/25 14:10
     */
    @GetMapping("/redisTemplate")
    public void send() {
        redisTemplate.convertAndSend("channel-lijianmin-redistemplate","这是来自RedisTemplate发布者的消息");
    }
}

<2>监听消息
自定义RedisSubscriber 并继承MessageListenerAdapter,重写onMessage 方
法,该方法可以获取通道的名称以及监听到的消息,具体处理消息的逻辑就写
在这里。

package com.jianmin.util;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

/**
 * @Author: jianmin.li
 * @Description: 监听redis通道
 * @Date: 2019/5/30 15:37
 * @Version: 1.0
 */
@Component
public class RedisSubscriber extends MessageListenerAdapter {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public void onMessage(Message message,byte[] pattern) {
        System.err.println("通道----->" + redisTemplate.getKeySerializer().deserialize(message.getChannel()));
        System.err.println("消息----->" + redisTemplate.getValueSerializer().deserialize(message.getBody()));
    }
}

<3>在上面的RedisConfig 里面注入redis 消息监听容器 (上面代码已经注入)

@Bean
    RedisMessageListenerContainer redisMessageListenerContainer(MessageListenerAdapter listenerAdapter,
                                                                JedisConnectionFactory jedisConnectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(jedisConnectionFactory);
        container.addMessageListener(listenerAdapter,Arrays.asList(new PatternTopic
                ("channel-lijianmin-redistemplate")));
        return container;
    }

<3>启动项目,触发接口

redis发布订阅模式--redisTemplate客户端

  • 总结:
    1、 redis 的发布订阅模式,监听了同一个通道的监听者都能收到相同的消息,在集群模式下,多个相同的监听者进行相同的信息消费,容易产生表单重复提交的问题,在实际开发中需要注意,建议采取分布式锁;
    2、 JedisCluster 的发布订阅,在容器启动时要异步去监听redis 通道;
    3、 RedisTemplate 的发布订阅,在监听到消息时,建议用redisTemplate 的KeySerializer反序列化通道,用ValueSerializer 反序列化消息,不建议用new String()反序列化,因为redisTemplate 在初始化时如果指定了相关的序列化转换器,在序列化和反序列化时就必须使用相同的序列化转换器,否则容易出现反序列化异常或乱码。