深入理解Java NIO
何谓reactor模式?它是实现高性能io的一种设计模式。网上资料有很多,有些写的也很好,但大多不知其所以然。这里博主按自己的思路简单介绍下,有不对的地方敬请指正。
bio
java1.4(2002年)以前,io都是blocking的,也就是常说的bio,它在等待请求、读、写(返回)三个环节都是阻塞的。在等待请求阶段,系统无法知道请求何时到达,因此需要一个主线程一直守着,当有请求进来时,将请求分发给读写线程。如图:
代码如下:
executorservice executor = excutors.newfixedthreadpollexecutor(100);//线程池 serversocket serversocket = new serversocket(); serversocket.bind(8088); while(!thread.currentthread.isinturrupted()){//主线程死循环等待新连接到来 socket socket = serversocket.accept(); executor.submit(new connectionhandler(socket));//为新的连接创建新的线程 }
class connectionhandler extends thread{ private socket socket; public connectionhandler(socket socket){ this.socket = socket; } public void run(){ while(!thread.currentthread.isinturrupted()&&!socket.isclosed()){//死循环处理读写事件 string something = socket.read()....//读取数据 if(something!=null){
......//处理数据
socket.write()....//写数据 } } }
需知,请求进来(accept),并不表示数据马上达到了,可能隔一段时间才会传进来,这个时候socket.read()也是一直阻塞的状态。socket.write()也同理,当向磁盘或其它socket写数据时,也要等对方准备好才能写入,在对方准备阶段,socket.write()也是阻塞的。这两个环节可能的无效阻塞导致读写线程的低效。
nio
java1.4开始,引入了nio。nio有三个概念:selector、buffer、channel。与bio的区别是,请求进来后,并不会马上分派io线程,而是依靠操作系统底层的多路复用机制(select/poll/epoll等),在监听到socket读写就绪之后,再分配io线程(实际可由当前线程[使用buffer和channel]直接读写,因为读写本身的效率很高),这就避免了线程等待。且与bio多线程方式相比,使用i/o多路复用技术,系统不必创建和维护庞大的线程池,从而大大减小了开销。这部分工作是nio的核心,由selector负责,本质上是多路复用的java封装。而buffer和channel又封装了一层socket的读写,应该为的是将io与业务代码彻底分离。以下图示为本人理解:
如图示,与bio中监听线程职责不同,selector监听的不只是连接请求,还有读写就绪事件,当某个事件发生时,即通知注册了该事件的channel,由channel操作socket读写buffer。虚线表示需要具体的nio框架或业务代码自己处理,比如channel如何注册以及注册何种事件,channel处理io的方式(如在当前线程处理还是新开线程,若新开线程,则可看作是aio模式)等。nio只是提供了一套机制,具体使用还是需要编程实现(reactor模式就是oo的一种实现)。
示例代码(摘自java nio详解)
服务端:
1 package cn.blog.test.niotest; 2 3 4 import java.io.ioexception; 5 import java.net.inetsocketaddress; 6 import java.nio.bytebuffer; 7 import java.nio.channels.*; 8 import java.nio.charset.charset; 9 import java.util.iterator; 10 import java.util.set; 11 12 13 public class mynioserver { 14 private selector selector; //创建一个选择器 15 private final static int port = 8686; 16 private final static int buf_size = 10240; 17 18 private void initserver() throws ioexception { 19 //创建通道管理器对象selector 20 this.selector=selector.open(); 21 22 //创建一个通道对象channel 23 serversocketchannel channel = serversocketchannel.open(); 24 channel.configureblocking(false); //将通道设置为非阻塞 25 channel.socket().bind(new inetsocketaddress(port)); //将通道绑定在8686端口 26 27 //将上述的通道管理器和通道绑定,并为该通道注册op_accept事件 28 //注册事件后,当该事件到达时,selector.select()会返回(一个key),如果该事件没到达selector.select()会一直阻塞 29 selectionkey selectionkey = channel.register(selector,selectionkey.op_accept); 30 31 while (true){ //轮询 32 selector.select(); //这是一个阻塞方法,一直等待直到有数据可读,返回值是key的数量(可以有多个) 33 set keys = selector.selectedkeys(); //如果channel有数据了,将生成的key访入keys集合中 34 iterator iterator = keys.iterator(); //得到这个keys集合的迭代器 35 while (iterator.hasnext()){ //使用迭代器遍历集合 36 selectionkey key = (selectionkey) iterator.next(); //得到集合中的一个key实例 37 iterator.remove(); //拿到当前key实例之后记得在迭代器中将这个元素删除,非常重要,否则会出错 38 if (key.isacceptable()){ //判断当前key所代表的channel是否在acceptable状态,如果是就进行接收 39 doaccept(key); 40 }else if (key.isreadable()){ 41 doread(key); 42 }else if (key.iswritable() && key.isvalid()){ 43 dowrite(key); 44 }else if (key.isconnectable()){ 45 system.out.println("连接成功!"); 46 } 47 } 48 } 49 } 50 51 public void doaccept(selectionkey key) throws ioexception { 52 serversocketchannel serverchannel = (serversocketchannel) key.channel(); 53 system.out.println("serversocketchannel正在循环监听"); 54 socketchannel clientchannel = serverchannel.accept(); 55 clientchannel.configureblocking(false); 56 clientchannel.register(key.selector(),selectionkey.op_read); 57 } 58 59 public void doread(selectionkey key) throws ioexception { 60 socketchannel clientchannel = (socketchannel) key.channel(); 61 bytebuffer bytebuffer = bytebuffer.allocate(buf_size); 62 long bytesread = clientchannel.read(bytebuffer); 63 while (bytesread>0){ 64 bytebuffer.flip(); 65 byte[] data = bytebuffer.array(); 66 string info = new string(data).trim(); 67 system.out.println("从客户端发送过来的消息是:"+info); 68 bytebuffer.clear(); 69 bytesread = clientchannel.read(bytebuffer); 70 } 71 if (bytesread==-1){ 72 clientchannel.close(); 73 } 74 } 75 76 public void dowrite(selectionkey key) throws ioexception { 77 bytebuffer bytebuffer = bytebuffer.allocate(buf_size); 78 bytebuffer.flip(); 79 socketchannel clientchannel = (socketchannel) key.channel(); 80 while (bytebuffer.hasremaining()){ 81 clientchannel.write(bytebuffer); 82 } 83 bytebuffer.compact(); 84 } 85 86 public static void main(string[] args) throws ioexception { 87 mynioserver mynioserver = new mynioserver(); 88 mynioserver.initserver(); 89 } 90 }
客户端:
1 package cn.blog.test.niotest; 2 3 4 import java.io.ioexception; 5 import java.net.inetsocketaddress; 6 import java.nio.bytebuffer; 7 import java.nio.channels.selectionkey; 8 import java.nio.channels.selector; 9 import java.nio.channels.socketchannel; 10 import java.util.iterator; 11 12 public class mynioclient { 13 private selector selector; //创建一个选择器 14 private final static int port = 8686; 15 private final static int buf_size = 10240; 16 private static bytebuffer bytebuffer = bytebuffer.allocate(buf_size); 17 18 private void initclient() throws ioexception { 19 this.selector = selector.open(); 20 socketchannel clientchannel = socketchannel.open(); 21 clientchannel.configureblocking(false); 22 clientchannel.connect(new inetsocketaddress(port)); 23 clientchannel.register(selector, selectionkey.op_connect); 24 while (true){ 25 selector.select(); 26 iterator<selectionkey> iterator = selector.selectedkeys().iterator(); 27 while (iterator.hasnext()){ 28 selectionkey key = iterator.next(); 29 iterator.remove(); 30 if (key.isconnectable()){ 31 doconnect(key); 32 }else if (key.isreadable()){ 33 doread(key); 34 } 35 } 36 } 37 } 38 39 public void doconnect(selectionkey key) throws ioexception { 40 socketchannel clientchannel = (socketchannel) key.channel(); 41 if (clientchannel.isconnectionpending()){ 42 clientchannel.finishconnect(); 43 } 44 clientchannel.configureblocking(false); 45 string info = "服务端你好!!"; 46 bytebuffer.clear(); 47 bytebuffer.put(info.getbytes("utf-8")); 48 bytebuffer.flip(); 49 clientchannel.write(bytebuffer); 50 //clientchannel.register(key.selector(),selectionkey.op_read); 51 clientchannel.close(); 52 } 53 54 public void doread(selectionkey key) throws ioexception { 55 socketchannel clientchannel = (socketchannel) key.channel(); 56 clientchannel.read(bytebuffer); 57 byte[] data = bytebuffer.array(); 58 string msg = new string(data).trim(); 59 system.out.println("服务端发送消息:"+msg); 60 clientchannel.close(); 61 key.selector().close(); 62 } 63 64 public static void main(string[] args) throws ioexception { 65 mynioclient mynioclient = new mynioclient(); 66 mynioclient.initclient(); 67 } 68 }
在早期的jdk1.4和1.5 update10版本之前,selector基于select/poll模型实现,是基于io复用技术的非阻塞io,不是异步io。在jdk1.5 update10和linux core2.6以上版本,sun优化了selctor的实现,底层使用epoll替换了select/poll。另据说buffer指向的并非堆内内存,nio使用 native 函数库直接分配堆外内存,然后通过一个存储在 java 堆的 directbytebuffer 对象作为这块内存的引用进行操作,避免了在 java 堆和 native 堆中来回复制数据。
nio的实现解析可参看:深入浅出nio socket实现机制
reactor模式
nio为实现reactor模式提供了基础,上面的nio图示其实就是reactor模式的雏形,只是reactor以oo的方式抽象出了几个概念,使得职责划分更加明确。
- reactor:reactor是io事件的派发者,对应nio的selector;
- acceptor:acceptor接受client连接,建立对应client的handler,并向reactor注册此handler,对应nio中注册channel和事件触发时的判断分支(上述nio服务端示例代码的38-46行);
- handler:io处理类,对应nio中channel[使用socket]操作buffer的过程。
基于上述三个角色画出reactor模式图如下:
如此,reactor模式便非常清晰地展现在我们眼前。那么业务线程如何与reactor交互呢?由前文所知,数据存取于buffer,具体操作由handler负责。socket.read()将数据读入buffer,需要一种机制将buffer引用推送给业务线程;同样,业务线程返回的数据需要写入buffer,按reactor模式,写入后还需要注册write事件,socket可写后write()。如果直接调用的话,至少handler和业务代码会耦合在一起,常见的解耦方式是定义接口,或使用消息中间件。
其它
话说回来,由于相对短暂的历史以及相对封闭的环境,.net社区缺少很多概念的演化、探究和讨论,这也导致了.neter们这些概念的缺失。虽然从语言层面上来说,c#和java大同小异,前者甚至一定程度的有语法上的便利,然而只有认识到了其背后的思想和模式,才能真正用好这门语言,这就是.neter需要了解java及其历史的原因,毕竟.net一开始就是参照着java来的。
比如.net里的堆栈概念,就算一些经典书籍都没有非常深入的说明,而java方面的资料就很多了,参看深入理解jvm—jvm内存模型
其它参考资料: