欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

redis实现消息队列(发布/订阅模式)

程序员文章站 2024-02-03 15:40:34
...

redis的列表类型天生支持用作消息队列(类似于MQ的队列模型–任何时候都可以消费,一条消息只能消费一次),学习过程借鉴https://www.cnblogs.com/qlqwjy/p/9763754.html

关于redis的list操作https://blog.csdn.net/weixin_43113679/article/details/90080933

java程序实现消息队列

先生产者
本人下面的目录结构参考https://blog.csdn.net/weixin_43113679/article/details/90413124
测试类

@Test
 	public void putMessage() throws Exception {
 		ctx = new ClassPathXmlApplicationContext("spring-service.xml");
        userService = ctx.getBean(UserService.class);
        for(int i=0;i<20;i++) {
        	userService.putMessage(i);
        }
 		
 	}
@Override
	public void putMessage(int message) throws Exception {
			//把消息发布
			String messageKey = "message:queue";
			redisDao.lpush(messageKey, String.valueOf(message));
		
		
	}
@Override
	public void lpush(String messageKey, String message) {
		jedisPool.getResource().lpush(messageKey, message);
		
	}

结果
redis实现消息队列(发布/订阅模式)
这样信息就发布成功了,这说一下,redis是单线程,如果上面用多个线程实现在缓存中的排列方式还是这样
准备消费者
这我用while循环来看点有意思的结果
先说一句:按以前redis在cmd操作的方式,当value值没有了,那key就会销毁不存在
测试类

@Test
 	public void getMessage() throws Exception {
 		ctx = new ClassPathXmlApplicationContext("spring-service.xml");
        userService = ctx.getBean(UserService.class);
        int i =1;
        while(true) {
        	String message = userService.getMessage();
        	System.out.println("第"+(i++)+"消息,"+"value="+message);
        }
 		
 	}
@Override
	public String getMessage() throws Exception {
		//把消息取出来
		String messageKey = "message:queue";
		return redisDao.rpop(messageKey);
	}
@Override
	public String rpop(String messageKey) throws Exception {
		
		return jedisPool.getResource().rpop(messageKey);
	}

结果太多瞬间过去了(这提醒一下,把最大连接写大点,要不会报错,水池耗尽Pool exhausted)
给大家看一个靠后的
redis实现消息队列(发布/订阅模式)
竟然有value是null的情况可能是我没加判定的原因,现在我加上

@Test
 	public void getMessage() throws Exception {
 		ctx = new ClassPathXmlApplicationContext("spring-service.xml");
        userService = ctx.getBean(UserService.class);
        int i =1;
        while(true) {
        	String message = userService.getMessage();
        	if(message == null){
        		break;
        	}
        	System.out.println("第"+(i++)+"消息,"+"value="+message);
        	
        }
 		
 	}

测试是没问题了,但是不能所有的都这样啊,长久肯定有问题啊,当生产者和消费者一直插入和读取呢,如果读取到还好,读取不到不就造成多余连接,浪费资源啊,除非你Thread.sleep()来让消费者休息一下,这样是不会造成不必要的浪费但是有两个问题

  1. 如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间
  2. 如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销

下面介绍brpop和blpop实现阻塞读取(非常重要)

命令参数

 brpop key [key …] timeout    //右端弹出
 blpop key [key …] timeout    //左端弹出

区别就是第一个是从右端弹出一个元素,第二个是从左端弹出一个元素
r和l就应该知道,这不说头部(R)和尾部(L),这样好理解
实例:

127.0.0.1:6379> lpush queue2 3 4
(integer) 2
127.0.0.1:6379> brpop queue2 5
1) "queue2"                     //返回值第一个是要返回的列表的key
2) "3"							//因为是brpop,所以返回的value是 3,这是一个一个返回的
127.0.0.1:6379> brpop queue2 5
1) "queue2"
2) "4"
127.0.0.1:6379> brpop queue2 5
(nil)
(5.04s)

弹出了两次后queue2列表里就没有value了,那就会消失,
  timeout也就是上面命令行的5,代表是当列表里没有此key的时候(当value没有了,可以也就销毁了),会等待5S,这5s里是阻塞的,
  当时间到了,而还没出现此key就会返回nil,相反,如果在等待时间里此key出现了,那么就会弹出指定的key,和value
实例
redis实现消息队列(发布/订阅模式)
我稍微定的时间长一些,两个窗口,在timeout内添加上queue2的列表就没问题了
对于命令行 key [key …] 说明可以多个key
当key都存在

127.0.0.1:6379> lpush queue1 1  //添加测试用例
(integer) 1
127.0.0.1:6379> lpush queue2 2
(integer) 1
127.0.0.1:6379> brpop qeueu1 queue2 5  //当brpop,当多个key时
1) "queue2"   //右边的列表queue2弹出
2) "2"
127.0.0.1:6379> lpush queue2 2
(integer) 1
127.0.0.1:6379> blpop queue1 queue2 5  //改成blpop,当多个key时
1) "queue1"     //左边的列表queue1弹出
2) "1"

