欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

网络IO之NIO

程序员文章站 2022-06-28 17:54:48
网络IO之NIONIO库是在JDK1.4中才引入,弥补了原来的I/O(BIO)的不足,它是一个高速的、面向块的I/O。NIO有两层含义:在java层面:nio称为new io,是一套全新的操作io的api。在OS层面:nio称为no-blocking io,系统调用socket函数可以设置一个NONBLOCK的参数,实现调用方法时不会阻塞线程。与BIO的主要区别面向流与面向缓冲BIO是面向流的,面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能...

网络IO之NIO

NIO库是在JDK1.4中才引入,弥补了原来的I/O(BIO)的不足,它是一个高速的、面向块的I/O。

NIO有两层含义:

  1. 在java层面:nio称为new io,是一套全新的操作io的api。
  2. 在OS层面:nio称为no-blocking io,系统调用socket函数可以设置一个NONBLOCK的参数,实现调用方法时不会阻塞线程。

与BIO的主要区别

面向流与面向缓冲

BIO是面向流的,面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据,如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。

NIO是面向缓冲区(Buffer)的,数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。

阻塞与非阻塞IO

NIO是阻塞的,这意味着当一个线程调用read()或write()时,该线程会被阻塞,直到有一些数据可以读取读取,或数据完全写入,该线程在此期间不能再干任何事情了。

NIO是一种非阻塞模式,当一个线程调用read()时,如果没有数据可以读取,就直接返回,不会阻塞,在数据变得可以读之前,该线程可以去处理其他事情。非阻塞写也是如此,一个线程请求写入一些数据到某通道(Channel),但不需要等待它完全写入,这个线程同时可以去做别的事情。线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程可以管理多个输入和输出通道(Channel)。

NIO三大核心组件

NIO是一种同步非阻塞的IO模型。同步是指线程不断轮询IO事件是否就绪,非阻塞是指线程在等待IO的时候,可以同时做其他任务。

NIO有三大核心组件:Selector选择器、Channel管道、Buffer缓冲区。

Selector

Selector(选择器):选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器(Selectors),然后使用一个单独的线程来操作这个选择器,进而“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。应用程序将向Selector对象注册需要它关注的Channel,以及具体的某一个Channel会对哪些IO事件感兴趣。Selector中也会维护一个“已经注册的 Channel”的容器。

SelectionKey

SelectionKey是一个抽象类,表示selectableChannel在Selector中注册的标识。每个Channel向Selector注册时,都将会创建一个SelectionKey。SelectionKey将Channel与Selector建立了关系,并维护了channel事件。

可以通过cancel方法取消键,取消的键不会立即从selector中移除,而是添加到cancelledKeys中,在下一次select操作时移除它。所以在调用某个key时,需要使用isValid进行校验。

SelectionKey类型和就绪条件

SelectionKey有如下四种类型,定义在java.nio.channels.SelectionKey中,分别如下表:

操作类型 就绪条件及说明
OP_READ 当操作系统读缓冲区有数据可读时就绪。并非时刻都有数据可读,所以一般需要注册该操作,仅当有就绪时才发起读操作,有的放矢,避免浪费CPU。
OP_WRITE 当操作系统写缓冲区有空闲空间时就绪。一般情况下写缓冲区都有空闲空间,小块数据直接写入即可,没必要注册该操作类型,否则该条件不断就绪浪费CPU;但如果是写密集型的任务,比如文件下载等,缓冲区很可能满,注册该操作类型就很有必要,同时注意写完后取消注册。
OP_CONNECT 当SocketChannel.connect()请求连接成功后就绪。该操作只给客户端使用。
OP_ACCEPT 当接收到一个客户端连接请求时就绪。该操作只给服务器使用。

服务端和客户端分别感兴趣的类型

ServerSocketChannel和SocketChannel可以注册自己感兴趣的操作类型,当对应操作类型的就绪条件满足时OS会通知channel,下表描述各种Channel允许注册的操作类型,Y表示允许注册,N表示不允许注册,其中服务器SocketChannel指由服务器ServerSocketChannel.accept()返回的对象。

