欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java IO 再理解

程序员文章站 2022-05-24 20:29:20
...

上一次写对Java IO的理解,主要是各种查资料进行汇总,觉得自己对IO理解的差不多了。
直到最近参加面试之后,我感觉自己对IO的理解还是有些偏差,于是重新查阅一些资料,有了一些新的见解。

同步与异步

通过上一篇博客Java IO 深入理解的介绍,我们已经知道对于一个IO操作,它会涉及到两个系统对象,一个是调用这个IO的process(or Thread),另一个就是系统内核(Kernel)。当一个read操作发生时,它会经历两个阶段:

  1. 等待数据准备
  2. 将数据从内核拷贝到用户空间

Java IO 再理解

同步和异步的区别就是: 做IO操作时,进程是否没阻塞。只要有一个阶段被阻塞,那就是同步。
所以对于 blocking IO(BIO),nonblocking IO(NIO)以及IO复用都是同步的,因为无论哪一个,在数据准备完成之后,第二阶段数据从内核拷贝到用户空间都是阻塞进程的。
真正的异步是指两个阶段都不会被阻塞,当进程发起IO操作之后,就直接返回再也不关心,直到kernel发送一个信号,告诉进程说IO完成。由于IO具体完成的时间是不确定的,进程不必关心IO的结果,可以理解为异步IO操作直接没有依赖相关性,后续的操作不依赖这个IO,只要最终知道是否执行成功就行了~

这样子也就更容易理解上篇博客中最后的钓鱼例子了:

有A,B,C,D四个人在钓鱼:
同步阻塞:A用的是最老式的鱼竿,所以呢,得一直守着,等到鱼上钩了再拉杆;
同步非阻塞:B的鱼竿有个功能,能够显示是否有鱼上钩,所以呢,B就和旁边的MM聊天,隔会再看看有没有鱼上钩,有的话就迅速拉杆;
IO复用:C用的鱼竿和B差不多,但他想了一个好办法,就是同时放好几根鱼竿,然后守在旁边,一旦有显示说鱼上钩了,它就将对应的鱼竿拉起来;
异步IO: D是个有钱人,干脆雇了一个人帮他钓鱼,一旦那个人把鱼钓上来了,就给D发个短信。

异步:直接把事件交给别人来办,我接下来的操作并不依赖这个事件,只需要别人办完之后发送短信(回调函数)告知具体情况即可!如果你需要获得异步操作的结果,还是得在线程里构造一个监控内存的循环进行异步读取,直到完成。

异步必然会引起系统调用,只有系统调用才能将内核态的数据复制到用户态


Java下AIO的实现:

异步channel API提供了两种方式监控/控制异步操作(connect,accept, read,write等)。
1. 返回java.util.concurrent.Future对象, 检查Future的状态可以得到操作是否完成还是失败,还是进行中, future.get阻塞当前进程*这种就是上面说的,你需要获得结果,那必然会循环获取。
2. 为操作提供一个回调参数java.nio.channels.CompletionHandler,这个回调类包含completed,failed两个方法

channel的每个I/O操作都为这两种方式提供了相应的方法, 你可以根据自己的需要选择合适的方式编程。

