欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java 高并发八:NIO和AIO详解

程序员文章站 2024-03-13 16:52:09
io感觉上和多线程并没有多大关系,但是nio改变了线程在应用层面使用的方式,也解决了一些实际的困难。而aio是异步io和前面的系列也有点关系。在此,为了学习和记录,也写一篇...

io感觉上和多线程并没有多大关系,但是nio改变了线程在应用层面使用的方式,也解决了一些实际的困难。而aio是异步io和前面的系列也有点关系。在此,为了学习和记录,也写一篇文章来介绍nio和aio。

1. 什么是nio

nio是new i/o的简称,与旧式的基于流的i/o方法相对,从名字看,它表示新的一套java i/o标 准。它是在java 1.4中被纳入到jdk中的,并具有以下特性:

  1. nio是基于块(block)的,它以块为基本单位处理数据 (硬盘上存储的单位也是按block来存储,这样性能上比基于流的方式要好一些)
  2. 为所有的原始类型提供(buffer)缓存支持
  3. 增加通道(channel)对象,作为新的原始 i/o 抽象
  4. 支持锁(我们在平时使用时经常能看到会出现一些.lock的文件,这说明有线程正在使用这把锁,当线程释放锁时,会把这个文件删除掉,这样其他线程才能继续拿到这把锁)和内存映射文件的文件访问接口
  5. 提供了基于selector的异步网络i/o

Java 高并发八:NIO和AIO详解

所有的从通道中的读写操作,都要经过buffer,而通道就是io的抽象,通道的另一端就是操纵的文件。

2. buffer

Java 高并发八:NIO和AIO详解

java中buffer的实现。基本的数据类型都有它对应的buffer

buffer的简单使用例子:

package test;
 
import java.io.file;
import java.io.fileinputstream;
import java.nio.bytebuffer;
import java.nio.channels.filechannel;
 
public class test {
 public static void main(string[] args) throws exception {
  fileinputstream fin = new fileinputstream(new file(
    "d:\\temp_buffer.tmp"));
  filechannel fc = fin.getchannel();
  bytebuffer bytebuffer = bytebuffer.allocate(1024);
  fc.read(bytebuffer);
  fc.close();
  bytebuffer.flip();//读写转换
 }
}

总结下使用的步骤是:

1. 得到channel

2. 申请buffer

3. 建立channel和buffer的读/写关系

4. 关闭

下面的例子是使用nio来复制文件:

public static void niocopyfile(string resource, string destination)
   throws ioexception {
  fileinputstream fis = new fileinputstream(resource);
  fileoutputstream fos = new fileoutputstream(destination);
  filechannel readchannel = fis.getchannel(); // 读文件通道
  filechannel writechannel = fos.getchannel(); // 写文件通道
  bytebuffer buffer = bytebuffer.allocate(1024); // 读入数据缓存
  while (true) {
   buffer.clear();
   int len = readchannel.read(buffer); // 读入数据
   if (len == -1) {
    break; // 读取完毕
   }
   buffer.flip();
   writechannel.write(buffer); // 写入文件
  }
  readchannel.close();
  writechannel.close();
 }

buffer中有3个重要的参数:位置(position)、容量(capactiy)和上限(limit)

这里要区别下容量和上限,比如一个buffer有10kb,那么10kb就是容量,我将5kb的文件读到buffer中,那么上限就是5kb。

下面举个例子来理解下这3个重要的参数:

public static void main(string[] args) throws exception {
  bytebuffer b = bytebuffer.allocate(15); // 15个字节大小的缓冲区
  system.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
    + " position=" + b.position());
  for (int i = 0; i < 10; i++) {
   // 存入10个字节数据
   b.put((byte) i);
  }
  system.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
    + " position=" + b.position());
  b.flip(); // 重置position
  system.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
    + " position=" + b.position());
  for (int i = 0; i < 5; i++) {
   system.out.print(b.get());
  }
  system.out.println();
  system.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
    + " position=" + b.position());
  b.flip();
  system.out.println("limit=" + b.limit() + " capacity=" + b.capacity()
    + " position=" + b.position());
 
 }

整个过程如图:

Java 高并发八:NIO和AIO详解

此时position从0到10,capactiy和limit不变。

Java 高并发八:NIO和AIO详解

该操作会重置position,通常,将buffer从写模式转换为读 模式时需要执行此方法 flip()操作不仅重置了当前的position为0,还将limit设置到当前position的位置 。

limit的意义在于,来确定哪些数据是有意义的,换句话说,从position到limit之间的数据才是有意义的数据,因为是上次操作的数据。所以flip操作往往是读写转换的意思。

