实时通信 socketio nio 总结
程序员文章站
2022-07-13 14:56:37
...
公司要求多一个实时通信的功能
解决思路如下
架构图:
后台管理页面实时显示在线的终端情况
终端服务器和后台服务器之间用NIO通信
当有终端登录登出,后台管理服务器(服务端)接收终端服务器(客户端) 接口请求
后台管理服务器(服务的)发送信息给实时监控页面(客户端)
附上代码:
终端的客户端
public class EmployeeSocketClient {
static SocketChannel sc =null;
final static InetSocketAddress address = new InetSocketAddress(
"127.0.0.1", 8085);
ByteBuffer buffer = ByteBuffer.allocate(1024);
static {
try {
//这里搞个单例好像也行
// 打开通道
sc = SocketChannel.open();
// 建立连接
sc.connect(address);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void sendMessage(String inputMsg) {
try {
// 把输入的数据放入buffer缓冲区
buffer.put(inputMsg.getBytes());
// 复位操作
// 重置capacity、position、limit位置,不用深入了解
buffer.flip();
// 将buffer的数据写入通道
sc.write(buffer);
System.out.println(sc.hashCode());
// 清空缓冲区中的数据
buffer.clear();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//这里注释掉是为了每次启动服务器之后做一个静态的管道,不用关闭后每次新建
// finally {
// if (sc != null) {
// try {
// sc.close();
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
// }
}
}
后台管理服务器NIO服务器代码:
public class EmployeeSocketServer implements InitializingBean {//spring加载完类的构造方法之后,就会执行afterPropertiesSet方法
private Selector selector;
private ByteBuffer buffer = ByteBuffer.allocate(1024);
@Value("${socket_server_port}")//springboot配置参数,如果不用InitializingBean 注入不进来
private String port;
@Autowired
private EmployeeMapper emploueeMapper;
public EmployeeSocketServer() {
try {
// 1 打开多复用器
selector = Selector.open();
// 2 打开服务器通道
// 这里可以选择netty的channel
ServerSocketChannel ssc = ServerSocketChannel.open();
// 3 设置服务器通道为非阻塞方式
ssc.configureBlocking(false);
// 4 绑定地址
if (port == null || "".equals(port)) {
port = "8085";
}
ssc.bind(new InetSocketAddress(Integer.valueOf(port)));
// 5 把服务器通道注册到多路复用选择器上,并监听阻塞状态
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server start, port:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
public void init() {
while (true) {
try {
// 1 必须让多路复用选择器开始监听
selector.select();
// 2 返回所有已经注册到多路复用选择器上的通道的SelectionKey
Iterator<SelectionKey> keys = selector.selectedKeys()
.iterator();
// 3 遍历keys,轮询开始
while (keys.hasNext()) {
SelectionKey key = keys.next();
// 目的,防止在遍历的时候对keys有插入删除操作
keys.remove();
if (key.isValid()) { // 如果key的状态是有效的
// 这部是在read之前注册管道
if (key.isAcceptable()) { // 如果key是阻塞状态,则调用accept()方法
accept(key);
}
if (key.isReadable()) { // 如果key是可读状态,则调用read()方法
read(key);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void accept(SelectionKey key) {
try {
// 1 获取服务器通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
// 2 执行阻塞方法
SocketChannel sc = ssc.accept();
// 3 设置阻塞模式为非阻塞
sc.configureBlocking(false);
// 4 注册到多路复用选择器上,并设置读取标识
sc.register(selector, SelectionKey.OP_READ);
} catch (Exception e) {
e.printStackTrace();
}
}
private void read(SelectionKey key) {
try {
// 1 清空缓冲区中的旧数据
buffer.clear();
// 2 获取之前注册的SocketChannel通道
SocketChannel sc = (SocketChannel) key.channel();
// 3 将sc中的数据放入buffer中
int count = sc.read(buffer);
if (count == -1) { // == -1表示通道中没有数据
key.channel().close();
key.cancel();
return;
}
// 读取到了数据,将buffer的position复位到0
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
// 将buffer中的数据写入byte[]中
buffer.get(bytes);
String body = new String(bytes).trim();
if ("employeeLogin".equals(body) || "employeeLogout".equals(body)) {
for (SocketIOClient client : EmployeeCurrentTimeInfo.clients) {//socketio服务器配置的静态变量
Random random = new Random();
client.sendEvent("pushpoint", new Point(random.nextInt(100), random.nextInt(100)) );
}
}
System.out.println("Server:" + body);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void afterPropertiesSet() throws Exception {
init();
}
}
后台管理服务器socketio服务器代码:
public class EmployeeCurrentTimeInfo implements InitializingBean{
@Value("${socketio_ip}")
private String socketioIp;
@Value("${socketio_port}")
private String socketioPort;
@Value("${socketio_delay}")
private String socketioDelay;
@Autowired
private EmployeeService employeeService ;
// 用于保存所有客户端
public static List<SocketIOClient> clients = new ArrayList<SocketIOClient>();
public SocketIOServer server ;
public void init() throws InterruptedException {
Configuration configuration = new Configuration();
if(socketioIp == null || "".equals(socketioIp)){
socketioIp = "localhost";
}
if(socketioPort == null || "".equals(socketioPort)){
socketioPort = "8032";
}
if(socketioDelay == null || "".equals(socketioDelay)){
socketioDelay = "3000";
}
configuration.setHostname(socketioIp);// 设置主机名
configuration.setPort(Integer.valueOf(socketioPort));// 设置监听的端口号
server = new SocketIOServer(configuration);// 根据配置创建服务器对象
server.addConnectListener(new ConnectListener() {// 添加客户端连接监听器
@Override
public void onConnect(SocketIOClient client) {
clients.add(client);// 保存客户端
}
});
server.addDisconnectListener(new DisconnectListener() {
//方式刷新页面时候会导致list里有多个无效的客户端,当销毁时,从list里删除,保证每次先销毁再加入
@Override
public void onDisconnect(SocketIOClient client) {
clients.remove(client);
}
});
server.start();
System.out.println("server started");
// 预留,三秒一刷新功能
// ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
// executorService.scheduleWithFixedDelay(new Runnable() {
// @Override
// public void run() {
// Random random = new Random();
// for (SocketIOClient client : clients) {
// client.sendEvent("pushpoint", new Point(
// random.nextInt(100), random.nextInt(100)));// 每隔一秒推送一次
// // System.out.println(p.x +"____"+p.y);
// }
// }
// }, 1000, Integer.valueOf(socketioDelay), TimeUnit.MILLISECONDS);
//
//
// // 防止同步并发问题,需要保留
// Object object = new Object();
// synchronized (object) {
// object.wait();
// }
}
@Override
public void afterPropertiesSet() throws Exception {
init();
}
}
实时监控页面:
<!DOCTYPE html>
<html>
<head>
<title>netty-socketio测试</title>
<meta http-equiv="Content-Type" content="text/html;charset=UTF-8"/>
<!-- 自己去下载 -->
<script src="js/socket.io/socket.io.js"></script>
<script src="js/moment.min.js"></script>
<script src="js/jquery-1.7.2.min.js"></script>
<script>
$(function(){
var socket = io.connect('http://192.168.1.122:8032');
//监听名为pushpoint的事件,这与服务端推送的那个事件名称必须一致
socket.on("pushpoint", function(data){
$('#x').text(data.x);
$('#y').text(data.y);
});
});
</script>
</head>
<body>
<div id="display" style="height:50px;background-color:grey;">
x=<span id="x">0</span>, y=<span id="y">0</span>
</div>
</body>
</html>
刚开始准备做成股票监控系统那样,3-3.5秒刷新一次信息,但发现实际业务没有那么复杂,就把实时刷新给注释掉了
socketio有2种方式,一种是上面写到的服务器自动发送请求的,另一种是客户端发送请求,服务器响应的方式。
以上2种方式结合使用,可以实现分页效果:
例如页面要显示的数据由10页,每页10个,但是当终端状态改变时页面客户端不会自动发送请求,则需要在页面客户端第一次请求时附带分页默认值,当页面客户端发送分页请求时,改变默认值,并作为入参去查询数据库。千万不要缓存所有的数据在前段自己来做分页处理,那样好蠢。
下一篇: Java基础(十七)——字节流与字符流
推荐阅读