欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Redis发布订阅模式

程序员文章站 2022-07-05 10:34:59
...

redis发布订阅架构
Redis提供了发布订阅功能,可以用于消息的传输,Redis的发布订阅机制包括三个部分,发布者,订阅者和Channel。 
Redis发布订阅模式
发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。

JAVA实现方式
redis配置

@Configuration
public class RedisConfig {

	@Value("${redis.pool.minIdle}")
	private int minIdle;
	@Value("${redis.pool.maxTotal}")
	private int maxTotal;
	@Value("${redis.pool.maxWaitMills}")
	private int maxWaitMills;
	@Value("${redis.pool.testWhileIdle}")
	private boolean testWhileIdle;
	@Value("${redis.pool.testOnBorrow}")
	private boolean testOnBorrow;
	@Value("${redis.pool.testOnReturn}")
	private boolean testOnReturn;
	@Value("${redis.pool.maxIdle}")
	private int maxIdle;
	@Value("${redis.pool.blockWhenExhausted}")
	private boolean blockWhenExhausted;
	@Value("${redis.pool.minEvictableIdleTimeMillis}")
	private int minEvictableIdleTimeMillis;
	@Value("${redis.pool.timeBetweenEvictionRunsMillis}")
	private int timeBetweenEvictionRunsMillis;
	@Value("${redis.pool.numTestsPerEvictionRun}")
	private int numTestsPerEvictionRun;

	@Value("${redis.cluster.host}")
	private String host;
	@Value("${redis.cluster.port}")
	private int port;
	@Value("${redis.cluster.Password}")
	private String redisPassword;

	@Bean
	public JedisPoolConfig jedisPoolConfig() {
		JedisPoolConfig poolConfig = new JedisPoolConfig();
		poolConfig.setMaxTotal(maxTotal);
		poolConfig.setMaxIdle(maxIdle);
		poolConfig.setMinIdle(minIdle);
		poolConfig.setMaxWaitMillis(maxWaitMills);
		poolConfig.setTestOnBorrow(testOnBorrow);
		poolConfig.setTestOnReturn(testOnReturn);
		poolConfig.setTestWhileIdle(testWhileIdle);
		poolConfig.setBlockWhenExhausted(blockWhenExhausted);
		poolConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
		poolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
		poolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
		return poolConfig;
	}

	@Bean
	public JedisPool jedisPool(){
		return new JedisPool(jedisPoolConfig(), host, port, Protocol.DEFAULT_TIMEOUT, redisPassword);
	}
}

发布者

public class Publisher extends Thread {

    private String channel;

    private JedisPool jedisPool;

    public Publisher(String channel, JedisPool jedisPool) {
        this.channel = channel;
        this.jedisPool = jedisPool;
    }

    @Override
    public void run() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        Jedis jedis = jedisPool.getResource();
        while (true) {
            try {
                String line = reader.readLine();
                if (StringUtils.isNotEmpty(line)) {
                    if ("quit".equals(line)) {
                        System.exit(0);
                    }
                    jedis.publish(channel, line);
                } else {
                    System.out.println("请输入消息");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

订阅者

public class Subscriber extends Thread {

    private String channel;

    private JedisPool jedisPool;

    private MessageListener messageListener;

    public Subscriber(String channel, JedisPool jedisPool, MessageListener messageListener) {
        this.channel = channel;
        this.jedisPool = jedisPool;
        this.messageListener = messageListener;
    }

    @Override
    public void run() {
        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", Constant.CHANNEL));
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(messageListener, channel);
        } catch (Exception e) {
            System.out.println(String.format("subsrcibe channel error, %s", e));
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}

@Component
public class MessageListener extends JedisPubSub {

    /**
     * 功能描述: 收到消息会调用<br>
     *
     * @Param: [channel, message]
     * @Return: void
     * @Author: jiangtj
     * @Date: 2020/4/22 15:01
     */
    @Override
    public void onMessage(String channel, String message) {
        System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
    }

    /**
     * 功能描述: 订阅了频道会调用<br>
     *
     * @Param: [channel, subscribedChannels]
     * @Return: void
     * @Author: jiangtj
     * @Date: 2020/4/22 15:01
     */
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
                channel, subscribedChannels));
    }

    /**
     * 功能描述: 取消订阅会调用<br>
     *
     * @Param: [channel, subscribedChannels]
     * @Return: void
     * @Author: jiangtj
     * @Date: 2020/4/22 15:01
     */
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
                channel, subscribedChannels));
    }
}

使用CommandLineRunner在启动时触发事件

public class Constant {
    public static final String CHANNEL = "TEST";
}

@Component
public class StartRunner implements CommandLineRunner {

    @Autowired
    private JedisPool jedisPool;

    @Autowired
    private MessageListener messageListener;

    @Override
    public void run(String... args) throws Exception {
        Subscriber subscriber = new Subscriber(Constant.CHANNEL, jedisPool, messageListener);
        subscriber.start();
        Publisher publisher = new Publisher(Constant.CHANNEL, jedisPool);
        publisher.start();
    }
}

启动类

@SpringBootApplication(exclude={DataSourceAutoConfiguration.class})
public class RedisApplication {

    public static void main(String[] args) {
        try {
            SpringApplication.run(RedisApplication.class, args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

总结:
虽然redis实现了发布订阅(publish/subscribe)的功能,但是在通常的情况下是不推荐使用的,如果想使用消息队列这种功能,最好还是使用专业的各种MQ中间件,例如rabbitMQ,rockedMQ,activitedMQ等,下面主要讲一下不推荐使用redis的发布订阅功能的原因。

概要说一下就是,PUBLISH和SUBSCRIBE的缺陷在于客户端必须一直在线才能接收到消息,断线可能会导致客户端丢失消息,除此之外,旧版的redis可能会由于订阅者消费不够快而变的不稳定导致崩溃,甚至被管理员杀掉

第一个原因是和redis系统的稳定性有关。对于旧版的redis来说,如果一个客户端订阅了某个或者某些频道,但是它读取消息的速度不够快,那么不断的积压的消息就会使得redis输出缓冲区的体积越来越大,这可能会导致redis的速度变慢,甚至直接崩溃。也可能会导致redis被操作系统强制杀死,甚至导致操作系统本身不可用。新版的redis不会出现这种问题,因为它会自动断开不符合client-output-buffer-limit pubsub配置选项要求的订阅客户端。

第二个原因是和数据传输的可靠性有关。任何网络系统在执行操作时都可能会遇到断网的情况。而断线产生的连接错误通常会使得网络连接两端中的一端进行重新连接。如果客户端在执行订阅操作的过程中断线,那么客户端将会丢失在断线期间的消息,这在很多业务场景下是不可忍受的。