Java IO 再理解
上一次写对Java IO的理解,主要是各种查资料进行汇总,觉得自己对IO理解的差不多了。
直到最近参加面试之后,我感觉自己对IO的理解还是有些偏差,于是重新查阅一些资料,有了一些新的见解。
同步与异步
通过上一篇博客Java IO 深入理解的介绍,我们已经知道对于一个IO操作,它会涉及到两个系统对象,一个是调用这个IO的process(or Thread),另一个就是系统内核(Kernel)。当一个read操作发生时,它会经历两个阶段:
- 等待数据准备
- 将数据从内核拷贝到用户空间
同步和异步的区别就是: 做IO操作时,进程是否没阻塞。只要有一个阶段被阻塞,那就是同步。
所以对于 blocking IO(BIO),nonblocking IO(NIO)以及IO复用都是同步的,因为无论哪一个,在数据准备完成之后,第二阶段数据从内核拷贝到用户空间都是阻塞进程的。
真正的异步是指两个阶段都不会被阻塞,当进程发起IO操作之后,就直接返回再也不关心,直到kernel发送一个信号,告诉进程说IO完成。由于IO具体完成的时间是不确定的,进程不必关心IO的结果,可以理解为异步IO操作直接没有依赖相关性,后续的操作不依赖这个IO,只要最终知道是否执行成功就行了~
这样子也就更容易理解上篇博客中最后的钓鱼例子了:
有A,B,C,D四个人在钓鱼:
同步阻塞:A用的是最老式的鱼竿,所以呢,得一直守着,等到鱼上钩了再拉杆;
同步非阻塞:B的鱼竿有个功能,能够显示是否有鱼上钩,所以呢,B就和旁边的MM聊天,隔会再看看有没有鱼上钩,有的话就迅速拉杆;
IO复用:C用的鱼竿和B差不多,但他想了一个好办法,就是同时放好几根鱼竿,然后守在旁边,一旦有显示说鱼上钩了,它就将对应的鱼竿拉起来;
异步IO: D是个有钱人,干脆雇了一个人帮他钓鱼,一旦那个人把鱼钓上来了,就给D发个短信。
异步:直接把事件交给别人来办,我接下来的操作并不依赖这个事件,只需要别人办完之后发送短信(回调函数)告知具体情况即可!如果你需要获得异步操作的结果,还是得在线程里构造一个监控内存的循环进行异步读取,直到完成。
异步必然会引起系统调用,只有系统调用才能将内核态的数据复制到用户态
Java下AIO的实现:
异步channel API提供了两种方式监控/控制异步操作(connect,accept, read,write等)。
1. 返回java.util.concurrent.Future对象, 检查Future的状态可以得到操作是否完成还是失败,还是进行中, future.get阻塞当前进程。*这种就是上面说的,你需要获得结果,那必然会循环获取。
2. 为操作提供一个回调参数java.nio.channels.CompletionHandler,这个回调类包含completed,failed两个方法
channel的每个I/O操作都为这两种方式提供了相应的方法, 你可以根据自己的需要选择合适的方式编程。
//Java AIO例子Server端 accept采用回调函数 而发送数据使用了future的方式
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class Server {
private static Charset charset = Charset.forName("US-ASCII");
private static CharsetEncoder encoder = charset.newEncoder();
public static void main(String[] args) throws Exception {
AsynchronousChannelGroup group =
AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
AsynchronousServerSocketChannel.open(group).
bind(new InetSocketAddress("0.0.0.0", 8013));
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel result, Void attachment) {
server.accept(null, this); // 接受下一个连接
try {
String now = new Date().toString();
ByteBuffer buffer = encoder.encode(CharBuffer.wrap(now + "\r\n"));
//result.write(buffer, null, new CompletionHandler<Integer,Void>(){...});
//callback or
Future<Integer> f = result.write(buffer);
f.get();
System.out.println("sent to client: " + now);
result.close();
} catch (IOException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
}
});
group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}
//Client端实现,同样也使用了两种方式, connect使用了future方式,而接收数据使用了回调的方式。
public class Client {
public static void main(String[] args) throws Exception {
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
Future<Void> future = client.connect(new InetSocketAddress("127.0.0.1", 8013));
future.get();
ByteBuffer buffer = ByteBuffer.allocate(100);
client.read(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
System.out.println("client received: " + new String(buffer.array()));
}
@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
Thread.sleep(10000);
}
}
Linux下AIO的实现:
可以参考下面这篇博客,linux下aio异步读写详解与实例。
这里就附上其中一些的核心代码。
其中aio_error用来查询当前异步操作的状态,当其状态处于EINPROGRESS则I/O还没完成,当处于ECANCELLED则操作已被取消,发生错误返回-1。
aio_write和aio_read函数类似,当该函数返回成功时,说明该写请求以进行排队(成功0,失败-1)
//进行异步读操作
//aio_write和aio_read函数类似,当该函数返回成功时,说明该写请求以进行排队(成功0,失败-1)
ret = aio_read(&rd);
if(ret < 0)
{
perror("aio_read");
exit(1);
}
couter = 0;
// 循环等待异步读操作结束
while(aio_error(&rd) == EINPROGRESS)
{
printf("第%d次\n",++couter);
}
//获取异步读返回值
ret = aio_return(&rd);
//异步写操作
ret = aio_write(&wr);
if(ret < 0)
{
perror("aio_write");
}
//等待异步写完成
while(aio_error(&wr) == EINPROGRESS)
{
printf("hello,world\n");
}
//获得异步写的返回值
ret = aio_return(&wr);
printf("\n\n\n返回值为:%d\n",ret);
//AIO I/O完成时进行异步通知的代码供理解
#define BUFFER_SIZE 1025
//AIO 操作完成回调函数
void aio_completion_handler(sigval_t sigval)
{
//用来获取读aiocb结构的指针
struct aiocb *prd;
int ret;
prd = (struct aiocb *)sigval.sival_ptr;
printf("hello\n");
//判断请求是否成功
if(aio_error(prd) == 0)
{
//获取返回值
ret = aio_return(prd);
printf("读返回值为:%d\n",ret);
}
}
int main(int argc,char **argv)
{
int fd,ret;
struct aiocb rd;
fd = open("test.txt",O_RDONLY);
if(fd < 0)
{
perror("test.txt");
}
//填充aiocb的基本内容
bzero(&rd,sizeof(rd));
rd.aio_fildes = fd;
rd.aio_buf = (char *)malloc(sizeof(BUFFER_SIZE + 1));
rd.aio_nbytes = BUFFER_SIZE;
rd.aio_offset = 0;
//填充aiocb中有关回调通知的结构体sigevent
rd.aio_sigevent.sigev_notify = SIGEV_THREAD;//使用线程回调通知
rd.aio_sigevent.sigev_notify_function = aio_completion_handler;//设置回调函数
rd.aio_sigevent.sigev_notify_attributes = NULL;//使用默认属性
rd.aio_sigevent.sigev_value.sival_ptr = &rd;//在aiocb控制块中加入自己的引用
//异步读取文件
ret = aio_read(&rd);
if(ret < 0)
{
perror("aio_read");
}
printf("异步读以开始\n");
//这里sleep 是为了等待 AIO完成,但是AIO具体的完成时间是不确定的
sleep(1);
printf("异步读结束\n");
return 0;
}
通过分析,我们可以发现AIO好像并没什么卵用,确实如此!!!
但需要注意异步,不等于AIO(asynchronous IO),linux的AIO和java的AIO都是实现异步的一种方式,都是渣~~
具体原因在下一篇博客里分析 高性能网络服务器编程