Java版WebSocket消息推送系统搭建
程序员文章站
2022-04-28 10:28:34
...
Java版WebSocket消息推送系统搭建
最近在做消息推送,网上查了一些资料,开始想的是用MQ来做,后面发现用WebSocket来做的话感觉应该要简单点,话不多说,准备撸代码。
后端核心代码
/**
* 监听器类:主要任务是用ServletRequest将我们的HttpSession携带过去
* @author Monkey
* @date 2020-05-23
*/
@Component
public class RequestListener implements ServletRequestListener {
@Override
public void requestInitialized(ServletRequestEvent sre) {
//将所有request请求都携带上httpSession
HttpSession httpSession= ((HttpServletRequest) sre.getServletRequest()).getSession();
String uid = sre.getServletRequest().getParameter("uid");
if (!StringUtils.isEmpty(uid)) {
httpSession.setAttribute("uid", uid);
SysContent.setUserLocal(uid);
}
}
/**
* Type: WebSocketConfig
* Description: WebSocket配置类
* @author Monkey
* @date 2020-05-23
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
/**
* Type: WebSocketServer
* Description: WebSocketServer,实现服务器客户端平等交流,达到服务器可以主动向客户端发送消息
*
* @author Monkey
* @date 2020-05-23
*/
@ServerEndpoint(value = "/websocket")
@Component
public class WebSocketServer {
//日志记录器
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
//高效,弱一致性,放的是WebSocketServer而非session是为了复用自身的方法
private static transient volatile Set<WebSocketServer> webSocketSet = ConcurrentHashMap.newKeySet();
private static transient volatile Set<WebSocketServer> tempWebSocketSet = ConcurrentHashMap.newKeySet();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
private static transient ConcurrentHashMap<String, Session> map = new ConcurrentHashMap();
/**
* Title: sendInfo
* Description: 群发消息
* @param message
*/
public static void sendInfo(String message, String sid) throws IOException {
LOGGER.info("webSocket-sendInfo群发消息:" + message);
RecordLogUtil.info("在线人数:" + getOnlineCount());
if (!StringUtils.isEmpty(sid)) {
Set<Map.Entry<String, Session>> entries = map.entrySet();
for(Map.Entry<String, Session> m : entries){
if (m.getKey().equals(sid)) {
Session s2 = m.getValue();
webSocketSet.forEach(ws -> {
if (ws.session.getId() == s2.getId()) {
ws.sendMessage(message);
return;
}
});
map.remove(m);
break;
}
}
} else {
tempWebSocketSet = ConcurrentHashMap.newKeySet();
for (Map.Entry<String, Session> m : map.entrySet()) {
Session s2 = m.getValue();
webSocketSet.forEach(ws -> {
if (ws.session.getId() == s2.getId()) {
ws.sendMessage(message);
tempWebSocketSet.add(ws);
return;
}
});
}
//过滤完已经挂断的session
webSocketSet = tempWebSocketSet;
}
}
/**
* Title: getOnlineCount
* Description: 获取连接数
* @return
*/
public static int getOnlineCount() {
return map.size();
}
/* *********************以下为非static方法************************** */
/**
* Title: sendMessage
* Description: 向客户端发送消息
* @param message
* @throws IOException
*/
public boolean sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
return true;
} catch (IOException error) {
LOGGER.error("webSocket-sendMessage发生错误:" + error.getClass() + error.getMessage());
return false;
}
}
/**
* 连接建立成功调用的方法*/
@OnOpen
public void onOpen(Session session) throws IOException {
String uid = SysContent.getUserLocal();
RecordLogUtil.info("uid=" + uid);
this.session = session;
if (StringUtils.isEmpty(uid)){
sendMessage("连接失败");
session.close();
return;
} else {
map.put(uid, this.session);
webSocketSet.add(this); //加入set中
sendMessage("连接成功-" + uid);
RecordLogUtil.info("当前在线人数: " + getOnlineCount());
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
//这里要删除map里面对象
for(Map.Entry<String, Session> m : map.entrySet()){
if (m.getValue() == this.session) {
map.remove(m);
RecordLogUtil.info("用户" + m.getKey() + "已关闭连接!");
break;
}
}
RecordLogUtil.info("在线人数:" + getOnlineCount() + ", 关联在线人数:" + map.size());
}
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
LOGGER.info("来自客户端(" + session.getId() + ")的消息:" + message);
sendMessage("Hello, nice to hear you! There are " + webSocketSet.size() + " users like you in total here!");
}
/**
* Title: onError
* Description: 发生错误时候回调函数
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
LOGGER.error("webSocket发生错误:" + error.getClass() + error.getMessage());
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
}
/**
* 监听器类:主要任务是用ServletRequest将我们的HttpSession携带过去
* @author Monkey
* @date 2020-05-23
*/
public class SysContent {
private static ThreadLocal<HttpServletRequest> requestLocal = new ThreadLocal<HttpServletRequest>();
private static ThreadLocal<HttpServletResponse> responseLocal = new ThreadLocal<HttpServletResponse>();
private static ThreadLocal<String> userLocal = new ThreadLocal<String>();
public static String getUserLocal() {
return userLocal.get();
}
public static void setUserLocal(String userLocal) {
SysContent.userLocal.set(userLocal);
}
public static HttpServletRequest getRequest() {
return (HttpServletRequest) requestLocal.get();
}
public static void setRequest(HttpServletRequest request) {
requestLocal.set(request);
}
public static HttpServletResponse getResponse() {
return (HttpServletResponse) responseLocal.get();
}
public static void setResponse(HttpServletResponse response) {
responseLocal.set(response);
}
public static HttpSession getSession() {
return (HttpSession) ((HttpServletRequest) requestLocal.get()).getSession();
}
}
前端代码
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<title>WebSocket测试</title>
<meta charset="utf-8">
<script src="/socket/js/jquery-3.3.1.min.js"></script>
<script src="/socket/js/sockjs.min.js"></script>
</head>
<body>
<!-----start-main---->
<div class="main">
<h2>socketTest</h2>
<input type="button" id="send" value="点击向服务器发送消息">
<p id="recive"></p>
</div>
<!-----//end-main---->
</body>
<script type="text/javascript">
var ws = null;
var ws_status = false;
function openWebSocket(){
//这里为了模拟不同的模拟器。sid是随机数,开多个浏览器窗口的话,就用随机值测试连接
var sid = Math.random()*10000;
//如果是正式使用,则这个就是绑定用户的唯一值,一般为id固定值
//sid = 1;
console.log("sid=" + sid);
//判断当前浏览器是否支持WebSocket
if ('WebSocket' in window) {
console.log("window...");
ws = new WebSocket("ws://"+window.location.host+"/websocket?uid=" + sid);
} else if ('MozWebSocket' in window) {
console.log("MozWebSocket...");
websocket = new MozWebSocket("ws://"+window.location.host+"/websocket?uid=" + sid);
} else {
console.log("SockJS...");
ws = new SockJS("http://"+window.location.host+"/websocket?uid=" + sid);
}
//这个事件是接受后端传过来的数据
ws.onmessage = function (event) {
//根据业务逻辑解析数据
console.log("Server:");
console.log(event);
};
ws.onclose = function (event) {
console.log("Connection closed!");
ws_status = false;
};
ws.onopen = function (event){
ws_status = true;
console.log("Connected!");
};
ws.onerror = function (event){
console.log("Connect error!");
};
}
//如果连接失败,每隔两秒尝试重新连接
setInterval(function(){
if(!ws_status){
openWebSocket();
}
}, 2000);
$("#send").click(function(){
ws.send("Hello, server, I am browser.");
$.ajax({
url: "/test/send",
data: {uid: null},
type: "get",
dataType: "json",
success: function(data) {
// data = jQuery.parseJSON(data); //dataType指明了返回数据为json类型,故不需要再反序列化
console.log("开始发送广播啦!")
}
});
});
</script>
</html>
前端示意图
demo 测试代码已经上传到csdn,喜欢的话可以前往下载。