Java 高并发八:NIO和AIO详解

意义同上。

而buffer中大多数的方法都是去改变这3个参数来达到某些功能的:

public final buffer rewind()

将position置零,并清除标志位(mark)

public final buffer clear()

将position置零,同时将limit设置为capacity的大小,并清除了标志mark

public final buffer flip()

先将limit设置到position所在位置,然后将position置零,并清除标志位mark,通常在读写转换时使用

文件映射到内存

public static void main(string[] args) throws exception {
  randomaccessfile raf = new randomaccessfile("c:\\mapfile.txt", "rw");
  filechannel fc = raf.getchannel();
  // 将文件映射到内存中
  mappedbytebuffer mbb = fc.map(filechannel.mapmode.read_write, 0,
    raf.length());
  while (mbb.hasremaining()) {
   system.out.print((char) mbb.get());
  }
  mbb.put(0, (byte) 98); // 修改文件
  raf.close();
 }

对mappedbytebuffer的修改就相当于修改文件本身,这样操作的速度是很快的。

3. channel

多线程网络服务器的一般结构:

Java 高并发八:NIO和AIO详解

简单的多线程服务器:

public static void main(string[] args) throws exception {
  serversocket echoserver = null;
  socket clientsocket = null;
  try {
   echoserver = new serversocket(8000);
  } catch (ioexception e) {
   system.out.println(e);
  }
  while (true) {
   try {
    clientsocket = echoserver.accept();
    system.out.println(clientsocket.getremotesocketaddress()
      + " connect!");
    tp.execute(new handlemsg(clientsocket));
   } catch (ioexception e) {
    system.out.println(e);
   }
  }
 }

功能就是服务器端读到什么数据,就向客户端回写什么数据。

这里的tp是一个线程池,handlemsg是处理消息的类。

static class handlemsg implements runnable{ 
   省略部分信息     
   public void run(){   
    try {   
     is = new bufferedreader(new inputstreamreader(clientsocket.getinputstream())); 
     os = new printwriter(clientsocket.getoutputstream(), true); 
     // 从inputstream当中读取客户端所发送的数据    
     string inputline = null;     
     long b=system. currenttimemillis ();     
     while ((inputline = is.readline()) != null)
     {   
      os.println(inputline);     
     }     
     long e=system. currenttimemillis ();     
     system. out.println ("spend:"+(e - b)+" ms ");    
   } catch (ioexception e) {     
    e.printstacktrace();    
   }finally
   { 
    关闭资源 
   }  
  } 
  }

客户端:

public static void main(string[] args) throws exception {
  socket client = null;
  printwriter writer = null;
  bufferedreader reader = null;
  try {
   client = new socket();
   client.connect(new inetsocketaddress("localhost", 8000));
   writer = new printwriter(client.getoutputstream(), true);
   writer.println("hello!");
   writer.flush();
   reader = new bufferedreader(new inputstreamreader(
     client.getinputstream()));
   system.out.println("from server: " + reader.readline());
  } catch (exception e) {
  } finally {
   // 省略资源关闭
  }
 }

以上的网络编程是很基本的,使用这种方式,会有一些问题:

为每一个客户端使用一个线程,如果客户端出现延时等异常,线程可能会被占用很长时间。因为数据的准备和读取都在这个线程中。此时,如果客户端数量众多,可能会消耗大量的系统资源。

解决方案:

使用非阻塞的nio (读取数据不等待,数据准备好了再工作)

为了体现nio使用的高效。

这里先模拟一个低效的客户端来模拟因网络而延时的情况:

private static executorservice tp= executors.newcachedthreadpool(); 
  private static final int sleep_time=1000*1000*1000; 
  public static class echoclient implements runnable{ 
   public void run(){   
    try {    
     client = new socket();    
     client.connect(new inetsocketaddress("localhost", 8000)); 
     writer = new printwriter(client.getoutputstream(), true); 
     writer.print("h");    
     locksupport.parknanos(sleep_time);  
     writer.print("e");   
     locksupport.parknanos(sleep_time);  
     writer.print("l");  
     locksupport.parknanos(sleep_time); 
     writer.print("l");  
     locksupport.parknanos(sleep_time); 
     writer.print("o");  
     locksupport.parknanos(sleep_time); 
     writer.print("!");   
     locksupport.parknanos(sleep_time); 
     writer.println();  
     writer.flush(); 
    }catch(exception e)
    {
    }
   }
  }

服务器端输出:

spend:6000ms
spend:6000ms
spend:6000ms
spend:6001ms
spend:6002ms
spend:6002ms
spend:6002ms
spend:6002ms
spend:6003ms
spend:6003ms

因为

while ((inputline = is.readline()) != null)

是阻塞的,所以时间都花在等待中。

如果用nio来处理这个问题会怎么做呢?

nio有一个很大的特点就是:把数据准备好了再通知我

而channel有点类似于流,一个channel可以和文件或者网络socket对应 。

Java 高并发八:NIO和AIO详解

selector是一个选择器,它可以选择某一个channel,然后做些事情。

一个线程可以对应一个selector,而一个selector可以轮询多个channel,而每个channel对应了一个socket。

与上面一个线程对应一个socket相比,使用nio后,一个线程可以轮询多个socket。

当selector调用select()时,会查看是否有客户端准备好了数据。当没有数据被准备好时,select()会阻塞。平时都说nio是非阻塞的,但是如果没有数据被准备好还是会有阻塞现象。

当有数据被准备好时,调用完select()后,会返回一个selectionkey,selectionkey表示在某个selector上的某个channel的数据已经被准备好了。

只有在数据准备好时,这个channel才会被选择。

这样nio实现了一个线程来监控多个客户端。

而刚刚模拟的网络延迟的客户端将不会影响nio下的线程,因为某个socket网络延迟时,数据还未被准备好,selector是不会选择它的,而会选择其他准备好的客户端。

selectnow()与select()的区别在于,selectnow()是不阻塞的,当没有客户端准备好数据时,selectnow()不会阻塞,将返回0,有客户端准备好数据时,selectnow()返回准备好的客户端的个数。

主要代码:

package test;
 
import java.net.inetaddress;
import java.net.inetsocketaddress;
import java.net.socket;
import java.nio.bytebuffer;
import java.nio.channels.selectionkey;
import java.nio.channels.selector;
import java.nio.channels.serversocketchannel;
import java.nio.channels.socketchannel;
import java.nio.channels.spi.abstractselector;
import java.nio.channels.spi.selectorprovider;
import java.util.hashmap;
import java.util.iterator;
import java.util.linkedlist;
import java.util.map;
import java.util.set;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
 
public class multithreadnioechoserver {
 public static map<socket, long> geym_time_stat = new hashmap<socket, long>();
 
 class echoclient {
  private linkedlist<bytebuffer> outq;
 
  echoclient() {
   outq = new linkedlist<bytebuffer>();
  }
 
  public linkedlist<bytebuffer> getoutputqueue() {
   return outq;
  }
 
  public void enqueue(bytebuffer bb) {
   outq.addfirst(bb);
  }
 }
 
 class handlemsg implements runnable {
  selectionkey sk;
  bytebuffer bb;
 
  public handlemsg(selectionkey sk, bytebuffer bb) {
   super();
   this.sk = sk;
   this.bb = bb;
  }
 
  @override
  public void run() {
   // todo auto-generated method stub
   echoclient echoclient = (echoclient) sk.attachment();
   echoclient.enqueue(bb);
   sk.interestops(selectionkey.op_read | selectionkey.op_write);
   selector.wakeup();
  }
 
 }
 
 private selector selector;
 private executorservice tp = executors.newcachedthreadpool();
 
 private void startserver() throws exception {
  selector = selectorprovider.provider().openselector();
  serversocketchannel ssc = serversocketchannel.open();
  ssc.configureblocking(false);
  inetsocketaddress isa = new inetsocketaddress(8000);
  ssc.socket().bind(isa);
  // 注册感兴趣的事件,此处对accpet事件感兴趣
  selectionkey acceptkey = ssc.register(selector, selectionkey.op_accept);
  for (;;) {
   selector.select();
   set readykeys = selector.selectedkeys();
   iterator i = readykeys.iterator();
   long e = 0;
   while (i.hasnext()) {
    selectionkey sk = (selectionkey) i.next();
    i.remove();
    if (sk.isacceptable()) {
     doaccept(sk);
    } else if (sk.isvalid() && sk.isreadable()) {
     if (!geym_time_stat.containskey(((socketchannel) sk
       .channel()).socket())) {
      geym_time_stat.put(
        ((socketchannel) sk.channel()).socket(),
        system.currenttimemillis());
     }
     doread(sk);
    } else if (sk.isvalid() && sk.iswritable()) {
     dowrite(sk);
     e = system.currenttimemillis();
     long b = geym_time_stat.remove(((socketchannel) sk
       .channel()).socket());
     system.out.println("spend:" + (e - b) + "ms");
    }
   }
  }
 }
 
