欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring Boot Redis队列-发布订阅模式

程序员文章站 2024-02-03 16:45:34
...

Spring Boot Redis队列-发布订阅模式

说明

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

Redis 客户端可以订阅任意数量的频道。

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

Spring Boot Redis队列-发布订阅模式

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

Spring Boot Redis队列-发布订阅模式

Redis 相关命令

以下实例演示了发布订阅是如何工作的。

开启客户端作为消息订阅者,订阅通道的同时会创建通道,此处通道名命名为“ demoChannel ”:

> subscribe demoChannel

另外开启客户端作为消息发布者,然后向刚前创建的通道 “ demoChannel ” 中发布消息,订阅者就能接收到消息。

> publish demoChannel "hello world!"

订阅者就会显示消息:

"hello world!"

Spring Boot 中使用

1. 订阅者订阅配置


package com.jeiker.redis.config;

import com.jeiker.redis.listener.RedisMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**
 * Description: Redis 消息订阅配置
 * User: jeikerxiao
 * Date: 2019-12-23 12:56
 */
@Configuration
public class RedisSubConfig {

    /**
     * 创建Redis连接工厂类
     *
     * @param connectionFactory
     * @param adapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter adapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 监听对应的channel
        container.addMessageListener(adapter, new PatternTopic("demoChannel"));
        return container;
    }

    @Bean
    public MessageListenerAdapter adapter(RedisMessageListener messageListener) {
        // onMessage 如果RedisMessage 中 没有实现接口,
        // 这个参数必须跟RedisMessage中的读取信息的方法名称一样
        return new MessageListenerAdapter(messageListener);
    }
}

2. 订阅者监听通道消息

package com.jeiker.redis.listener;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;

/**
 * Description: 订阅者监听通道消息
 * User: jeikerxiao
 * Date: 2019-12-23 12:53
 */
@Component
public class RedisMessageListener implements MessageListener {

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] bytes) {
        RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
        String msg = serializer.deserialize(message.getBody());
        System.out.println("接收到的消息是:" + msg);
    }
}

3. 发布者发布消息


package com.jeiker.redis;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.util.Assert;

import java.util.HashMap;
import java.util.Map;

@SpringBootTest
@ExtendWith(SpringExtension.class)
class SpringBootRedisApplicationTests {

    @Autowired
    private RedisTemplate redisTemplate;

    @Test
    public void testMq() {
        redisTemplate.convertAndSend("demoChannel", "hello world");
    }
}

测试

执行单元测试,可以查看结果

接收到的消息是:hello world
相关标签: Spring Boot Redis