JAVA中的NIO (New IO)
简介
标准的io是基于字节流和字符流进行操作的,而java中的nio是基于channel和buffer进行操作的。
传统io
nio
核心模块
nio主要有三个核心部分:selector、channel、buffer
数据总是从channel读取到buffer或者从buffer写入到channel中。
selector可以监听多个channel的多个事件。
传统的io与channel的区别
1.传统的io是bio的,而channel是nio的。
*当流调用了read()、write()方法后会一直阻塞线程直到数据被读取或写入完毕。
2.传统io流是单向的,而channel是双向的。
channel
filechannel:从文件中进行读取 datagramchannel:可以通过udp协议在网络中进行数据的传输 socketchannel:可以通过tcp协议在网络中进行数据的传输 serversocketchannel:可以作为一个服务器监听连接
channel通用api:
read(buffer):将数据从channel读取到buffer中,读取完毕返回-1。 read(buffer []):将数据从channel读取到多个buffer中,仅当第一个buffer被写满后往第二个buffer中进行写入。 write(buffer):将buffer中的数据写入到channel中。 write(buffer[]):将多个buffer中的数据写入到channel中,仅当第一个buffer中的数据被读取完毕后再从第二个buffer中进行读取。 register(selector,interest):将channel注册到selector中,同时需要向selector传递要监听此channel的事件类型(注册到selector中的channel一定要非阻塞的) configureblocking(boolean):设置channel是否为阻塞。 transferfrom(position,count,channel):将其他channel中的数据传输到当前channel中。 transferto(position,count,channel):将当前channel中的数据传输到其他channel中。
socketchannel api
open()静态方法:创建socketchannel。 connect(new inetsocketaddress(port))方法:连接服务器。 finishconnect()方法:判断是否已经与服务器建立连接。
serversocketchannel api
open()静态方法:创建serversocketchannel。 accept()方法:该方法会一直阻塞线程直到有新连接到达。
阻塞式与非阻塞式channel
正常情况下channel都是阻塞的,只有当调用了configureblocking(false)方法时channel才为非阻塞。
阻塞式channel的connect()、accept()、read()、write()方法都会阻塞线程,直到处理完毕。
非阻塞式channel的connect()、accept()、read()、write()方法都是异步的。
*当调用了非阻塞式channel的connect()方法后,需要使用finishconnect()方法判断是否已经与服务器建立连接。
*当调用了非阻塞式channel的accept()方法后,需要根据方法的返回值是否为null判断是否接收到新的连接。
*当调用了非阻塞式channel的read()方法后,需要根据方法的返回值是否大于0判断是否有读取到数据。
*在使用非阻塞式channel的write()方法时,需要借助while循环与hasremaining()方法保证buffer中的内容被全部写入。
*filechannel一定是阻塞的。
示例
public void testfilechannel() throws ioexception { randomaccessfile randomaccessfile = new randomaccessfile(new file("f:\\笔记\\nginx.txt"), "rw"); filechannel filechannel = randomaccessfile.getchannel(); bytebuffer bytebuffer = bytebuffer.allocate(64); int count = filechannel.read(bytebuffer); while (count != -1) { bytebuffer.flip(); system.out.println(new string(arrays.copyofrange(bytebuffer.array(),0,bytebuffer.limit()),charset.forname("utf-8"))); bytebuffer.clear(); count = filechannel.read(bytebuffer); } }
buffer
buffer是一块可以进行读写操作的内存(顺序存储结构)
bytebuffer:基于byte类型进行存储 charbuffer:基于char类型进行存储 doublebuffer:基于double类型进行存储 floatbuffer:基于float类型进行存储 intbuffer:基于int类型进行存储 longbuffer:基于long类型进行存储 shortbuffer:基于short类型进行存储
buffer的内部结构
1.capacity:表示buffer的容量
2.position:表示当前的位置(从0开始,最大值为capacity-1)
3.limit:在写模式中表示可以写入的个数(与capacity一样),在读模式中表示可以读取的个数。
从写模式转换成读模式
limit设置为position+1,position设置为0。
从读模式转换成写模式
limit设置为capacity,position设置为0。
往buffer中写数据
1.将数据从channel读取到buffer中。
2.使用buffer的put()方法。
从buffer中读数据
1.将buffer中的数据写入到channel中。
2.使用buffer的get()方法
buffer通用api:
allocate(size)静态静态:初始化一个buffer。 flip():将buffer从写模式转换成读模式。 array():将buffer中的内容转换成数组(不受limit控制) get():获取buffer中的内容。 hasremaining():判断buffer中是否还有未读的元素(limit - (postion+1) ) rewind():将positon设置为0。 clear():将limit设置为capacity,position设置为0。 compact():将所有未读的元素移动到buffer的起始处,position指向最后一个未读的元素的下一位,limit设置为capacity。 *clear()和compact()方法都可以理解成将buffer从读模式转换成写模式,区别在于compact()方法会保留未读取的元素。 mark():在当前position处打一个标记。 reset():将position恢复到标记处。
selector
selector用于监听多个channel的多个事件(单线程)
channel的事件类型
1.连接就绪:当socketchannel、datagramchannel成功与服务器建立连接时将会触发连接就绪事件。
2.接收就绪:当有连接到达服务器时将会触发接收就绪事件。
3.读就绪:当socketchannel、datagramchannel有数据可读时将会触发读就绪事件。
4.写就绪:当socketchannel、datagramchannel可以进行数据写入时将会触发写就绪事件。
selectionkey
selectionkey用于存储selector与channel之间的相关信息。
selectionkey中提供了四个常量分别代表channel的事件类型。
selectionkey.op_connect selectionkey.op_accept selectionkey.op_read selectionkey.op_write
selectablechannel提供的register(selector,interest)方法用于将channel注册到selector中,同时需要向selector传递要监听此channel的事件类型,当要监听的事件类型不止一个时可以使用或运算,当将channel注册到selector后会返回selectionkey实例,用于存储selector与此channel之间的相关信息。
selectionkey api:
interestops()方法:返回selector监听此channel的事件类型。 readyops()方法:返回此channel目前就绪的事件。 isacceptable():判断channel是否接收就绪。 isconnectable():判断channel是否连接就绪。 isreadable():判断channel是否读就绪。 iswriteable():判断channel是否写就绪。 channel():返回具体的channel实例。 selector():返回selector实例。 attach():往selectionkey中添加一个附加对象。 attachment():返回保存在selectionkey中的附加对象。
selector api:
open()静态方法:创建一个selector。 select()方法:该方法会一直阻塞线程直到所监听的channel有事件就绪,返回就绪的channel个数(只会返回新就绪的channel个数) selectedkeys()方法:返回就绪的channel对应的selectionkey。 *当channel就绪的事件处理完毕后,需要手动删除selectionkey集合中该channel对应的selectionkey,当该channel再次有事件就绪时会自动加入到selectionkey集合中。
非阻塞式channel与selector
非阻塞式channel一般与selector配合使用
当selector监听到serversocketchannel接收就绪时,那么此时可以立即调用serversocketchannel的accept()方法获取新连接。
当selector监听到socketchannel读就绪时,那么此时可以立即调用socketchannel的read()方法进行数据的读取。
非阻塞式服务器
/** * @author: zhuang haotang * @date: 2019/10/26 16:35 * @description: */ public class server { public void start() throws ioexception { selector selector = selector.open(); serversocketchannel serversocketchannel = createnioserversocketchannel(); system.out.println("start nio server and bind port 8888"); serversocketchannel.register(selector, selectionkey.op_accept); int ready = selector.select(); while (ready > 0) { system.out.println("ready channel count " + ready); set<selectionkey> selectionkeyset = selector.selectedkeys(); for (iterator<selectionkey> iterator = selectionkeyset.iterator(); iterator.hasnext(); ) { selectionkey selectionkey = iterator.next(); if (selectionkey.isacceptable()) { system.out.println("acceptable"); accepthandler(selectionkey); } else if (selectionkey.isreadable()) { system.out.println("readable"); readhandler(selectionkey); } iterator.remove(); } ready = selector.select(); } } private serversocketchannel createnioserversocketchannel() throws ioexception { serversocketchannel serversocketchannel = serversocketchannel.open(); serversocketchannel.bind(new inetsocketaddress(inetaddress.getlocalhost(), 8888)); serversocketchannel.configureblocking(false); return serversocketchannel; } private void accepthandler(selectionkey selectionkey) throws ioexception { selector selector = selectionkey.selector(); serversocketchannel serversocketchannel = (serversocketchannel) selectionkey.channel(); socketchannel socketchannel = serversocketchannel.accept(); socketchannel.configureblocking(false); socketchannel.register(selector, selectionkey.op_read); system.out.println("accept client connection " + socketchannel.getlocaladdress()); } private void readhandler(selectionkey selectionkey) throws ioexception { socketchannel socketchannel = (socketchannel) selectionkey.channel(); bytebuffer bytebuffer = bytebuffer.allocate(100); int num = socketchannel.read(bytebuffer); if(num == -1){ // 连接已断开 system.out.println("client "+socketchannel.getlocaladdress() + " disconnection"); socketchannel.close(); return; } bytebuffer.flip(); while (bytebuffer.hasremaining()) { byte b = bytebuffer.get(); system.out.println((char) b); } } public static void main(string[] args) throws ioexception { server server = new server(); server.start(); } }
*一个channel不会同时有多个事件就绪,以事件为单位。
*当客户端断开连接,那么将会触发读就绪,并且channel的read()方法返回-1,表示连接已断开,服务器应该要做出处理,关闭这个连接。
客户端
/** * @auther: zhuang haotang * @date: 2019/10/26 16:36 * @description: */ public class client { public static void main(string[] args) throws ioexception, interruptedexception { socketchannel socketchannel = socketchannel.open(); socketchannel.connect(new inetsocketaddress(inetaddress.getlocalhost(),8888)); string message = "today is sunday"; bytebuffer bytebuffer = bytebuffer.allocate(message.getbytes().length); bytebuffer.put(message.getbytes()); bytebuffer.flip(); socketchannel.write(bytebuffer); thread.sleep(5000); } }
运行结果
reactor模式
reactor有三种模式
1.reactor单线程模式 2.reactor多线程模式 3.主从reactor多线程模式
*reactor模式是在nio下实现的。
reactor单线程模式
1.单线程的事件分化器,同时这个线程需要处理接收、读、写就绪事件。
/** * @author: zhuang haotang * @date: 2019/10/26 16:35 * @description: */ public class reactorsinglethreadserver { private void start() throws ioexception { selector selector = selector.open(); serversocketchannel serversocketchannel = createnioserversocketchannel(); system.out.println("start nio server and bind port 8888"); serversocketchannel.register(selector, selectionkey.op_accept); int ready = selector.select(); while (ready > 0) { system.out.println("ready channel count " + ready); set<selectionkey> selectionkeyset = selector.selectedkeys(); for (iterator<selectionkey> iterator = selectionkeyset.iterator(); iterator.hasnext(); ) { selectionkey selectionkey = iterator.next(); if (selectionkey.isacceptable()) { system.out.println("acceptable"); accepthandler(selectionkey); } else if (selectionkey.isreadable()) { system.out.println("readable"); readhandler(selectionkey); } iterator.remove(); } ready = selector.select(); } } private serversocketchannel createnioserversocketchannel() throws ioexception { serversocketchannel serversocketchannel = serversocketchannel.open(); serversocketchannel.bind(new inetsocketaddress(inetaddress.getlocalhost(), 8888)); serversocketchannel.configureblocking(false); return serversocketchannel; } private void accepthandler(selectionkey selectionkey) throws ioexception { selector selector = selectionkey.selector(); serversocketchannel serversocketchannel = (serversocketchannel) selectionkey.channel(); socketchannel socketchannel = serversocketchannel.accept(); socketchannel.configureblocking(false); socketchannel.register(selector, selectionkey.op_read); system.out.println("accept client connection " + socketchannel.getlocaladdress()); } private void readhandler(selectionkey selectionkey) throws ioexception { socketchannel socketchannel = (socketchannel) selectionkey.channel(); bytebuffer bytebuffer = bytebuffer.allocate(100); int num = socketchannel.read(bytebuffer); if (num == -1) { system.out.println("client " + socketchannel.getlocaladdress() + " disconnection"); socketchannel.close(); return; } bytebuffer.flip(); while (bytebuffer.hasremaining()) { byte b = bytebuffer.get(); system.out.println((char) b); } } public static void main(string[] args) throws ioexception { reactorsinglethreadserver server = new reactorsinglethreadserver(); server.start(); } }
reactor多线程模式
1.单线程的事件分发器。
2.具体事件类型的handler线程池。
3.业务线程池。
/** * @author: zhuang haotang * @date: 2019-10-28 17:00 * @description: */ public class reactormultithreadserver { private threadpoolexecutor eventhandlerpool = new threadpoolexecutor(10, 50, 2, timeunit.minutes, new arrayblockingqueue<runnable>(200), new threadpoolexecutor.callerrunspolicy()); private void start() throws ioexception { selector selector = selector.open(); serversocketchannel serversocketchannel = createnioserversocketchannel(); system.out.println("start nio server and bind port 8888"); serversocketchannel.register(selector, selectionkey.op_accept); selector.select(); for (;;) { set<selectionkey> selectionkeyset = selector.selectedkeys(); for (iterator<selectionkey> iterator = selectionkeyset.iterator(); iterator.hasnext(); ) { final selectionkey selectionkey = iterator.next(); if (selectionkey.isacceptable()) { system.out.println("acceptable"); eventhandlerpool.submit(new runnable() { @override public void run() { try { accepthandler(selectionkey); } catch (ioexception e) { e.printstacktrace(); } } }); } else if (selectionkey.isreadable()) { system.out.println("readable"); eventhandlerpool.submit(new runnable() { @override public void run() { readhandler(selectionkey); } }); } iterator.remove(); } // thread.sleep(10); // 没找到好方案,留一些时间给register selector.select(); } } private serversocketchannel createnioserversocketchannel() throws ioexception { serversocketchannel serversocketchannel = serversocketchannel.open(); serversocketchannel.bind(new inetsocketaddress(inetaddress.getlocalhost(), 8888)); serversocketchannel.configureblocking(false); return serversocketchannel; } private void accepthandler(selectionkey selectionkey) throws ioexception { selector selector = selectionkey.selector(); serversocketchannel serversocketchannel = (serversocketchannel) selectionkey.channel(); socketchannel socketchannel = serversocketchannel.accept(); if (socketchannel != null) { socketchannel.configureblocking(false); selector.wakeup(); // 往selector注册channel时,selector要处于非阻塞状态 socketchannel.register(selector, selectionkey.op_read); system.out.println("accept client connection " + socketchannel.getlocaladdress()); } } private void readhandler(selectionkey selectionkey) { socketchannel socketchannel = (socketchannel) selectionkey.channel(); bytebuffer bytebuffer = bytebuffer.allocate(100); try { int num = socketchannel.read(bytebuffer); if (num == -1) { system.out.println("client " + socketchannel.getlocaladdress() + " disconnection"); socketchannel.close(); // 底层有些逻辑 return; } bytebuffer.flip(); while (bytebuffer.hasremaining()) { byte b = bytebuffer.get(); system.out.println((char) b); } } catch (exception e) { system.out.println("由于连接关闭导致并发线程读取异常"); } } public static void main(string[] args) throws ioexception { reactormultithreadserver reactorserver = new reactormultithreadserver(); reactorserver.start(); } }
主从reactor多线程模式
1.使用两个单线程的事件分发器。
第一个事件分发器只负责监听serversocketchannel的接收就绪事件,同时serversocketchannel接收到的连接要注册到第二个事件分发器中。 第二个事件分发器只负责监听socketchannel的读、写就绪事件。
2.具体事件类型的handler线程池。
3.业务线程池。
/** * @author: zhuang haotang * @date: 2019-10-28 17:00 * @description: */ public class mainsubreactormultithreadserver { private threadpoolexecutor eventhandlerpool = new threadpoolexecutor(10, 50, 2, timeunit.minutes, new arrayblockingqueue<runnable>(200), new threadpoolexecutor.callerrunspolicy()); private void start() throws ioexception { final selector mainselector = selector.open(); final selector subselector = selector.open(); new thread(new runnable() { @override public void run() { try { startmainselector(mainselector, subselector); } catch (ioexception e) { e.printstacktrace(); } } }).start(); new thread(new runnable() { @override public void run() { try { startsubselector(subselector); } catch (ioexception e) { e.printstacktrace(); } } }).start(); } /** * 第一个事件分发器,用于监听serversocketchannel的接收就绪事件 */ private void startmainselector(selector mainselector, final selector subselector) throws ioexception { serversocketchannel serversocketchannel = createnioserversocketchannel(); system.out.println("start nio server and bind port 8888"); serversocketchannel.register(mainselector, selectionkey.op_accept); mainselector.select(); for (; ; ) { final set<selectionkey> selectionkeyset = mainselector.selectedkeys(); final selectionkey selectionkey = iterables.getonlyelement(selectionkeyset); if (selectionkey.isacceptable()) { system.out.println("acceptable"); eventhandlerpool.submit(new runnable() { @override public void run() { try { accepthandler(selectionkey, subselector); } catch (ioexception e) { e.printstacktrace(); } } }); selectionkeyset.clear(); } mainselector.select(); } } /** * 第二个事件分发器,用于监听sockchannel的读写就绪事件 */ private void startsubselector(selector subselector) throws ioexception { subselector.select(); for (; ; ) { set<selectionkey> selectionkeyset = subselector.selectedkeys(); for (iterator<selectionkey> iterator = selectionkeyset.iterator(); iterator.hasnext(); ) { final selectionkey selectionkey = iterator.next(); if (selectionkey.isreadable()) { system.out.println("readable"); eventhandlerpool.submit(new runnable() { @override public void run() { readhandler(selectionkey); } }); iterator.remove(); } } // thread.sleep(10); // 没找到好方案,留一些时间给register subselector.select(); } } private serversocketchannel createnioserversocketchannel() throws ioexception { serversocketchannel serversocketchannel = serversocketchannel.open(); serversocketchannel.bind(new inetsocketaddress(inetaddress.getlocalhost(), 8888)); serversocketchannel.configureblocking(false); return serversocketchannel; } private void accepthandler(selectionkey selectionkey, selector subselector) throws ioexception { serversocketchannel serversocketchannel = (serversocketchannel) selectionkey.channel(); socketchannel socketchannel = serversocketchannel.accept(); if (socketchannel != null) { socketchannel.configureblocking(false); subselector.wakeup(); // 往selector注册channel时,selector要处于非阻塞状态 socketchannel.register(subselector, selectionkey.op_read); system.out.println("accept client connection " + socketchannel.getlocaladdress() + " and register to subselector"); } } private void readhandler(selectionkey selectionkey) { socketchannel socketchannel = (socketchannel) selectionkey.channel(); bytebuffer bytebuffer = bytebuffer.allocate(100); try { int num = socketchannel.read(bytebuffer); if (num == -1) { system.out.println("client " + socketchannel.getlocaladdress() + " disconnection"); socketchannel.close(); // 底层有些逻辑 return; } bytebuffer.flip(); while (bytebuffer.hasremaining()) { byte b = bytebuffer.get(); system.out.println((char) b); } } catch (exception e) { system.out.println("由于连接关闭导致并发线程读取异常"); } } public static void main(string[] args) throws ioexception { mainsubreactormultithreadserver reactorserver = new mainsubreactormultithreadserver(); reactorserver.start(); } }
通用客户端
/** * @author: zhuang haotang * @date: 2019/10/26 16:36 * @description: */ public class client { public static void main(string[] args) throws ioexception, interruptedexception { socketchannel socketchannel = socketchannel.open(); socketchannel.connect(new inetsocketaddress(inetaddress.getlocalhost(), 8888)); string message = "today is sunday"; bytebuffer bytebuffer = bytebuffer.allocate(message.getbytes().length); bytebuffer.put(message.getbytes()); bytebuffer.flip(); socketchannel.write(bytebuffer); thread.sleep(5000); bytebuffer bytebuffer1 = bytebuffer.allocate("wo".getbytes().length).put("wo".getbytes()); bytebuffer1.flip(); socketchannel.write(bytebuffer1); bytebuffer receivebuffer = bytebuffer.allocate(1024); while (true) { socketchannel.read(receivebuffer); receivebuffer.flip(); while (receivebuffer.hasremaining()) { system.out.println((char)receivebuffer.get()); } receivebuffer.clear(); } } }
*主线程不需要等待具体事件类型的handler处理完毕,直接异步返回,那么将会导致事件重复就绪,程序做出相应的控制即可。
*当有连接到达服务器时,将会触发接收就绪事件,那么主线程将会不停的向线程池中提交任务,直到某个线程接收了连接,此时将会停止接收就绪,其他线程接收到的连接为null。
*当channel有数据可读时,将会触发读就绪,那么主线程将会不停的向线程池提交任务,直到某个线程读取完毕,此时将会停止读就绪,其他线程读取到的个数为0。
*当客户端断开连接时,将会触发读就绪,那么主线程将会不停的向线程池提交任务,直到某个线程关闭连接,此时将会停止读就绪
一般不会直接去使用java nio,只是通过java nio学习他的设计思想,如果要想搭建nio服务器那么应该使用netty等nio框架。
关于bio和nio的选择
bio即同步并阻塞,线程会进入阻塞状态,如果并发连接数只有几百,那么创建几百个线程去处理是没有任何问题的,这种方式更加简单高效。
但是如果并发连接数达到几万,那么显然创建几万个线程去处理是不可行的,系统承受不了这个负荷,此时应该使用nio,即同步非阻塞,利用更少的线程去做更多的事情。
java nio就是使用nio(同步非阻塞),使用io多路复用的select模型。
*不管客户端有多少个并发连接和请求,服务端总是可以利用更少的线程去处理(单线程事件分发器 和 具体事件类型的handler线程池)