欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java版WebSocket消息推送系统搭建

程序员文章站 2022-04-28 10:28:34
...

Java版WebSocket消息推送系统搭建

        最近在做消息推送,网上查了一些资料,开始想的是用MQ来做,后面发现用WebSocket来做的话感觉应该要简单点,话不多说,准备撸代码。


后端核心代码



/**
 * 监听器类:主要任务是用ServletRequest将我们的HttpSession携带过去
 * @author Monkey
 * @date 2020-05-23
 */
@Component
public class RequestListener implements ServletRequestListener {
  @Override
  public void requestInitialized(ServletRequestEvent sre) {
    //将所有request请求都携带上httpSession
      HttpSession httpSession= ((HttpServletRequest) sre.getServletRequest()).getSession();
      String uid = sre.getServletRequest().getParameter("uid");
      if (!StringUtils.isEmpty(uid)) {
          httpSession.setAttribute("uid", uid);
          SysContent.setUserLocal(uid);
      }

  }
/**
 * Type: WebSocketConfig
 * Description: WebSocket配置类
 * @author Monkey
 * @date 2020-05-23
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
  
}

/**
 * Type: WebSocketServer
 * Description: WebSocketServer,实现服务器客户端平等交流,达到服务器可以主动向客户端发送消息
 *
 * @author Monkey
 * @date 2020-05-23
 */
@ServerEndpoint(value = "/websocket")
@Component
public class WebSocketServer {
	
	//日志记录器
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
	
    //高效,弱一致性,放的是WebSocketServer而非session是为了复用自身的方法
    private static transient volatile Set<WebSocketServer> webSocketSet = ConcurrentHashMap.newKeySet();

    private static transient volatile Set<WebSocketServer> tempWebSocketSet = ConcurrentHashMap.newKeySet();
 
    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    private static transient ConcurrentHashMap<String, Session> map = new ConcurrentHashMap();
 
    /**
     * Title: sendInfo
     * Description: 群发消息
     * @param message
     */
    public static void sendInfo(String message, String sid) throws IOException {
    	LOGGER.info("webSocket-sendInfo群发消息:" + message);
        RecordLogUtil.info("在线人数:" + getOnlineCount());
        if (!StringUtils.isEmpty(sid)) {
            Set<Map.Entry<String, Session>> entries = map.entrySet();
            for(Map.Entry<String, Session> m : entries){
                if (m.getKey().equals(sid)) {
                    Session s2 = m.getValue();
                    webSocketSet.forEach(ws -> {
                        if (ws.session.getId() == s2.getId()) {
                            ws.sendMessage(message);
                            return;
                        }
                    });
                    map.remove(m);
                    break;
                }
            }
        } else {
            tempWebSocketSet = ConcurrentHashMap.newKeySet();
            for (Map.Entry<String, Session> m : map.entrySet()) {
                Session s2 = m.getValue();
                webSocketSet.forEach(ws -> {
                    if (ws.session.getId() == s2.getId()) {
                        ws.sendMessage(message);
                        tempWebSocketSet.add(ws);
                        return;
                    }
                });
            }
            //过滤完已经挂断的session
            webSocketSet = tempWebSocketSet;
        }

    }
 
    /**
     * Title: getOnlineCount
     * Description: 获取连接数
     * @return
     */
    public static int getOnlineCount() {
        return map.size();
    }
    /* *********************以下为非static方法************************** */
    /**
     * Title: sendMessage
     * Description: 向客户端发送消息
     * @param message
     * @throws IOException
     */
    public boolean sendMessage(String message) {
        try {
			this.session.getBasicRemote().sendText(message);
			return true;
		} catch (IOException error) {
			LOGGER.error("webSocket-sendMessage发生错误:" + error.getClass() + error.getMessage());
			return false;
		}
    }
    /**
     * 连接建立成功调用的方法*/
    @OnOpen
    public void onOpen(Session session) throws IOException {
        String uid = SysContent.getUserLocal();
        RecordLogUtil.info("uid=" + uid);
        this.session = session;
        if (StringUtils.isEmpty(uid)){
            sendMessage("连接失败");
            session.close();
            return;
        } else {
            map.put(uid, this.session);
            webSocketSet.add(this);     //加入set中
            sendMessage("连接成功-" + uid);
            RecordLogUtil.info("当前在线人数: " + getOnlineCount());
        }
    }
 
    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  //从set中删除
        //这里要删除map里面对象
        for(Map.Entry<String, Session> m : map.entrySet()){
            if (m.getValue() == this.session) {
                map.remove(m);
                RecordLogUtil.info("用户" + m.getKey() + "已关闭连接!");
                break;
            }
        }
        RecordLogUtil.info("在线人数:" + getOnlineCount() + ", 关联在线人数:" + map.size());
    }
 
