...
redis的发布/订阅模式是消息机制之一,另外一个叫生成者消费者模式。Redis发布订阅模式讲解可以参考菜鸟教程的这篇文章http://www.runoob.com/redis/redis-pub-sub.html。
1、Redis发布订阅模式客户端实现。在打开Redis服务器后,再打开两个客户端,客户端1用来接收消息,客户端2用来发布消息。
客户端1订阅bar频道。格式:SUBSCRIBE name1 name2。
成功订阅回复,分别对应订阅类型、订阅频道、订阅数量。
127.0.0.1:6379> SUBSCRIBE bar
Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "bar"3) (integer) 1
客户端2,发送消息。格式:publish channelName Message。
127.0.0.1:6379> publish bar val
(integer) 1
客户端1订阅回复,分别对应消息类型,频道,消息。
1) "message"2) "bar"3) "val"
效果图如下:
Redis支持模式匹配订阅,*为模糊匹配符。
订阅所有频道的消息
PSUBSCRIBE *
订阅以news.开头的所有频道。
PSUBSCRIBE news.*
其他操作这里不在赘述。
2、Java实现Redis的发布订阅。
Java实现Redis的功能大部分是使用jedis的jar包来进行操作。个人感觉,jedis封装了操作redis的常用命令,写多了就会发现知道redis命令怎么写的,就可以猜出来jedis中怎么写的。
首先在我们封装的JedisUtils中加入发布和订阅操作的方法。大家没有JedisUtils的可以参考JeeSite中的JedisUtils怎么写的。
public static void publishMsg(String channel,String message){
Jedis jedis = null;
try {
jedis = getResource();
jedis.publish(channel, message);
logger.debug("publishMsg {} = {}", channel, message);
} catch (Exception e) {
logger.warn("publishMsg {} = {}", channel, message, e);
} finally {
returnResource(jedis);
}
}
public static void publishMsg(byte[] channel,byte[] message){
Jedis jedis = null;
try {
jedis = getResource();
jedis.publish(channel, message);
logger.debug("publishMsg {} = {}", channel, message);
} catch (Exception e) {
logger.warn("publishMsg {} = {}", channel, message, e);
} finally {
returnResource(jedis);
}
}
|
上面的两个方法的核心处理就是jedis.publish(channel, message);参数channel是消息的频道,message是消息的内容。在Junit测试或者其他的地方,使用工具类的此方法即可发布一个消息。
接收消息代码多一些。首先定义一个类继承JedisPubSub,然后实现其中的未实现的方法,最后在工具类JedisUtils中定义一个操作的方法即可。代码如下:
public static void subscribeMsg(JedisPubSub jedisPubSub,String channels){
Jedis jedis = null;
try {
jedis = getResource();
jedis.subscribe(jedisPubSub, channels);
logger.debug("subscribeMsg {} = {}", jedisPubSub, channels);
} catch (Exception e) {
logger.warn("subscribeMsg {} = {}", jedisPubSub, channels, e);
} finally {
returnResource(jedis);
}
}
|
JedisPubSub类的定义如下:
package com.aq.web.shiro.redis.msg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPubSub;
import com.aq.dao.info.RedisMsgAuditInfo;
import com.aq.web.shiro.redis.SelfObjectUtils;
public class RedisMsgPubSubListener extends JedisPubSub{
private static Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class);
@Override
public void onMessage(String channel, String message) {
logger.debug("onMessage: channel["+channel+"], message["+message+"]");
if(RedisMsgJedisUtils.getRedisMsgChannelString().equalsIgnoreCase(channel)){
this.AuditMsgHandler(message);
}
}
@Override
public void onPMessage(String pattern, String channel, String message) {
logger.debug("onPMessage: channel["+channel+"], message["+message+"]");
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
logger.debug("onSubscribe: channel["+channel+"],"+
"subscribedChannels["+subscribedChannels+"]");
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
logger.debug("onUnsubscribe: channel["+channel+"], "+
"subscribedChannels["+subscribedChannels+"]");
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
logger.debug("onPUnsubscribe: pattern["+pattern+"],"+
"subscribedChannels["+subscribedChannels+"]");
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
logger.debug("onPSubscribe: pattern["+pattern+"], "+
"subscribedChannels["+subscribedChannels+"]");
}
private void AuditMsgHandler(String message){
RedisMsgAuditInfo msg3 = (RedisMsgAuditInfo) SelfObjectUtils.unserialize(message.getBytes());
logger.debug(msg3.toString());
}
}
审计日志反序列化部分代码可以省略掉。
|
最后Junit中测试如下:
package ap.shiro.redis.msg;
import org.junit.Test;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import com.aq.web.shiro.redis.JedisUtils;
import com.aq.web.shiro.redis.msg.RedisMsgJedisUtils;
import com.aq.web.shiro.redis.msg.RedisMsgPubSubListener;
@ContextConfiguration(locations = "/mysql-test.xml")
public class RedisMsgPubSubTester extends AbstractJUnit4SpringContextTests{
@Test
public void testMsg_pub(){
RedisMsgJedisUtils.publishMsg("news.share", "2016年3月28日 15:34:37");
}
@Test
public void testMsg_sub(){
RedisMsgPubSubListener pubsub = new RedisMsgPubSubListener();
RedisMsgJedisUtils.subscribeMsg(pubsub, "news.share");
}
}
建议测试时,分开方法进行测试。其中testMsg_pub()方法是用来测试发布消息的,testMsg_sub()是用来测试接收订阅消息的。这里只是使用了普通订阅,大家还可以使用模式订阅。执行testMsg_sub()方法后,客户端会一直开启着,不会关闭。另外,在其他的redis客户端中发布一条消息,控制台就会立刻输出该消息。
|
|
这样Redis的订阅发布的Java实现就完成了。总体不太难,我今天搜资料的时候发现资料很多也很乱,当然没在开源中国搜。
另外,上面例子中使用的JedisPubSub类的子类接收消息。第二天测试时发现,JedisPubSub类的子类接收字符串类的消息没问题,但是接收对象转byte[]的消息后不能正确地转换回对象。这里的对象是自定义的,通过ByteArray流和Object流完成Object与byte[]之间的转换。
经过查资料后发现,使用BinaryJedisPubSub类的子类接收消息可以正确地转换对象,不会出现上述问题。大家自己试下。继承BinaryJedisPubSub类的监听器跟JedisPubSub类的监听器类似。