欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

JavaNIO基础

程序员文章站 2022-06-02 23:53:53
...

NIO主要包含两部分,Selector和Channel、Buffer。

为什么需要需要NIO

基本的Java套接字对于小规模系统可以很好地运行,但当涉及同时处理上千个客户端的服务器时,可能就会产生一些问题。由于创建、维护、切换线程需要的系统开销,一客户一线程的方式在系统扩展性方面受到了限制。使用线程池可以节省那种系统开销,同时允许实现者利用并行硬件的优势。

但是对于连接生存期比较长的协议来说,线程池的大小仍然限制了系统同时可以处理的客户端数量。

另外对于服务器需要由不同客户端同时访问和修改的信息时,对于多线程就得进行同步,这会变得更加复杂,即使用同步机制将增加更多的系统调度和上下文切换开销,而程序员对这些开销又无法控制。

由于多线程的同步的复杂性,一些程序员宁愿继续使用单线程方法,这类服务器只用一个线程来处理所有客户端——不是顺序处理,而是一次全部处理。这种服务器不能为任何客户端提供I/O操作的阻塞等待,而必须排他地使用非阻塞方式(nonblocking)I/O。

前面的while true,不断地轮询(poll)accept方法,这种忙等(busy waiting)方法会引入系统开销,因为程序需要反复循环地连接I/O源,却又发现什么都不用做。

我们需要一种方法来一次轮询一组客户端,以查找那个客户端需要服务,这正是NIO要介绍的Selector和Channel的抽象关键点。

一个Channel实例代表了一个可轮询(pollable)的I/O目标,如套接字(或一个文件、设备等)。Channel能够注册一个Selector类的实例。

Selector的select方法允许你询问在一组信道中,哪一个当前需要服务(被接受、读、写)。

Stream的抽象,好处是隐藏了底层缓冲区的有限性,提供了一个能够容纳任意长度数据的容器的假象。要实现这样一个假象,要么会产生大量的内存开销,要么会引入大量的上下文切换,不好控制。

使用Buffer抽象的原因是:Buffer抽象代表了一个有限容量(finite-capacity)的数据容器——其本质是一个数组,由指针指示了在哪存放数据和从哪读取数据。使用Buffer的好处是:
1)与读写缓冲区数据相关联的系统开销都暴露给了程序员。例如,如果想要向缓冲区存入数据,但是又没有足够的空间时,就必须采取一些措施来获得空间(即移出一些数据,或移开已经在那个位置的数据来获得空间,或者创建一个新的新的实例)。这意味着需要额外的工作,但是你可以控制它什么时候发生,如何发生,以及是否发生。
2)一些对Java对象的特殊Buffer映射操作能够直接操作底层平台的资源(例如操作系统的缓冲区),这些操作节省了在不同地址空间中复制数据的开销。

综上,Channel实例代表了一个与设备的连接,通过它可以进行输入输出操作。信道(channel)和套接字(socket)的不同之处在于:channel通常需要调用静态工厂方法来获取实例。channel使用的不是流,而是使用缓冲区来发送或读取数据。

Buffer有固定的、有限的容量,并由内部状态记录了有多少数据放入或取出,就像是一个有限容量的队列一样。

Selector

NIO的强大功能部分来自于channel的非阻塞特性。accept可能因为等待一个客户端连接而阻塞,read可能因为没有数据可读而阻塞,直到连接的另一端传来新数据。

总的来说,创建/接收连接或读写数据等I/O调用,都可能无限期地阻塞等待,直到底层的网络实现发生了什么。慢速的、有损耗的网络,或仅仅是简单的网络故障都可能导致任意时间的延迟。

而NIO则立即返回:

public class TCPEchoClientNonblocking {
    public static void main(String args[]) throws Exception {
        if ((args.length < 2) || (args.length > 3)) // Test for correct # of args
            throw new IllegalArgumentException("Parameter(s): <Server> <Word> [<Port>]");
        String server = args[0]; // Server name or IP address
        // Convert input String to bytes using the default charset
        byte[] argument = args[1].getBytes();
        int servPort = (args.length == 3) ? Integer.parseInt(args[2]) : 7;
        // Create channel and set to nonblocking
        SocketChannel clntChan = SocketChannel.open();
        clntChan.configureBlocking(false);
        // Initiate connection to server and repeatedly poll until complete
        if (!clntChan.connect(new InetSocketAddress(server, servPort))) {
            while (!clntChan.finishConnect()) {
                System.out.print(".");  // Do something else
            }
        }
        ByteBuffer writeBuf = ByteBuffer.wrap(argument);
        ByteBuffer readBuf = ByteBuffer.allocate(argument.length);
        int totalBytesRcvd = 0; // Total bytes received so far
        int bytesRcvd; // Bytes received in last read
        while (totalBytesRcvd < argument.length) {
            if (writeBuf.hasRemaining()) {
                clntChan.write(writeBuf);
            }
            if ((bytesRcvd = clntChan.read(readBuf)) == -1) {
                throw new SocketException("Connection closed prematurely");
            }
            totalBytesRcvd += bytesRcvd;
            System.out.print(".");   // Do something else
        }
        System.out.println("Received: " +  // convert to String per default charset
                new String(readBuf.array(), 0, totalBytesRcvd));
        clntChan.close();
    }
}