    /**
     * 收到客户端消息后调用的方法
     * @param message 客户端发送过来的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
    	LOGGER.info("来自客户端(" + session.getId() + ")的消息:" + message);
    	sendMessage("Hello, nice to hear you! There are " + webSocketSet.size() + " users like you in total here!");
    }
 
	/**
	 * Title: onError
	 * Description: 发生错误时候回调函数
	 * @param session
	 * @param error
	 */
    @OnError
    public void onError(Session session, Throwable error) {
        LOGGER.error("webSocket发生错误:" + error.getClass() + error.getMessage());
    }
 
    @Override
    public int hashCode() {
    	return super.hashCode();
    }
    
    @Override
    public boolean equals(Object obj) {
    	return super.equals(obj);
    }
}

/**
 * 监听器类:主要任务是用ServletRequest将我们的HttpSession携带过去
 * @author Monkey
 * @date 2020-05-23
 */
public class SysContent {
    private static ThreadLocal<HttpServletRequest> requestLocal = new ThreadLocal<HttpServletRequest>();
    private static ThreadLocal<HttpServletResponse> responseLocal = new ThreadLocal<HttpServletResponse>();
    private static ThreadLocal<String> userLocal = new ThreadLocal<String>();

    public static String getUserLocal() {
        return userLocal.get();
    }

    public static void setUserLocal(String userLocal) {
        SysContent.userLocal.set(userLocal);
    }

    public static HttpServletRequest getRequest() {
        return (HttpServletRequest) requestLocal.get();
    }

    public static void setRequest(HttpServletRequest request) {
        requestLocal.set(request);
    }

    public static HttpServletResponse getResponse() {
        return (HttpServletResponse) responseLocal.get();
    }

    public static void setResponse(HttpServletResponse response) {
        responseLocal.set(response);
    }

    public static HttpSession getSession() {
        return (HttpSession) ((HttpServletRequest) requestLocal.get()).getSession();
    }
}

前端代码


<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <title>WebSocket测试</title>
    <meta charset="utf-8">
    <script src="/socket/js/jquery-3.3.1.min.js"></script>
    <script src="/socket/js/sockjs.min.js"></script>
</head>
<body>
<!-----start-main---->
<div class="main">
    <h2>socketTest</h2>
    <input type="button" id="send" value="点击向服务器发送消息">
    <p id="recive"></p>

</div>
<!-----//end-main---->
</body>
<script type="text/javascript">
    var ws = null;
    var ws_status = false;
    function openWebSocket(){
        //这里为了模拟不同的模拟器。sid是随机数,开多个浏览器窗口的话,就用随机值测试连接
        var sid = Math.random()*10000;
        //如果是正式使用,则这个就是绑定用户的唯一值,一般为id固定值
        //sid = 1;
        console.log("sid=" + sid);
        //判断当前浏览器是否支持WebSocket
        if ('WebSocket' in window) {
            console.log("window...");
            ws = new WebSocket("ws://"+window.location.host+"/websocket?uid=" + sid);
        } else if ('MozWebSocket' in window) {
            console.log("MozWebSocket...");
            websocket = new MozWebSocket("ws://"+window.location.host+"/websocket?uid=" + sid);
        } else {
            console.log("SockJS...");
            ws = new SockJS("http://"+window.location.host+"/websocket?uid=" + sid);
        }
 
        //这个事件是接受后端传过来的数据
        ws.onmessage = function (event) {
            //根据业务逻辑解析数据
            console.log("Server:");
            console.log(event);
        };
        ws.onclose = function (event) {
            console.log("Connection closed!");
            ws_status = false;
        };
        ws.onopen = function (event){
            ws_status = true;
            console.log("Connected!");
        };
        ws.onerror = function (event){
            console.log("Connect error!");
        };
    }
    //如果连接失败,每隔两秒尝试重新连接
    setInterval(function(){
        if(!ws_status){
            openWebSocket();
        }
    }, 2000);
    $("#send").click(function(){
        ws.send("Hello, server, I am browser.");

        $.ajax({
            url: "/test/send",
            data: {uid: null},
            type: "get",
            dataType: "json",
            success: function(data) {
                // data = jQuery.parseJSON(data);  //dataType指明了返回数据为json类型,故不需要再反序列化
                console.log("开始发送广播啦!")
            }
        });

    });
</script>
</html>

前端示意图


Java版WebSocket消息推送系统搭建 


demo 测试代码已经上传到csdn,喜欢的话可以前往下载。

https://download.csdn.net/download/lj88811498/12453985