传统的 IO 流有很多缺陷,它的阻塞性加上磁盘读写本来就慢,会导致 CPU 使用效率大大降低。
jdk 1.4 发布了 NIO 包,NIO 的文件读写设计颠覆了传统 IO 的设计,采用『通道』+『缓存区』使得新式的 IO 操作直接面向缓存区,并且是非阻塞的,对于效率的提升真不是一点两点。
缓存区 Buffer
一个 Buffer 本质上是内存中的一块,我们可以将数据写入这块内存,之后从这块内存获取数据。
Buffer 是所有具体缓存区的基类,是一个抽象类,它的实现类有很多,包含各种类型数据的缓存。
其实核心是最后的 ByteBuffer,前面的一大串类只是包装了一下它而已,我们使用最多的通常也是 ByteBuffer。
我们应该将 Buffer 理解为一个数组,IntBuffer、CharBuffer、DoubleBuffer 等分别对应 int[]、char[]、double[] 等。
MappedByteBuffer 用于实现内存映射文件。
Buffer 中有几个重要的成员属性,我们了解一下:
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
long address;
复制代码
mark 属性我们已经不陌生了,用于重复读。capacity 描述缓存区容量,即整个缓存区最大能存储多少数据量。address 用于操作直接内存,区别于 jvm 内存,这一点待会说明。
而 position,limit,capacity 用一张图结合解释:
最好理解的当然是 capacity,它代表这个缓冲区的容量,一旦设定就不可以更改。比如 capacity 为 1024 的 IntBuffer,代表其一次可以存放 1024 个 int 类型的值。一旦 Buffer 的容量达到 capacity,需要清空 Buffer,才能重新写入值。
由于缓存区是读写共存的,所以不同的模式下,position,limit 变量的值也具有不同的意义。
position 的初始值是 0,每往 Buffer 中写入一个值,position 就自动加 1,代表下一次的写入位置。读操作的时候也是类似的,每读一个值,position 就自动加 1。
从写操作模式到读操作模式切换的时候(flip),position 都会归零,这样就可以从头开始读写了。
Limit:写操作模式下,limit 代表的是最大能写入的数据,这个时候 limit 等于 capacity。写结束后,切换到读模式,此时的 limit 等于 Buffer 中实际的数据大小,因为 Buffer 不一定被写满了。
初始化 Buffer
每个 Buffer 实现类都提供了一个静态方法 allocate(int capacity) 帮助我们快速实例化一个 Buffer。如:
ByteBuffer byteBuf = ByteBuffer.allocate(1024);
IntBuffer intBuf = IntBuffer.allocate(1024);
LongBuffer longBuf = LongBuffer.allocate(1024);
复制代码
另外,我们经常使用 wrap 方法来初始化一个 Buffer。
public static ByteBuffer wrap(byte[] array) {
...
}
复制代码
填充 Buffer
各个 Buffer 类都提供了一些 put 方法用于将数据填充到 Buffer 中,如 ByteBuffer 中的几个 put 方法:
// 填充一个 byte 值
public abstract ByteBuffer put(byte b);
// 在指定位置填充一个 int 值
public abstract ByteBuffer put(int index, byte b);
// 将一个数组中的值填充进去
public final ByteBuffer put(byte[] src) {...}
public ByteBuffer put(byte[] src, int offset, int length) {...}
复制代码
上述这些方法需要自己控制 Buffer 大小,不能超过 capacity,超过会抛 java.nio.BufferOverflowException 异常。
对于 Buffer 来说,另一个常见的操作中就是,我们要将来自 Channel 的数据填充到 Buffer 中,在系统层面上,这个操作我们称为读操作,因为数据是从外部(文件或网络等)读到内存中。
int num = channel.read(buf);
复制代码
上述方法会返回从 Channel 中读入到 Buffer 的数据大小。
提取 Buffer 中的值
每写入一个值,position 的值都需要加 1,所以 position 最后会指向最后一次写入的位置的后面一个,如果 Buffer 写满了,那么 position 等于 capacity(position 从 0 开始)。
如果要读 Buffer 中的值,需要切换模式,从写入模式切换到读出模式。注意,通常在说 NIO 的读操作的时候,我们说的是从 Channel 中读数据到 Buffer 中,对应的是对 Buffer 的写入操作。
调用 Buffer 的 flip() 方法,可以从写入模式切换到读取模式。其实这个方法也就是设置了一下 position 和 limit 值罢了。
public final Buffer flip() {
limit = position; // 将 limit 设置为实际写入的数据数量
position = 0; // 重置 position 为 0
mark = -1; // mark 之后再说
return this;
}
复制代码
对应写入操作的一系列 put 方法,读操作提供了一系列的 get 方法:
// 根据 position 来获取数据
public abstract byte get();
// 获取指定位置的数据
public abstract byte get(int index);
// 将 Buffer 中的数据写入到数组中
public ByteBuffer get(byte[] dst)
复制代码
附一个经常使用的方法:
new String(buffer.array()).trim();
复制代码
当然了,除了将数据从 Buffer 取出来使用,更常见的操作是将我们写入的数据传输到 Channel 中,如通过 FileChannel 将数据写入到文件中,通过 SocketChannel 将数据写入网络发送到远程机器等。对应的,这种操作,我们称之为写操作。
int num = channel.write(buf);
复制代码
mark() & reset()
mark 用于临时保存 position 的值,每次调用 mark() 方法都会将 mark 设值为当前的 position,便于后续需要的时候使用。
public final Buffer mark() {
mark = position;
return this;
}
复制代码
考虑以下场景,我们在 position 为 5 的时候,先 mark() 一下,然后继续往下读,读到第 10 的时候,我想重新回到 position 为 5 的地方重新来一遍,那只要调一下 reset() 方法,position 就回到 5 了。
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
复制代码
rewind() & clear() & compact()
rewind():会重置 position 为 0,通常用于重新从头读写 Buffer。
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}
复制代码
clear():有点重置 Buffer 的意思,相当于重新实例化了一样。
通常,我们会先填充 Buffer,然后从 Buffer 读取数据,之后我们再重新往里填充新的数据,我们一般在重新填充之前先调用 clear()。
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
复制代码
compact():和 clear() 一样的是,它们都是在准备往 Buffer 填充新的数据之前调用。
前面说的 clear() 方法会重置几个属性,但是我们要看到,clear() 方法并不会将 Buffer 中的数据清空,只不过后续的写入会覆盖掉原来的数据,也就相当于清空了数据了。
而 compact() 方法有点不一样,调用这个方法以后,会先处理还没有读取的数据,也就是 position 到 limit 之间的数据(还没有读过的数据),先将这些数据移到左边,然后在这个基础上再开始写入。很明显,此时 limit 还是等于 capacity,position 指向原来数据的右边。
通道 Channel
NIO 的核心就是通道和缓存区,所以它们的工作模式是这样的:
所有的 NIO 操作始于通道,通道是数据来源或数据写入的目的地。
通道有点类似 IO 中的流,但不同的是,同一个通道既允许读也允许写,而任意一个流要么是读流要么是写流。 但是你要明白一点,通道和流一样都是需要基于物理文件的,而每个流或者通道都通过文件指针操作文件,这里说的「通道是双向的」也是有前提的,那就是通道基于随机访问文件『RandomAccessFile』的可读可写文件指针。 『RandomAccessFile』是既可读又可写的,所以基于它的通道是双向的,所以,「通道是双向的」这句话是有前提的,不能断章取义。 基本的通道类型有如下一些:
FileChannel 是基于文件的通道,SocketChannel 和 ServerSocketChannel 用于网络 TCP 套接字数据报读写,DatagramChannel 是用于网络 UDP 套接字数据报读写。
通道不能单独存在,它永远需要绑定一个缓存区,所有的数据只会存在于缓存区中,无论你是写或是读,必然是缓存区通过通道到达磁盘文件,或是磁盘文件通过通道到达缓存区。 即缓存区是数据的「起点」,也是「终点」。
FileChannel
不过我们在说 NIO 的时候,其实 FileChannel 并不是关注的重点。而且后面我们说非阻塞的时候会看到,FileChannel 是不支持非阻塞的。
初始化:
FileInputStream inputStream = new FileInputStream(new File("/data.txt"));
FileChannel fileChannel = inputStream.getChannel();
复制代码
当然了,我们也可以从 RandomAccessFile#getChannel 来得到 FileChannel。
读取文件内容:
ByteBuffer buffer = ByteBuffer.allocate(1024);
int num = fileChannel.read(buffer);
复制代码
前面我们也说了,所有的 Channel 都是和 Buffer 打交道的。
写入文件内容:
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("随机写入一些内容到 Buffer 中".getBytes());
// Buffer 切换为读模式
buffer.flip();
while(buffer.hasRemaining()) {
// 将 Buffer 中的内容写入文件
fileChannel.write(buffer);
}
复制代码
SocketChannel
打开一个 TCP 连接:
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("https://www.javadoop.com", 80));
复制代码
上面的这行代码等价于下面的两行:
// 打开一个通道
SocketChannel socketChannel = SocketChannel.open();
// 发起连接
socketChannel.connect(new InetSocketAddress("https://www.javadoop.com", 80));
复制代码
SocketChannel 的读写和 FileChannel 没什么区别,就是操作缓冲区。
// 读取数据
socketChannel.read(buffer);
// 写入数据到网络连接中
while(buffer.hasRemaining()) {
socketChannel.write(buffer);
}
复制代码
ServerSocketChannel
之前说 SocketChannel 是 TCP 客户端,这里说的 ServerSocketChannel 就是对应的服务端。
ServerSocketChannel 用于监听机器端口,管理从这个端口进来的 TCP 连接。
// 实例化
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 监听 8080 端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
while (true) {
// 一旦有一个 TCP 连接进来,就对应创建一个 SocketChannel 进行处理
SocketChannel socketChannel = serverSocketChannel.accept();
}
复制代码
SocketChannel 了,它不仅仅是 TCP 客户端,它代表的是一个网络通道,可读可写。
ServerSocketChannel 不和 Buffer 打交道了,因为它并不实际处理数据,它一旦接收到请求后,实例化 SocketChannel,之后在这个连接通道上的数据传递它就不管了,因为它需要继续监听端口,等待下一个连接。
DatagramChannel
UDP 和 TCP 不一样,DatagramChannel 一个类处理了服务端和客户端。
监听端口:
DatagramChannel channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(9090));
ByteBuffer buf = ByteBuffer.allocate(48);
channel.receive(buf);
复制代码
发送数据:
String newData = "New String to write to file..."
+ System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.put(newData.getBytes());
buf.flip();
int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com", 80));
复制代码
JVM 内存划分为栈和堆,这是大家深入脑海的知识,但是其实划分给 JVM 的还有一块堆外内存,也就是直接内存,很多人不知道这块内存是干什么用的。 这是一块物理内存,专门用于 JVM 和 IO 设备打交道,Java 底层使用 C 语言的 API 调用操作系统与 IO 设备进行交互。
例如,Java 内存中有一个字节数组,现在调用流将它写入磁盘文件,那么 JVM 首先会将这个字节数组先拷贝一份到堆外内存中,然后调用 C 语言 API 指明将某个连续地址范围的数据写入磁盘。
读操作也是类似,而 JVM 额外做的拷贝工作也是有意义的,因为 JVM 是基于自动垃圾回收机制运行的,所有内存中的数据会在 GC 时不停的被移动,如果你调用系统 API 告诉操作系统将内存某某位置的内存写入磁盘,而此时发生 GC 移动了该部分数据,GC 结束后操作系统是不是就写错数据了。
所以,JVM 对于与外围 IO 设备交互的情况下,都会将内存数据复制一份到堆外内存中,然后调用系统 API 间接的写入磁盘,读也是类似的。由于堆外内存不受 GC 管理,所以用完一定得记得释放。
理解这一个小知识是看懂源码实现的前提,不然你可能不知道代码实现者在做什么。好了,那我们就先来看看读操作的基本使用与源码实现。
RandomAccessFile file = new RandomAccessFile
("C:\\Users\\yanga\\Desktop\\note.txt","rw");
FileChannel channel = file.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
byte[] res = new byte[1024];
buffer.get(res,0,buffer.limit());
System.out.println(new String(res));
channel.close();
复制代码
我们看这么一段代码,这段代码我大致分成了四个部分,第一部分用于获取文件通道,第二部分用于分配缓存区并完成读操作,第三部分用于将缓存区中数据进行打印,第四部分为关闭通道连接。
getChannel 方法用于获取一个文件相关的通道实例,具体实现如下:
public final FileChannel getChannel() {
synchronized (this) {
if (channel == null) {
channel = FileChannelImpl.open(fd, path, true, rw, this);
}
return channel;
}
}
public static FileChannel open
(FileDescriptor var0, String var1, boolean var2, boolean var3, Object var4) {
return new FileChannelImpl(var0, var1, var2, var3, false, var4);
}
复制代码
getChannel 方法会调用 FileChannelImpl 的工厂方法构建一个 FileChannelImpl 实例,FileChannelImpl 是抽象类 FileChannel 的一个子类实现。 构成 FileChannelImpl 实例所需的必要参数有,该文件的文件指针,该文件的完整路径,读写权限等。
所谓的缓存区,本质上就是字节数组。
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}
复制代码
ByteBuffer 实例的构建是通过工厂模式产生的,必须指定参数 capacity 作为内部字节数组的容量。HeapByteBuffer 是虚拟机的堆上内存,所有数据都将存储在堆空间,我们不久将会介绍它的一个兄弟,DirectByteBuffer,它被分配在堆外内存中。
HeapByteBuffer(int cap, int lim) {
super(-1, 0, lim, cap, new byte[cap], 0);
}
复制代码
调用父类的构造方法,初始化我们在 ByteBuffer 中提过的一些属性值,如 position,capacity,mark,limit,offset 以及字节数组 hb。
我们看看这个 read 方法的调用链。
这个 read 方法是子类 FileChannelImpl 对父类 FileChannel read 方法的重写。这个方法不是读操作的核心,我们简单概括一下,该方法首先会拿到当前通道实例的锁,如果没有被其他线程占有,那么占有该锁,并调用 IOUtil 的 read 方法。
首先判断我们的 ByteBuffer 实例是不是一个 DirectBuffer,也就是判断当前的 ByteBuffer 实例是不是被分配在直接内存中,如果是,那么将调用 readIntoNativeBuffer 方法从磁盘读取数据直接放入 ByteBuffer 实例所在的直接内存中。 否则,虚拟机将在直接内存区域分配一块内存,该内存区域的首地址存储在 var5 实例的 address 属性中。 接着从磁盘读取数据放入 var5 所代表的直接内存区域中。 最后,put 方法会将 var5 所代表的直接内存区域中的数据写入到 var1 所代表的堆内缓存区并释放临时创建的直接内存空间。 这样,我们传入的缓存区中就成功的被读入了数据。写操作是相反的,大家可以自行类比,反正堆内数据想要到达磁盘就必定要经过堆外内存的复制过程。
想要更好的使用这个通道和缓存区进行文件读写操作,你就一定得对缓存区的几个变量的值时刻把握住,position 和 limit 当前的值是什么,大致什么位置,一定得清晰,否则这个读写共存的缓存区可能会让你晕头转向。
选择器 Selector
Selector 是 Java NIO 的一个组件,Selector 建立在非阻塞的基础之上,大家经常听到的 多路复用 在 Java 世界中指的就是它,用于实现一个线程管理多个 Channel。但本质上由于 FileChannel 不支持注册选择器,所以 Selector 一般被认为是服务于网络套接字通道的。 而大家口中的「NIO 是非阻塞的」,准确来说,指的是网络编程中客户端与服务端连接交换数据的过程是非阻塞的。普通的文件读写依然是阻塞的,和 IO 是一样的。
创建一个选择器一般是通过 Selector 的工厂方法:
Selector selector = Selector.open();
复制代码
而一个通道想要注册到某个 Selector 中,必须调整模式为非阻塞模式,例如:
//创建一个 TCP 套接字通道
SocketChannel channel = SocketChannel.open();
//调整通道为非阻塞模式
channel.configureBlocking(false);
//向选择器注册一个通道
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
复制代码
以上代码是注册一个通道到选择器中的最简单版本,支持注册选择器的通道都有一个 register 方法,该方法就是用于注册当前实例通道到指定选择器的。 该方法的第一个参数就是目标选择器,第二个参数其实是一个二进制掩码,它指明当前选择器感兴趣当前通道的哪些事件。以枚举类型提供了以下几种取值:
- int OP_READ = 1 << 0;//对应 00000001,通道中有数据可以进行读取
- int OP_WRITE = 1 << 2;//对应 00000100,可以往通道中写入数据
- int OP_CONNECT = 1 << 3;//对应 00001000,成功建立 TCP 连接
- int OP_ACCEPT = 1 << 4;//对应 00010000,接受 TCP 连接
这种用二进制掩码来表示某些状态的机制,我们在讲述虚拟机类类文件结构的时候也遇到过,它就是用一个二进制位来描述一种状态。
我们可以同时监听一个 Channel 中的发生的多个事件,比如我们要监听 ACCEPT 和 READ 事件,那么指定参数为二进制的 00010001 即十进制数值 17 即可。
register 方法会返回一个 SelectionKey 实例,该实例代表的就是选择器与通道的一个关联关系。你可以调用它的 selector 方法返回当前相关联的选择器实例,也可以调用它的 channel 方法返回当前关联关系中的通道实例。 除此之外,SelectionKey 的 readyOps 方法将返回当前选择感兴趣当前通道中事件中准备就绪的事件集合,依然返回的一个整型数值,也就是一个二进制掩码。
int readySet = selectionKey.readyOps();
复制代码
假如 readySet 的值为 13,二进制 「0000 1101」,从后向前数,第一位为 1,第三位为 1,第四位为 1,那么说明选择器关联的通道,读就绪、写就绪,连接就绪。
所以,当我们注册一个通道到选择器之后,就可以通过返回的 SelectionKey 实例监听该通道的各种事件。 当然,一旦某个选择器中注册了多个通道,我们不可能一个一个的记录它们注册时返回的 SelectionKey 实例来监听通道事件,选择器应当有方法返回所有注册成功的通道相关的 SelectionKey 实例。
Set<SelectionKey> keys = selector.selectedKeys();
复制代码
selectedKeys 方法会返回选择器中注册成功的所有通道的 SelectionKey 实例集合。我们通过这个集合的 SelectionKey 实例,可以得到所有通道的事件就绪情况并进行相应的处理操作。
调用 select() 方法获取通道信息。用于判断是否有我们感兴趣的事件已经发生了。
下面我们以一个简单的客户端服务端连接通讯的实例应用一下上述理论知识:
这段小程序的运行的实际效果是这样的,客户端建立请求到服务端,待请求完全建立,客户端会去检查服务端是否有数据写回,而服务端的任务就很简单了,接受任意客户端的请求连接并为它写回一段数据。 别看整个过程很简单,但只要你有一点模糊的地方,你这个功能就不可能实现,不信你试试,尤其是加了选择器的客户端代码,更值得大家一行一行分析。提醒一点的是,大家应更多的关注于哪些方法是阻塞的,哪些是非阻塞的,这会有助于分析代码。
想必你也能发现,加了选择器的代码会复杂很多,也并不一定高效于原来的代码,这其实是因为你的功能比较简单,并不涉及大量通道处理,逻辑一旦复杂起来,选择器给你带来的好处会非常明显。
对于 Selector,我们还需要非常熟悉以下几个方法:
- select()
调用此方法,会将上次 select 之后的准备好的 channel 对应的 SelectionKey 复制到 selected set 中。如果没有任何通道准备好,这个方法会阻塞,直到至少有一个通道准备好。
- selectNow()
功能和 select 一样,区别在于如果没有准备好的通道,那么此方法会立即返回 0。
- select(long timeout)
看了前面两个,这个应该很好理解了,如果没有通道准备好,此方法会等待一会
- wakeup()
这个方法是用来唤醒等待在 select() 和 select(timeout) 上的线程的。如果 wakeup() 先被调用,此时没有线程在 select 上阻塞,那么之后的一个 select() 或 select(timeout) 会立即返回,而不会阻塞,当然,它只会作用一次。
Java 非阻塞 IO 和异步 IO
阻塞模式 IO
客户端-服务端网络通讯所需要的 ServerSocketChannel、SocketChannel 和 Buffer
public class Server {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 监听 8080 端口进来的 TCP 链接
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
while (true) {
// 这里会阻塞,直到有一个请求的连接进来
SocketChannel socketChannel = serverSocketChannel.accept();
// 开启一个新的线程来处理这个请求,然后在 while 循环中继续监听 8080 端口
SocketHandler handler = new SocketHandler(socketChannel);
new Thread(handler).start();
}
}
}
复制代码
SocketHandler
public class SocketHandler implements Runnable {
private SocketChannel socketChannel;
public SocketHandler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
// 将请求数据读入 Buffer 中
int num;
while ((num = socketChannel.read(buffer)) > 0) {
// 读取 Buffer 内容之前先 flip 一下
buffer.flip();
// 提取 Buffer 中的数据
byte[] bytes = new byte[num];
buffer.get(bytes);
String re = new String(bytes, "UTF-8");
System.out.println("收到请求:" + re);
// 回应客户端
ByteBuffer writeBuffer = ByteBuffer.wrap(("我已经收到你的请求,你的请求内容是:" + re).getBytes());
socketChannel.write(writeBuffer);
buffer.clear();
}
} catch (IOException e) {
IOUtils.closeQuietly(socketChannel);
}
}
}
复制代码
客户端 SocketChannel
public class SocketChannelTest {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8080));
// 发送请求
ByteBuffer buffer = ByteBuffer.wrap("1234567890".getBytes());
socketChannel.write(buffer);
// 读取响应
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int num;
if ((num = socketChannel.read(readBuffer)) > 0) {
readBuffer.flip();
byte[] re = new byte[num];
readBuffer.get(re);
String result = new String(re, "UTF-8");
System.out.println("返回值: " + result);
}
}
}
复制代码
这个模式下的性能瓶颈在哪里呢?
-
首先,每次来一个连接都开一个新的线程这肯定是不合适的。当活跃连接数在几十几百的时候当然是可以这样做的,但如果活跃连接数是几万几十万的时候,这么多线程明显就不行了。每个线程都需要一部分内存,内存会被迅速消耗,同时,线程切换的开销非常大。
-
其次,阻塞操作在这里也是一个问题。首先,accept() 是一个阻塞操作,当 accept() 返回的时候,代表有一个连接可以使用了,我们这里是马上就新建线程来处理这个 SocketChannel 了,但是,但是这里不代表对方就将数据传输过来了。所以,SocketChannel#read 方法将阻塞,等待数据,明显这个等待是不值得的。同理,write 方法也需要等待通道可写才能执行写入操作,这边的阻塞等待也是不值得的。
非阻塞 IO
非阻塞 IO 的核心在于使用一个 Selector 来管理多个通道,可以是 SocketChannel,也可以是 ServerSocketChannel,将各个通道注册到 Selector 上,指定监听的事件。
之后可以只用一个线程来轮询这个 Selector,看看上面是否有通道是准备好的,当通道准备好可读或可写,然后才去开始真正的读写,这样速度就很快了。我们就完全没有必要给每个通道都起一个线程。
NIO 中 Selector 是对底层操作系统实现的一个抽象,管理通道状态其实都是底层系统实现的,这里简单介绍下在不同系统下的实现。
select:上世纪 80 年代就实现了,它支持注册 FD_SETSIZE(1024) 个 socket,在那个年代肯定是够用的,不过现在嘛,肯定是不行了。
poll:1997 年,出现了 poll 作为 select 的替代者,最大的区别就是,poll 不再限制 socket 数量。
select 和 poll 都有一个共同的问题,那就是它们都只会告诉你有几个通道准备好了,但是不会告诉你具体是哪几个通道。所以,一旦知道有通道准备好以后,自己还是需要进行一次扫描,显然这个不太好,通道少的时候还行,一旦通道的数量是几十万个以上的时候,扫描一次的时间都很可观了,时间复杂度 O(n)。所以,后来才催生了以下实现。
epoll:2002 年随 Linux 内核 2.5.44 发布,epoll 能直接返回具体的准备好的通道,时间复杂度 O(1)。
除了 Linux 中的 epoll,2000 年 FreeBSD 出现了 Kqueue,还有就是,Solaris 中有 /dev/poll。
前面说了那么多实现,但是没有出现 Windows,Windows 平台的非阻塞 IO 使用 select,我们也不必觉得 Windows 很落后,在 Windows 中 IOCP 提供的异步 IO 是比较强大的。
我们回到 Selector,毕竟 JVM 就是这么一个屏蔽底层实现的平台,我们面向 Selector 编程就可以了。
之前在介绍 Selector 的时候已经了解过了它的基本用法,这边来一个可运行的实例代码,大家不妨看看:
public class SelectorServer {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.socket().bind(new InetSocketAddress(8080));
// 将其注册到 Selector 中,监听 OP_ACCEPT 事件
server.configureBlocking(false);
server.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int readyChannels = selector.select();
if (readyChannels == 0) {
continue;
}
Set<SelectionKey> readyKeys = selector.selectedKeys();
// 遍历
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
// 有已经接受的新的到服务端的连接
SocketChannel socketChannel = server.accept();
// 有新的连接并不代表这个通道就有数据,
// 这里将这个新的 SocketChannel 注册到 Selector,监听 OP_READ 事件,等待数据
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 有数据可读
// 上面一个 if 分支中注册了监听 OP_READ 事件的 SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int num = socketChannel.read(readBuffer);
if (num > 0) {
// 处理进来的数据...
System.out.println("收到数据:" + new String(readBuffer.array()).trim());
ByteBuffer buffer = ByteBuffer.wrap("返回给客户端的数据...".getBytes());
socketChannel.write(buffer);
} else if (num == -1) {
// -1 代表连接已经关闭
socketChannel.close();
}
}
}
}
}
}
复制代码
NIO.2 异步 IO
More New IO,或称 NIO.2,随 JDK 1.7 发布,包括了引入异步 IO 接口和 Paths 等文件访问接口。
通常,我们会有一个线程池用于执行异步任务,提交任务的线程将任务提交到线程池就可以立马返回,不必等到任务真正完成。如果想要知道任务的执行结果,通常是通过传递一个回调函数的方式,任务结束后去调用这个函数。
同样的原理,Java 中的异步 IO 也是一样的,都是由一个线程池来负责执行任务,然后使用回调或自己去查询结果。
异步 IO 主要是为了控制线程数量,减少过多的线程带来的内存消耗和 CPU 在线程调度上的开销。
在 Unix/Linux 等系统中,JDK 使用了并发包中的线程池来管理任务,具体可以查看 AsynchronousChannelGroup 的源码。
在 Windows 操作系统中,提供了一个叫做 I/O Completion Ports 的方案,通常简称为 IOCP,操作系统负责管理线程池,其性能非常优异,所以在 Windows 中 JDK 直接采用了 IOCP 的支持,使用系统支持,把更多的操作信息暴露给操作系统,也使得操作系统能够对我们的 IO 进行一定程度的优化。
总共有三个类需要我们关注,分别是 AsynchronousSocketChannel,AsynchronousServerSocketChannel 和 AsynchronousFileChannel,只不过是在之前介绍的 FileChannel、SocketChannel 和 ServerSocketChannel 的类名上加了个前缀 Asynchronous。
Java 异步 IO 提供了两种使用方式,分别是返回 Future 实例和使用回调函数。
1、返回 Future 实例
返回 java.util.concurrent.Future 实例的方式我们应该很熟悉,JDK 线程池就是这么使用的。Future 接口的几个方法语义在这里也是通用的,这里先做简单介绍。
future.isDone();
判断操作是否已经完成,包括了正常完成、异常抛出、取消
future.cancel(true);
取消操作,方式是中断。参数 true 说的是,即使这个任务正在执行,也会进行中断。
future.isCancelled();
是否被取消,只有在任务正常结束之前被取消,这个方法才会返回 true
future.get();
这是我们的老朋友,获取执行结果,阻塞。
future.get(10, TimeUnit.SECONDS);
如果上面的 get() 方法的阻塞你不满意,那就设置个超时时间。
2、提供 CompletionHandler 回调函数
java.nio.channels.CompletionHandler 接口定义:
public interface CompletionHandler<V,A> {
void completed(V result, A attachment);
void failed(Throwable exc, A attachment);
}
复制代码
注意,参数上有个 attachment,虽然不常用,我们可以在各个支持的方法中传递这个参数值
AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open().bind(null);
// accept 方法的第一个参数可以传递 attachment
listener.accept(attachment, new CompletionHandler<AsynchronousSocketChannel, Object>() {
public void completed(
AsynchronousSocketChannel client, Object attachment) {
//
}
public void failed(Throwable exc, Object attachment) {
//
}
});
复制代码
AsynchronousFileChannel
首先,我们就来关注异步的文件 IO,前面我们说了,文件 IO 在所有的操作系统中都不支持非阻塞模式,但是我们可以对文件 IO 采用异步的方式来提高性能。
AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("/Users/hongjie/test.txt"));
复制代码
我们就可以着手准备将数据读入到 Buffer 中:
ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> result = channel.read(buffer, 0);
复制代码
异步文件通道的读操作和写操作都需要提供一个文件的开始位置,文件开始位置为 0
除了使用返回 Future 实例的方式,也可以采用回调函数进行操作,接口如下:
public abstract <A> void read(ByteBuffer dst,
long position,
A attachment,
CompletionHandler<Integer,? super A> handler);
复制代码
顺便也贴一下写操作的两个版本的接口:
public abstract Future<Integer> write(ByteBuffer src, long position);
public abstract <A> void write(ByteBuffer src,
long position,
A attachment,
CompletionHandler<Integer,? super A> handler);
复制代码
AIO 的读写主要也还是与 Buffer 打交道,这个与 NIO 是一脉相承的。
另外,还提供了用于将内存中的数据刷入到磁盘的方法:
public abstract void force(boolean metaData) throws IOException;
复制代码
因为我们对文件的写操作,操作系统并不会直接针对文件操作,系统会缓存,然后周期性地刷入到磁盘。如果希望将数据及时写入到磁盘中,以免断电引发部分数据丢失,可以调用此方法。参数如果设置为 true,意味着同时也将文件属性信息更新到磁盘。
还有,还提供了对文件的锁定功能,我们可以锁定文件的部分数据,这样可以进行排他性的操作。
public abstract Future<FileLock> lock(long position, long size, boolean shared);
复制代码
position 是要锁定内容的开始位置,size 指示了要锁定的区域大小,shared 指示需要的是共享锁还是排他锁
当然,也可以使用回调函数的版本:
public abstract <A> void lock(long position,
long size,
boolean shared,
A attachment,
CompletionHandler<FileLock,? super A> handler);
复制代码
文件锁定功能上还提供了 tryLock 方法,此方法会快速返回结果:
public abstract FileLock tryLock(long position, long size, boolean shared)
throws IOException;
复制代码
这个方法很简单,就是尝试去获取锁,如果该区域已被其他线程或其他应用锁住,那么立刻返回 null,否则返回 FileLock 对象。
AsynchronousServerSocketChannel
public class Server {
public static void main(String[] args) throws IOException {
// 实例化,并监听端口
AsynchronousServerSocketChannel server =
AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080));
// 自己定义一个 Attachment 类,用于传递一些信息
Attachment att = new Attachment();
att.setServer(server);
server.accept(att, new CompletionHandler<AsynchronousSocketChannel, Attachment>() {
@Override
public void completed(AsynchronousSocketChannel client, Attachment att) {
try {
SocketAddress clientAddr = client.getRemoteAddress();
System.out.println("收到新的连接:" + clientAddr);
// 收到新的连接后,server 应该重新调用 accept 方法等待新的连接进来
att.getServer().accept(att, this);
Attachment newAtt = new Attachment();
newAtt.setServer(server);
newAtt.setClient(client);
newAtt.setReadMode(true);
newAtt.setBuffer(ByteBuffer.allocate(2048));
// 这里也可以继续使用匿名实现类,不过代码不好看,所以这里专门定义一个类
client.read(newAtt.getBuffer(), newAtt, new ChannelHandler());
} catch (IOException ex) {
ex.printStackTrace();
}
}
@Override
public void failed(Throwable t, Attachment att) {
System.out.println("accept failed");
}
});
// 为了防止 main 线程退出
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
}
}
}
复制代码
public class ChannelHandler implements CompletionHandler<Integer, Attachment> {
@Override
public void completed(Integer result, Attachment att) {
if (att.isReadMode()) {
// 读取来自客户端的数据
ByteBuffer buffer = att.getBuffer();
buffer.flip();
byte bytes[] = new byte[buffer.limit()];
buffer.get(bytes);
String msg = new String(buffer.array()).toString().trim();
System.out.println("收到来自客户端的数据: " + msg);
// 响应客户端请求,返回数据
buffer.clear();
buffer.put("Response from server!".getBytes(Charset.forName("UTF-8")));
att.setReadMode(false);
buffer.flip();
// 写数据到客户端也是异步
att.getClient().write(buffer, att, this);
} else {
// 到这里,说明往客户端写数据也结束了,有以下两种选择:
// 1. 继续等待客户端发送新的数据过来
// att.setReadMode(true);
// att.getBuffer().clear();
// att.getClient().read(att.getBuffer(), att, this);
// 2. 既然服务端已经返回数据给客户端,断开这次的连接
try {
att.getClient().close();
} catch (IOException e) {
}
}
}
@Override
public void failed(Throwable t, Attachment att) {
System.out.println("连接断开");
}
}
复制代码
public class Attachment {
private AsynchronousServerSocketChannel server;
private AsynchronousSocketChannel client;
private boolean isReadMode;
private ByteBuffer buffer;
// getter & setter
}
复制代码
AsynchronousSocketChannel
public class Client {
public static void main(String[] args) throws Exception {
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
// 来个 Future 形式的
Future<?> future = client.connect(new InetSocketAddress(8080));
// 阻塞一下,等待连接成功
future.get();
Attachment att = new Attachment();
att.setClient(client);
att.setReadMode(false);
att.setBuffer(ByteBuffer.allocate(2048));
byte[] data = "I am obot!".getBytes();
att.getBuffer().put(data);
att.getBuffer().flip();
// 异步发送数据到服务端
client.write(att.getBuffer(), att, new ClientChannelHandler());
// 这里休息一下再退出,给出足够的时间处理数据
Thread.sleep(2000);
}
}
复制代码
public class ClientChannelHandler implements CompletionHandler<Integer, Attachment> {
@Override
public void completed(Integer result, Attachment att) {
ByteBuffer buffer = att.getBuffer();
if (att.isReadMode()) {
// 读取来自服务端的数据
buffer.flip();
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
String msg = new String(bytes, Charset.forName("UTF-8"));
System.out.println("收到来自服务端的响应数据: " + msg);
// 接下来,有以下两种选择:
// 1. 向服务端发送新的数据
// att.setReadMode(false);
// buffer.clear();
// String newMsg = "new message from client";
// byte[] data = newMsg.getBytes(Charset.forName("UTF-8"));
// buffer.put(data);
// buffer.flip();
// att.getClient().write(buffer, att, this);
// 2. 关闭连接
try {
att.getClient().close();
} catch (IOException e) {
}
} else {
// 写操作完成后,会进到这里
att.setReadMode(true);
buffer.clear();
att.getClient().read(buffer, att, this);
}
}
@Override
public void failed(Throwable t, Attachment att) {
System.out.println("服务器无响应");
}
}
复制代码
Asynchronous Channel Groups
之前我们说过,异步 IO 一定存在一个线程池,这个线程池负责接收任务、处理 IO 事件、回调等。这个线程池就在 group 内部,group 一旦关闭,那么相应的线程池就会关闭。
AsynchronousServerSocketChannels 和 AsynchronousSocketChannels 是属于 group 的,当我们调用 AsynchronousServerSocketChannel 或 AsynchronousSocketChannel 的 open() 方法的时候,相应的 channel 就属于默认的 group,这个 group 由 JVM 自动构造并管理。
如果我们想要配置这个默认的 group,可以在 JVM 启动参数中指定以下系统变量:
- java.nio.channels.DefaultThreadPool.threadFactory
此系统变量用于设置 ThreadFactory,它应该是 java.util.concurrent.ThreadFactory 实现类的全限定类名。一旦我们指定了这个 ThreadFactory 以后,group 中的线程就会使用该类产生。
- java.nio.channels.DefaultThreadPool.initialSize
此系统变量也很好理解,用于设置线程池的初始大小。
可能你会想要使用自己定义的 group,这样可以对其中的线程进行更多的控制,使用以下几个方法即可:
- AsynchronousChannelGroup.withCachedThreadPool(ExecutorService executor, int initialSize)
- AsynchronousChannelGroup.withFixedThreadPool(int nThreads, ThreadFactory threadFactory)
- AsynchronousChannelGroup.withThreadPool(ExecutorService executor)
熟悉线程池的读者对这些方法应该很好理解,它们都是 AsynchronousChannelGroup 中的静态方法。
至于 group 的使用就很简单了,代码一看就懂:
AsynchronousChannelGroup group = AsynchronousChannelGroup
.withFixedThreadPool(10, Executors.defaultThreadFactory());
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group);
复制代码
AsynchronousFileChannels 不属于 group。但是它们也是关联到一个线程池的,如果不指定,会使用系统默认的线程池,如果想要使用指定的线程池,可以在实例化的时候使用以下方法:
public static AsynchronousFileChannel open(Path file,
Set<? extends OpenOption> options,
ExecutorService executor,
FileAttribute<?>... attrs) {
...
}
复制代码