欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java实现redis消息订阅/发布(PubSub)

程序员文章站 2024-02-03 15:44:34
...

引入依赖

首先在你的java项目中引入

		<dependency>
			<groupId>redis.clients</groupId>
			<artifactId>jedis</artifactId>
			<version>2.9.2</version>
		</dependency>

监听代码

订阅主题所需的类

import redis.clients.jedis.JedisPubSub;

public class ChannelListener extends JedisPubSub {
	public ChannelListener() {
	}

	@Override
	public void onMessage(String channel, String message) { // 收到消息会调用
		// 下面可以写你的业务处理代码
		System.out.println(String.format("收到消息成功! channel: %s, message: %s", channel, message));
		if (message.equals("close"))
			this.unsubscribe("可填,不填就全部主题关闭订阅了");
	}

	@Override
	public void onSubscribe(String channel, int subscribedChannels) { // 订阅频道会调用
		System.out.println(String.format("订阅频道成功! channel: %s, subscribedChannels %d", channel, subscribedChannels));
	}

	@Override
	public void onUnsubscribe(String channel, int subscribedChannels) { // 取消订阅会调用
		System.out.println(String.format("取消订阅频道! channel: %s, subscribedChannels: %d", channel, subscribedChannels));

	}

}

 测试代码

	public static void main(String[] args) {
		JedisPoolConfig config = new JedisPoolConfig();
		config.setMaxTotal(1000);
		config.setMaxIdle(300);
		config.setMaxWaitMillis(1000);
		JedisPool jedisPool = new JedisPool(config, "ip", 端口, 可填, 密码==有?填:null);

		// 会阻塞,所以使用线程打开
		ExecutorService service = Executors.newCachedThreadPool();
		service.execute(new Runnable() {
			@Override
			public void run() {
				// TODO Auto-generated method stub
				Jedis jedis = jedisPool.getResource();
				ChannelListener c=	new ChannelListener();
				try {
					System.out.println("打开订阅线程,执行下句代码后此线程会阻塞");
					jedis.subscribe(c, "test");
					System.out.println("关闭订阅时才会打印");
				} finally {
					// TODO: handle finally clause
					System.out.println("线程关闭");
					//取消订阅
					jedis.close();
				}

			}
		});
		service.execute(new Runnable() {
			@Override
			public void run() {
				// TODO Auto-generated method stub
				Jedis jedis = jedisPool.getResource();
				try {
					// 每秒发送一条消息到对应主题
					for (int i = 0; i < 10; i++) {
						jedis.publish("test", String.valueOf(i));
						try {
							Thread.sleep(1000);
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					}

				} finally {
					// TODO: handle finally clause
					System.out.println("关闭连接");
					jedis.close();
				}

			}
		});
		try {
			Thread.sleep(11000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		// 关闭线程,即使你换成now来强制关闭,如果没有下面的通知订阅关闭代码,jvm是无法关闭的
		service.shutdown();
        //通知订阅关闭
		jedisPool.getResource().publish("test", "close");
		// 关闭线程池
		jedisPool.close();
	}

输出结果

控制台:

Java实现redis消息订阅/发布(PubSub)

服务器: 

redis-cli后有密码就auth你的密码,没有就 subscribe 你需要订阅的主题

 Java实现redis消息订阅/发布(PubSub)

总结

订阅很愉快,写码需谨慎.里面有阻塞代码块在,需要做好此代码的处理,不然是无法优雅的关闭线程的.