OP_READ OP_WRITE OP_CONNECT OP_ACCEPT
服务器ServerSocketChannel Y
服务器SocketChannel Y Y
客户端SocketChannel Y Y Y
  • 服务器启动ServerSocketChannel,关注OP_ACCEPT事件。
  • 客户端启动SocketChannel,连接服务器,关注OP_CONNECT事件。
  • 服务器接受连接,启动一个服务器的SocketChannel,这个SocketChannel可以关注OP_READ、OP_WRITE事件,一般连接建立后会直接关注OP_READ事件。
  • 客户端这边的客户端SocketChannel发现连接建立后,可以关注OP_READ、OP_WRITE事件,一般是需要客户端需要发送数据了才关注OP_READ事件。
  • 连接建立后客户端与服务器端开始相互发送消息(读写),根据实际情况来关注OP_READ、OP_WRITE事件。

Channel

Channel(通道):被建立的一个应用程序和操作系统交互事件、传递内容的渠道(注意是连接到操作系统)。那么既然是和操作系统进行内容的传递,那么说明应用程序可以通过通道读取数据,也可以通过通道向操作系统写数据,而且可以同时进行读写。

  • 所有被Selector(选择器)注册的通道,只能是继承了SelectableChannel类的子类。
  • ServerSocketChannel:应用服务器程序的监听通道。只有通过这个通道,应用程序才能向操作系统注册支持“多路复用 IO”的端口监听,同时支持UDP协议和TCP协议。
  • ScoketChannel:TCP Socket套接字的监听通道,一个Socket套接字对应了一个客户端IP+端口到服务器IP+端口的通信连接。
  • 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。

Buffer缓冲区

Buffer其实就是一个数组,在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的; 在写入数据时,它也是写入到缓冲区中的;任何时候访问 NIO 中的数据,都是将它放到缓冲区中。而在面向流I/O系统中,所有数据都是直接写入或者直接将数据读取到Stream对象中。

所有的缓冲区类型都继承于抽象类Buffer,最常用的就是ByteBuffer。

NIO的使用

服务器端代码:

package com.morris.nio.use;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {

    public static final int PORT = 8899;

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false); // 设置为非阻塞方式
        serverSocketChannel.bind(new InetSocketAddress(PORT));
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 注册接收连接的事件

        System.out.println("server is start on port: " + PORT);

        while (true) {
            selector.select(1000);
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();

            while (selectionKeyIterator.hasNext()) {
                SelectionKey key = selectionKeyIterator.next();
                selectionKeyIterator.remove();

                if(key.isValid()) {
                    if(key.isAcceptable()){
                        ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
                        SocketChannel sc = ssChannel.accept();
                        sc.configureBlocking(false);
                        sc.register(key.selector(), SelectionKey.OP_READ);
                    }
                    if(key.isReadable()){
                        SocketChannel sc = (SocketChannel)key.channel();
                        ByteBuffer buf = ByteBuffer.allocate(1024);
                        int bytesRead = sc.read(buf);
                        if(bytesRead > 0){
                            buf.flip();

                            byte[] bytes = new byte[buf.remaining()];
                            buf.get(bytes);

                            String body = new String(bytes);

                            System.out.println("receive from client: " + body);

                            String response = "hello client";

                            bytes = response.getBytes();

                            buf = ByteBuffer.allocate(bytes.length);
                            buf.put(bytes);
                            buf.flip();
                            sc.write(buf);
                            key.cancel();
                            sc.close();
                        }
                    }
                }
            }
        }
    }
}

客户端代码:

