java核心技术-NIO
1、reactor(反应器)模式
使用单线程模拟多线程,提高资源利用率和程序的效率,增加系统吞吐量。下面例子比较形象的说明了什么是反应器模式:
一个老板经营一个饭店,
传统模式 - 来一个客人安排一个服务员招呼,客人很满意;(相当于一个连接一个线程)
后来客人越来越多,需要的服务员越来越多,资源条件不足以再请更多的服务员了,传统模式已经不能满足需求。老板之所以为老板自然有过人之处,老板发现,服务员在为客人服务时,当客人点菜的时候,服务员基本处于等待状态,(阻塞线程,不做事)。
于是乎就让服务员在客人点菜的时候,去为其他客人服务,当客人菜点好后再招呼服务员即可。 --反应器(reactor)模式诞生了
饭店的生意红红火火,几个服务员就足以支撑大量的客流量,老板用有限的资源赚了更多的money~~~~_
通道:类似于流,但是可以异步读写数据(流只能同步读写),通道是双向的,(流是单向的),通道的数据总是要先读到一个buffer 或者 从一个buffer写入,即通道与buffer进行数据交互。
通道类型:
- filechannel:从文件中读写数据。
- datagramchannel:能通过udp读写网络中的数据。
- socketchannel:能通过tcp读写网络中的数据。
- serversocketchannel:可以监听新进来的tcp连接,像web服务器那样。对每一个新进来的连接都会创建一个socketchannel。
- filechannel比较特殊,它可以与通道进行数据交互, 不能切换到非阻塞模式,套接字通道可以切换到非阻塞模式;
缓冲区 - 本质上是一块可以存储数据的内存,被封装成了buffer对象而已!
缓冲区类型:
- bytebuffer
- mappedbytebuffer
- charbuffer
- doublebuffer
- floatbuffer
- intbuffer
- longbuffer
- shortbuffer
常用方法:
- allocate() - 分配一块缓冲区
- put() - 向缓冲区写数据
- get() - 向缓冲区读数据
- filp() - 将缓冲区从写模式切换到读模式
- clear() - 从读模式切换到写模式,不会清空数据,但后续写数据会覆盖原来的数据,即使有部分数据没有读,也会被遗忘;
- compact() - 从读数据切换到写模式,数据不会被清空,会将所有未读的数据copy到缓冲区头部,后续写数据不会覆盖,而是在这些数据之后写数据
- mark() - 对position做出标记,配合reset使用
- reset() - 将position置为标记值
缓冲区的一些属性:
-
capacity - 缓冲区大小,无论是读模式还是写模式,此属性值不会变;
-
position - 写数据时,position表示当前写的位置,每写一个数据,会向下移动一个数据单元,初始为0;最大为capacity - 1,切换到读模式时,position会被置为0,表示当前读的位置
-
limit - 写模式下,limit 相当于capacity 表示最多可以写多少数据,切换到读模式时,limit 等于原先的position,表示最多可以读多少数据。
非直接缓冲区:通过allocate() 方法 分配缓冲区,将缓冲区建立在jvm内存中
直接缓冲区:通过allocatedirect() 方法直接缓冲区 将缓冲区建立在物理内存中
2.1 关于缓冲区各个属性的测试
string str = "abcde"; //1. 分配一个指定大小的缓冲区 bytebuffer buf = bytebuffer.allocate(1024); system.out.println("--------------allocate()----------------"); system.out.println(buf.position());//0 system.out.println(buf.limit());//1024 system.out.println(buf.capacity());//1024 //2. 利用put存入数据到缓冲区中去 buf.put(str.getbytes()); system.out.println("----------------put()-------------------"); system.out.println(buf.position());//5 system.out.println(buf.limit());//1024 system.out.println(buf.capacity());//1024 //3. 切换到读取模式 buf.flip(); system.out.println("----------------flip()------------------"); system.out.println(buf.position());//0 system.out.println(buf.limit());//5 system.out.println(buf.capacity());//1024 //4. 利用get() 读取缓冲区中的数据 byte[] dst = new byte[buf.limit()]; buf.get(dst); system.out.println(new string(dst,0,dst.length)); system.out.println("----------------get()------------------"); system.out.println(buf.position());//5 system.out.println(buf.limit());//5 system.out.println(buf.capacity());//1024 //5.可重复读 buf.rewind(); system.out.println("----------------rewind()------------------"); system.out.println(buf.position());//0 system.out.println(buf.limit());//5 system.out.println(buf.capacity());//1024 //6.clear(): 清空缓冲区, 但是缓冲区的数据依然存在, 但是处于被遗忘的状态 buf.clear(); system.out.println("----------------clear()-------------------"); system.out.println(buf.position());//0 system.out.println(buf.limit());//1024 system.out.println(buf.capacity());//1024 byte[] newbyte = new byte[buf.limit()]; buf.get(newbyte); system.out.println(new string(newbyte,0,newbyte.length));
2.2 关于通道的使用
1.利用通道进行 文件的复制 非直接缓冲区
fileinputstream fis = null; fileoutputstream fos = null; filechannel inchannel = null; filechannel outchannel = null; try { fis = new fileinputstream("1.jpg"); fos = new fileoutputstream("2.jpg"); // ①获取通道 inchannel = fis.getchannel(); outchannel = fos.getchannel(); // ②将通道中的数据存入缓冲区 bytebuffer bytebuffer = bytebuffer.allocate(1024); // 将通道中的数据存入缓冲区 while (inchannel.read(bytebuffer) != -1) { bytebuffer.flip(); // 切换读取数据的模式 outchannel.write(bytebuffer); bytebuffer.clear(); } } catch (ioexception e) { e.printstacktrace(); } finally { if (inchannel != null) { try { inchannel.close(); } catch (ioexception e) { e.printstacktrace(); } } if (outchannel != null) { try { outchannel.close(); } catch (ioexception e) { e.printstacktrace(); } } if (fis != null) { try { fis.close(); } catch (ioexception e) { e.printstacktrace(); } } if (fos != null) { try { fos.close(); } catch (ioexception e) { e.printstacktrace(); } } }
2.通道之间的传输
create_new:如果文件不存在就创建,存在就报错
create:如果文件不存在就创建,存在创建(覆盖)
filechannel inchannel = null; filechannel outchannel = null; try { inchannel = filechannel.open(paths.get("hello.txt"), standardopenoption.read); outchannel = filechannel.open(paths.get("hello2.txt"), standardopenoption.read,standardopenoption.write,standardopenoption.create_new); inchannel.transferto(0, inchannel.size(), outchannel); } catch (exception e) { e.printstacktrace(); } finally { if(inchannel != null){ try { inchannel.close(); } catch (ioexception e) { e.printstacktrace(); } } if(outchannel != null){ try { outchannel.close(); } catch (ioexception e) { e.printstacktrace(); } } }
3. 使用直接缓冲区完成内存文件的复制
filechannel inchannel = null; filechannel outchannel = null; try { inchannel = filechannel.open(paths.get("1.jpg"), standardopenoption.read); outchannel = filechannel.open(paths.get("x.jpg"), standardopenoption.read,standardopenoption.write,standardopenoption.create_new); mappedbytebuffer inmappedbuffer = inchannel.map(mapmode.read_only, 0, inchannel.size()); mappedbytebuffer outmappedbuffer = outchannel.map(mapmode.read_write, 0, inchannel.size()); system.out.println(inmappedbuffer.limit()); byte[] b = new byte[inmappedbuffer.limit()];; inmappedbuffer.get(b); outmappedbuffer.put(b); } catch (exception e) { e.printstacktrace(); } finally { if(inchannel != null){ try { inchannel.close(); } catch (ioexception e) { e.printstacktrace(); } } if(outchannel != null){ try { outchannel.close(); } catch (ioexception e) { e.printstacktrace(); } } }
2.3 重点 nio-非阻塞io
个人认为 nio 最难的两点 一个是对于选择器和选择键的理解 其次是对于网络通信模型的理解
本章内容以防过长 只讲解 nio 的使用方法 上述两点参看下回分解
阻塞io示例:
//客户端 @test public void client() throws ioexception{ socketchannel schannel = socketchannel.open(new inetsocketaddress("127.0.0.1", 9898)); filechannel inchannel = filechannel.open(paths.get("1.jpg"), standardopenoption.read); bytebuffer buf = bytebuffer.allocate(1024); while(inchannel.read(buf) != -1){ buf.flip(); schannel.write(buf); buf.clear(); } schannel.shutdownoutput(); //接收服务端的反馈 int len = 0; while((len = schannel.read(buf)) != -1){ buf.flip(); system.out.println(new string(buf.array(), 0, len)); buf.clear(); } inchannel.close(); schannel.close(); } //服务端 @test public void server() throws ioexception{ serversocketchannel sschannel = serversocketchannel.open(); filechannel outchannel = filechannel.open(paths.get("2.jpg"), standardopenoption.write, standardopenoption.create); sschannel.bind(new inetsocketaddress(9898)); socketchannel schannel = sschannel.accept(); bytebuffer buf = bytebuffer.allocate(1024); while(schannel.read(buf) != -1){ buf.flip(); outchannel.write(buf); buf.clear(); } //发送反馈给客户端 buf.put("服务端接收数据成功".getbytes()); buf.flip(); schannel.write(buf); schannel.close(); outchannel.close(); sschannel.close(); }
非阻塞io示例-tcp:
//客户端 @test public void client() throws ioexception{ //1. 获取通道 socketchannel schannel = socketchannel.open(new inetsocketaddress("127.0.0.1", 9898)); //2. 切换非阻塞模式 schannel.configureblocking(false); //3. 分配指定大小的缓冲区 bytebuffer buf = bytebuffer.allocate(1024); //4. 发送数据给服务端 scanner scan = new scanner(system.in); while(scan.hasnext()){ string str = scan.next(); buf.put((new date().tostring() + "\n" + str).getbytes()); buf.flip(); schannel.write(buf); buf.clear(); } //5. 关闭通道 schannel.close(); } //服务端 @test public void server() throws ioexception{ //1. 获取通道 serversocketchannel sschannel = serversocketchannel.open(); //2. 切换非阻塞模式 sschannel.configureblocking(false); //3. 绑定连接 sschannel.bind(new inetsocketaddress(9898)); //4. 获取选择器 selector selector = selector.open(); //5. 将通道注册到选择器上, 并且指定“监听接收事件” sschannel.register(selector, selectionkey.op_accept); //6. 轮询式的获取选择器上已经“准备就绪”的事件 while(selector.select() > 0){ //7. 获取当前选择器中所有注册的“选择键(已就绪的监听事件)” iterator<selectionkey> it = selector.selectedkeys().iterator(); while(it.hasnext()){ //8. 获取准备“就绪”的是事件 selectionkey sk = it.next(); //9. 判断具体是什么事件准备就绪 if(sk.isacceptable()){ //10. 若“接收就绪”,获取客户端连接 socketchannel schannel = sschannel.accept(); //11. 切换非阻塞模式 schannel.configureblocking(false); //12. 将该通道注册到选择器上 schannel.register(selector, selectionkey.op_read); }else if(sk.isreadable()){ //13. 获取当前选择器上“读就绪”状态的通道 socketchannel schannel = (socketchannel) sk.channel(); //14. 读取数据 bytebuffer buf = bytebuffer.allocate(1024); int len = 0; while((len = schannel.read(buf)) > 0 ){ buf.flip(); system.out.println(new string(buf.array(), 0, len)); buf.clear(); } } //15. 取消选择键 selectionkey it.remove(); } } }
非阻塞io示例-udp:
@test public void send() throws ioexception{ datagramchannel dc = datagramchannel.open(); dc.configureblocking(false); bytebuffer buf = bytebuffer.allocate(1024); scanner scan = new scanner(system.in); while(scan.hasnext()){ string str = scan.next(); buf.put((new date().tostring() + ":\n" + str).getbytes()); buf.flip(); dc.send(buf, new inetsocketaddress("127.0.0.1", 9898)); buf.clear(); } dc.close(); } @test public void receive() throws ioexception{ datagramchannel dc = datagramchannel.open(); dc.configureblocking(false); dc.bind(new inetsocketaddress(9898)); selector selector = selector.open(); dc.register(selector, selectionkey.op_read); while(selector.select() > 0){ iterator<selectionkey> it = selector.selectedkeys().iterator(); while(it.hasnext()){ selectionkey sk = it.next(); if(sk.isreadable()){ bytebuffer buf = bytebuffer.allocate(1024); dc.receive(buf); buf.flip(); system.out.println(new string(buf.array(), 0, buf.limit())); buf.clear(); } } it.remove(); } }