探索Java I/O 模型的演进
相关概念
同步和异步
描述的是用户线程与内核的交互方式:
- 同步是指用户线程发起 i/o 请求后需要等待或者轮询内核 i/o 操作完成后才能继续执行;
- 异步是指用户线程发起 i/o 请求后仍继续执行,当内核 i/o 操作完成后会通知用户线程,或者调用用户线程注册的回调函数。
阻塞和非阻塞
描述的是用户线程调用内核 i/o 操作的方式:
- 阻塞是指 i/o 操作需要彻底完成后才返回到用户空间;
- 非阻塞是指 i/o 操作被调用后立即返回给用户一个状态值,无需等到 i/o 操作彻底完成。
一个 i/o 操作其实分成了两个步骤:发起 i/o 请求和实际的 i/o 操作。 阻塞 i/o 和非阻塞 i/o 的区别在于第一步,发起 i/o 请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞 i/o ,如果不阻塞,那么就是非阻塞 i/o 。 同步 i/o 和异步 i/o 的区别就在于第二个步骤是否阻塞,如果实际的 i/o 读写阻塞请求进程,那么就是同步 i/o 。
unix i/o 模型
unix 下共有五种 i/o 模型:
- 阻塞 i/o
- 非阻塞 i/o
- i/o 复用(select 和 poll)
- 信号驱动 i/o(sigio)
- 异步 i/o(posix 的 aio_系列函数)
阻塞 i/o
请求无法立即完成则保持阻塞。
阶段1:等待数据就绪。网络 i/o 的情况就是等待远端数据陆续抵达;磁盘i/o的情况就是等待磁盘数据从磁盘上读取到内核态内存中。
阶段2:数据从内核拷贝到进程。出于系统安全,用户态的程序没有权限直接读取内核态内存,因此内核负责把内核态内存中的数据拷贝一份到用户态内存中。
非阻塞 i/o
- socket 设置为 nonblock(非阻塞)就是告诉内核,当所请求的 i/o 操作无法完成时,不要将进程睡眠,而是返回一个错误码(ewouldblock) ,这样请求就不会阻塞
- i/o 操作函数将不断的测试数据是否已经准备好,如果没有准备好,继续测试,直到数据准备好为止。整个 i/o 请求的过程中,虽然用户线程每次发起 i/o 请求后可以立即返回,但是为了等到数据,仍需要不断地轮询、重复请求,消耗了大量的 cpu 的资源
- 数据准备好了,从内核拷贝到用户空间。
一般很少直接使用这种模型,而是在其他 i/o 模型中使用非阻塞 i/o 这一特性。这种方式对单个 i/o 请求意义不大,但给 i/o 多路复用铺平了道路.
i/o 复用(异步阻塞 i/o)
i/o 多路复用会用到 select 或者 poll 函数,这两个函数也会使进程阻塞,但是和阻塞 i/o 所不同的的,这两个函数可以同时阻塞多个 i/o 操作。而且可以同时对多个读操作,多个写操作的 i/o 函数进行检测,直到有数据可读或可写时,才真正调用 i/o 操作函数。
从流程上来看,使用 select 函数进行 i/o 请求和同步阻塞模型没有太大的区别,甚至还多了添加监视 socket,以及调用 select 函数的额外操作,效率更差。但是,使用 select 以后最大的优势是用户可以在一个线程内同时处理多个 socket 的 i/o 请求。用户可以注册多个 socket,然后不断地调用 select 读取被激活的 socket,即可达到在同一个线程内同时处理多个 i/o 请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。
i/o 多路复用模型使用了 reactor 设计模式实现了这一机制。
调用 select / poll 该方法由一个用户态线程负责轮询多个 socket,直到某个阶段1的数据就绪,再通知实际的用户线程执行阶段2的拷贝。 通过一个专职的用户态线程执行非阻塞i/o轮询,模拟实现了阶段一的异步化
信号驱动 i/o(sigio)
首先我们允许 socket 进行信号驱动 i/o,并安装一个信号处理函数,进程继续运行并不阻塞。当数据准备好时,进程会收到一个 sigio 信号,可以在信号处理函数中调用 i/o 操作函数处理数据。
异步 i/o
调用 aio_read 函数,告诉内核描述字,缓冲区指针,缓冲区大小,文件偏移以及通知的方式,然后立即返回。当内核将数据拷贝到缓冲区后,再通知应用程序。
异步 i/o 模型使用了 proactor 设计模式实现了这一机制。
告知内核,当整个过程(包括阶段1和阶段2)全部完成时,通知应用程序来读数据.
几种 i/o 模型的比较
前四种模型的区别是阶段1不相同,阶段2基本相同,都是将数据从内核拷贝到调用者的缓冲区。而异步 i/o 的两个阶段都不同于前四个模型。
同步 i/o 操作引起请求进程阻塞,直到 i/o 操作完成。异步 i/o 操作不引起请求进程阻塞。
常见 java i/o 模型
在了解了 unix 的 i/o 模型之后,其实 java 的 i/o 模型也是类似。
“阻塞i/o”模式
在上一节 socket 章节中的 echoserver 就是一个简单的阻塞 i/o 例子,服务器启动后,等待客户端连接。在客户端连接服务器后,服务器就阻塞读写取数据流。
echoserver 代码:
public class echoserver { public static int default_port = 7; public static void main(string[] args) throws ioexception { int port; try { port = integer.parseint(args[0]); } catch (runtimeexception ex) { port = default_port; } try ( serversocket serversocket = new serversocket(port); socket clientsocket = serversocket.accept(); printwriter out = new printwriter(clientsocket.getoutputstream(), true); bufferedreader in = new bufferedreader( new inputstreamreader(clientsocket.getinputstream())); ) { string inputline; while ((inputline = in.readline()) != null) { out.println(inputline); } } catch (ioexception e) { system.out.println("exception caught when trying to listen on port " + port + " or listening for a connection"); system.out.println(e.getmessage()); } } }
改进为“阻塞i/o+多线程”模式
使用多线程来支持多个客户端来访问服务器。
主线程 multithreadechoserver.java
public class multithreadechoserver { public static int default_port = 7; public static void main(string[] args) throws ioexception { int port; try { port = integer.parseint(args[0]); } catch (runtimeexception ex) { port = default_port; } socket clientsocket = null; try (serversocket serversocket = new serversocket(port);) { while (true) { clientsocket = serversocket.accept(); // multithread new thread(new echoserverhandler(clientsocket)).start(); } } catch (ioexception e) { system.out.println( "exception caught when trying to listen on port " + port + " or listening for a connection"); system.out.println(e.getmessage()); } } }
处理器类 echoserverhandler.java
public class echoserverhandler implements runnable { private socket clientsocket; public echoserverhandler(socket clientsocket) { this.clientsocket = clientsocket; } @override public void run() { try (printwriter out = new printwriter(clientsocket.getoutputstream(), true); bufferedreader in = new bufferedreader(new inputstreamreader(clientsocket.getinputstream()));) { string inputline; while ((inputline = in.readline()) != null) { out.println(inputline); } } catch (ioexception e) { system.out.println(e.getmessage()); } } }
存在问题:每次接收到新的连接都要新建一个线程,处理完成后销毁线程,代价大。当有大量地短连接出现时,性能比较低。
改进为“阻塞i/o+线程池”模式
针对上面多线程的模型中,出现的线程重复创建、销毁带来的开销,可以采用线程池来优化。每次接收到新连接后从池中取一个空闲线程进行处理,处理完成后再放回池中,重用线程避免了频率地创建和销毁线程带来的开销。
主线程 threadpoolechoserver.java
public class threadpoolechoserver { public static int default_port = 7; public static void main(string[] args) throws ioexception { int port; try { port = integer.parseint(args[0]); } catch (runtimeexception ex) { port = default_port; } executorservice threadpool = executors.newfixedthreadpool(5); socket clientsocket = null; try (serversocket serversocket = new serversocket(port);) { while (true) { clientsocket = serversocket.accept(); // thread pool threadpool.submit(new thread(new echoserverhandler(clientsocket))); } } catch (ioexception e) { system.out.println( "exception caught when trying to listen on port " + port + " or listening for a connection"); system.out.println(e.getmessage()); } } }
存在问题:在大量短连接的场景中性能会有提升,因为不用每次都创建和销毁线程,而是重用连接池中的线程。但在大量长连接的场景中,因为线程被连接长期占用,不需要频繁地创建和销毁线程,因而没有什么优势。
虽然这种方法可以适用于小到中度规模的客户端的并发数,如果连接数超过 100,000或更多,那么性能将很不理想。
改进为“非阻塞i/o”模式
“阻塞i/o+线程池”网络模型虽然比”阻塞i/o+多线程”网络模型在性能方面有提升,但这两种模型都存在一个共同的问题:读和写操作都是同步阻塞的,面对大并发(持续大量连接同时请求)的场景,需要消耗大量的线程来维持连接。cpu 在大量的线程之间频繁切换,性能损耗很大。一旦单机的连接超过1万,甚至达到几万的时候,服务器的性能会急剧下降。
而 nio 的 selector 却很好地解决了这个问题,用主线程(一个线程或者是 cpu 个数的线程)保持住所有的连接,管理和读取客户端连接的数据,将读取的数据交给后面的线程池处理,线程池处理完业务逻辑后,将结果交给主线程发送响应给客户端,少量的线程就可以处理大量连接的请求。
java nio 由以下几个核心部分组成:
- channel
- buffer
- selector
要使用 selector,得向 selector 注册 channel,然后调用它的 select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等。
主线程 nonblokingechoserver.java
public class nonblokingechoserver { public static int default_port = 7; public static void main(string[] args) throws ioexception { int port; try { port = integer.parseint(args[0]); } catch (runtimeexception ex) { port = default_port; } system.out.println("listening for connections on port " + port); serversocketchannel serverchannel; selector selector; try { serverchannel = serversocketchannel.open(); inetsocketaddress address = new inetsocketaddress(port); serverchannel.bind(address); serverchannel.configureblocking(false); selector = selector.open(); serverchannel.register(selector, selectionkey.op_accept); } catch (ioexception ex) { ex.printstacktrace(); return; } while (true) { try { selector.select(); } catch (ioexception ex) { ex.printstacktrace(); break; } set<selectionkey> readykeys = selector.selectedkeys(); iterator<selectionkey> iterator = readykeys.iterator(); while (iterator.hasnext()) { selectionkey key = iterator.next(); iterator.remove(); try { if (key.isacceptable()) { serversocketchannel server = (serversocketchannel) key.channel(); socketchannel client = server.accept(); system.out.println("accepted connection from " + client); client.configureblocking(false); selectionkey clientkey = client.register(selector, selectionkey.op_write | selectionkey.op_read); bytebuffer buffer = bytebuffer.allocate(100); clientkey.attach(buffer); } if (key.isreadable()) { socketchannel client = (socketchannel) key.channel(); bytebuffer output = (bytebuffer) key.attachment(); client.read(output); } if (key.iswritable()) { socketchannel client = (socketchannel) key.channel(); bytebuffer output = (bytebuffer) key.attachment(); output.flip(); client.write(output); output.compact(); } } catch (ioexception ex) { key.cancel(); try { key.channel().close(); } catch (ioexception cex) { } } } } } }
改进为“异步i/o”模式
java se 7 版本之后,引入了异步 i/o (nio.2) 的支持,为构建高性能的网络应用提供了一个利器。
主线程 asyncechoserver.java
public class asyncechoserver { public static int default_port = 7; public static void main(string[] args) throws ioexception { int port; try { port = integer.parseint(args[0]); } catch (runtimeexception ex) { port = default_port; } executorservice taskexecutor = executors.newcachedthreadpool(executors.defaultthreadfactory()); // create asynchronous server socket channel bound to the default group try (asynchronousserversocketchannel asynchronousserversocketchannel = asynchronousserversocketchannel.open()) { if (asynchronousserversocketchannel.isopen()) { // set some options asynchronousserversocketchannel.setoption(standardsocketoptions.so_rcvbuf, 4 * 1024); asynchronousserversocketchannel.setoption(standardsocketoptions.so_reuseaddr, true); // bind the server socket channel to local address asynchronousserversocketchannel.bind(new inetsocketaddress(port)); // display a waiting message while ... waiting clients system.out.println("waiting for connections ..."); while (true) { future<asynchronoussocketchannel> asynchronoussocketchannelfuture = asynchronousserversocketchannel .accept(); try { final asynchronoussocketchannel asynchronoussocketchannel = asynchronoussocketchannelfuture .get(); callable<string> worker = new callable<string>() { @override public string call() throws exception { string host = asynchronoussocketchannel.getremoteaddress().tostring(); system.out.println("incoming connection from: " + host); final bytebuffer buffer = bytebuffer.allocatedirect(1024); // transmitting data while (asynchronoussocketchannel.read(buffer).get() != -1) { buffer.flip(); asynchronoussocketchannel.write(buffer).get(); if (buffer.hasremaining()) { buffer.compact(); } else { buffer.clear(); } } asynchronoussocketchannel.close(); system.out.println(host + " was successfully served!"); return host; } }; taskexecutor.submit(worker); } catch (interruptedexception | executionexception ex) { system.err.println(ex); system.err.println("\n server is shutting down ..."); // this will make the executor accept no new threads // and finish all existing threads in the queue taskexecutor.shutdown(); // wait until all threads are finished while (!taskexecutor.isterminated()) { } break; } } } else { system.out.println("the asynchronous server-socket channel cannot be opened!"); } } catch (ioexception ex) { system.err.println(ex); } } }
源码
本章例子的源码,可以在 中 com.waylau.essentialjava.net.echo 包下找到。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。