Java实现redis消息订阅/发布(PubSub)
程序员文章站
2024-02-03 15:44:34
...
引入依赖
首先在你的java项目中引入
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.2</version>
</dependency>
监听代码
订阅主题所需的类
import redis.clients.jedis.JedisPubSub;
public class ChannelListener extends JedisPubSub {
public ChannelListener() {
}
@Override
public void onMessage(String channel, String message) { // 收到消息会调用
// 下面可以写你的业务处理代码
System.out.println(String.format("收到消息成功! channel: %s, message: %s", channel, message));
if (message.equals("close"))
this.unsubscribe("可填,不填就全部主题关闭订阅了");
}
@Override
public void onSubscribe(String channel, int subscribedChannels) { // 订阅频道会调用
System.out.println(String.format("订阅频道成功! channel: %s, subscribedChannels %d", channel, subscribedChannels));
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) { // 取消订阅会调用
System.out.println(String.format("取消订阅频道! channel: %s, subscribedChannels: %d", channel, subscribedChannels));
}
}
测试代码
public static void main(String[] args) {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(1000);
config.setMaxIdle(300);
config.setMaxWaitMillis(1000);
JedisPool jedisPool = new JedisPool(config, "ip", 端口, 可填, 密码==有?填:null);
// 会阻塞,所以使用线程打开
ExecutorService service = Executors.newCachedThreadPool();
service.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
Jedis jedis = jedisPool.getResource();
ChannelListener c= new ChannelListener();
try {
System.out.println("打开订阅线程,执行下句代码后此线程会阻塞");
jedis.subscribe(c, "test");
System.out.println("关闭订阅时才会打印");
} finally {
// TODO: handle finally clause
System.out.println("线程关闭");
//取消订阅
jedis.close();
}
}
});
service.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
Jedis jedis = jedisPool.getResource();
try {
// 每秒发送一条消息到对应主题
for (int i = 0; i < 10; i++) {
jedis.publish("test", String.valueOf(i));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} finally {
// TODO: handle finally clause
System.out.println("关闭连接");
jedis.close();
}
}
});
try {
Thread.sleep(11000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 关闭线程,即使你换成now来强制关闭,如果没有下面的通知订阅关闭代码,jvm是无法关闭的
service.shutdown();
//通知订阅关闭
jedisPool.getResource().publish("test", "close");
// 关闭线程池
jedisPool.close();
}
输出结果
控制台:
服务器:
redis-cli后有密码就auth你的密码,没有就 subscribe 你需要订阅的主题
总结
订阅很愉快,写码需谨慎.里面有阻塞代码块在,需要做好此代码的处理,不然是无法优雅的关闭线程的.