从上面的例子得出当有多个key(都存在),根据 brpop和blpop来决定哪个列表,再在列表里根据brpop和blpop来决定弹出哪边
当key有不存在的

127.0.0.1:6379> del queue2  //删除列表 queue2
(integer) 1
127.0.0.1:6379> brpop queue2 queue1 5  //第一个可以是不存在的,第二个是存在的
1) "queue1"   			//返回的是第二个的key
2) "4"
127.0.0.1:6379> brpop queue1 queue2 5   //调换位置
1) "queue1"  					//还是列表queue1的key
2) "5"
127.0.0.1:6379> del queue1    //删除列表queue1
(integer) 1
127.0.0.1:6379> brpop queue1 queue2 5  //等待5s,失败
(nil)
(5.04s)

从上面的例子能看出当多个key时,当有不存在时,会把不存在的剔除(或者忽略),之后再存在的列表里决定弹出哪一边,当都不存在时就等待timeout了
借此特性可以区分优先级的任务队列,根据key的顺序读取每一个数据
程序我就不写了,跟据上面的改就行,
在java程序里,timeout写在前面,key写在后面,返回的是一个List集合

发布/订阅模式(一个消息可以被多个订阅者消费)

1 客户端发布/订阅

1.1普通的发布/订阅

除了实现任务队列外,redis还提供了一组命令可以让开发者实现"发布/订阅"(publish/subscribe)模式。"发布/订阅"模式同样可以实现进程间的消息传递,其原理如下:
  "发布/订阅"模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel),而发布者可以向指定的频道(channel)发送消息,所有订阅此频道的订阅者都会收到此消息。
(1)发布消息1

127.0.0.1:6379> PUBLISH channel:1 hi   //如向 channel1:1说一声hi
(integer) 0

一开始没有订阅者返回的是0,发出去的消息不会被持久化,也就是有客户端订阅channel:1后只能接收到后续发布到该频道的消息,之前的就接收不到了。
(2)订阅频道
  订阅频道的命令是 subscribe,可以同时订阅多个频道,用法是 subscribe channel1 [channel2 …],例如新开一个客户端订阅上面频道:(不会收到消息,因为不会收到订阅之前就发布到该频道的消息)

127.0.0.1:6379> SUBSCRIBE channel:1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"    //表示订阅成功的反馈信息
2) "channel:1"   // 订阅成功的频道名称
3) (integer) 1    //当前客户端订阅的频道数量

订阅后就不能进行新的操作,所以建议开两个cmd窗口,只能接收发布者的消息
(3)发布者(第一个客户端重新往channel:1发送一条消息)

127.0.0.1:6379> PUBLISH channel:1 hi
(integer) 1   //返回的是订阅者的数量

订阅的客户端显示的内容

127.0.0.1:6379> SUBSCRIBE channel:1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:1"
3) (integer) 1   //上面的是重复的
1) "message"   //表示接收到的消息
2) "channel:1" //示产生消息的频道名称,因为可以接受多个频道
3) "hi"  //消息的内容

取消订阅

redis 127.0.0.1:6379> unsubscribe channel:1
1) "unsubscribe"  //成功取消订阅某个频道
2) "channel:1"  //频道名称
3) (integer) 0  //当前客户端订阅的频道数量

在此过程中,发布消息的客户端可以随意操作,但是订阅消息的客户端只能接收订阅的消息,不能进行其他的操作,再开一个新客户端,端口号一样,那新客户端可以进行任意操作

1.2按照规则发布/订阅

除了可以使用subscribe命令来订阅指定的频道外,还可以使用psubscribe命令来订阅频道
它们两个的区别命令是在开头加了一个p另一个区别就是前面的是制定频道订阅,后面的是模糊频道订阅
如果SQL语句的模糊查询就更好理解了
命令

psubscribe pattern [pattern ...]  //订阅多个模式的频道

通配符中?表示1个占位符,*表示任意个占位符(包括0),?*表示1个以上占位符
下面来看一下这三个通配符的实例吧

127.0.0.1:6379> psubscribe c? b* d?*  //订阅多个频道,并且都有通配符
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"			//下面这些就不说了,和subscribe订阅一样的
2) "c?" 
3) (integer) 1
1) "psubscribe"
2) "b*"
3) (integer) 2
1) "psubscribe"
2) "d?*"
3) (integer) 3
1) "pmessage"
2) "c?"

新开个客户端发送频道信息到指定频道

