SpringBoot-SSE开发实践(线程池发送心跳、Session管理)
程序员文章站
2022-06-21 13:42:22
文章目录SpringBoot-SSE开发实践SSE开发实践项目框架项目依赖Session管理业务接口业务实现任务异常控制器测试SpringBoot-SSE开发实践SSESSE(Server-SentEvents,即服务器发送事件)是围绕只读Comet交互推出的API或者模式。SSE API用于创建到服务器的单向连接,服务器通过这个连接可以发送任意数量的数据。服务器响应的MIME类型必须是text/event-stream,而且是浏览器中的JavaScript API能解析格式输出。SSE支持短轮询、长...
SpringBoot-SSE开发实践
SSE介绍
SSE(Server-SentEvents,即服务器发送事件)是围绕只读Comet交互推出的API或者模式。SSE API用于创建到服务器的单向连接,服务器通过这个连接可以发送任意数量的数据。服务器响应的MIME类型必须是text/event-stream,而且是浏览器中的JavaScript API能解析格式输出。SSE支持短轮询、长轮询和HTTP流,而且能在断开连接时自动确定何时重新连接。
- SSE特点:实现简单、 单向通信、自动重连、···
- 业务场景:客户端与服务端建立连接后,只需要服务端给客户端发送数据,客户端无需要给服务端发送数据
开发实践
项目框架
.
├── README.md
├── pom.xml
└── src
└── main
├── java
│ └── cn
│ └── zuster
│ └── sse
│ ├── SseApplication.java【启动类】
│ ├── controller 【控制器】
│ ├── exception 【异常】
│ ├── service 【服务接口】
│ │ └── impl 【服务实现】
│ ├── session 【SESSION管理】
│ └── task 【任务管理】
└── resources
└── application.properties 【配置文件】
项目依赖
SpringBoot中已经有SseEmitter了,所以不需要额外引入其他包。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.zuster</groupId>
<artifactId>my-demo-springboot-sse</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>my-demo-springboot-sse</name>
<description>Spring Boot And SSE Demo</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
Session管理
服务端和客户端建立连接时,往往会保持很多个SSE会话,为此,我们需要统一的对会话进行管理,此处我们使用 ConcurrentHashMap进行会话管理,其中 key 为客户端ID,value 为 SseEmitter 对象。当然 value 也可以按照业务进行封装。
主要方法说明:
- boolean exist(String id):检测指定的客户端Session是否存在。
- boolean add(String id, SseEmitter emitter):添加Session,如果有相同的客户端ID,则先结束掉之前的Session,重新建立新的Session。
- boolean del(String id):删除指定客户端Session。
- boolean send(String id, Object msg):给指定的客户端发送数据,注意,此处没有指定 MediaType ,即默认发送的就是 data,如果需要发送其他类型的数据,可进行*扩展。
- void onCompletion(String id, ScheduledFuture<?> future):当 SseEmitter 触发 onCompletion时业务中需要处理的逻辑,包括停止线程池中的线程执行(比如心跳),移除缓存的Session等。
- void onError(String id, SseException e):当 SseEmitter 触发 onError 和 onTimeout 时业务中需要处理的逻辑,这里我取到缓存的Session,然后继续触发 completeWithError(),最终还是会执行到上面的 onCompletion() 方法中。
package cn.zuster.sse.session;
// 省略 import
/**
* SSE Session
*
* @author zuster
* @date 2021/1/5
*/
public class SseSession {
private static final Logger logger = LoggerFactory.getLogger(SseSession.class);
/**
* Session维护Map
*/
private static Map<String, SseEmitter> SESSION = new ConcurrentHashMap<>();
/**
* 判断Session是否存在
*
* @param id 客户端ID
* @return
*/
public static boolean exist(String id) {
return SESSION.get(id) == null;
}
/**
* 增加Session
*
* @param id 客户端ID
* @param emitter SseEmitter
*/
public static void add(String id, SseEmitter emitter) {
final SseEmitter oldEmitter = SESSION.get(id);
if (oldEmitter != null) {
oldEmitter.completeWithError(new SseException("RepeatConnect(Id:" + id + ")"));
}
SESSION.put(id, emitter);
}
/**
* 删除Session
*
* @param id 客户端ID
* @return
*/
public static boolean del(String id) {
final SseEmitter emitter = SESSION.remove(id);
if (emitter != null) {
emitter.complete();
return true;
}
return false;
}
/**
* 发送消息
*
* @param id 客户端ID
* @param msg 发送的消息
* @return
*/
public static boolean send(String id, Object msg) {
final SseEmitter emitter = SESSION.get(id);
if (emitter != null) {
try {
emitter.send(msg);
return true;
} catch (IOException e) {
logger.error("MSG: SendMessageError-IOException | ID: " + id + " | Date: " + new Date() + " |", e);
return false;
}
}
return false;
}
/**
* SseEmitter onCompletion 后执行的逻辑
*
* @param id 客户端ID
* @param future
*/
public static void onCompletion(String id, ScheduledFuture<?> future) {
SESSION.remove(id);
if (future != null) {
// SseEmitter断开后需要中断心跳发送
future.cancel(true);
}
}
/**
* SseEmitter onTimeout 或 onError 后执行的逻辑
*
* @param id
* @param e
*/
public static void onError(String id, SseException e) {
final SseEmitter emitter = SESSION.get(id);
if (emitter != null) {
emitter.completeWithError(e);
}
}
}
业务接口
一般使用 SSE 时的业务包括:客户端建立连接、给客户端发送数据、客户端终端连接,接口如下:
package cn.zuster.sse.service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* SSE 相关业务接口
*
* @author zuster
* @date 2021/1/5
*/
public interface SseService {
/**
* 新建连接
*
* @param clientId 客户端ID
* @return
*/
SseEmitter start(String clientId);
/**
* 发送数据
*
* @param clientId 客户端ID
* @return
*/
String send(String clientId);
/**
* 关闭连接
*
* @param clientId 客户端ID
* @return
*/
String close(String clientId);
}
业务实现
业务实现简要介绍:
- ScheduledExecutorService heartbeatExecutors :使用线程池来管理客户端连接后给客户端发送心跳,我们的业务场景是建立连接后服务端需每隔10秒给客户单发送一个消息,若连续3次未收到心跳,则客户端中断连接,重新进行连接。很多地方使用while(true)…Thread.sleep()方式来实现此业务,但是在真实业务中问题很多,没有用线程池优雅和高效。
- SseEmitter start(String clientId) :客户端建立连接,建立连接后,需要将缓存Session,同时设置心跳(如果有其他业务也可以在这里设置),另外在onCompletion、onTimeout、onError回调事件中处理相关的业务。强调:一定要在回调中处理掉Session和之前设置的Task,否则很容易OOM!
- String send(String clientId):向指定客户端发送消息。
- String close(String clientId):关闭连接。
package cn.zuster.sse.service.impl;
// 省略 import
/**
* SSE 相关业务实现
*
* @author zuster
* @date 2021/1/5
*/
@Service
public class SseServiceImpl implements SseService {
private static final Logger logger = LoggerFactory.getLogger(SseServiceImpl.class);
/**
* 发送心跳线程池
*/
private static ScheduledExecutorService heartbeatExecutors = Executors.newScheduledThreadPool(8);
/**
* 新建连接
*
* @param clientId 客户端ID
* @return
*/
@Override
public SseEmitter start(String clientId) {
// 设置为0L为永不超时
// 次数设置30秒超时,方便测试 timeout 事件
SseEmitter emitter = new SseEmitter(30_000L);
logger.info("MSG: SseConnect | EmitterHash: {} | ID: {} | Date: {}", emitter.hashCode(), clientId, new Date());
SseSession.add(clientId, emitter);
final ScheduledFuture<?> future = heartbeatExecutors.scheduleAtFixedRate(new HeartBeatTask(clientId), 0, 10, TimeUnit.SECONDS);
emitter.onCompletion(() -> {
logger.info("MSG: SseConnectCompletion | EmitterHash: {} |ID: {} | Date: {}", emitter.hashCode(), clientId, new Date());
SseSession.onCompletion(clientId, future);
});
emitter.onTimeout(() -> {
logger.error("MSG: SseConnectTimeout | EmitterHash: {} |ID: {} | Date: {}", emitter.hashCode(), clientId, new Date());
SseSession.onError(clientId, new SseException("TimeOut(clientId: " + clientId + ")"));
});
emitter.onError(t -> {
logger.error("MSG: SseConnectError | EmitterHash: {} |ID: {} | Date: {}", emitter.hashCode(), clientId, new Date());
SseSession.onError(clientId, new SseException("Error(clientId: " + clientId + ")"));
});
return emitter;
}
/**
* 发送数据
*
* @param clientId 客户端ID
* @return
*/
@Override
public String send(String clientId) {
if (SseSession.send(clientId, System.currentTimeMillis())) {
return "Succeed!";
}
return "error";
}
/**
* 关闭连接
*
* @param clientId 客户端ID
* @return
*/
@Override
public String close(String clientId) {
logger.info("MSG: SseConnectClose | ID: {} | Date: {}", clientId, new Date());
if (SseSession.del(clientId)) return "Succeed!";
return "Error!";
}
}
任务
我们的业务为建立连接后发送心跳数据,此处我只设置了客户端ID,如果业务中有其他数据可以扩充。
package cn.zuster.sse.task;
// 省略 import
/**
* 心跳任务
*
* @author zuster
* @date 2021/1/5
*/
public class HeartBeatTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
private final String clientId;
public HeartBeatTask(String clientId) {
// 这里可以按照业务传入需要的数据
this.clientId = clientId;
}
@Override
public void run() {
logger.info("MSG: SseHeartbeat | ID: {} | Date: {}", clientId, new Date());
SseSession.send(clientId, "ping");
}
}
异常
package cn.zuster.sse.exception;
/**
* SSE异常信息
*
* @author zuster
* @date 2021/1/5
*/
public class SseException extends RuntimeException {
public SseException() {
}
public SseException(String message) {
super(message);
}
public SseException(String message, Throwable cause) {
super(message, cause);
}
public SseException(Throwable cause) {
super(cause);
}
public SseException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
控制器
package cn.zuster.sse.controller;
// 省略 import
/**
* SSE测试控制器
*
* @author songyh
* @date 2021/1/5
*/
@RestController
@RequestMapping("sse")
public class SseTestController {
private static final Logger logger = LoggerFactory.getLogger(SseTestController.class);
@Autowired
private SseService sseService;
@RequestMapping("start")
public SseEmitter start(@RequestParam String clientId) {
return sseService.start(clientId);
}
/**
* 将SseEmitter对象设置成完成
*
* @param clientId
* @return
*/
@RequestMapping("/end")
public String close(String clientId) {
return sseService.close(clientId);
}
}
测试
代码就上面这么多了,启动起来测试一下吧。
- 建立连接:http://localhost:8080/sse/start?clientId=888
- 关闭连接:http://localhost:8080/sse/end?clientId=111
需要测试的点包括:
- 同时开启多个连接
- 启动两个相同的连接
- 启动后直接关掉
- 启动后等待30秒超时
- 发送消息(我controller中删了发送消息的,可以自行加上试试)
- 通过关闭连接接口关闭连接
好了,敬请的玩吧????????????
如果还有其他更加优雅的方式,欢迎留言????????????
本文地址:https://blog.csdn.net/williamanos/article/details/112289846
下一篇: yum安装配置jdk