Springboot集成WebSocket,实现后台向前端推送信息
程序员文章站
2022-05-21 21:28:30
...
WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
Ajax传值每次都要传送大量的头文件,对于项目运行十分不方便,所以使用Websockets。
sendSocket.js
var socket;
var recdata = [];
// 创建链接
function openSocket(func, param) {
if (typeof (WebSocket) == "undefined") {
console.log("您的浏览器不支持WebSocket");
} else {
console.log("您的浏览器支持WebSocket");
//实现化WebSocket对象,指定要连接的服务器地址与端口 建立连接
//var socketUrl="${request.contextPath}/im/"+$("#userId").val();
var socketUrl = "http://localhost:8081/server/" + func + "/" + param;
console.log("url = " + socketUrl);
socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
console.log(socketUrl);
if (socket != null) {
socket.close();
socket = null;
}
socket = new WebSocket(socketUrl);
//打开事件
socket.onopen = function () {
console.log("websocket已打开");
//socket.send("这是来自客户端的消息" + location.href + new Date());
};
//获得消息事件
socket.onmessage = function (msg) {
// console.log(msg.data);
recdata = msg.data;
};
//关闭事件
socket.onclose = function () {
console.log("websocket已关闭");
};
//发生了错误事件
socket.onerror = function () {
console.log("websocket发生了错误");
}
}
}
// 发送socket
function sendMessage(str) {
if (typeof (WebSocket) == "undefined") {
console.log("您的浏览器不支持WebSocket");
} else {
console.log("您的浏览器支持WebSocket");
// console.log("发送的消息为: " + str);
socket.send(str);
}
}
WebSocketServer
package com.ademos.controller;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import com.ademos.config.TestWebSocketServer;
import com.ademos.pojo.DataSateModel;
import com.ademos.pojo.SoluResModel;
import com.ademos.pojo.StationModel;
import com.ademos.pojo.WarnModel;
import com.ademos.services.*;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.springframework.stereotype.Controller;
@ServerEndpoint(value = "/server/{func}/{param}")
@Controller
public class WebSocketServer {
// 注入spring
private static DataMapServices dataMapServices;
private static DataResServices dataResServices;
private static DataSeqServices dataSeqServices;
private static DataSateServices dataSateServices;
private static DataWarnServices dataWarnServices;
@Autowired
public void setDataMapServices(DataMapServices dataMapServices) {
WebSocketServer.dataMapServices = dataMapServices;
}
@Autowired
public void setDataResServices(DataResServices dataResServices) {
WebSocketServer.dataResServices = dataResServices;
}
@Autowired
public void setDataSeqServices(DataSeqServices dataSeqServices) {
WebSocketServer.dataSeqServices = dataSeqServices;
}
@Autowired
public void setDataSateServices(DataSateServices dataSateServices) {
WebSocketServer.dataSateServices = dataSateServices;
}
@Autowired
public void setDataWarnServices(DataWarnServices dataWarnServices) {
WebSocketServer.dataWarnServices = dataWarnServices;
}
static Log log = LogFactory.get(TestWebSocketServer.class);
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
private String func = "";
private String param = "";
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("func") String func,
@PathParam("param") String param) {
this.session = session;
this.func = func;
this.param = param;
if (webSocketMap.containsKey(func)) {
webSocketMap.remove(func);
webSocketMap.put(func, this);
//加入set中
} else {
webSocketMap.put(func, this);
//加入set中
addOnlineCount();
//在线数加1
}
log.info("用户连接:" + func + ",当前在线人数为:" + getOnlineCount());
try {
sendMessage("连接成功");
} catch (IOException e) {
log.error("用户:" + func + ",网络异常!!!!!!");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(func)) {
webSocketMap.remove(func);
//从set中删除
subOnlineCount();
}
log.info("用户退出:" + func + ",当前在线人数为:" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
// log.info("用户消息:" + func + ",报文:" + message);
//可以群发消息
//消息保存到数据库、redis
if (StringUtils.isNotBlank(message)) {
try {
//解析发送的报文
JSONObject jsonObject = JSON.parseObject(message);
//追加发送人(防止串改)
jsonObject.put("fromFunc", this.func);
String para = jsonObject.getString("station");
//传送给对应toUserId用户的websocket
if (StringUtils.isNotBlank(func) && webSocketMap.containsKey(func)) {
switch (func) {
// 展示地图,实时网图
case "showmap": {
ArrayList<StationModel> stationModels = dataMapServices.getStationMsg();
webSocketMap.get(func).sendMessage(JSON.toJSONString(stationModels));
break;
}
// 解算结果
case "res":
ArrayList<SoluResModel> soluResModels = dataResServices.getSoluResModel();
webSocketMap.get(func).sendMessage(JSON.toJSONString(soluResModels));
break;
// 坐标序列
case "seq": {
ArrayList<Double> stationModels = dataSeqServices.getStationModelNeu(param);
webSocketMap.get(func).sendMessage(JSON.toJSONString(stationModels));
break;
}
// 卫星视图
case "satellite": {
ArrayList<DataSateModel> dataSateModels = dataSateServices.getDataSateMsg();
webSocketMap.get(func).sendMessage(JSON.toJSONString(dataSateModels));
break;
}
// 预警-实时
case "warn-real": {
ArrayList<Double> warnReal = dataWarnServices.getWarnReal(param);
webSocketMap.get(func).sendMessage(JSON.toJSONString(warnReal));
break;
}
// 预警-表格
case "warn-table": {
ArrayList<WarnModel> warnModels = dataWarnServices.getWarning(para);
webSocketMap.get(func).sendMessage(JSON.toJSONString(warnModels));
break;
}
}
} else {
log.error("请求的userId:" + func + "不在该服务器上");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.func + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
WebSocketConfig
package com.ademos.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* 开启WebSocket支持
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
.html
<script src="websocket/sendSocket.js"></script>
// 开启websocket链接
openSocket("res", "res");
// 是否更新数据
var showResFlag = 0;
function flashData() {
// 判断websocket链接是否已经建立
if (socket.readyState === 1) {
showResFlag = 0;
sendMessage(msg);
mydata = recdata;
mydata = JSON.parse(mydata);
if (mydata != "连接成功") {
for (let i = 0; i < mydata.length; i++) {
if (mydata[i].stationGpsTime != null &&
mydata[i].stationGpsTime != "undefined") {
$('#list2').jqGrid('clearGridData');
$("#list2").jqGrid('setGridParam', {
datatype: "local",
data: mydata
}).trigger("reloadGrid");
}
}
}
}
}