127.0.0.1:6379> publish c m1  //频道是c,但是没有订阅者,上面的c后面跟的?
(integer) 0
127.0.0.1:6379> publish c1 m1 //有订阅者
(integer) 1
127.0.0.1:6379> publish c11 m1 //没有订阅者,因为?代表的是一个占位符,这离的c后面有两个
(integer) 0
127.0.0.1:6379> publish b m1 //有,*代表0到无限的占位符,下同
(integer) 1
127.0.0.1:6379> publish b1 m1 
(integer) 1
127.0.0.1:6379> publish b11 m1 
(integer) 1
127.0.0.1:6379> publish d m1 //没有订阅者,?*代表至少1和占位符,下面就有了
(integer) 0
127.0.0.1:6379> publish d1 m1
(integer) 1
127.0.0.1:6379> publish d11 m1
(integer) 1

都是发送的m1的消息
上面返回值为1表示被订阅者所接受,可以匹配上面的通配符。
现在看看订阅者的客户端的信息吧

redis实现消息队列(发布/订阅模式)
注意:
  (1)使用psubscribe命令可以重复订阅同一个频道,如客户端执行了psubscribe c? c?*。这时向c1发布消息客户端会接受到两条消息,而同时publish命令的返回值是2而不是1。.同样的,如果有另一个客户端执行了subscribe c1 和psubscribe c?*的话,向c1发送一条消息该客户顿也会受到两条消息(但是是两种类型:message和pmessage),同时publish命令也返回2.

(2)punsubscribe命令可以退订指定的规则,用法是: punsubscribe [pattern [pattern …]],如果没有参数则会退订所有规则

(3)使用punsubscribe只能退订通过psubscribe命令订阅的规则不会影响直接通过subscribe命令订阅的频道,同样unsubscribe命令不会影响通过psubscribe命令订阅的规则。
另外需要注意punsubscribe命令退订某个规则时不会将其中的通配符展开,而是进行严格的字符串匹配,所以punsubscribe * 无法退订c规则,而是必须使用punsubscribe c*才可以退订。

2 java程序实现发布者订阅者模式

首先强调一点,订阅者在订阅模式下要一直开着,或者阻塞,因为它的特性决定当你退出后发布者发布什么就和你无关了,要想接收就需要一直开着,或者一直阻塞,在java程序时,消费者需要一直开着等待生产者发送消息,消费者才能接收到
因为Eclipse的Junit只能一个测试,我就分开了,测试用Eclipse,另一个用cmd
目录结构参考:https://blog.csdn.net/weixin_43113679/article/details/90413124
生产者

/**
 	 * 生产者
 	 * */
 	@Test
 	public void MessageProducer()throws Exception{
 		ctx = new ClassPathXmlApplicationContext("spring-service.xml");
        userService = ctx.getBean(UserService.class);
        for(int i = 0; i < 10; i++) {
        	userService.publish(i);
        	
        }    		
 	};

service

@Override
	public void publish(int i) throws Exception {
		//频道号
		String channel = "channel:1";
		redisDao.publish(channel, String.valueOf(i));
		
	}

redisDao

@Override
	public void publish(String channel, String message) throws Exception {
		jedisPool.getResource().publish(channel, message);
		
	}

结果
redis实现消息队列(发布/订阅模式)
(1)subscribe实现订阅者

/**
 	 *  用subscribe订阅的订阅者
 	 * */
 	@Test
 	public void MessageConsumerSubscribe()throws Exception{
 		ctx = new ClassPathXmlApplicationContext("spring-service.xml");
        userService = ctx.getBean(UserService.class);  
        	userService.subscribe();        
 	}

上面的userService.subscribe(); 不用while循环,你只要订阅了,除非退出,要不会一直阻塞在那等待发布者发布指定频道的消息,有消息就会处理,再等待,所以不需要自己添加while循环
service

@Override
	public void subscribe() throws Exception {
		String channel = "channel:1";
		redisDao.subscribe(channel);
	}

redisDao

/**
	 * 处理订阅/发布的订阅者根据频道接收的信息
	 * */
private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();

@Override
	public void subscribe(String channel) throws Exception {
		jedisPool.getResource().subscribe(myJedisPubSub, channel);
		//下面就直接交给myJedisPubSub来处理返回的信息了
	}
	
}
/**
 * 继承JedisPubSub,重写接收消息的方法
 */
class MyJedisPubSub extends JedisPubSub {
	/**
	 * 结束程序的消息
	 * */
   private static final String EXIT_COMMAND = "exit";
    /** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现
     * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage
     * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法
     * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]
     **/
	 @Override
    public void onMessage(String channel, String message) {
    	 //接收到exit消息后退出
        if (EXIT_COMMAND.equals(message)) {
            System.exit(0);
        }
        System.out.println("-接收到消息:channel=" + channel + ",message=" + message);
       
    }
}

