运用Comet技术实现服务端往客户端主动推送数据(结合redis发布/订阅)
记得我之前写过 redis主动向页面push数据 的文章,但文中所描述的方法要应用到J2EE的项目中还是比较困难的(还需用到nodejs什么的)。于是本文来探究下比较适合web项目的主动推技术。
Comet是一种用于web的推送技术,能使服务器能实时地将更新的信息传送到客户端,而无须客户端发出请求,目前有两种实现方式:长轮询(long-polling)和iframe流(streaming)。下面就用iframe流的实现方式来实现服务端主动向客户端(这里客户端指的是jsp页面)推送的效果,并且结合了redis的发布订阅,算是比较典型的例子了。
客户端(页面):
<script type="text/javascript"> $(function() { setCometUrl(); bindLinstener(); }); function bindLinstener() { if (window.addEventListener) { window.addEventListener("load", comet.initialize, false); window.addEventListener("unload", comet.onUnload, false); } else if (window.attachEvent) { window.attachEvent("onload", comet.initialize); window.attachEvent("onunload", comet.onUnload); } } function setCometUrl(){ comet.cometUrl = "pubsub/push.json"; } //服务器推送代码 var comet = { connection : false, iframediv : false, initialize : function() { if (navigator.appVersion.indexOf("MSIE") != -1) { // For IE browsers comet.connection = new ActiveXObject("htmlfile"); comet.connection.open(); comet.connection.write("<html>"); comet.connection.write("<script>document.domain = '" + document.domain + "'"); comet.connection.write("</html>"); comet.connection.close(); comet.iframediv = comet.connection.createElement("div"); comet.connection.appendChild(comet.iframediv); comet.connection.parentWindow.comet = comet; comet.iframediv.innerHTML = "<iframe id='comet_iframe' src='"+comet.cometUrl+"'></iframe>"; } else if (navigator.appVersion.indexOf("KHTML") != -1) { // for KHTML browsers comet.connection = document.createElement('iframe'); comet.connection.setAttribute('id', 'comet_iframe'); comet.connection.setAttribute('src', comet.cometUrl); with (comet.connection.style) { position = "absolute"; left = top = "-100px"; height = width = "1px"; visibility = "hidden"; } document.body.appendChild(comet.connection); } else { // For other browser (Firefox...) comet.connection = document.createElement('iframe'); comet.connection.setAttribute('id', 'comet_iframe'); comet.iframediv = document.createElement('iframe'); comet.iframediv.setAttribute('src', comet.cometUrl); comet.connection.appendChild(comet.iframediv); document.body.appendChild(comet.connection); } }, onUnload : function() { if (comet.connection) { comet.connection = false; // release the iframe to prevent problems with IE when reloading the page closePage(); } }, receiveMsg : function(msg) { $("#content").append(msg + "<br/>"); } } function closePage() { $.ajax({ async : true, cache : false, type : "POST", //data:{objId:objId}, dataType:"json", url :"pubsub/close.json", success : function(data) { }, error: function(){ } }); } </script> </head> <body > <div id="content" class="show"></div> </body>
这个客户端页面是利用浏览器支持的Comet,仅发起一次ajax请求,打通后台后,后台就会源源不断主动往这个页面发送数据。
后台较为复杂,并且还结合了redis的发布订阅。数据来源则是订阅redis的一个channel而得到。
Action:
@Controller public class PubSubAction { LinkedList<String> queue = new LinkedList<String>(); PrintWriter out; //线程 MsgSubHandler subT = null; CheckQueueHandler checkT = null; @RequestMapping("/pubsub/push.json") @ResponseBody public void pushMsg(HttpServletResponse response) { System.out.println("这儿进几次........."); //订阅 subT = new MsgSubHandler("pubsub_channel", queue); subT.start(); //检查 checkT = new CheckQueueHandler(queue); checkT.start(); //创建Comet Iframe sendHtmlScript(response, "<script type=\"text/javascript\"> var comet = window.parent.comet;</script>"); while (true) { try { Thread.sleep(1000);//每隔1s从队列取数 if(queue.size() > 0) { String msg = queue.pop(); System.out.println("从队列里取到的信息:" + msg); sendHtmlScript(response, "<script type=\"text/javascript\"> comet.receiveMsg('"+msg+"');</script>"); } }catch(InterruptedException e) { e.printStackTrace(); } } } @RequestMapping("/pubsub/close.json") @ResponseBody public void shutdownServer() throws InterruptedException { System.out.println("开始关闭操作.."); //关闭流 out.flush(); out.close(); //队列情空 queue.clear(); //消息的关闭处理 subT.shut(); checkT.shut(); //线程停止 if(checkT.isAlive()) { checkT.interrupt(); checkT.join(); } if(subT.isAlive()) { subT.interrupt(); subT.join(); } } private void sendHtmlScript(HttpServletResponse response,String script){ response.setCharacterEncoding("UTF-8"); response.setContentType("text/html"); response.setDateHeader("Expires", 0); response.setHeader("Pragma", "No-cache"); response.setHeader("Cache-Control", "no-cache,no-store,max-age=0"); try { out = response.getWriter(); out.write(script); out.flush(); } catch (IOException e) { e.printStackTrace(); log.error(e.getMessage(), e); } } }
其中,订阅消息的线程类和检查消息队列大小的线程类分别如下:
1:定时检查队列大小的线程类,目的是避免消息队列大小过大
public class CheckQueueHandler extends Thread { private LinkedList<String> queue; private boolean runFlag = true; public CheckQueueHandler(LinkedList<String> queue) { this.queue = queue; } @Override public void run() { try { while (runFlag && queue.size()>0) { Thread.sleep(60 * 1000);//每隔1分钟检查指定队列的大小 if (queue.size() >= 500) { queue.clear(); } } } catch (InterruptedException e) { e.printStackTrace(); } } public void shut() { runFlag = false; } }
2:订阅相应的channel的线程类:
public class MsgSubHandler extends Thread{ private LinkedList<String> queue; private String channel; JedisPool pool; Jedis jedis; PubSubListener listener; public MsgSubHandler(String channel, LinkedList<String> queue) { this.channel = channel; this.queue = queue; //redis资源初始化 pool = SysBeans.getBean("jedisPool"); jedis = pool.getResource(); //发布/订阅监听初始化 listener = new PubSubListener(queue); } @Override public void run() { //订阅指定的渠道信息 jedis.subscribe(listener, channel); } public void shut() { //归还redis资源 if(pool !=null && jedis != null) { pool.returnResource(jedis); } //取消渠道订阅 listener.unsubscribe(); } }
3:redis的发布/订阅监听类
public class PubSubListener extends JedisPubSub { private LinkedList<String> queue =null; public PubSubListener(LinkedList<String> queue) { this.queue = queue; } //取得订阅后消息的处理 @Override public void onMessage(String channel, String message) { //System.out.print("onMessage:取得订阅后消息的处理 "); queue.add(message); } //取得按表达式的方式订阅的消息后的处理 @Override public void onPMessage(String pattern, String channel, String message) { System.out.print("onPMessage:取得按表达式的方式订阅的消息后的处理 "); System.out.println(pattern + "=" + channel + "=" + message); } //初始化按表达式的方式订阅时候的处理 @Override public void onPSubscribe(String pattern, int subscribedChannels) { System.out.print("onPSubscribe:初始化按表达式的方式订阅时候的处理 "); System.out.println(pattern + "=" + subscribedChannels); } //取消化按表达式的方式订阅时候的处理 @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { System.out.print("onPUnsubscribe:取消化按表达式的方式订阅时候的处理 "); System.out.println(pattern + "=" + subscribedChannels); } //初始化订阅时候的处理 @Override public void onSubscribe(String channel, int subscribedChannels) { System.out.print("onSubscribe:初始化订阅时候的处理 "); System.out.println(channel + "=" + subscribedChannels); } //取消订阅时候的处理 @Override public void onUnsubscribe(String channel, int subscribedChannels) { System.out.print("onUnsubscribe:取消订阅时候的处理 "); System.out.println(channel + "=" + subscribedChannels); } }
启动工程,打开客户端页面,最初始的div:
同时控制台打印:
这儿进几次.........
onSubscribe:初始化订阅时候的处理 pubsub_channel=1
这说明:一打开客户端,就实现了订阅对应channel的目的。
接下来,为了让这个div中有数据,我们开始来对这个channel进行publish一些数据,模拟:
public static void main(String[] args) { Jedis jedis = new Jedis("localhost"); while(true) { try { Thread.sleep(2000); jedis.publish("pubsub_channel", "I like " + Math.random()*100 ); } catch (InterruptedException e) { e.printStackTrace(); } } }
然后你再观察这个div,会发现如下现象(某一时刻):
由此说明:我们达到了如题所想要的目的!——结合了redis的发布/订阅 并且客户端只请求服务端一次,服务端主动向客户端推送了数据。
最后,我们再试着关闭客户端页面,会发现控制台打印:
onUnsubscribe:取消订阅时候的处理 pubsub_channel=0
说明,客户端一关闭,就取消了对channel的订阅了。并且queue队列也会被清空。
其实Comet并不是新兴的技术,关于【反ajax】技术,最新的有WebSocket,以后有机会再研究。