//Java AIO例子Server端 accept采用回调函数 而发送数据使用了future的方式
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class Server {
    private static Charset charset = Charset.forName("US-ASCII");
    private static CharsetEncoder encoder = charset.newEncoder();

    public static void main(String[] args) throws Exception {
        AsynchronousChannelGroup group = 
                AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
        AsynchronousServerSocketChannel.open(group).
                bind(new InetSocketAddress("0.0.0.0", 8013));
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            @Override
            public void completed(AsynchronousSocketChannel result, Void attachment) {
                server.accept(null, this); // 接受下一个连接
                try {
                     String now = new Date().toString();
                     ByteBuffer buffer = encoder.encode(CharBuffer.wrap(now + "\r\n"));
                    //result.write(buffer, null, new CompletionHandler<Integer,Void>(){...}); 
                    //callback or
                    Future<Integer> f = result.write(buffer);
                    f.get();
                    System.out.println("sent to client: " + now);
                    result.close();
                } catch (IOException | InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
            }
        });
        group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    }
}
//Client端实现,同样也使用了两种方式, connect使用了future方式,而接收数据使用了回调的方式。
public class Client {
    public static void main(String[] args) throws Exception {
        AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
        Future<Void> future = client.connect(new InetSocketAddress("127.0.0.1", 8013));
        future.get();

        ByteBuffer buffer = ByteBuffer.allocate(100);
        client.read(buffer, null, new CompletionHandler<Integer, Void>() {
            @Override
            public void completed(Integer result, Void attachment) {
                System.out.println("client received: " + new String(buffer.array()));

            }
            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
                try {
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        });

        Thread.sleep(10000);
    }
}

Linux下AIO的实现:

可以参考下面这篇博客,linux下aio异步读写详解与实例
这里就附上其中一些的核心代码。
其中aio_error用来查询当前异步操作的状态,当其状态处于EINPROGRESS则I/O还没完成,当处于ECANCELLED则操作已被取消,发生错误返回-1。
aio_write和aio_read函数类似,当该函数返回成功时,说明该写请求以进行排队(成功0,失败-1)

 //进行异步读操作
 //aio_write和aio_read函数类似,当该函数返回成功时,说明该写请求以进行排队(成功0,失败-1) 
 ret = aio_read(&rd);
 if(ret < 0)
 {
     perror("aio_read");
     exit(1);
 }

 couter = 0;
//  循环等待异步读操作结束
 while(aio_error(&rd) == EINPROGRESS)
 {
     printf("第%d次\n",++couter);
 }
 //获取异步读返回值
 ret = aio_return(&rd);
 //异步写操作
 ret = aio_write(&wr);
 if(ret < 0)
 {
     perror("aio_write");
 }

 //等待异步写完成
 while(aio_error(&wr) == EINPROGRESS)
 {
     printf("hello,world\n");
 }

 //获得异步写的返回值
 ret = aio_return(&wr);
 printf("\n\n\n返回值为:%d\n",ret);
//AIO I/O完成时进行异步通知的代码供理解

#define BUFFER_SIZE 1025

//AIO 操作完成回调函数
void aio_completion_handler(sigval_t sigval)
{
    //用来获取读aiocb结构的指针
    struct aiocb *prd;
    int ret;

    prd = (struct aiocb *)sigval.sival_ptr;

    printf("hello\n");

    //判断请求是否成功
    if(aio_error(prd) == 0)
    {
        //获取返回值
        ret = aio_return(prd);
        printf("读返回值为:%d\n",ret);
    }
}

int main(int argc,char **argv)
{
    int fd,ret;
    struct aiocb rd;

    fd = open("test.txt",O_RDONLY);
    if(fd < 0)
    {
        perror("test.txt");
    }



    //填充aiocb的基本内容
    bzero(&rd,sizeof(rd));

    rd.aio_fildes = fd;
    rd.aio_buf = (char *)malloc(sizeof(BUFFER_SIZE + 1));
    rd.aio_nbytes = BUFFER_SIZE;
    rd.aio_offset = 0;

    //填充aiocb中有关回调通知的结构体sigevent
    rd.aio_sigevent.sigev_notify = SIGEV_THREAD;//使用线程回调通知
    rd.aio_sigevent.sigev_notify_function = aio_completion_handler;//设置回调函数
    rd.aio_sigevent.sigev_notify_attributes = NULL;//使用默认属性
    rd.aio_sigevent.sigev_value.sival_ptr = &rd;//在aiocb控制块中加入自己的引用

    //异步读取文件
    ret = aio_read(&rd);
    if(ret < 0)
    {
        perror("aio_read");
    }

    printf("异步读以开始\n");
    //这里sleep 是为了等待 AIO完成,但是AIO具体的完成时间是不确定的
    sleep(1);
    printf("异步读结束\n");



    return 0;
}

通过分析,我们可以发现AIO好像并没什么卵用,确实如此!!!
但需要注意异步,不等于AIO(asynchronous IO),linux的AIO和java的AIO都是实现异步的一种方式,都是渣~~
具体原因在下一篇博客里分析 高性能网络服务器编程

相关标签: IO Linux epoll