package com.morris.nio.use;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Client {

    public static int PORT = 8899;

    private static boolean stop = false;

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();
        SocketChannel serverSocketChannel = SocketChannel.open();

        serverSocketChannel.configureBlocking(false); // 设置为非阻塞方式

        boolean connect = serverSocketChannel.connect(new InetSocketAddress("127.0.0.1", PORT));

        if (connect) { // 连接建立成功,注册监听读的事件
            serverSocketChannel.register(selector, SelectionKey.OP_READ);
        } else {
            // 连接还没建立成功,注册监听建立连接的事件
            serverSocketChannel.register(selector, SelectionKey.OP_CONNECT);
        }

        while (!stop) {
            selector.select(1000);
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();
            while (selectionKeyIterator.hasNext()) {
                SelectionKey key = selectionKeyIterator.next();

                selectionKeyIterator.remove(); // 移除key

                if (key.isValid()) {
                    SocketChannel sc = (SocketChannel) key.channel();

                    if (key.isConnectable()) {

                        if (sc.finishConnect()) {
                            sc.register(selector, SelectionKey.OP_READ);

                            String response = "hello server";
                            byte[] bytes = response.getBytes();
                            ByteBuffer buf = ByteBuffer.allocate(bytes.length);
                            buf.put(bytes);
                            buf.flip();
                            sc.write(buf);
                        }

                    }

                    if (key.isReadable()) {
                        ByteBuffer buf = ByteBuffer.allocate(1024);
                        int bytesRead = sc.read(buf);
                        if (bytesRead > 0) {
                            buf.flip();

                            byte[] bytes = new byte[buf.remaining()];
                            buf.get(bytes);
                            String body = new String(bytes);
                            System.out.println("receive from server: " + body);

                            key.cancel();
                            sc.close();
                            stop = true;
                        }
                    }
                }
            }
        }
    }
}

key为什么要移除?如果不移除,下次调用selector.select还会把这个事件选择出来,这个移除只是移除本次监听到的事件,而不是不再监听这个事件了。

key为什么会失效?SelectionKey.isValid()方法检测此key是否有效。当key被取消(cancel),或者通道被关闭,或者selector被关闭,都将导致此key无效。SelectionKey.cancel()方法请求将此键取消注册,一旦返回成功,那么该键就是无效的,被添加到selector的cancelledKeys中。cancel操作将key的valid属性置为false,并执行selector.cancel(key)(即将key加入cancelledkey集合),在下一次select操作时移除它。所以在调用某个key时,需要使用isValid进行校验。

为什么不要监听写事件?先来看下面的例子:

package com.morris.nio.use;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class WriteableServer {

    public static final int PORT = 8899;

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false); // 设置为非阻塞方式
        serverSocketChannel.bind(new InetSocketAddress(PORT));
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 注册接收连接的事件

        System.out.println("server is start on port: " + PORT);

        while (true) {
            selector.select(1000);
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();

            while (selectionKeyIterator.hasNext()) {
                SelectionKey key = selectionKeyIterator.next();
                System.out.println("当前通道的事件:" + key.interestOps());

                selectionKeyIterator.remove();

                if (key.isValid()) {
                    if (key.isAcceptable()) {
                        ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
                        SocketChannel sc = ssChannel.accept();
                        sc.configureBlocking(false);
                        sc.register(key.selector(), SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        SocketChannel sc = (SocketChannel) key.channel();
                        ByteBuffer buf = ByteBuffer.allocate(1024);
                        int bytesRead = sc.read(buf);
                        if (bytesRead > 0) {
                            buf.flip();

                            byte[] bytes = new byte[buf.remaining()];
                            buf.get(bytes);

                            String body = new String(bytes);

                            System.out.println("receive from client: " + body);

                            String response = "hello client";

                            bytes = response.getBytes();

                            buf = ByteBuffer.allocate(bytes.length);
                            buf.put(bytes);
                            buf.flip();

                            // 这里不直接写,而是监听写事件,还得监听读时间
                            sc.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, buf);
                        } else if (bytesRead < 0) {
                            key.cancel();
                            sc.close();
                        }
                    } else if (key.isWritable()) {
                        System.out.println("write");
                        SocketChannel sc = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
                        sc.write(byteBuffer);
                    }
                }
            }
        }
    }
}

这段代码一运行,会监听到多次写事件,写操作的就绪条件是内核发送缓冲区有空的情况,OP_WRITE写操作是在内核发送缓冲区满且有数据可写的情况下使用,并且在写事件触发后需要立刻清除写操作注册,否则可能到只无限循环。

所以要修改为下面的代码:

} else if (key.isWritable()) {
    System.out.println("write");
    SocketChannel sc = (SocketChannel) key.channel();
    ByteBuffer byteBuffer = (ByteBuffer) key.attachment();

    if (byteBuffer.hasRemaining()) {
        sc.write(byteBuffer);
    } else {
        // 取消对写事件的注册
        key.interestOps(SelectionKey.OP_READ);
    }
}

本文地址:https://blog.csdn.net/u022812849/article/details/109579503