欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

运用Comet技术实现服务端往客户端主动推送数据(结合redis发布/订阅)

程序员文章站 2022-06-12 12:33:08
...

记得我之前写过  redis主动向页面push数据  的文章,但文中所描述的方法要应用到J2EE的项目中还是比较困难的(还需用到nodejs什么的)。于是本文来探究下比较适合web项目的主动推技术。

Comet是一种用于web的推送技术,能使服务器能实时地将更新的信息传送到客户端,而无须客户端发出请求,目前有两种实现方式:长轮询(long-polling)和iframe流(streaming)。下面就用iframe流的实现方式来实现服务端主动向客户端(这里客户端指的是jsp页面)推送的效果,并且结合了redis的发布订阅,算是比较典型的例子了。

 

客户端(页面):

<script type="text/javascript">
	$(function() {
		setCometUrl();
		bindLinstener();
	});
	
	function bindLinstener() {
		if (window.addEventListener) {  
		    window.addEventListener("load", comet.initialize, false);  
		    window.addEventListener("unload", comet.onUnload, false);  
		} else if (window.attachEvent) {  
		    window.attachEvent("onload", comet.initialize);  
		    window.attachEvent("onunload", comet.onUnload);  
		} 
	}
	
	function setCometUrl(){
		comet.cometUrl = "pubsub/push.json";
	}
	
	//服务器推送代码
	var comet = {
		connection : false,
		iframediv : false,

		initialize : function() {
			if (navigator.appVersion.indexOf("MSIE") != -1) {
				// For IE browsers
				comet.connection = new ActiveXObject("htmlfile");
				comet.connection.open();
				comet.connection.write("<html>");
				comet.connection.write("<script>document.domain = '" + document.domain + "'");
				comet.connection.write("</html>");
				comet.connection.close();
				comet.iframediv = comet.connection.createElement("div");
				comet.connection.appendChild(comet.iframediv);
				comet.connection.parentWindow.comet = comet;
				comet.iframediv.innerHTML = "<iframe id='comet_iframe' src='"+comet.cometUrl+"'></iframe>";
			
			} else if (navigator.appVersion.indexOf("KHTML") != -1) {
				// for KHTML browsers
				comet.connection = document.createElement('iframe');
				comet.connection.setAttribute('id', 'comet_iframe');
				comet.connection.setAttribute('src', comet.cometUrl);
				with (comet.connection.style) {
					position = "absolute";
					left = top = "-100px";
					height = width = "1px";
					visibility = "hidden";
				}
				document.body.appendChild(comet.connection);

			} else {
				// For other browser (Firefox...)
				comet.connection = document.createElement('iframe');
				comet.connection.setAttribute('id', 'comet_iframe');
				comet.iframediv = document.createElement('iframe');
				comet.iframediv.setAttribute('src', comet.cometUrl);
				
				comet.connection.appendChild(comet.iframediv);
				document.body.appendChild(comet.connection);
			}
		},

		onUnload : function() {
			if (comet.connection) {
				comet.connection = false; // release the iframe to prevent problems with IE when reloading the page
				closePage();
			}
		},
		
		receiveMsg : function(msg) {
			$("#content").append(msg + "<br/>");
		}
		
	}
	
	function closePage() {
		$.ajax({
			async : true,
			cache : false,
			type : "POST",
			//data:{objId:objId},
			dataType:"json",
			url :"pubsub/close.json",
			success : function(data) {
			},
			error: function(){
			}
		});
	}
</script>

</head>
<body >
	<div id="content" class="show"></div>
</body>

 这个客户端页面是利用浏览器支持的Comet,仅发起一次ajax请求,打通后台后,后台就会源源不断主动往这个页面发送数据。

 

后台较为复杂,并且还结合了redis的发布订阅。数据来源则是订阅redis的一个channel而得到。

Action:

@Controller
public class PubSubAction {
	
	LinkedList<String> queue = new LinkedList<String>();
	PrintWriter out;
	
	//线程
	MsgSubHandler subT = null;
	CheckQueueHandler checkT = null;
	
	@RequestMapping("/pubsub/push.json")
	@ResponseBody
	public void pushMsg(HttpServletResponse response) {
		System.out.println("这儿进几次.........");
		//订阅
		subT = new MsgSubHandler("pubsub_channel", queue);
		subT.start();
		//检查
		checkT = new CheckQueueHandler(queue);
		checkT.start();
		//创建Comet Iframe
		sendHtmlScript(response, "<script type=\"text/javascript\"> var comet = window.parent.comet;</script>");
		
		while (true) {
			try {
				Thread.sleep(1000);//每隔1s从队列取数
				if(queue.size() > 0) {
					String msg = queue.pop();
					System.out.println("从队列里取到的信息:" + msg);
					sendHtmlScript(response, "<script type=\"text/javascript\"> comet.receiveMsg('"+msg+"');</script>");
				}
			}catch(InterruptedException e) {
				e.printStackTrace();
			}	
		}
	}
	
	@RequestMapping("/pubsub/close.json")
	@ResponseBody
	public void shutdownServer() throws InterruptedException {
		System.out.println("开始关闭操作..");
		//关闭流
		out.flush();
		out.close();
		//队列情空
		queue.clear();
		//消息的关闭处理
		subT.shut();
		checkT.shut();
		//线程停止
		if(checkT.isAlive()) {
			checkT.interrupt();
			checkT.join();
		}
		if(subT.isAlive()) {
			subT.interrupt();
			subT.join();
		}
	}
	
