欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Tomcat7.84 Websocket接消息队列

程序员文章站 2024-01-13 11:30:10
...
   前端监控需要和消息队列对接,使用websockt和activeMQ,整个系统采用的spring strusts
结构
   按部就班:
1.以定义好消息队列连接器等配置,针对消息监控不需要转换为对象,直接透传,以前队列监听的是主题队列,增加一个监听,使用simplemessage convertl
 
  <!-- 消息转换器 -->
	<bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean>

  <!-- 前端Websocket消息监听器 -->
	<bean id="webSocketConsumerListener" class="WebSocketConsumerListener"></bean>
	<bean id="taskMessageWebSocketListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<property name="delegate" ref="webSocketConsumerListener"/> 
		<property name="defaultListenerMethod" value="handleMessage" />
		<property name="messageConverter" ref="simpleMessageConverter"/>
	</bean>
	<bean id="taskMessageWebSocketListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="taskConnectionFactory" />
        <property name="destination" ref="taskMessageTopic" />
        <property name="messageListener" ref="taskMessageWebSocketListenerAdapter" />
        <property name="exceptionListener" ref="messageExceptionListener"/>
        <property name="pubSubDomain" value="true" />     <!-- 发布订阅模式 -->
        <!-- <property name="receiveTimeout" value="10000" />  消息接收超时 -->
        <property name="subscriptionDurable" value="true" /> <!-- 持久化订阅者 -->
        <property name="recoveryInterval" value="300000" />
        <property name="durableSubscriptionName" value="durableSubscriptionName_webSocket_task" />
	</bean>


import javax.annotation.Resource;

import org.springframework.context.annotation.Lazy;

/**
 * 任务mom topic 监听器
 * 
 * @author
 *
 */
public class WebSocketConsumerListener {
	
	@Lazy(value=true)
	@Resource
	private SocketServer socketServer;

//	public void setSocketServer(SocketServer socketServer) {
//		this.socketServer = socketServer;
//	}

	public void handleMessage(String message) {
		if (message != null )
			socketServer.send(message);
	}

}



由于tomcat7.84的websocket是基于servlet的,没有办法使用spring管理,在spring容器实例化完成后,才会初始化servlet.
所以在监听里使用了延迟加载,后面介绍如何将websocket server加到spring容器里去

代码如下

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;

import org.apache.catalina.websocket.MessageInbound;
import org.apache.catalina.websocket.StreamInbound;
import org.apache.catalina.websocket.WebSocketServlet;
import org.apache.catalina.websocket.WsOutbound;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.web.context.support.WebApplicationContextUtils;
import org.springframework.web.context.support.XmlWebApplicationContext;

public class SocketServer extends WebSocketServlet {
	private static final long serialVersionUID = 1L;
	public final Set<ChatWebSocket> sessions = new CopyOnWriteArraySet<ChatWebSocket>();

	public static int USERNUMBER = 1;

	@Override
	public void init() throws ServletException {
		super.init();
		// 绑定监听和websocket
		// WebMonitorConsumerListener bean =
		// WebApplicationContextUtils.getWebApplicationContext(getServletContext())
		// .getBean("webMonitorConsumerListener", WebMonitorConsumerListener.class);
		// bean.setSocketServer(this);

		XmlWebApplicationContext webApplicationContext = (XmlWebApplicationContext) WebApplicationContextUtils
				.getWebApplicationContext(getServletContext());
		ConfigurableListableBeanFactory beanFactory = webApplicationContext.getBeanFactory();
		beanFactory.registerSingleton("socketServer", this);

		WebApplicationContextUtils.getWebApplicationContext(getServletContext()).getAutowireCapableBeanFactory()
				.autowireBean(this);
	}

	@Override
	protected StreamInbound createWebSocketInbound(String arg0, HttpServletRequest arg1) {
		return new ChatWebSocket(sessions);
	}

	public void send(String message) {
		for (ChatWebSocket session : sessions) {
			try {
				CharBuffer temp = CharBuffer.wrap(message);
				session.getWsOutbound().writeTextMessage(temp);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	public class ChatWebSocket extends MessageInbound {

		private Set<ChatWebSocket> sessions;

		public ChatWebSocket() {
			sessions = new CopyOnWriteArraySet<ChatWebSocket>();
		}

		public ChatWebSocket(Set<ChatWebSocket> sessions) {
			this.sessions = sessions;
		}

		@Override
		protected void onTextMessage(CharBuffer message) throws IOException {
			// 这里处理的是文本数据
			onMessage(message.toString());
		}

		public void onMessage(String data) {

		}

		@Override
		protected void onOpen(WsOutbound outbound) {
			USERNUMBER++;
			sessions.add(this);
		}

		@Override
		protected void onClose(int status) {
			sessions.remove(this);

		}

		@Override
		protected void onBinaryMessage(ByteBuffer arg0) throws IOException {

		}
	}

}


使用ConfigurableListableBeanFactory 将sever实例注册到spring容器内
ConfigurableListableBeanFactory beanFactory = webApplicationContext.getBeanFactory();
		beanFactory.registerSingleton("socketServer", this);



前端采用标准的websocket代码
if (!window.WebSocket && window.MozWebSocket)
	window.WebSocket=window.MozWebSocket;
if (!window.WebSocket)
	alert("No Support ");
var ws;

$(document).ready(function(){
	startWebSocket();
})

function startWebSocket()
{	
	ws = new WebSocket("ws://" + location.host + "/SocketServer");
    ws.onopen = function(){
    	console.log("success open"); 
    };
	ws.onmessage = function(event)
	{
		console.log("RECEIVE:"+event.data);
		handleData(event.data); 
	};
	ws.onclose = function(event) { 
		console.log('Client notified socket has closed',event); 
	};
}

function handleData(data)
{	
	if(data){
		var obj = JSON.parse(data);
		updateStatus&&updateStatus(obj);//更新方法
	}
}