欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

深入理解Java NIO

程序员文章站 2022-03-26 09:26:25
何谓Reactor模式?它是实现高性能IO的一种设计模式。网上资料有很多,有些写的也很好,但大多不知其所以然。这里博主按自己的思路简单介绍下,有不对的地方敬请指正。 BIO Java1.4(2002年)以前,IO都是Blocking的,也就是常说的BIO,它在等待请求、读、写(返回)三个环节都是阻塞 ......

何谓reactor模式?它是实现高性能io的一种设计模式。网上资料有很多,有些写的也很好,但大多不知其所以然。这里博主按自己的思路简单介绍下,有不对的地方敬请指正。


bio

java1.4(2002年)以前,io都是blocking的,也就是常说的bio,它在等待请求、读、写(返回)三个环节都是阻塞的。在等待请求阶段,系统无法知道请求何时到达,因此需要一个主线程一直守着,当有请求进来时,将请求分发给读写线程。如图:

深入理解Java NIO

代码如下:

    executorservice executor = excutors.newfixedthreadpollexecutor(100);//线程池
    serversocket serversocket = new serversocket();
    serversocket.bind(8088); 
    while(!thread.currentthread.isinturrupted()){//主线程死循环等待新连接到来        
        socket socket = serversocket.accept();
        executor.submit(new connectionhandler(socket));//为新的连接创建新的线程 
    }