	private void sendHtmlScript(HttpServletResponse response,String script){
		response.setCharacterEncoding("UTF-8");
		response.setContentType("text/html");
		response.setDateHeader("Expires", 0);
		response.setHeader("Pragma", "No-cache");
		response.setHeader("Cache-Control", "no-cache,no-store,max-age=0");
		try {
			out = response.getWriter();
			out.write(script);
			out.flush();
		} catch (IOException e) {
			e.printStackTrace();
			log.error(e.getMessage(), e);
		}
   }
}

 

其中,订阅消息的线程类和检查消息队列大小的线程类分别如下:

 

1:定时检查队列大小的线程类,目的是避免消息队列大小过大

public class CheckQueueHandler extends Thread {
	
	private LinkedList<String> queue;
	private boolean runFlag = true;
	
	public CheckQueueHandler(LinkedList<String> queue) {
		this.queue = queue;
	}

	@Override
	public void run() {
		try {
			while (runFlag && queue.size()>0) {
				Thread.sleep(60 * 1000);//每隔1分钟检查指定队列的大小
				if (queue.size() >= 500) {
					queue.clear();
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();  
		}
	}
	
	public void shut() {
		runFlag = false;
	}
}

 2:订阅相应的channel的线程类:

public class MsgSubHandler extends Thread{
	
	private LinkedList<String> queue;
	private String channel;
	
	JedisPool pool;
	Jedis jedis;
	PubSubListener listener;
	
	public MsgSubHandler(String channel, LinkedList<String> queue) {
		this.channel = channel;
		this.queue = queue;
		
		//redis资源初始化
		pool = SysBeans.getBean("jedisPool");
		jedis = pool.getResource();
		
		//发布/订阅监听初始化
		listener = new PubSubListener(queue);
	}
	
	@Override
	public void run() {
		//订阅指定的渠道信息
		jedis.subscribe(listener, channel);
	}
	
	public void shut() {
		//归还redis资源
		if(pool !=null && jedis != null) {
			pool.returnResource(jedis);
		}
		//取消渠道订阅
		listener.unsubscribe();
	}
}

 3:redis的发布/订阅监听类

public class PubSubListener extends JedisPubSub {
	
	private LinkedList<String> queue =null;
	
	public PubSubListener(LinkedList<String> queue) {
		this.queue  =  queue;
	}
	
	//取得订阅后消息的处理  
    @Override  
    public void onMessage(String channel, String message) {  
        //System.out.print("onMessage:取得订阅后消息的处理  ");  
        queue.add(message);   
    }  
      
    //取得按表达式的方式订阅的消息后的处理  
    @Override  
    public void onPMessage(String pattern, String channel, String message) {  
        System.out.print("onPMessage:取得按表达式的方式订阅的消息后的处理    ");  
        System.out.println(pattern + "=" + channel + "=" + message);  
    }  
      
    //初始化按表达式的方式订阅时候的处理  
    @Override  
    public void onPSubscribe(String pattern, int subscribedChannels) {  
        System.out.print("onPSubscribe:初始化按表达式的方式订阅时候的处理   ");  
        System.out.println(pattern + "=" + subscribedChannels);    
    }  
      
    //取消化按表达式的方式订阅时候的处理  
    @Override  
    public void onPUnsubscribe(String pattern, int subscribedChannels) {  
        System.out.print("onPUnsubscribe:取消化按表达式的方式订阅时候的处理   ");  
        System.out.println(pattern + "=" + subscribedChannels);   
    }  
      
    //初始化订阅时候的处理  
    @Override  
    public void onSubscribe(String channel, int subscribedChannels) {  
        System.out.print("onSubscribe:初始化订阅时候的处理   ");  
        System.out.println(channel + "=" + subscribedChannels);   
    }  
      
    //取消订阅时候的处理  
    @Override  
    public void onUnsubscribe(String channel, int subscribedChannels) {  
        System.out.print("onUnsubscribe:取消订阅时候的处理   ");  
        System.out.println(channel + "=" + subscribedChannels);  
    }  

}

 

启动工程,打开客户端页面,最初始的div:


运用Comet技术实现服务端往客户端主动推送数据(结合redis发布/订阅)
            
    
    博客分类: Java EE ComelIframe主动推送redis的发布订阅 
 同时控制台打印:

这儿进几次.........

onSubscribe:初始化订阅时候的处理   pubsub_channel=1

这说明:一打开客户端,就实现了订阅对应channel的目的。

接下来,为了让这个div中有数据,我们开始来对这个channel进行publish一些数据,模拟:

public static void main(String[] args) {
		Jedis jedis = new Jedis("localhost");
		while(true) {
			try {
				Thread.sleep(2000);
				jedis.publish("pubsub_channel", "I like " + Math.random()*100 );
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			
		}
	}

 然后你再观察这个div,会发现如下现象(某一时刻):


运用Comet技术实现服务端往客户端主动推送数据(结合redis发布/订阅)
            
    
    博客分类: Java EE ComelIframe主动推送redis的发布订阅 

由此说明:我们达到了如题所想要的目的!——结合了redis的发布/订阅  并且客户端只请求服务端一次,服务端主动向客户端推送了数据。

 

最后,我们再试着关闭客户端页面,会发现控制台打印:

onUnsubscribe:取消订阅时候的处理   pubsub_channel=0

说明,客户端一关闭,就取消了对channel的订阅了。并且queue队列也会被清空。


 其实Comet并不是新兴的技术,关于【反ajax】技术,最新的有WebSocket,以后有机会再研究。

  • 运用Comet技术实现服务端往客户端主动推送数据(结合redis发布/订阅)
            
    
    博客分类: Java EE ComelIframe主动推送redis的发布订阅 
  • 大小: 3.2 KB
  • 运用Comet技术实现服务端往客户端主动推送数据(结合redis发布/订阅)
            
    
    博客分类: Java EE ComelIframe主动推送redis的发布订阅 
  • 大小: 15.3 KB