欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java NIO (二) 通道(Channels)

程序员文章站 2022-04-24 11:41:07
...

Channels

channels与buffers配合去实现高性能的I/O操作。本章向你介绍NIO的channel类型。

Channels 介绍

channel是一个对象,这个对象代表一个与硬盘,文件,网络socket,程序组件的打开链接,或者另一个可以去执行写,读和其他I/O操作的实体。channel可以在字节buffer和基于操作系统的I/O服务源或目的之间高效地传输数据。

注意 Channels是访问I/O服务的门卫。Channels使用字节buffers作为发送和接受数据的终点。

操作系统的文件处理器或文件描述符和channel之间经常出现一对一的响应。当你在一个文件中用channels,这个channel将经常连接一个打开的文件操作符。尽管channels比文件操作符更加抽象,他们仍然能够对操作系统的I/O实施进行建模。

Channel与其子类

Java通过包java.nio.channels和java.nio.channels.spi来支持channels,应用程序与位于前一个包的类型进行交互;正在定义新的selector提供者的开发人员使用后者的包(下一章会讨论selectors)。

所有的channels都是实现java.nio.channels.Channel接口的实现类。Channel声明了以下方法:

  • void close():关闭此channel,当此channel已经关闭,调用close()没有任何影响。当另一个线程已经调用了close()方法,再调用close()会阻塞直到第一次调用完成。此后的close()调用没有任何影响。
  • boolean isOpen():返回此channel的打开状态,channel打开时返回true,反之返回false。

这些方法指明了所有的channel只有这两个操作是通用的:关闭channel和判断channel是关闭或打开。为了支持I/O,Channel也由两个接口java.nio.channels.WritableByteChannel和java.nio.channels.ReadableByteChannel来继承:

  • WritableByteChannel 声明了一个抽象方法 int write(ByteBuffer buffer) 从buffer写出一个字节序列到当前channel。这个方法返回已经写的字节的数量。
  • ReadableByteChannel 声明了一个抽象方法 int read(ByteBuffer buffer) 从当前channel读取字节到buffer中。这个方法返回实际读的字节数量(当没有更多要读的字节时返回-1)。

java.nio.channels.InterruptibleChannel接口也继承了Channel。InterruptibleChannel描述可以异步关闭和中断的channel。这个接口重写了父接口Channel的close()方法头,为Channel的这个方法提供了以下附加约定:当前在此channel的I/O操作中被阻塞的任何线程将收到AsynchronousCloseException异常(一个IOException的派生类)。

实现了此接口的channel是同步可关闭的:在可中断的channel中,当一个线程阻塞在I/O操作时,另一个线程可以调用此channel的close()方法。这就导致以阻塞线程会接收到抛出的异常AsynchronousCloseException实例。

实现了此接口的channel也是可中断的:在可中断的channel中,当一个线程阻塞与I/O操作时,另一个线程可调用已阻塞线程的interrupte()方法。这样做会导致此channel关闭。已阻塞线程会接收到已抛出的ClosedByInterruptException实例,已阻塞线程重置自身的中断状态。

当阻塞线程被中断时,NIO的设计者选择去关闭channel,因为他们无法找到一种方式在各个操作系统中以相同的方式可靠地处理中断的I/O操作。保证确定行为的唯一方法就是关闭channel。

在上篇中,你已经知道必须调用java.nio.Buffer一个子类的方法去获取buffer,对于channels,有两个方式去获取channel:

  • 包 java.nio.channels提供了一个Channels工具类,这个类提供了两个方法从流中获取channels,对于以下方法,当channel关闭时,基础流也将关闭,并且channel不会被缓存。

    • WritableByteChannel newChannel(OutputStream outputStream) 从给定的outputStream中返回一个可写的字节channel
    • ReadableByteChannel newChannel(InputStream inputStream) 从给定的inputStream中返回一个可读的字节channel
  • 各个经典的I/O类已经改进去支持channel的创建。举例,java.io.RandomAccessFile 声明了FileChannel getChannel()方法返回一个文件channel,java.net.Socket 声明了 SocketChannel getChannel()方法返回一个socket channel。