 private void dowrite(selectionkey sk) {
  // todo auto-generated method stub
  socketchannel channel = (socketchannel) sk.channel();
  echoclient echoclient = (echoclient) sk.attachment();
  linkedlist<bytebuffer> outq = echoclient.getoutputqueue();
  bytebuffer bb = outq.getlast();
  try {
   int len = channel.write(bb);
   if (len == -1) {
    disconnect(sk);
    return;
   }
   if (bb.remaining() == 0) {
    outq.removelast();
   }
  } catch (exception e) {
   // todo: handle exception
   disconnect(sk);
  }
  if (outq.size() == 0) {
   sk.interestops(selectionkey.op_read);
  }
 }
 
 private void doread(selectionkey sk) {
  // todo auto-generated method stub
  socketchannel channel = (socketchannel) sk.channel();
  bytebuffer bb = bytebuffer.allocate(8192);
  int len;
  try {
   len = channel.read(bb);
   if (len < 0) {
    disconnect(sk);
    return;
   }
  } catch (exception e) {
   // todo: handle exception
   disconnect(sk);
   return;
  }
  bb.flip();
  tp.execute(new handlemsg(sk, bb));
 }
 
 private void disconnect(selectionkey sk) {
  // todo auto-generated method stub
  //省略略干关闭操作
 }
 
 private void doaccept(selectionkey sk) {
  // todo auto-generated method stub
  serversocketchannel server = (serversocketchannel) sk.channel();
  socketchannel clientchannel;
  try {
   clientchannel = server.accept();
   clientchannel.configureblocking(false);
   selectionkey clientkey = clientchannel.register(selector,
     selectionkey.op_read);
   echoclient echoclinet = new echoclient();
   clientkey.attach(echoclinet);
   inetaddress clientaddress = clientchannel.socket().getinetaddress();
   system.out.println("accepted connection from "
     + clientaddress.gethostaddress());
  } catch (exception e) {
   // todo: handle exception
  }
 }
 
 public static void main(string[] args) {
  // todo auto-generated method stub
  multithreadnioechoserver echoserver = new multithreadnioechoserver();
  try {
   echoserver.startserver();
  } catch (exception e) {
   // todo: handle exception
  }
 
 }
 
}

代码仅作参考,主要的特点是,对不同事件的感兴趣来做不同的事。

当用之前模拟的那个延迟的客户端时,这次的时间消耗就在2ms到11ms之间了。性能提升是很明显的。

总结:

1. nio会将数据准备好后,再交由应用进行处理,数据的读取/写入过程依然在应用线程中完成,只是将等待的时间剥离到单独的线程中去。

2. 节省数据准备时间(因为selector可以复用)

5. aio

aio的特点:

1. 读完了再通知我

2. 不会加快io,只是在读完后进行通知

3. 使用回调函数,进行业务处理

aio的相关代码:

asynchronousserversocketchannel

server = asynchronousserversocketchannel.open().bind( new inetsocketaddress (port));
使用server上的accept方法

public abstract <a> void accept(a attachment,completionhandler<asynchronoussocketchannel,? super a> handler);
completionhandler为回调接口,当有客户端accept之后,就做handler中的事情。

示例代码:

server.accept(null,
    new completionhandler<asynchronoussocketchannel, object>() {
     final bytebuffer buffer = bytebuffer.allocate(1024);
 
     public void completed(asynchronoussocketchannel result,
       object attachment) {
      system.out.println(thread.currentthread().getname());
      future<integer> writeresult = null;
      try {
       buffer.clear();
       result.read(buffer).get(100, timeunit.seconds);
       buffer.flip();
       writeresult = result.write(buffer);
      } catch (interruptedexception | executionexception e) {
       e.printstacktrace();
      } catch (timeoutexception e) {
       e.printstacktrace();
      } finally {
       try {
        server.accept(null, this);
        writeresult.get();
        result.close();
       } catch (exception e) {
        system.out.println(e.tostring());
       }
      }
     }
 
     @override
     public void failed(throwable exc, object attachment) {
      system.out.println("failed: " + exc);
     }
    });

这里使用了future来实现即时返回,关于future请参考上一篇

在理解了nio的基础上,看aio,区别在于aio是等读写过程完成后再去调用回调函数。

nio是同步非阻塞的

aio是异步非阻塞的

由于nio的读写过程依然在应用线程里完成,所以对于那些读写过程时间长的,nio就不太适合。

而aio的读写过程完成后才被通知,所以aio能够胜任那些重量级,读写过程长的任务。