redis实现消息队列(发布/订阅模式)
结果
redis实现消息队列(发布/订阅模式)
(2)psubscribe实现订阅者
这个就不写了,和上面一样,把channel 改成模糊频道,再把所有的subscribe改成psubscribe,还有重写JedisPubSub(这个和上边的有点不一样),

/**
 1. 继承JedisPubSub,重写接收消息的方法
 */
class MyJedisPubSub extends JedisPubSub {
	/**
	 * 结束程序的消息
	 * */
   private static final String EXIT_COMMAND = "exit";
   
    @Override
    public void onPMessage(String pattern, String channel, String message) {
      //接收到exit消息后退出,程序停住
        if (EXIT_COMMAND.equals(message)) {
            System.exit(0);
        }
        System.out.println(Thread.currentThread().getName()+"-接收到消息:pattern="+pattern+",channel=" + channel + ",message=" + message);
      
    }
}

到此我们实现了两种消息队列,

  1. redis自带的list类型(lpushrpop或者brpoprpushlpop或者blpop),带b的是阻塞读取
  2. 发布/订阅模式发布者(publish channel message),订阅者(subscribe channel [channel …]或者psubscribe pattern [pattern …])前者指定频道后者模糊频道

补充

(1)取消订阅

就像上面一样,订阅了此线程就会在这继续等待,不会停止也不会返回主线程,就在这等待发布者发布消息,接收,除非像上面一样 System.exit(0)结束程序 ,在测试时没问题,但是在做项目是就不行了,要不一直在这等待,怎么返回结果呢,所以就需要取消订阅,调用JedisPubSubunsubscribe() 方法
程序和上面的基本没变

@Override
	public void subscribe(String channel) throws Exception {
		jedisPool.getResource().subscribe(myJedisPubSub, channel);
		
		System.out.println("取消订阅后返回主线程");
	}
	
}
/**
 * 继承JedisPubSub,重写接收消息的方法
 */
class MyJedisPubSub extends JedisPubSub {
	/**
	 * 结束程序的消息
	 * */
   private static final String EXIT_COMMAND = "exit";
    /** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现
     * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage
     * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法
     * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]
     **/
	 @Override
    public void onMessage(String channel, String message) {
        System.out.println("-接收到消息:channel=" + channel + ",message=" + message);
        //接收到exit消息后退出返回主线程
        if (EXIT_COMMAND.equals(message)) {
            unsubscribe(channel);
        }
    }
	 @Override
	public void unsubscribe(String... channels) {
		
		super.unsubscribe(channels);
	}
}
127.0.0.1:6379> PUBLISH channel:1 5
(integer) 1
127.0.0.1:6379> PUBLISH channel:1 6
(integer) 1
127.0.0.1:6379> PUBLISH channel:1 exit
(integer) 1

结果
redis实现消息队列(发布/订阅模式)
 返回redisDao里了,这样就成功了,但是一想这还是发布方决定的啊,
 一想邮件什么的不是有取消订阅吗?
 这个就会有概念混淆的问题,邮件里的订阅和取消订阅和这个的订阅还是有区别的,这里的订阅更像是实时动态群发,群接受
看源码
redis实现消息队列(发布/订阅模式)
  当客户端为null时你可以这么理解,就是你自己退出时,订阅就结束了,当你登陆后开始新的订阅
  这时候你会想根据redis的订阅的特性,当你不在的时候发布者的发布消息你都接收不到了,那我再登录时这些我还有吗?答案是肯定有的
  现在有可能根据测试例子你都忘了redis到底是干什么的它是缓存啊,缓存里的数据肯定要在数据库里备份啊,就算你不在,你登陆后完全可以去数据库中查找啊
  psubscribe也有punsubscribe,和上面差不多

(2)BRPOP:当给定列表内没有任何元素可供弹出的时候,连接将被BRPOP命令阻塞,直到等待超时或发现可弹出元素为止。(每次只弹出一个元素,当没有元素的时候处于阻塞,当弹出一个元素之后就会解除阻塞)


    @Override
	public List<String> brpop(String messageKey) throws Exception {
		List<String> list = new ArrayList<String>();
		System.out.println("准备接受列表的全部数据");
		list = jedisPool.getResource().brpop(5, messageKey);
		System.out.println("接收完成,返回");
		return list;
	}
	
    

上面的就是brpop的过程,brpop返回的是一个List<String>集合,
第一个返回的是列表的名称,也可以说是列表的key
第二个就是弹出的信息
这个也是弹出一个就结束返回主线程
redis实现消息队列(发布/订阅模式)
当redis中没有此列表时,timeout就要发挥作用了
redis实现消息队列(发布/订阅模式)
brpop阻塞也是有时间限制的,不像发布/订阅,只要超过timeout还没有消息就会结束(在此期间还是阻塞等待的过程),返回主线程