欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

实时通信 socketio nio 总结

程序员文章站 2022-07-13 14:56:37
...

公司要求多一个实时通信的功能

解决思路如下

架构图:实时通信 socketio nio 总结

后台管理页面实时显示在线的终端情况

终端服务器和后台服务器之间用NIO通信

当有终端登录登出,后台管理服务器(服务端)接收终端服务器(客户端) 接口请求

后台管理服务器(服务的)发送信息给实时监控页面(客户端)

附上代码:

终端的客户端

public class EmployeeSocketClient {
	static SocketChannel sc =null;
	
	final static InetSocketAddress address = new InetSocketAddress(
			"127.0.0.1", 8085);
	
	ByteBuffer buffer = ByteBuffer.allocate(1024);
	static {
		try {
			//这里搞个单例好像也行
			// 打开通道
			sc = SocketChannel.open();
			// 建立连接
			sc.connect(address);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
	
	public void sendMessage(String inputMsg) {
		
		try {

			// 把输入的数据放入buffer缓冲区
			buffer.put(inputMsg.getBytes());
			// 复位操作
			// 重置capacity、position、limit位置,不用深入了解
			buffer.flip();
			// 将buffer的数据写入通道
			sc.write(buffer);
			System.out.println(sc.hashCode());
			// 清空缓冲区中的数据
			buffer.clear();

		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} 
		//这里注释掉是为了每次启动服务器之后做一个静态的管道,不用关闭后每次新建
//		finally {
//			if (sc != null) {
//				try {
//					sc.close();
//				} catch (IOException e) {
//					e.printStackTrace();
//				}
//			}
//		}

	}
}

后台管理服务器NIO服务器代码:

public class EmployeeSocketServer implements InitializingBean {//spring加载完类的构造方法之后,就会执行afterPropertiesSet方法


	private Selector selector;
	private ByteBuffer buffer = ByteBuffer.allocate(1024);
	@Value("${socket_server_port}")//springboot配置参数,如果不用InitializingBean 注入不进来
	private String port;
	@Autowired
	private EmployeeMapper emploueeMapper;

	public EmployeeSocketServer() {
		try {
			// 1 打开多复用器
			selector = Selector.open();
			// 2 打开服务器通道
			// 这里可以选择netty的channel
			ServerSocketChannel ssc = ServerSocketChannel.open();
			// 3 设置服务器通道为非阻塞方式
			ssc.configureBlocking(false);
			// 4 绑定地址
			if (port == null || "".equals(port)) {
				port = "8085";
			}
			ssc.bind(new InetSocketAddress(Integer.valueOf(port)));
			// 5 把服务器通道注册到多路复用选择器上,并监听阻塞状态
			ssc.register(selector, SelectionKey.OP_ACCEPT);
			System.out.println("Server start, port:" + port);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public void init() {
		while (true) {
			try {
				// 1 必须让多路复用选择器开始监听
				selector.select();
				// 2 返回所有已经注册到多路复用选择器上的通道的SelectionKey
				Iterator<SelectionKey> keys = selector.selectedKeys()
						.iterator();
				// 3 遍历keys,轮询开始
				while (keys.hasNext()) {
					SelectionKey key = keys.next();
					// 目的,防止在遍历的时候对keys有插入删除操作
					keys.remove();
					if (key.isValid()) { // 如果key的状态是有效的
						// 这部是在read之前注册管道
						if (key.isAcceptable()) { // 如果key是阻塞状态,则调用accept()方法
							accept(key);
						}
						if (key.isReadable()) { // 如果key是可读状态,则调用read()方法
							read(key);
						}
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	private void accept(SelectionKey key) {
		try {
			// 1 获取服务器通道
			ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
			// 2 执行阻塞方法
			SocketChannel sc = ssc.accept();
			// 3 设置阻塞模式为非阻塞
			sc.configureBlocking(false);
			// 4 注册到多路复用选择器上,并设置读取标识
			sc.register(selector, SelectionKey.OP_READ);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private void read(SelectionKey key) {
		try {
			// 1 清空缓冲区中的旧数据
			buffer.clear();
			// 2 获取之前注册的SocketChannel通道
			SocketChannel sc = (SocketChannel) key.channel();
			// 3 将sc中的数据放入buffer中
			int count = sc.read(buffer);
			if (count == -1) { // == -1表示通道中没有数据
				key.channel().close();
				key.cancel();
				return;
			}
			// 读取到了数据,将buffer的position复位到0
			buffer.flip();
			byte[] bytes = new byte[buffer.remaining()];
			// 将buffer中的数据写入byte[]中
			buffer.get(bytes);
			String body = new String(bytes).trim();
			if ("employeeLogin".equals(body) || "employeeLogout".equals(body)) {

				for (SocketIOClient client : EmployeeCurrentTimeInfo.clients) {//socketio服务器配置的静态变量
					
					Random random = new Random(); 
					client.sendEvent("pushpoint", new Point(random.nextInt(100), random.nextInt(100)) );
				}
			}
			System.out.println("Server:" + body);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		init();
	}

}

后台管理服务器socketio服务器代码:

public class EmployeeCurrentTimeInfo implements InitializingBean{
	
	@Value("${socketio_ip}")
	private String socketioIp;
	@Value("${socketio_port}")
	private String socketioPort;
	@Value("${socketio_delay}")
	private String socketioDelay;
	
	
	
	@Autowired
	private EmployeeService employeeService ;
	// 用于保存所有客户端
	
	public static List<SocketIOClient> clients = new ArrayList<SocketIOClient>();
	public SocketIOServer server ;
	
	public void init() throws InterruptedException {
		Configuration configuration = new Configuration();
		
		if(socketioIp == null || "".equals(socketioIp)){
			socketioIp = "localhost";
		}
		if(socketioPort == null || "".equals(socketioPort)){
			socketioPort = "8032";
		}
		if(socketioDelay == null || "".equals(socketioDelay)){
			socketioDelay = "3000";
		}
		configuration.setHostname(socketioIp);// 设置主机名
		configuration.setPort(Integer.valueOf(socketioPort));// 设置监听的端口号
		server = new SocketIOServer(configuration);// 根据配置创建服务器对象

		server.addConnectListener(new ConnectListener() {// 添加客户端连接监听器
			@Override
			public void onConnect(SocketIOClient client) {
				clients.add(client);// 保存客户端
			}
		});
		server.addDisconnectListener(new DisconnectListener() {
			//方式刷新页面时候会导致list里有多个无效的客户端,当销毁时,从list里删除,保证每次先销毁再加入
			@Override
			public void onDisconnect(SocketIOClient client) {
				clients.remove(client);
			}
		});
		server.start();
		System.out.println("server started");

		
		// 预留,三秒一刷新功能
//		ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
		
//		executorService.scheduleWithFixedDelay(new Runnable() {
//			@Override
//			public void run() {
//				Random random = new Random();
//				for (SocketIOClient client : clients) {
//					client.sendEvent("pushpoint", new Point(
//							random.nextInt(100), random.nextInt(100)));// 每隔一秒推送一次
//					// System.out.println(p.x +"____"+p.y);
//				}
//			}
//		}, 1000, Integer.valueOf(socketioDelay), TimeUnit.MILLISECONDS);
//		
//		
//		// 防止同步并发问题,需要保留
//		Object object = new Object();
//		synchronized (object) {
//			object.wait();
//		}
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		init();
		
	}

}

实时监控页面:

<!DOCTYPE html>
<html>
<head>
    <title>netty-socketio测试</title>
    <meta http-equiv="Content-Type" content="text/html;charset=UTF-8"/>
    <!-- 自己去下载 -->
    <script src="js/socket.io/socket.io.js"></script>
    <script src="js/moment.min.js"></script>
    <script src="js/jquery-1.7.2.min.js"></script>
    <script>
        $(function(){
            var socket =  io.connect('http://192.168.1.122:8032');
            //监听名为pushpoint的事件,这与服务端推送的那个事件名称必须一致
            socket.on("pushpoint", function(data){

                $('#x').text(data.x);
                $('#y').text(data.y);
            });
        });

    </script>
</head>

<body>

<div id="display" style="height:50px;background-color:grey;">
    x=<span id="x">0</span>, y=<span id="y">0</span>
</div>

</body>

</html>

刚开始准备做成股票监控系统那样,3-3.5秒刷新一次信息,但发现实际业务没有那么复杂,就把实时刷新给注释掉了

socketio有2种方式,一种是上面写到的服务器自动发送请求的,另一种是客户端发送请求,服务器响应的方式。

以上2种方式结合使用,可以实现分页效果:

例如页面要显示的数据由10页,每页10个,但是当终端状态改变时页面客户端不会自动发送请求,则需要在页面客户端第一次请求时附带分页默认值,当页面客户端发送分页请求时,改变默认值,并作为入参去查询数据库。千万不要缓存所有的数据在前段自己来做分页处理,那样好蠢。