欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

使用jedis操作redis之消息的发布和订阅

程序员文章站 2022-05-21 13:52:00
...

一般来说,发布和订阅(又称pub/sub)的特点是订阅者(listener)负责订阅频道(channel),发送者(publisher)负责向频道发送二级制字符串消息。每当有消息被发送至给定频道时,频道的所有订阅者都会受到消息。一个订阅者可以订阅多个频道,而发送者也能向多个频道发送消息。那么这里就分别来介绍命令行操作以及使用jedis操作redis来实现消息的发布和订阅。
关于使用jedis操作redis(参考使用jedis操作redis)。

一、命令行

使用ssh连接linux,打开三个窗口(一个发送消息,两个接收消息)启动redis服务,三个窗口都使用命令连接到redis客户端。

1、两个订阅者

127.0.0.1:6379> subscribe cctv1 
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "cctv1"
3) (integer) 1
127.0.0.1:6379> subscribe cctv1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "cctv1"
3) (integer) 1

订阅频道 :subscribe 频道
两个订阅者都订阅了频道 “cctv1”

2、发送者

127.0.0.1:6379> publish cctv1 'haha'
(integer) 2

向指定频道发送消息:publish 频道 消息内容
发送者向频道 “cctv1” 发送了一个消息, 内容是 “haha”

这个时候再看两个订阅者都已经接收到了这个消息

127.0.0.1:6379> subscribe cctv1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "cctv1"
3) (integer) 1
1) "message"
2) "cctv1"
3) "haha"
127.0.0.1:6379> subscribe cctv1 
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "cctv1"
3) (integer) 1
1) "message"
2) "cctv1"
3) "haha"

2、jedis操作

创建项目,导包

<dependencies>
    <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.8.1</version>
        </dependency>
     <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
       </dependency>
  </dependencies>

测试(只做简单测试,不再说场景应用):

需求:两个订阅者(订阅相同的频道),三个发送者。发送者每次发送消息,订阅者接收到消息后将消息打印到控制台。

public class PubSubTest {
    JedisPool jedispool = new JedisPool("192.168.25.128",6379);
    Jedis jedis;

    @Before//test方法执行之前执行
    public void init(){
        jedis = jedispool.getResource();
    }
    @After//test方法执行之后执行
    public void destroy(){
        jedispool.close();
        jedis.close();
    }
    /*
     * 测试订阅
     */
    @Test//订阅者1
    public void test0() throws IOException{
        MyJedisSub myJedisSub = new MyJedisSub();
        jedis.subscribe(myJedisSub, "china-1","china-2","china-3");
        System.in.read();//持续接收消息的状态,按回车线程才会结束。

    }
    @Test//订阅者2
    public void test1() throws IOException{
        MyJedisSub myJedisSub = new MyJedisSub();
        jedis.subscribe(myJedisSub, "china-1","china-2","china-3");
        System.in.read();

    }

解释:jedis的subscribe方法要求传入第一个参数为JedisPubSub对象,该对象中定义了一系列消息监等听方法,需要我们自己定义类继承并实现需要的方法,如onMessage()。
第二个参数是订阅的频道。每次发送者向该频道发送消息后,可以在JedisPubSub的方法里面进行逻辑操作。比如这里监听到了消息,那么就在控制台打印消息。

自定义MyJedisSub类如下

public class MyJedisSub extends JedisPubSub{
    @Override
    public void onMessage(String channel, String message) {
    //每次监听到频道channel发送的消息message后就在控制台打印
        System.out.println("频道"+channel+"发出消息:"+message);
    }
}
    /*
     * 测试发布消息
     */
    @Test//发布者1
    public void test2(){
        jedis.publish("china-1", "正在播放新闻联播");
    }
    @Test//发布者2
    public void test3(){
        jedis.publish("china-2", "正在播放射雕英雄传");
    }
    @Test//发布者3
    public void test4(){
        jedis.publish("china-3", "正在播放还珠格格");
    }

测试:先运行test0和test1方法,使用订阅者处于一直接受消息的状态。在分别运行test2-test4。可以在两个订阅者对应的控制台看到打印的消息。

订阅者1控制台:

使用jedis操作redis之消息的发布和订阅

订阅者2控制台:

使用jedis操作redis之消息的发布和订阅

注意:redis的发布与订阅模式非常有用,但是存在着一些问题。一个是因为旧版本的不稳的性原因(如果一个客户端订阅了某些频道,但它读取的速度不够快的话,那么不断积累的消息就会导致redis输出缓冲区的体积变得越来越大,这会导致redis速度变慢,甚至崩溃)。另一个原因和数据传输的可靠性有关,比如客户端在订阅操作过程中断线,那么客户端就会丢失在断线期间发生的所有消息(来自redis in action这本书,同时提供了解决办法)。