nio:Selector示例解析
程序员文章站
2022-04-24 14:33:55
...
title: nio:Selector示例解析
date: 2019-05-30 10:45:09
categories:
- Java基础
tags: - nio
Nio三大核心概念
Nio
中有三大核心概念:Buffer
、Channel
、Selector
。
-
Buffer
本身是一块内存,底层实现上,是一个数组。数据的读写都是通过Buffer
实现的。所有的数据的读写都是通过Buffer
来进行的,永远不会出现直接向Channel
读写的情况。 -
Channel
指的是可以向其读写数据的对象,类似于java.io
中的Stream
。Stream
只能是InputStream
或者OutputStream
,Channel
所不同的是,Channel
是双向的,Channel
打开后,则可以进行读取、写入。 -
Selector
可以管理着多条Channel
通道,并且可以知晓这些通道是否为Accept
、Connect
、Read
、Write
做好了准备。通过Selector
可以管理多个网络连接,也就是说,在一个线程中就可以管理多个Channel
,这也是跟传统io
的区别,这样单线程的话,减少了线程上下文切换的开销。
示例
服务端程序:
public class NioServer {
//保存通道的map,以通道的地址作为key
private static Map<String,SocketChannel> map = new HashMap<>();
public static void main(String[] args) throws Exception{
//创建ServerSocketChannel实例,并绑定端口
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//使用Selector,必须处于非阻塞模式
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8899));
//创建一个Selector
Selector selector = Selector.open();
//将channel注册到selector上,并绑定selector对于此通道的感兴趣的事件,当此通道`接受就绪`时,selector就可以知道此通道`接受就绪`。这样就实现了通道和Selector的关联关系。共有四种常量状态
//SelectionKey.OP_CONNECT
//SelectionKey.OP_ACCEPT
//SelectionKey.OP_READ
//SelectionKey.OP_WRITE
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//
while (true){
//返回keys的数量,若没有,在阻塞一段时间后,返回0
int select = selector.select();
if (select == 0) continue;
//获取SelectionKey集合,通过这些SelectionKey可以获取到相应的就绪通道
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//这个循环遍历已选择键集中的每个键,并检测各个键所对应的通道的就绪事件。
selectionKeys.forEach(selectionKey -> {
final SocketChannel client;
try {
if(selectionKey.isAcceptable()) {
//如果有客户端连接服务,触发accept事件
ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
client = channel.accept();
client.configureBlocking(false);
String s = client.getRemoteAddress().toString();
System.out.println(s);
//在selector上注册socketChannel的OP_READ事件。
client.register(selector,SelectionKey.OP_READ);
//将通道放入map中保存
map.put(s,client);
//从 selectionKeys删除掉已处理的selectionKey,不然会一直循环
selectionKeys.remove(selectionKey);
}else if(selectionKey.isReadable()){
//如果有客户端发送数据,触发read事件
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//将数据读到Buffer中
int write = socketChannel.read(byteBuffer);
if (write > 0) {
//将position移到0处
byteBuffer.flip();
Charset charset = Charset.forName("utf-8");
//解码出字符串
String valueOf = String.valueOf(charset.decode(byteBuffer));
System.out.println(valueOf);
SocketAddress remoteAddress = socketChannel.getRemoteAddress();
//将服务端接收到的数据,发送给其他的客户端,通过保存在map中CHannel
//比较
for (Map.Entry<String,SocketChannel> entry:
map.entrySet()) {
SocketChannel value = entry.getValue();
//如果map中的Channel等于当前Channel,就不发送给这个Channel,这样就只有别的客户端嫩收到消息
if (value != socketChannel) {
byteBuffer.clear();
byteBuffer.put( valueOf.getBytes());
byteBuffer.flip();
value.write(byteBuffer);
}
}
}
selectionKeys.remove(selectionKey);
}
}catch (Exception e){
e.printStackTrace();
}
});
}
}
}
客户端程序:
public class NioClient {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("localhost",8899));
Selector selector = Selector.open();
//将channel注册到selector上,并绑定selector对于此通道的感兴趣的事件,当此通道`接收连接`时,selector就可以知道此通道`接受连接`。这样就实现了通道和Selector的关联关系。
socketChannel.register(selector, SelectionKey.OP_CONNECT);
while (true){
int select = selector.select();
if(select == 0) continue;
Set<SelectionKey> selectionKeys = selector.selectedKeys();
selectionKeys.forEach(selectionKey ->
if(selectionKey.isConnectable()){
SocketChannel channel = (SocketChannel) selectionKey.channel();
//判断连接是否完成
if(channel.isConnectionPending()){
try {
channel.finishConnect();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put((LocalDateTime.now() + " connect successed!").getBytes());
byteBuffer.flip();
channel.write(byteBuffer);
//通过单线程池创建获取键盘输入的线程
ExecutorService executorService = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
executorService.submit(() -> {
while (true){
byteBuffer.clear();
InputStreamReader inputStreamReader = new InputStreamReader(System.in);
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String s = bufferedReader.readLine();
byteBuffer.put((channel.getLocalAddress().toString() + " : "+ s).getBytes());
byteBuffer.flip();
channel.write(byteBuffer);
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
try {
//将通道绑定为读事件
channel.register(selector,SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}else if(selectionKey.isReadable()){
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (true){
byteBuffer.clear();
try {
int read = channel.read(byteBuffer);
if(read <= 0) break;
String s = new String(byteBuffer.array(), 0, read);
System.out.println(s);
} catch (IOException e) {
e.printStackTrace();
}
}
selectionKeys.remove(selectionKey);
}
});
}
}
}