Java 高并发八:NIO和AIO详解
io感觉上和多线程并没有多大关系,但是nio改变了线程在应用层面使用的方式,也解决了一些实际的困难。而aio是异步io和前面的系列也有点关系。在此,为了学习和记录,也写一篇文章来介绍nio和aio。
1. 什么是nio
nio是new i/o的简称,与旧式的基于流的i/o方法相对,从名字看,它表示新的一套java i/o标 准。它是在java 1.4中被纳入到jdk中的,并具有以下特性:
- nio是基于块(block)的,它以块为基本单位处理数据 (硬盘上存储的单位也是按block来存储,这样性能上比基于流的方式要好一些)
- 为所有的原始类型提供(buffer)缓存支持
- 增加通道(channel)对象,作为新的原始 i/o 抽象
- 支持锁(我们在平时使用时经常能看到会出现一些.lock的文件,这说明有线程正在使用这把锁,当线程释放锁时,会把这个文件删除掉,这样其他线程才能继续拿到这把锁)和内存映射文件的文件访问接口
- 提供了基于selector的异步网络i/o
所有的从通道中的读写操作,都要经过buffer,而通道就是io的抽象,通道的另一端就是操纵的文件。
2. buffer
java中buffer的实现。基本的数据类型都有它对应的buffer
buffer的简单使用例子:
package test; import java.io.file; import java.io.fileinputstream; import java.nio.bytebuffer; import java.nio.channels.filechannel; public class test { public static void main(string[] args) throws exception { fileinputstream fin = new fileinputstream(new file( "d:\\temp_buffer.tmp")); filechannel fc = fin.getchannel(); bytebuffer bytebuffer = bytebuffer.allocate(1024); fc.read(bytebuffer); fc.close(); bytebuffer.flip();//读写转换 } }
总结下使用的步骤是:
1. 得到channel
2. 申请buffer
3. 建立channel和buffer的读/写关系
4. 关闭
下面的例子是使用nio来复制文件:
public static void niocopyfile(string resource, string destination) throws ioexception { fileinputstream fis = new fileinputstream(resource); fileoutputstream fos = new fileoutputstream(destination); filechannel readchannel = fis.getchannel(); // 读文件通道 filechannel writechannel = fos.getchannel(); // 写文件通道 bytebuffer buffer = bytebuffer.allocate(1024); // 读入数据缓存 while (true) { buffer.clear(); int len = readchannel.read(buffer); // 读入数据 if (len == -1) { break; // 读取完毕 } buffer.flip(); writechannel.write(buffer); // 写入文件 } readchannel.close(); writechannel.close(); }
buffer中有3个重要的参数:位置(position)、容量(capactiy)和上限(limit)
这里要区别下容量和上限,比如一个buffer有10kb,那么10kb就是容量,我将5kb的文件读到buffer中,那么上限就是5kb。
下面举个例子来理解下这3个重要的参数:
public static void main(string[] args) throws exception { bytebuffer b = bytebuffer.allocate(15); // 15个字节大小的缓冲区 system.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); for (int i = 0; i < 10; i++) { // 存入10个字节数据 b.put((byte) i); } system.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); b.flip(); // 重置position system.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); for (int i = 0; i < 5; i++) { system.out.print(b.get()); } system.out.println(); system.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); b.flip(); system.out.println("limit=" + b.limit() + " capacity=" + b.capacity() + " position=" + b.position()); }
整个过程如图:
此时position从0到10,capactiy和limit不变。
该操作会重置position,通常,将buffer从写模式转换为读 模式时需要执行此方法 flip()操作不仅重置了当前的position为0,还将limit设置到当前position的位置 。
limit的意义在于,来确定哪些数据是有意义的,换句话说,从position到limit之间的数据才是有意义的数据,因为是上次操作的数据。所以flip操作往往是读写转换的意思。
意义同上。
而buffer中大多数的方法都是去改变这3个参数来达到某些功能的:
public final buffer rewind()
将position置零,并清除标志位(mark)
public final buffer clear()
将position置零,同时将limit设置为capacity的大小,并清除了标志mark
public final buffer flip()
先将limit设置到position所在位置,然后将position置零,并清除标志位mark,通常在读写转换时使用
文件映射到内存
public static void main(string[] args) throws exception { randomaccessfile raf = new randomaccessfile("c:\\mapfile.txt", "rw"); filechannel fc = raf.getchannel(); // 将文件映射到内存中 mappedbytebuffer mbb = fc.map(filechannel.mapmode.read_write, 0, raf.length()); while (mbb.hasremaining()) { system.out.print((char) mbb.get()); } mbb.put(0, (byte) 98); // 修改文件 raf.close(); }
对mappedbytebuffer的修改就相当于修改文件本身,这样操作的速度是很快的。
3. channel
多线程网络服务器的一般结构:
简单的多线程服务器:
public static void main(string[] args) throws exception { serversocket echoserver = null; socket clientsocket = null; try { echoserver = new serversocket(8000); } catch (ioexception e) { system.out.println(e); } while (true) { try { clientsocket = echoserver.accept(); system.out.println(clientsocket.getremotesocketaddress() + " connect!"); tp.execute(new handlemsg(clientsocket)); } catch (ioexception e) { system.out.println(e); } } }
功能就是服务器端读到什么数据,就向客户端回写什么数据。
这里的tp是一个线程池,handlemsg是处理消息的类。
static class handlemsg implements runnable{ 省略部分信息 public void run(){ try { is = new bufferedreader(new inputstreamreader(clientsocket.getinputstream())); os = new printwriter(clientsocket.getoutputstream(), true); // 从inputstream当中读取客户端所发送的数据 string inputline = null; long b=system. currenttimemillis (); while ((inputline = is.readline()) != null) { os.println(inputline); } long e=system. currenttimemillis (); system. out.println ("spend:"+(e - b)+" ms "); } catch (ioexception e) { e.printstacktrace(); }finally { 关闭资源 } } }
客户端:
public static void main(string[] args) throws exception { socket client = null; printwriter writer = null; bufferedreader reader = null; try { client = new socket(); client.connect(new inetsocketaddress("localhost", 8000)); writer = new printwriter(client.getoutputstream(), true); writer.println("hello!"); writer.flush(); reader = new bufferedreader(new inputstreamreader( client.getinputstream())); system.out.println("from server: " + reader.readline()); } catch (exception e) { } finally { // 省略资源关闭 } }
以上的网络编程是很基本的,使用这种方式,会有一些问题:
为每一个客户端使用一个线程,如果客户端出现延时等异常,线程可能会被占用很长时间。因为数据的准备和读取都在这个线程中。此时,如果客户端数量众多,可能会消耗大量的系统资源。
解决方案:
使用非阻塞的nio (读取数据不等待,数据准备好了再工作)
为了体现nio使用的高效。
这里先模拟一个低效的客户端来模拟因网络而延时的情况:
private static executorservice tp= executors.newcachedthreadpool(); private static final int sleep_time=1000*1000*1000; public static class echoclient implements runnable{ public void run(){ try { client = new socket(); client.connect(new inetsocketaddress("localhost", 8000)); writer = new printwriter(client.getoutputstream(), true); writer.print("h"); locksupport.parknanos(sleep_time); writer.print("e"); locksupport.parknanos(sleep_time); writer.print("l"); locksupport.parknanos(sleep_time); writer.print("l"); locksupport.parknanos(sleep_time); writer.print("o"); locksupport.parknanos(sleep_time); writer.print("!"); locksupport.parknanos(sleep_time); writer.println(); writer.flush(); }catch(exception e) { } } }
服务器端输出:
spend:6000ms
spend:6000ms
spend:6000ms
spend:6001ms
spend:6002ms
spend:6002ms
spend:6002ms
spend:6002ms
spend:6003ms
spend:6003ms
因为
while ((inputline = is.readline()) != null)
是阻塞的,所以时间都花在等待中。
如果用nio来处理这个问题会怎么做呢?
nio有一个很大的特点就是:把数据准备好了再通知我
而channel有点类似于流,一个channel可以和文件或者网络socket对应 。
selector是一个选择器,它可以选择某一个channel,然后做些事情。
一个线程可以对应一个selector,而一个selector可以轮询多个channel,而每个channel对应了一个socket。
与上面一个线程对应一个socket相比,使用nio后,一个线程可以轮询多个socket。
当selector调用select()时,会查看是否有客户端准备好了数据。当没有数据被准备好时,select()会阻塞。平时都说nio是非阻塞的,但是如果没有数据被准备好还是会有阻塞现象。
当有数据被准备好时,调用完select()后,会返回一个selectionkey,selectionkey表示在某个selector上的某个channel的数据已经被准备好了。
只有在数据准备好时,这个channel才会被选择。
这样nio实现了一个线程来监控多个客户端。
而刚刚模拟的网络延迟的客户端将不会影响nio下的线程,因为某个socket网络延迟时,数据还未被准备好,selector是不会选择它的,而会选择其他准备好的客户端。
selectnow()与select()的区别在于,selectnow()是不阻塞的,当没有客户端准备好数据时,selectnow()不会阻塞,将返回0,有客户端准备好数据时,selectnow()返回准备好的客户端的个数。
主要代码:
package test; import java.net.inetaddress; import java.net.inetsocketaddress; import java.net.socket; import java.nio.bytebuffer; import java.nio.channels.selectionkey; import java.nio.channels.selector; import java.nio.channels.serversocketchannel; import java.nio.channels.socketchannel; import java.nio.channels.spi.abstractselector; import java.nio.channels.spi.selectorprovider; import java.util.hashmap; import java.util.iterator; import java.util.linkedlist; import java.util.map; import java.util.set; import java.util.concurrent.executorservice; import java.util.concurrent.executors; public class multithreadnioechoserver { public static map<socket, long> geym_time_stat = new hashmap<socket, long>(); class echoclient { private linkedlist<bytebuffer> outq; echoclient() { outq = new linkedlist<bytebuffer>(); } public linkedlist<bytebuffer> getoutputqueue() { return outq; } public void enqueue(bytebuffer bb) { outq.addfirst(bb); } } class handlemsg implements runnable { selectionkey sk; bytebuffer bb; public handlemsg(selectionkey sk, bytebuffer bb) { super(); this.sk = sk; this.bb = bb; } @override public void run() { // todo auto-generated method stub echoclient echoclient = (echoclient) sk.attachment(); echoclient.enqueue(bb); sk.interestops(selectionkey.op_read | selectionkey.op_write); selector.wakeup(); } } private selector selector; private executorservice tp = executors.newcachedthreadpool(); private void startserver() throws exception { selector = selectorprovider.provider().openselector(); serversocketchannel ssc = serversocketchannel.open(); ssc.configureblocking(false); inetsocketaddress isa = new inetsocketaddress(8000); ssc.socket().bind(isa); // 注册感兴趣的事件,此处对accpet事件感兴趣 selectionkey acceptkey = ssc.register(selector, selectionkey.op_accept); for (;;) { selector.select(); set readykeys = selector.selectedkeys(); iterator i = readykeys.iterator(); long e = 0; while (i.hasnext()) { selectionkey sk = (selectionkey) i.next(); i.remove(); if (sk.isacceptable()) { doaccept(sk); } else if (sk.isvalid() && sk.isreadable()) { if (!geym_time_stat.containskey(((socketchannel) sk .channel()).socket())) { geym_time_stat.put( ((socketchannel) sk.channel()).socket(), system.currenttimemillis()); } doread(sk); } else if (sk.isvalid() && sk.iswritable()) { dowrite(sk); e = system.currenttimemillis(); long b = geym_time_stat.remove(((socketchannel) sk .channel()).socket()); system.out.println("spend:" + (e - b) + "ms"); } } } } private void dowrite(selectionkey sk) { // todo auto-generated method stub socketchannel channel = (socketchannel) sk.channel(); echoclient echoclient = (echoclient) sk.attachment(); linkedlist<bytebuffer> outq = echoclient.getoutputqueue(); bytebuffer bb = outq.getlast(); try { int len = channel.write(bb); if (len == -1) { disconnect(sk); return; } if (bb.remaining() == 0) { outq.removelast(); } } catch (exception e) { // todo: handle exception disconnect(sk); } if (outq.size() == 0) { sk.interestops(selectionkey.op_read); } } private void doread(selectionkey sk) { // todo auto-generated method stub socketchannel channel = (socketchannel) sk.channel(); bytebuffer bb = bytebuffer.allocate(8192); int len; try { len = channel.read(bb); if (len < 0) { disconnect(sk); return; } } catch (exception e) { // todo: handle exception disconnect(sk); return; } bb.flip(); tp.execute(new handlemsg(sk, bb)); } private void disconnect(selectionkey sk) { // todo auto-generated method stub //省略略干关闭操作 } private void doaccept(selectionkey sk) { // todo auto-generated method stub serversocketchannel server = (serversocketchannel) sk.channel(); socketchannel clientchannel; try { clientchannel = server.accept(); clientchannel.configureblocking(false); selectionkey clientkey = clientchannel.register(selector, selectionkey.op_read); echoclient echoclinet = new echoclient(); clientkey.attach(echoclinet); inetaddress clientaddress = clientchannel.socket().getinetaddress(); system.out.println("accepted connection from " + clientaddress.gethostaddress()); } catch (exception e) { // todo: handle exception } } public static void main(string[] args) { // todo auto-generated method stub multithreadnioechoserver echoserver = new multithreadnioechoserver(); try { echoserver.startserver(); } catch (exception e) { // todo: handle exception } } }
代码仅作参考,主要的特点是,对不同事件的感兴趣来做不同的事。
当用之前模拟的那个延迟的客户端时,这次的时间消耗就在2ms到11ms之间了。性能提升是很明显的。
总结:
1. nio会将数据准备好后,再交由应用进行处理,数据的读取/写入过程依然在应用线程中完成,只是将等待的时间剥离到单独的线程中去。
2. 节省数据准备时间(因为selector可以复用)
5. aio
aio的特点:
1. 读完了再通知我
2. 不会加快io,只是在读完后进行通知
3. 使用回调函数,进行业务处理
aio的相关代码:
asynchronousserversocketchannel
server = asynchronousserversocketchannel.open().bind( new inetsocketaddress (port));
使用server上的accept方法
public abstract <a> void accept(a attachment,completionhandler<asynchronoussocketchannel,? super a> handler);
completionhandler为回调接口,当有客户端accept之后,就做handler中的事情。
示例代码:
server.accept(null, new completionhandler<asynchronoussocketchannel, object>() { final bytebuffer buffer = bytebuffer.allocate(1024); public void completed(asynchronoussocketchannel result, object attachment) { system.out.println(thread.currentthread().getname()); future<integer> writeresult = null; try { buffer.clear(); result.read(buffer).get(100, timeunit.seconds); buffer.flip(); writeresult = result.write(buffer); } catch (interruptedexception | executionexception e) { e.printstacktrace(); } catch (timeoutexception e) { e.printstacktrace(); } finally { try { server.accept(null, this); writeresult.get(); result.close(); } catch (exception e) { system.out.println(e.tostring()); } } } @override public void failed(throwable exc, object attachment) { system.out.println("failed: " + exc); } });
这里使用了future来实现即时返回,关于future请参考上一篇
在理解了nio的基础上,看aio,区别在于aio是等读写过程完成后再去调用回调函数。
nio是同步非阻塞的
aio是异步非阻塞的
由于nio的读写过程依然在应用线程里完成,所以对于那些读写过程时间长的,nio就不太适合。
而aio的读写过程完成后才被通知,所以aio能够胜任那些重量级,读写过程长的任务。