网络IO之NIO
网络IO之NIO
NIO库是在JDK1.4中才引入,弥补了原来的I/O(BIO)的不足,它是一个高速的、面向块的I/O。
NIO有两层含义:
- 在java层面:nio称为new io,是一套全新的操作io的api。
- 在OS层面:nio称为no-blocking io,系统调用socket函数可以设置一个NONBLOCK的参数,实现调用方法时不会阻塞线程。
与BIO的主要区别
面向流与面向缓冲
BIO是面向流的,面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据,如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。
NIO是面向缓冲区(Buffer)的,数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。
阻塞与非阻塞IO
NIO是阻塞的,这意味着当一个线程调用read()或write()时,该线程会被阻塞,直到有一些数据可以读取读取,或数据完全写入,该线程在此期间不能再干任何事情了。
NIO是一种非阻塞模式,当一个线程调用read()时,如果没有数据可以读取,就直接返回,不会阻塞,在数据变得可以读之前,该线程可以去处理其他事情。非阻塞写也是如此,一个线程请求写入一些数据到某通道(Channel),但不需要等待它完全写入,这个线程同时可以去做别的事情。线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程可以管理多个输入和输出通道(Channel)。
NIO三大核心组件
NIO是一种同步非阻塞的IO模型。同步是指线程不断轮询IO事件是否就绪,非阻塞是指线程在等待IO的时候,可以同时做其他任务。
NIO有三大核心组件:Selector选择器、Channel管道、Buffer缓冲区。
Selector
Selector(选择器):选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器(Selectors),然后使用一个单独的线程来操作这个选择器,进而“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。应用程序将向Selector对象注册需要它关注的Channel,以及具体的某一个Channel会对哪些IO事件感兴趣。Selector中也会维护一个“已经注册的 Channel”的容器。
SelectionKey
SelectionKey是一个抽象类,表示selectableChannel在Selector中注册的标识。每个Channel向Selector注册时,都将会创建一个SelectionKey。SelectionKey将Channel与Selector建立了关系,并维护了channel事件。
可以通过cancel方法取消键,取消的键不会立即从selector中移除,而是添加到cancelledKeys中,在下一次select操作时移除它。所以在调用某个key时,需要使用isValid进行校验。
SelectionKey类型和就绪条件
SelectionKey有如下四种类型,定义在java.nio.channels.SelectionKey中,分别如下表:
操作类型 | 就绪条件及说明 |
---|---|
OP_READ | 当操作系统读缓冲区有数据可读时就绪。并非时刻都有数据可读,所以一般需要注册该操作,仅当有就绪时才发起读操作,有的放矢,避免浪费CPU。 |
OP_WRITE | 当操作系统写缓冲区有空闲空间时就绪。一般情况下写缓冲区都有空闲空间,小块数据直接写入即可,没必要注册该操作类型,否则该条件不断就绪浪费CPU;但如果是写密集型的任务,比如文件下载等,缓冲区很可能满,注册该操作类型就很有必要,同时注意写完后取消注册。 |
OP_CONNECT | 当SocketChannel.connect()请求连接成功后就绪。该操作只给客户端使用。 |
OP_ACCEPT | 当接收到一个客户端连接请求时就绪。该操作只给服务器使用。 |
服务端和客户端分别感兴趣的类型
ServerSocketChannel和SocketChannel可以注册自己感兴趣的操作类型,当对应操作类型的就绪条件满足时OS会通知channel,下表描述各种Channel允许注册的操作类型,Y表示允许注册,N表示不允许注册,其中服务器SocketChannel指由服务器ServerSocketChannel.accept()返回的对象。
OP_READ | OP_WRITE | OP_CONNECT | OP_ACCEPT | |
---|---|---|---|---|
服务器ServerSocketChannel | Y | |||
服务器SocketChannel | Y | Y | ||
客户端SocketChannel | Y | Y | Y |
- 服务器启动ServerSocketChannel,关注OP_ACCEPT事件。
- 客户端启动SocketChannel,连接服务器,关注OP_CONNECT事件。
- 服务器接受连接,启动一个服务器的SocketChannel,这个SocketChannel可以关注OP_READ、OP_WRITE事件,一般连接建立后会直接关注OP_READ事件。
- 客户端这边的客户端SocketChannel发现连接建立后,可以关注OP_READ、OP_WRITE事件,一般是需要客户端需要发送数据了才关注OP_READ事件。
- 连接建立后客户端与服务器端开始相互发送消息(读写),根据实际情况来关注OP_READ、OP_WRITE事件。
Channel
Channel(通道):被建立的一个应用程序和操作系统交互事件、传递内容的渠道(注意是连接到操作系统)。那么既然是和操作系统进行内容的传递,那么说明应用程序可以通过通道读取数据,也可以通过通道向操作系统写数据,而且可以同时进行读写。
- 所有被Selector(选择器)注册的通道,只能是继承了SelectableChannel类的子类。
- ServerSocketChannel:应用服务器程序的监听通道。只有通过这个通道,应用程序才能向操作系统注册支持“多路复用 IO”的端口监听,同时支持UDP协议和TCP协议。
- ScoketChannel:TCP Socket套接字的监听通道,一个Socket套接字对应了一个客户端IP+端口到服务器IP+端口的通信连接。
- 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。
Buffer缓冲区
Buffer其实就是一个数组,在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的; 在写入数据时,它也是写入到缓冲区中的;任何时候访问 NIO 中的数据,都是将它放到缓冲区中。而在面向流I/O系统中,所有数据都是直接写入或者直接将数据读取到Stream对象中。
所有的缓冲区类型都继承于抽象类Buffer,最常用的就是ByteBuffer。
NIO的使用
服务器端代码:
package com.morris.nio.use;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Server {
public static final int PORT = 8899;
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 设置为非阻塞方式
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 注册接收连接的事件
System.out.println("server is start on port: " + PORT);
while (true) {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();
while (selectionKeyIterator.hasNext()) {
SelectionKey key = selectionKeyIterator.next();
selectionKeyIterator.remove();
if(key.isValid()) {
if(key.isAcceptable()){
ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ);
}
if(key.isReadable()){
SocketChannel sc = (SocketChannel)key.channel();
ByteBuffer buf = ByteBuffer.allocate(1024);
int bytesRead = sc.read(buf);
if(bytesRead > 0){
buf.flip();
byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
String body = new String(bytes);
System.out.println("receive from client: " + body);
String response = "hello client";
bytes = response.getBytes();
buf = ByteBuffer.allocate(bytes.length);
buf.put(bytes);
buf.flip();
sc.write(buf);
key.cancel();
sc.close();
}
}
}
}
}
}
}
客户端代码:
package com.morris.nio.use;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Client {
public static int PORT = 8899;
private static boolean stop = false;
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
SocketChannel serverSocketChannel = SocketChannel.open();
serverSocketChannel.configureBlocking(false); // 设置为非阻塞方式
boolean connect = serverSocketChannel.connect(new InetSocketAddress("127.0.0.1", PORT));
if (connect) { // 连接建立成功,注册监听读的事件
serverSocketChannel.register(selector, SelectionKey.OP_READ);
} else {
// 连接还没建立成功,注册监听建立连接的事件
serverSocketChannel.register(selector, SelectionKey.OP_CONNECT);
}
while (!stop) {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();
while (selectionKeyIterator.hasNext()) {
SelectionKey key = selectionKeyIterator.next();
selectionKeyIterator.remove(); // 移除key
if (key.isValid()) {
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
String response = "hello server";
byte[] bytes = response.getBytes();
ByteBuffer buf = ByteBuffer.allocate(bytes.length);
buf.put(bytes);
buf.flip();
sc.write(buf);
}
}
if (key.isReadable()) {
ByteBuffer buf = ByteBuffer.allocate(1024);
int bytesRead = sc.read(buf);
if (bytesRead > 0) {
buf.flip();
byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
String body = new String(bytes);
System.out.println("receive from server: " + body);
key.cancel();
sc.close();
stop = true;
}
}
}
}
}
}
}
key为什么要移除?如果不移除,下次调用selector.select
还会把这个事件选择出来,这个移除只是移除本次监听到的事件,而不是不再监听这个事件了。
key为什么会失效?SelectionKey.isValid()方法检测此key是否有效。当key被取消(cancel),或者通道被关闭,或者selector被关闭,都将导致此key无效。SelectionKey.cancel()方法请求将此键取消注册,一旦返回成功,那么该键就是无效的,被添加到selector的cancelledKeys中。cancel操作将key的valid属性置为false,并执行selector.cancel(key)(即将key加入cancelledkey集合),在下一次select操作时移除它。所以在调用某个key时,需要使用isValid进行校验。
为什么不要监听写事件?先来看下面的例子:
package com.morris.nio.use;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class WriteableServer {
public static final int PORT = 8899;
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 设置为非阻塞方式
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 注册接收连接的事件
System.out.println("server is start on port: " + PORT);
while (true) {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();
while (selectionKeyIterator.hasNext()) {
SelectionKey key = selectionKeyIterator.next();
System.out.println("当前通道的事件:" + key.interestOps());
selectionKeyIterator.remove();
if (key.isValid()) {
if (key.isAcceptable()) {
ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buf = ByteBuffer.allocate(1024);
int bytesRead = sc.read(buf);
if (bytesRead > 0) {
buf.flip();
byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
String body = new String(bytes);
System.out.println("receive from client: " + body);
String response = "hello client";
bytes = response.getBytes();
buf = ByteBuffer.allocate(bytes.length);
buf.put(bytes);
buf.flip();
// 这里不直接写,而是监听写事件,还得监听读时间
sc.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, buf);
} else if (bytesRead < 0) {
key.cancel();
sc.close();
}
} else if (key.isWritable()) {
System.out.println("write");
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
sc.write(byteBuffer);
}
}
}
}
}
}
这段代码一运行,会监听到多次写事件,写操作的就绪条件是内核发送缓冲区有空的情况,OP_WRITE写操作是在内核发送缓冲区满且有数据可写的情况下使用,并且在写事件触发后需要立刻清除写操作注册,否则可能到只无限循环。
所以要修改为下面的代码:
} else if (key.isWritable()) {
System.out.println("write");
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
if (byteBuffer.hasRemaining()) {
sc.write(byteBuffer);
} else {
// 取消对写事件的注册
key.interestOps(SelectionKey.OP_READ);
}
}
本文地址:https://blog.csdn.net/u022812849/article/details/109579503