tcp nio basics
程序员文章站
2022-07-07 10:18:27
...
nio(non-blocking io)是由操作系统实际执行阻塞的网络io:应用将要发送的数据写到某个缓冲区,由操作系统实际发送出去;操作系统接收到数据后放到某个缓冲区,供应用直接读取。由于阻塞操作都交给了操作系统,所以应用通常不会阻塞。nio不同于asynchronous io,后者是将阻塞的io操作交由应用内的某线程池去执行。
nio编程围绕着java.nio.channels.Selector:有连接过来了、连上了对方、有数据可读、有数据可写 - 所有期待被通知这四种事件的Socket/ServerSocket都注册到Selector,当事件发生时应用会知道并做相应处理。以下是一个简单例子NServer(接收input并响应hello input)。
首先创建一个Selector:
创建一个Server socket在8888端口监听:
在注册到selector之前要设置server socket为non-blocking的。注册时指定此server socket对OP_ACCEPT事件感兴趣。
以下是一个常见的无限循环,每次循环调用selector.select()方法得到所有感兴趣的事件发生了的Socket/ServerSocket,然后逐一处理。下面是示意结构代码(具体代码参考附件):
上面注意每次处理完后要从iteration里移除当前key(不移除会一直在)。
SelectionKey代表在Selector上的注册,SelectionKey.channel()方法返回注册的Socket/ServerSocket。OP_ACCEPT事件处理如下:
OP_ACCEPT事件一定发生在前面在8888端口监听的ServerSocket上,调用它的accept()方法直接获取到(已经由操作系统建立好的)连接socket。我们这里演示socket接收到input会响应hello input,所以马上把该socket注册到selector 并指定对OP_READ感兴趣。
SelectionKey允许attach任意对象。由于程序中会有很多socket并且每个socket发送的数据都不同,所以定制了一个特定的Attach类用来存储socket的名称和将发送的数据:
当socket的客户端发来数据时,操作系统会接收到然后程序会进入OP_READ分支:
因为是isReadable,读数据是不会阻塞的,但注意SocketChannel.read()方法可能返回-1或者抛出异常。经实验:
a)直接强关telnet客户端 会返回-1,无异常。
b)直接强关NClient(例子程序,见后) 会异常“远程主机强迫关闭了一个现有的连接”
所以凡是io读、写 都可能发生IOException,程序需要自行处理。
当-1代表此socket已不可读(每个socket都有互相独立的InputStream、OutputStream 2个流),程序取消对OP_READ的兴趣(需要关闭socket?)。如果读到实际的内容input(排除"close"和"exit")要准备响应hello input。由于是nio 需要等到操作系统提示可写时才能写,所以程序将待写内容存入Attach 并增加对OP_WRITE的兴趣。另,len=0这个分支在测试中没有遇到过。
下一步当提示可写时,程序从Attach取出数据、写出、然后取消OP_WRITE兴趣:
注意为了简化,示例程序调用SocketChannel.write()并没有检查内容是否全部写出。按照API说明,write方法甚至可能返回0 - 代表一个字节也没写出(到操作系统的某个缓存区)。
如上是一个简单的nio Server例子。运行该程序,然后启动一个telnet客户端“telnet localhost 8888”,输入"abc",看到程序输出:
输入"close"关闭当前连接,程序输出:
再次运行telnet,输入"def":
再运行一个telnet,输入"xyz":
输入"exit"结束。
测试客户端NClient
同样,建立一个唯一的Selector,然后注册任意多个socket,指定对OP_CONNECT和OP_READ感兴趣:
nio的客户端当到远端的连接就绪时:
通过SocketChannel.finishConnect()完成客户端的连接。连上后客户端准备主动发数据:
当操作系统提示可写时把数据写出,并取消OP_WRITE兴趣:
同样注意为了简化这里没有检查SocketChannel.write()的返回值。
当NServer 收到input、响应数据过来时,发送下一条数据:
留意一下if isWritable和else if isReadable是分支处理,但nio 实际上(估计)应该存在同时可写、可读的情况。
将socket数量改到1000,运行NServer、NClient,CPU迅速升到90%以上(NClient里每次循环sleep了10毫秒)。通过telnet 输入"exit"结束。简单实验可以看到NServer 一根线程可以处理1000个并发连接的连续读写,nio相对传统blocking io的优势就是在对线程的节省上。
(测试环境:Windows10+Java8、CentOS7+Java8)
nio编程围绕着java.nio.channels.Selector:有连接过来了、连上了对方、有数据可读、有数据可写 - 所有期待被通知这四种事件的Socket/ServerSocket都注册到Selector,当事件发生时应用会知道并做相应处理。以下是一个简单例子NServer(接收input并响应hello input)。
首先创建一个Selector:
Selector selector = Selector.open();
创建一个Server socket在8888端口监听:
ServerSocketChannel serverCh = ServerSocketChannel.open(); serverCh.bind(new InetSocketAddress(8888)); serverCh.configureBlocking(false); serverCh.register(selector, SelectionKey.OP_ACCEPT);
在注册到selector之前要设置server socket为non-blocking的。注册时指定此server socket对OP_ACCEPT事件感兴趣。
以下是一个常见的无限循环,每次循环调用selector.select()方法得到所有感兴趣的事件发生了的Socket/ServerSocket,然后逐一处理。下面是示意结构代码(具体代码参考附件):
for(;;) { selector.select(); Iterator<SelectionKey> ite = selector.selectedKeys().iterator(); for (; ite.hasNext() ;) { SelectionKey key = ite.next(); if(key.isAcceptable()) { //有连接过来了 ite.remove(); } else if(key.isReadable()) { //有数据可读 ite.remove(); } else if(key.isWritable()) { //有数据可写 ite.remove(); } } }
上面注意每次处理完后要从iteration里移除当前key(不移除会一直在)。
SelectionKey代表在Selector上的注册,SelectionKey.channel()方法返回注册的Socket/ServerSocket。OP_ACCEPT事件处理如下:
if(key.isAcceptable()) { SocketChannel ch = ((ServerSocketChannel) key.channel()).accept(); ch.configureBlocking(false); SelectionKey key2 = ch.register(selector, SelectionKey.OP_READ); key2.attach(new Attach("Ch-" + ++chCount)); ite.remove(); }
OP_ACCEPT事件一定发生在前面在8888端口监听的ServerSocket上,调用它的accept()方法直接获取到(已经由操作系统建立好的)连接socket。我们这里演示socket接收到input会响应hello input,所以马上把该socket注册到selector 并指定对OP_READ感兴趣。
SelectionKey允许attach任意对象。由于程序中会有很多socket并且每个socket发送的数据都不同,所以定制了一个特定的Attach类用来存储socket的名称和将发送的数据:
static class Attach { String name; String output; Attach(String name) { this.name = name; } static String nameOf(SelectionKey key) { Attach a = (Attach) key.attachment(); return a.name; } static void output(SelectionKey key, String output) { Attach a = (Attach) key.attachment(); a.output = output; } static String outputOf(SelectionKey key) { Attach a = (Attach) key.attachment(); return a.output; } }
当socket的客户端发来数据时,操作系统会接收到然后程序会进入OP_READ分支:
else if(key.isReadable()) { SocketChannel ch = (SocketChannel) key.channel(); buf.position(0); int len; try {len = ch.read(buf); } catch (IOException e) { len = -1; } if(len == -1) { System.out.println(Attach.nameOf(key) + " Read: EOF"); key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); } else if(len > 0) { String input = new String(buf.array(), 0, len).trim(); // trailing \r\n System.out.println(Attach.nameOf(key) + " Read: " + input); if("close".equals(input)) { System.out.println(Attach.nameOf(key) + " Close"); ch.close(); } else if("exit".equals(input)) { System.out.println(Attach.nameOf(key) + " Exit"); ch.close(); exit = true; break; } else { System.out.println(Attach.nameOf(key) + " will Write: " + input); key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); Attach.output(key, "Hello " + input); } } else if(len == 0) { System.out.println(Attach.nameOf(key) + " Read: len=0"); } ite.remove(); }
因为是isReadable,读数据是不会阻塞的,但注意SocketChannel.read()方法可能返回-1或者抛出异常。经实验:
a)直接强关telnet客户端 会返回-1,无异常。
b)直接强关NClient(例子程序,见后) 会异常“远程主机强迫关闭了一个现有的连接”
所以凡是io读、写 都可能发生IOException,程序需要自行处理。
当-1代表此socket已不可读(每个socket都有互相独立的InputStream、OutputStream 2个流),程序取消对OP_READ的兴趣(需要关闭socket?)。如果读到实际的内容input(排除"close"和"exit")要准备响应hello input。由于是nio 需要等到操作系统提示可写时才能写,所以程序将待写内容存入Attach 并增加对OP_WRITE的兴趣。另,len=0这个分支在测试中没有遇到过。
下一步当提示可写时,程序从Attach取出数据、写出、然后取消OP_WRITE兴趣:
else if(key.isWritable()) { SocketChannel ch = (SocketChannel) key.channel(); String output = Attach.outputOf(key); System.out.println(Attach.nameOf(key) + " Write: " + output); ch.write(ByteBuffer.wrap((output + "\n").getBytes())); key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); ite.remove(); }
注意为了简化,示例程序调用SocketChannel.write()并没有检查内容是否全部写出。按照API说明,write方法甚至可能返回0 - 代表一个字节也没写出(到操作系统的某个缓存区)。
如上是一个简单的nio Server例子。运行该程序,然后启动一个telnet客户端“telnet localhost 8888”,输入"abc",看到程序输出:
Ch-1 Read: abc Ch-1 will Write: abc Ch-1 Write: Hello abc
输入"close"关闭当前连接,程序输出:
Ch-1 Read: close Ch-1 Close
再次运行telnet,输入"def":
Ch-2 Read: def Ch-2 will Write: def Ch-2 Write: Hello def
再运行一个telnet,输入"xyz":
Ch-3 Read: xyz Ch-3 will Write: xyz Ch-3 Write: Hello xyz
输入"exit"结束。
测试客户端NClient
同样,建立一个唯一的Selector,然后注册任意多个socket,指定对OP_CONNECT和OP_READ感兴趣:
Selector selector = Selector.open(); for(int i=0; i<1; i++) { SocketChannel ch = SocketChannel.open(); ch.configureBlocking(false); SelectionKey key = ch.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ); key.attach(new Attach("Ch-" + (i+1))); ch.connect(new InetSocketAddress(8888)); }
nio的客户端当到远端的连接就绪时:
if (key.isConnectable()) { SocketChannel ch2 = (SocketChannel) key.channel(); ch2.finishConnect(); key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); initWrite(key); ite.remove(); }
通过SocketChannel.finishConnect()完成客户端的连接。连上后客户端准备主动发数据:
static void initWrite(SelectionKey key) { String output = Attach.nameOf(key) + " " + Math.random(); System.out.println(Attach.nameOf(key) + " will Write: " + output); key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); Attach.output(key, output); }
当操作系统提示可写时把数据写出,并取消OP_WRITE兴趣:
else if(key.isWritable()) { SocketChannel ch2 = (SocketChannel) key.channel(); String output = Attach.outputOf(key); System.out.println(Attach.nameOf(key) + " Write: " + output); ch2.write(ByteBuffer.wrap((output + "\n").getBytes())); key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); ite.remove(); }
同样注意为了简化这里没有检查SocketChannel.write()的返回值。
当NServer 收到input、响应数据过来时,发送下一条数据:
else if(key.isReadable()) { SocketChannel ch2 = (SocketChannel) key.channel(); buf.position(0); int len = ch2.read(buf); if(len > 0) { String input = new String(buf.array(), 0, len).trim(); // trailing \n System.out.println(Attach.nameOf(key) + " Read: " + input); initWrite(key); ite.remove(); } }
留意一下if isWritable和else if isReadable是分支处理,但nio 实际上(估计)应该存在同时可写、可读的情况。
将socket数量改到1000,运行NServer、NClient,CPU迅速升到90%以上(NClient里每次循环sleep了10毫秒)。通过telnet 输入"exit"结束。简单实验可以看到NServer 一根线程可以处理1000个并发连接的连续读写,nio相对传统blocking io的优势就是在对线程的节省上。
(测试环境:Windows10+Java8、CentOS7+Java8)