以下清单使用Channels类去获取标准输入流和输出流的channel,然后使用这些channels去完成从输入channel到输出channel的字节复制。

package com.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;

public class ChannelDemo {
    public static void main(String[] args) {
        ReadableByteChannel src = Channels.newChannel(System.in);
        WritableByteChannel dest = Channels.newChannel(System.out);

        try {
            copy(src, dest);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                src.close();
                dest.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void copy(ReadableByteChannel src, WritableByteChannel dest) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        while (src.read(buffer) != -1) {
            buffer.flip();
            dest.write(buffer);
            buffer.compact();
        }
        buffer.flip();
        while (buffer.hasRemaining()) {
            dest.write(buffer);
        }
    }

    public static void copyAlt(ReadableByteChannel src, WritableByteChannel dest) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        while (src.read(buffer) != -1) {
            buffer.flip();
            while (buffer.hasRemaining()) {
                dest.write(buffer);
            }
            buffer.clear();
        }
    }
}

以上代码清单展示了两种方法从标准输入流复制字节到标准输出流。第一种方法中,简单的示例了copy()方法,目标是最小化操作系统的I/O调用,compact()方法的调用致使更多的数据被复制。第二种方法中,即copyAlt(),目标是消除数据复制,虽然更多的操作系统I/O会被调用。

copy()和copyAlt()方法首先分配了直接字节buffer,然后进入了while循环从数据源channel读取字节直到输入结束(read()返回-1)。继续往下读,buffer被翻转去输出。

  • copy()方法的while循环对write()进行了一次调用。因为write()方法不会完全分散buffer,所以下次读之前调用compact()方法去压缩此buffer。压缩确保未写的buffer内容在下次读操作时不会被覆盖。在while循环之后,copy()翻转buffer准备输出剩下的内容然后使用hasRemaining()方法和write()方法去完全输出此buffer。
  • copyAlt() 方法while循环包含了嵌套while循环以hasRemaining()和write()方法去输出此buffer直到buffer为空。接下来调用clear()方法以清空buffer以便下次调用read()方法时被填满。

编译清单代码: javac ChannelDemo.jara
运行代码如下:
java ChannelDemo
java ChannelDemo

深入理解Channels

上节的Channel接口与其直接派生类讨论让你对Channel有了一些了解。然而,此节还有更多要去探索。此节将通过探索 scatter/gather I/O,文件channel,socket Channel和pipe管道来更加深入理解channels。

Scatter/Gather I/O

Channels提供了一种能力在多个buffers之间执行一个单一的I/O操作。这种能力被称为scatter/gather I/O(也被称为 向量I/O)。

在写操作的情况下,几个buffers的内容按顺序收集(排空),然后通过channel发送到目的地。这些buffers不需要有相同的capacities。在读操作的情况下,一个channel的内容依次分散(填充)到多个buffers中。每个buffer被填充到它的limit直到channel为空或者直到所有buffer的空间被使用完。

注意 现代操作系统提供支持向量 I/O的API去消除(或 至少减少)系统调用或buffer的拷贝,以此提升性能。例如,Win32和Win64的API提供了ReadFileScatter() 和 WriteFileGather()方法来完成这个目的。

Java 提供了java.nio.channels.ScatteringByteChannel接口去支持散列和java.nio.channels.GatheringByteChannel接口去支持收集。

ScatteringByteChannel 提供了以下方法:

  • long read(ByteBuffer[] buffers, int offset, int length)
  • long read(ByteBuffer[] buffers)

GatheringByteChannel 提供了以下方法:

  • long write(ByteBuffer[] buffers, int offset, int length)
  • long write(ByteBuffer[] buffers)

第一个read()方法和第一个write()方法通过传递一个基于零的偏移量来识别要读取/写入的第一个缓冲区,以及通过将值的传递长度来读取/写入的buffer数量。第二个read()方法和第二个write()方法将依次读取/写入所有的buffers。

以下代码清单展示了read(ByteBuffer[] buffers)和write(ByteBuffer[] buffers)方法。

package com.nio;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;