上面的轮询仅仅是演示用。

需要使用Selector类来避免忙等的轮询。考虑一个即时的消息服务器,可能有上千个客户端同时连接到了服务器,但任何时刻都只有非常少量的消息需要读取和分发。这就需要一种方法阻塞等待,直到至少有一个信道可以进行I/O操作,并指出是哪个信道。NIO的选择器就实现了这样的功能。一个Selector实例可以同时检查一组信道的I/O状态。用专业术语来说,选择器就是一个多路开关选择器,因为一个选择器能够管理多个信道上的I/O操作。

要使用选择器,需要创建一个Selector实例并将其注册到想要监控的信道上(注意,这要通过channel的方法实现,而不是使用selector的方法)。最后,调用选择器的select方法,该方法会阻塞等待,直到还有一个或更多的信道准备好了I/O操作或等待超时。select方法返回可进行I/O操作的信道数量。

public class TCPServerSelector {
    private static final int BUFSIZE = 256;  // Buffer size (bytes)
    private static final int TIMEOUT = 3000; // Wait timeout (milliseconds)
    public static void main(String[] args) throws IOException {
        if (args.length < 1) { // Test for correct # of args
            throw new IllegalArgumentException("Parameter(s): <Port> ...");
        }
        // Create a selector to multiplex listening sockets and connections
        Selector selector = Selector.open();
        // Create listening socket channel for each port and register selector
        for (String arg : args) {
            ServerSocketChannel listnChannel = ServerSocketChannel.open();
            listnChannel.socket().bind(new InetSocketAddress(Integer.parseInt(arg)));
            listnChannel.configureBlocking(false); // must be nonblocking to register
            // Register selector with channel. The returned key is ignored
            listnChannel.register(selector, SelectionKey.OP_ACCEPT);
        }
        // Create a handler that will implement the protocol
        TCPProtocol protocol = new EchoSelectorProtocol(BUFSIZE);
        while (true) { // Run forever, processing available I/O operations
            // Wait for some channel to be ready (or timeout)
            if (selector.select(TIMEOUT) == 0) { // returns # of ready chans
                System.out.print(".");
                continue;
            }
            // Get iterator on set of keys with I/O to process
            Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
            while (keyIter.hasNext()) {
                SelectionKey key = keyIter.next(); // Key is bit mask
                // Server socket channel has pending connection requests?
                if (key.isAcceptable()) {
                    protocol.handleAccept(key);
                }
                // Client socket channel has pending data?
                if (key.isReadable()) {
                    protocol.handleRead(key);
                }
                // Client socket channel is available for writing and
                // key is valid (i.e., channel not closed)?
                if (key.isValid() && key.isWritable()) {
                    protocol.handleWrite(key);
                }
                keyIter.remove(); // remove from set of selected keys
            }
        }
    }
}

由于select方法只是向selector所关联的键集合中添加元素,因此,如果不移除每个处理过的键,它就会在下次调用select方法时仍然保留在集合中,而且可能会有无用的操作来调用它。

具体的处理方法

public class EchoSelectorProtocol implements TCPProtocol {
    private int bufSize; // Size of I/O buffer
    public EchoSelectorProtocol(int bufSize) {
        this.bufSize = bufSize;
    }
    public void handleAccept(SelectionKey key) throws IOException {
        SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
        clntChan.configureBlocking(false); // Must be nonblocking to register
        // Register the selector with new channel for read and attach byte buffer
        clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer
                .allocate(bufSize));
    }
    public void handleRead(SelectionKey key) throws IOException {
        // Client socket channel has pending data
        SocketChannel clntChan = (SocketChannel) key.channel();
        ByteBuffer buf = (ByteBuffer) key.attachment();
        long bytesRead = clntChan.read(buf);
        if (bytesRead == -1) { // Did the other end close?
            clntChan.close();
        } else if (bytesRead > 0) {
            // Indicate via key that reading/writing are both of interest now.
            key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        }
    }
    public void handleWrite(SelectionKey key) throws IOException {
    /*
     * Channel is available for writing, and key is valid (i.e., client channel
     * not closed).
     */
        // Retrieve data read earlier
        ByteBuffer buf = (ByteBuffer) key.attachment();
        buf.flip(); // Prepare buffer for writing
        SocketChannel clntChan = (SocketChannel) key.channel();
        clntChan.write(buf);
        if (!buf.hasRemaining()) { // Buffer completely written?
            // Nothing left, so no longer interested in writes
            key.interestOps(SelectionKey.OP_READ);
        }
        buf.compact(); // Make room for more data to be read in
    }
}