class connectionhandler extends thread{ private socket socket; public connectionhandler(socket socket){ this.socket = socket; } public void run(){ while(!thread.currentthread.isinturrupted()&&!socket.isclosed()){//死循环处理读写事件 string something = socket.read()....//读取数据 if(something!=null){
           ......//处理数据
           socket.write()....//写数据 } } }

需知,请求进来(accept),并不表示数据马上达到了,可能隔一段时间才会传进来,这个时候socket.read()也是一直阻塞的状态。socket.write()也同理,当向磁盘或其它socket写数据时,也要等对方准备好才能写入,在对方准备阶段,socket.write()也是阻塞的。这两个环节可能的无效阻塞导致读写线程的低效。


nio

java1.4开始,引入了nio。nio有三个概念:selector、buffer、channel。与bio的区别是,请求进来后,并不会马上分派io线程,而是依靠操作系统底层的多路复用机制(select/poll/epoll等),在监听到socket读写就绪之后,再分配io线程(实际可由当前线程[使用buffer和channel]直接读写,因为读写本身的效率很高),这就避免了线程等待。且与bio多线程方式相比,使用i/o多路复用技术,系统不必创建和维护庞大的线程池,从而大大减小了开销。这部分工作是nio的核心,由selector负责,本质上是多路复用的java封装。而buffer和channel又封装了一层socket的读写,应该为的是将io与业务代码彻底分离。以下图示为本人理解:

深入理解Java NIO

如图示,与bio中监听线程职责不同,selector监听的不只是连接请求,还有读写就绪事件,当某个事件发生时,即通知注册了该事件的channel,由channel操作socket读写buffer。虚线表示需要具体的nio框架或业务代码自己处理,比如channel如何注册以及注册何种事件,channel处理io的方式(如在当前线程处理还是新开线程,若新开线程,则可看作是aio模式)等。nio只是提供了一套机制,具体使用还是需要编程实现(reactor模式就是oo的一种实现)。

示例代码(摘自java nio详解

服务端:

深入理解Java NIO
 1 package cn.blog.test.niotest;
 2 
 3 
 4 import java.io.ioexception;
 5 import java.net.inetsocketaddress;
 6 import java.nio.bytebuffer;
 7 import java.nio.channels.*;
 8 import java.nio.charset.charset;
 9 import java.util.iterator;
10 import java.util.set;
11 
12 
13 public class mynioserver {
14     private selector selector;          //创建一个选择器
15     private final static int port = 8686;
16     private final static int buf_size = 10240;
17 
18     private void initserver() throws ioexception {
19         //创建通道管理器对象selector
20         this.selector=selector.open();
21 
22         //创建一个通道对象channel
23         serversocketchannel channel = serversocketchannel.open();
24         channel.configureblocking(false);       //将通道设置为非阻塞
25         channel.socket().bind(new inetsocketaddress(port));       //将通道绑定在8686端口
26 
27         //将上述的通道管理器和通道绑定,并为该通道注册op_accept事件
28         //注册事件后,当该事件到达时,selector.select()会返回(一个key),如果该事件没到达selector.select()会一直阻塞
29         selectionkey selectionkey = channel.register(selector,selectionkey.op_accept);
30 
31         while (true){       //轮询
32             selector.select();          //这是一个阻塞方法,一直等待直到有数据可读,返回值是key的数量(可以有多个)
33             set keys = selector.selectedkeys();         //如果channel有数据了,将生成的key访入keys集合中
34             iterator iterator = keys.iterator();        //得到这个keys集合的迭代器
35             while (iterator.hasnext()){             //使用迭代器遍历集合
36                 selectionkey key = (selectionkey) iterator.next();       //得到集合中的一个key实例
37                 iterator.remove();          //拿到当前key实例之后记得在迭代器中将这个元素删除,非常重要,否则会出错
38                 if (key.isacceptable()){         //判断当前key所代表的channel是否在acceptable状态,如果是就进行接收
39                     doaccept(key);
40                 }else if (key.isreadable()){
41                     doread(key);
42                 }else if (key.iswritable() && key.isvalid()){
43                     dowrite(key);
44                 }else if (key.isconnectable()){
45                     system.out.println("连接成功!");
46                 }
47             }
48         }
49     }
50 
51     public void doaccept(selectionkey key) throws ioexception {
52         serversocketchannel serverchannel = (serversocketchannel) key.channel();
53         system.out.println("serversocketchannel正在循环监听");
54         socketchannel clientchannel = serverchannel.accept();
55         clientchannel.configureblocking(false);
56         clientchannel.register(key.selector(),selectionkey.op_read);
57     }
58 
59     public void doread(selectionkey key) throws ioexception {
60         socketchannel clientchannel = (socketchannel) key.channel();
61         bytebuffer bytebuffer = bytebuffer.allocate(buf_size);
62         long bytesread = clientchannel.read(bytebuffer);
63         while (bytesread>0){
64             bytebuffer.flip();
65             byte[] data = bytebuffer.array();
66             string info = new string(data).trim();
67             system.out.println("从客户端发送过来的消息是:"+info);
68             bytebuffer.clear();
69             bytesread = clientchannel.read(bytebuffer);
70         }
71         if (bytesread==-1){
72             clientchannel.close();
73         }
74     }
75 
76     public void dowrite(selectionkey key) throws ioexception {
77         bytebuffer bytebuffer = bytebuffer.allocate(buf_size);
78         bytebuffer.flip();
79         socketchannel clientchannel = (socketchannel) key.channel();
80         while (bytebuffer.hasremaining()){
81             clientchannel.write(bytebuffer);
82         }
83         bytebuffer.compact();
84     }
85 
86     public static void main(string[] args) throws ioexception {
87         mynioserver mynioserver = new mynioserver();
88         mynioserver.initserver();
89     }
90 }
view code

客户端:

深入理解Java NIO
 1 package cn.blog.test.niotest;
 2 
 3 
 4 import java.io.ioexception;
 5 import java.net.inetsocketaddress;
 6 import java.nio.bytebuffer;
 7 import java.nio.channels.selectionkey;
 8 import java.nio.channels.selector;
 9 import java.nio.channels.socketchannel;
10 import java.util.iterator;
11 
12 public class mynioclient {
13     private selector selector;          //创建一个选择器
14     private final static int port = 8686;
15     private final static int buf_size = 10240;
16     private static bytebuffer bytebuffer = bytebuffer.allocate(buf_size);
17 
18     private void  initclient() throws ioexception {
19         this.selector = selector.open();
20         socketchannel clientchannel = socketchannel.open();
21         clientchannel.configureblocking(false);
22         clientchannel.connect(new inetsocketaddress(port));
23         clientchannel.register(selector, selectionkey.op_connect);
24         while (true){
25             selector.select();
26             iterator<selectionkey> iterator = selector.selectedkeys().iterator();
27             while (iterator.hasnext()){
28                 selectionkey key = iterator.next();
29                 iterator.remove();
30                 if (key.isconnectable()){
31                     doconnect(key);
32                 }else if (key.isreadable()){
33                     doread(key);
34                 }
35             }
36         }
37     }
38 
39     public void doconnect(selectionkey key) throws ioexception {
40         socketchannel clientchannel = (socketchannel) key.channel();
41         if (clientchannel.isconnectionpending()){
42             clientchannel.finishconnect();
43         }
44         clientchannel.configureblocking(false);
45         string info = "服务端你好!!";
46         bytebuffer.clear();
47         bytebuffer.put(info.getbytes("utf-8"));
48         bytebuffer.flip();
49         clientchannel.write(bytebuffer);
50         //clientchannel.register(key.selector(),selectionkey.op_read);
51         clientchannel.close();
52     }
53 
54     public void doread(selectionkey key) throws ioexception {
55         socketchannel clientchannel = (socketchannel) key.channel();
56         clientchannel.read(bytebuffer);
57         byte[] data = bytebuffer.array();
58         string msg = new string(data).trim();
59         system.out.println("服务端发送消息:"+msg);
60         clientchannel.close();
61         key.selector().close();
62     }
63 
64     public static void main(string[] args) throws ioexception {
65         mynioclient mynioclient = new mynioclient();
66         mynioclient.initclient();
67     }
68 }
view code

在早期的jdk1.4和1.5 update10版本之前,selector基于select/poll模型实现,是基于io复用技术的非阻塞io,不是异步io。在jdk1.5 update10和linux core2.6以上版本,sun优化了selctor的实现,底层使用epoll替换了select/poll。另据说buffer指向的并非堆内内存,nio使用 native 函数库直接分配堆外内存,然后通过一个存储在 java 堆的 directbytebuffer 对象作为这块内存的引用进行操作,避免了在 java 堆和 native 堆中来回复制数据。

nio的实现解析可参看:深入浅出nio socket实现机制


reactor模式

nio为实现reactor模式提供了基础,上面的nio图示其实就是reactor模式的雏形,只是reactor以oo的方式抽象出了几个概念,使得职责划分更加明确。

  • reactor:reactor是io事件的派发者,对应nio的selector;
  • acceptor:acceptor接受client连接,建立对应client的handler,并向reactor注册此handler,对应nio中注册channel和事件触发时的判断分支(上述nio服务端示例代码的38-46行);
  • handler:io处理类,对应nio中channel[使用socket]操作buffer的过程。

基于上述三个角色画出reactor模式图如下:

深入理解Java NIO

如此,reactor模式便非常清晰地展现在我们眼前。那么业务线程如何与reactor交互呢?由前文所知,数据存取于buffer,具体操作由handler负责。socket.read()将数据读入buffer,需要一种机制将buffer引用推送给业务线程;同样,业务线程返回的数据需要写入buffer,按reactor模式,写入后还需要注册write事件,socket可写后write()。如果直接调用的话,至少handler和业务代码会耦合在一起,常见的解耦方式是定义接口,或使用消息中间件。


其它

话说回来,由于相对短暂的历史以及相对封闭的环境,.net社区缺少很多概念的演化、探究和讨论,这也导致了.neter们这些概念的缺失。虽然从语言层面上来说,c#和java大同小异,前者甚至一定程度的有语法上的便利,然而只有认识到了其背后的思想和模式,才能真正用好这门语言,这就是.neter需要了解java及其历史的原因,毕竟.net一开始就是参照着java来的。

比如.net里的堆栈概念,就算一些经典书籍都没有非常深入的说明,而java方面的资料就很多了,参看深入理解jvm—jvm内存模型

 

其它参考资料:

nio浅析

深入理解java nio

 

转载请注明本文出处:https://www.cnblogs.com/newton/p/9776821.html