Tomcat7.84 Websocket接消息队列
程序员文章站
2024-01-13 11:30:10
...
前端监控需要和消息队列对接,使用websockt和activeMQ,整个系统采用的spring strusts
结构
按部就班:
1.以定义好消息队列连接器等配置,针对消息监控不需要转换为对象,直接透传,以前队列监听的是主题队列,增加一个监听,使用simplemessage convertl
由于tomcat7.84的websocket是基于servlet的,没有办法使用spring管理,在spring容器实例化完成后,才会初始化servlet.
所以在监听里使用了延迟加载,后面介绍如何将websocket server加到spring容器里去
代码如下
使用ConfigurableListableBeanFactory 将sever实例注册到spring容器内
前端采用标准的websocket代码
结构
按部就班:
1.以定义好消息队列连接器等配置,针对消息监控不需要转换为对象,直接透传,以前队列监听的是主题队列,增加一个监听,使用simplemessage convertl
<!-- 消息转换器 --> <bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean> <!-- 前端Websocket消息监听器 --> <bean id="webSocketConsumerListener" class="WebSocketConsumerListener"></bean> <bean id="taskMessageWebSocketListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate" ref="webSocketConsumerListener"/> <property name="defaultListenerMethod" value="handleMessage" /> <property name="messageConverter" ref="simpleMessageConverter"/> </bean> <bean id="taskMessageWebSocketListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="taskConnectionFactory" /> <property name="destination" ref="taskMessageTopic" /> <property name="messageListener" ref="taskMessageWebSocketListenerAdapter" /> <property name="exceptionListener" ref="messageExceptionListener"/> <property name="pubSubDomain" value="true" /> <!-- 发布订阅模式 --> <!-- <property name="receiveTimeout" value="10000" /> 消息接收超时 --> <property name="subscriptionDurable" value="true" /> <!-- 持久化订阅者 --> <property name="recoveryInterval" value="300000" /> <property name="durableSubscriptionName" value="durableSubscriptionName_webSocket_task" /> </bean>
import javax.annotation.Resource; import org.springframework.context.annotation.Lazy; /** * 任务mom topic 监听器 * * @author * */ public class WebSocketConsumerListener { @Lazy(value=true) @Resource private SocketServer socketServer; // public void setSocketServer(SocketServer socketServer) { // this.socketServer = socketServer; // } public void handleMessage(String message) { if (message != null ) socketServer.send(message); } }
由于tomcat7.84的websocket是基于servlet的,没有办法使用spring管理,在spring容器实例化完成后,才会初始化servlet.
所以在监听里使用了延迟加载,后面介绍如何将websocket server加到spring容器里去
代码如下
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import org.apache.catalina.websocket.MessageInbound; import org.apache.catalina.websocket.StreamInbound; import org.apache.catalina.websocket.WebSocketServlet; import org.apache.catalina.websocket.WsOutbound; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.web.context.support.WebApplicationContextUtils; import org.springframework.web.context.support.XmlWebApplicationContext; public class SocketServer extends WebSocketServlet { private static final long serialVersionUID = 1L; public final Set<ChatWebSocket> sessions = new CopyOnWriteArraySet<ChatWebSocket>(); public static int USERNUMBER = 1; @Override public void init() throws ServletException { super.init(); // 绑定监听和websocket // WebMonitorConsumerListener bean = // WebApplicationContextUtils.getWebApplicationContext(getServletContext()) // .getBean("webMonitorConsumerListener", WebMonitorConsumerListener.class); // bean.setSocketServer(this); XmlWebApplicationContext webApplicationContext = (XmlWebApplicationContext) WebApplicationContextUtils .getWebApplicationContext(getServletContext()); ConfigurableListableBeanFactory beanFactory = webApplicationContext.getBeanFactory(); beanFactory.registerSingleton("socketServer", this); WebApplicationContextUtils.getWebApplicationContext(getServletContext()).getAutowireCapableBeanFactory() .autowireBean(this); } @Override protected StreamInbound createWebSocketInbound(String arg0, HttpServletRequest arg1) { return new ChatWebSocket(sessions); } public void send(String message) { for (ChatWebSocket session : sessions) { try { CharBuffer temp = CharBuffer.wrap(message); session.getWsOutbound().writeTextMessage(temp); } catch (IOException e) { e.printStackTrace(); } } } public class ChatWebSocket extends MessageInbound { private Set<ChatWebSocket> sessions; public ChatWebSocket() { sessions = new CopyOnWriteArraySet<ChatWebSocket>(); } public ChatWebSocket(Set<ChatWebSocket> sessions) { this.sessions = sessions; } @Override protected void onTextMessage(CharBuffer message) throws IOException { // 这里处理的是文本数据 onMessage(message.toString()); } public void onMessage(String data) { } @Override protected void onOpen(WsOutbound outbound) { USERNUMBER++; sessions.add(this); } @Override protected void onClose(int status) { sessions.remove(this); } @Override protected void onBinaryMessage(ByteBuffer arg0) throws IOException { } } }
使用ConfigurableListableBeanFactory 将sever实例注册到spring容器内
ConfigurableListableBeanFactory beanFactory = webApplicationContext.getBeanFactory(); beanFactory.registerSingleton("socketServer", this);
前端采用标准的websocket代码
if (!window.WebSocket && window.MozWebSocket) window.WebSocket=window.MozWebSocket; if (!window.WebSocket) alert("No Support "); var ws; $(document).ready(function(){ startWebSocket(); }) function startWebSocket() { ws = new WebSocket("ws://" + location.host + "/SocketServer"); ws.onopen = function(){ console.log("success open"); }; ws.onmessage = function(event) { console.log("RECEIVE:"+event.data); handleData(event.data); }; ws.onclose = function(event) { console.log('Client notified socket has closed',event); }; } function handleData(data) { if(data){ var obj = JSON.parse(data); updateStatus&&updateStatus(obj);//更新方法 } }