public class ByteChannelDemo {
    public static void main(String[] args) throws IOException {
        ScatteringByteChannel src;
        // 读取classpath下的文件x.dat
        FileInputStream fis = new FileInputStream(ByteChannelDemo.class.getClassLoader().getResource("x.dat").getFile());
        src = (ScatteringByteChannel) Channels.newChannel(fis);
        ByteBuffer buffer1 = ByteBuffer.allocate(5);
        ByteBuffer buffer2 = ByteBuffer.allocate(3);
        ByteBuffer[] buffers = {buffer1, buffer2};
        src.read(buffers);
        buffer1.flip();
        while (buffer1.hasRemaining()) {
            System.out.println(buffer1.get());
        }
        System.out.println();
        buffer2.flip();
        while (buffer2.hasRemaining()) {
            System.out.println(buffer2.get());
        }
        buffer1.rewind();
        buffer2.rewind();

        GatheringByteChannel dest;
        FileOutputStream fos = new FileOutputStream("y.dat");
        dest = (GatheringByteChannel) Channels.newChannel(fos);
        buffers[0] = buffer2;
        buffers[1] = buffer1;
        dest.write(buffer1);
    }
}

以上清单中main()方法首先通过初始化java.io.FileInputStream对象获得了一个收集字节channel然后传递这个实例到Channels类的ReadableByteChannel newChannel(InputStream inputStream) 方法中。返回的ReadableByteChannel 实例被强制转换为ScatteringByteChannel 因为这个实例实际上是一个实现ScatteringByteChannel接口的文件channel(稍后讨论)。

下一步,main方法创建了一对直接字节buffers;第一个buffer的capacity是5个字节,第二个buffer的capacity是3个字节。这些buffers按顺序存储到一个数组中,这个数组被传递到read(ByteBuffer[]) 方法去填满他们。

在填满这些buffers之后,main方法翻转了他们以便可以输出他们的内容到标准输出流中。这些内容被输出之后,buffers被倒带以准备通过聚集操作排空。

main()方法现在通过实例化java.io.FileOutputStream去获得一个聚集字节channel,然后传递这个实例到Channels 类方法WritableByteChannel newChannel(OutputStream outputStream) 中。返回的WritableByteChannel实例被强转为GatheringByteChannel,因为这个实例实际上是一个实现了GatheringByteChannel接口的文件channel(稍后讨论)。

最终,main()方法将这些buffers按照与最初分配方式相反的顺序分配给buffers 数组,然后通过这个数组的 write(ByteBuffer[]) 方法将其排空。

创建一个命名为x.dat的文件然后将一下内容存在这个文件中:

1234abcdefg

现在编译以上代码(javac ByteChannelDemo.java)和运行程序(java ByteChannelDemo),你应该观察到这8个字符的Unicode值。

49
50
51
52
53

97
98
99

另外,你应该看到一个新创建的y.dat文件以及其内容为:abc12345

File Channels

我之前提到了RandomAccessFile类声明了一个 FileChannel getChannel()方法来返回一个file channel实例,这个实例描述了一个文件的打开连接。也证明了FileInputStream 和FileOutputStream 也提供了同样的方法。相反,java.io.FileReader 和java.io.FileWriter没有提供一个方法去获取file channel。

警告 FileInputStream 的 getChannel() 方法返回的file channel是只读的,FileOutputStream 的 getChannel() 方法返回的file channel是只写的。尝试在只读的file channel中写操作或者在只写的file channel中读操作都会抛出异常。

java.nio.channels.FileChannel 抽象类 描述了一个file channel。因为这个类实现了接口InterruptibleChannel 所以 file channel 是可中断的。这个类也实现了接口ByteChannel,GatheringByteChannel和ScatteringByteChannel,所以你可以在文件上执行写入,读取和分散/剧集等操作。然而,这还有更多的操作。

注意 与线程不安全的buffers不一样,file channel则是线程安全的。

file channel 将当前位置保存在文件中,这个位置是FileChannel让你获取和修改的。也是让你请求将缓存数据被强制写入到磁盘,读取/写入文件内容,获取channel底层文件的大小,截断文件,尝试锁住整个文件或文件的某个区域,执行内存映射的文件I/O操作,并以操作系统优化的方式将数据直接转移到另一个channel中。

相关标签: java nio channel