Nio框架需要注意的两个问题(2)
书接上回,上次说到了selector的register和select会有锁冲突,这次再来考虑write的问题。
1. channel.write(Bytebuffer)是不是总是可写,当socket的writeBuffer满的时候会返回0,说明不能再写进任何字节。假设要写入一个很大的ByteBuffer,有可能需要分多次写。
2. channel上执行write操作需要获得锁保证同步,如果用户在应用代码中多处同时执行则会有锁竞争。
我们知道NIO是无阻塞同步的IO,无阻塞说的是读或者写的就绪状态。一般来说写的流程是这样的,使channel对write感兴趣,然后轮询selector的select方法,遍历selectionKey,如果可写则对给channel执行写操作
while (true) { try { handlerRegister(); keyCount = selector.select(selectTimeout); if(keyCount > 0) debug(Thread.currentThread().getName() + " selected:" + keyCount); Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSession session = (NioSession)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (session == null) { iterator.remove(); } else { iterator.remove(); if(sk.isValid() && sk.isWritable()){ debug("Session write:" + session); session.write();//可写时 执行写操作 }else if(sk.isValid() && sk.isReadable()){ debug("Session read:" + session); session.read(); }else{ debug("Session close from select"); session.close(); } } }//while } catch (IOException e) { debug("I'm here"); e.printStackTrace(); } } }
从上面的代码我们很快就发现一个问题,这个写操作是在Poller的SelectorLoop,假设我们需要在程序代码中执行write操作的时候,根本不知道什么时候可写。那我们不管什么selector的可读可写。直接执行channel.write()是否可以。可以不总是可行,就像前面提到的。writeBuffer已满是不能写入任何byte的。
对于这个问题,Tomcat和Netty采用不同的解决方案,先说一个tomcat的方案:
Tomcat采用阻塞写的方案,首先循环执行channle.write(buffer)直到不可写,当buffer没有写完即hasRemaining为true,则注册到一个blockingSelector执行不断轮询阻塞写。一般来说tomcat返回的html没有那么大就不需要注册给阻塞的selector,tomcat这样处理而只在Poller中执行读操作,还有另外一个好处。减少了每个SelectorLoop的时间把所有的时间都交给了读事件的处理大大提交了吞吐量。
try { while ( (!timedout) && buf.hasRemaining()) { if (keycount > 0) { //only write if we were registered for a write int cnt = socket.write(buf); //write the data if (lastWrite != null) lastWrite.set(cnt); if (cnt == -1) throw new EOFException(); written += cnt; if (cnt > 0) { time = System.currentTimeMillis(); //reset our timeout timer 首先直接执行channel的write直到不能写入任何byte,这时候就需要交给另外一个selector处理 continue; //we successfully wrote, try again without a selector } } try { if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1); poller.add(att,SelectionKey.OP_WRITE,reference);//注册给阻塞的selector //同时加了一个CountDownLatch,如果Selector的select一直没有检测到该channel可写而执行countDown()则会后续根据count是否超时 att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS); }catch (InterruptedException ignore) { Thread.interrupted(); } if ( att.getWriteLatch()!=null && att.getWriteLatch().getCount()> 0) { //we got interrupted, but we haven't received notification from the poller. keycount = 0; }else { //latch countdown has happened keycount = 1; att.resetWriteLatch(); } if (writeTimeout > 0 && (keycount == 0)) timedout = (System.currentTimeMillis() - time) >= writeTimeout; } //while if (timedout) throw new SocketTimeoutException(); } finally { poller.remove(att,SelectionKey.OP_WRITE); if (timedout && reference.key!=null) { poller.cancelKey(reference.key); } reference.key = null; }
blog编辑器有点不太好用,后续再介绍一个Netty的解决方案。
上一篇: 从3张图了解Pinterest Feed算法与架构设计
下一篇: comet4j开发指南