欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Nio框架需要注意的两个问题(2)

程序员文章站 2022-05-06 14:37:32
...

    书接上回,上次说到了selector的register和select会有锁冲突,这次再来考虑write的问题。

1. channel.write(Bytebuffer)是不是总是可写,当socket的writeBuffer满的时候会返回0,说明不能再写进任何字节。假设要写入一个很大的ByteBuffer,有可能需要分多次写。

2. channel上执行write操作需要获得锁保证同步,如果用户在应用代码中多处同时执行则会有锁竞争。

     我们知道NIO是无阻塞同步的IO,无阻塞说的是读或者写的就绪状态。一般来说写的流程是这样的,使channel对write感兴趣,然后轮询selector的select方法,遍历selectionKey,如果可写则对给channel执行写操作

 

 while (true) {

                try {
                    handlerRegister();
                    keyCount = selector.select(selectTimeout);
                    if(keyCount > 0)
                        debug(Thread.currentThread().getName() + " selected:" + keyCount);
                    Iterator<SelectionKey> iterator =
                            keyCount > 0 ? selector.selectedKeys().iterator() : null;
                        // Walk through the collection of ready keys and dispatch
                        // any active event.
                        while (iterator != null && iterator.hasNext()) {
                            SelectionKey sk = iterator.next();
                            NioSession session = (NioSession)sk.attachment();
                            // Attachment may be null if another thread has called
                            // cancelledKey()
                            if (session == null) {
                                iterator.remove();
                            } else {
                                iterator.remove();
                                if(sk.isValid() && sk.isWritable()){
                                    debug("Session write:" + session);
                                    session.write();//可写时 执行写操作
                                }else if(sk.isValid() && sk.isReadable()){
                                    debug("Session read:" + session);
                                    session.read();
                                }else{
                                	debug("Session close from select");
                                    session.close();
                                }
                            }
                        }//while
                } catch (IOException e) {
                   debug("I'm here");
                    e.printStackTrace();
                }
            }

        }

 从上面的代码我们很快就发现一个问题,这个写操作是在Poller的SelectorLoop,假设我们需要在程序代码中执行write操作的时候,根本不知道什么时候可写。那我们不管什么selector的可读可写。直接执行channel.write()是否可以。可以不总是可行,就像前面提到的。writeBuffer已满是不能写入任何byte的。

     对于这个问题,Tomcat和Netty采用不同的解决方案,先说一个tomcat的方案:

     Tomcat采用阻塞写的方案,首先循环执行channle.write(buffer)直到不可写,当buffer没有写完即hasRemaining为true,则注册到一个blockingSelector执行不断轮询阻塞写。一般来说tomcat返回的html没有那么大就不需要注册给阻塞的selector,tomcat这样处理而只在Poller中执行读操作,还有另外一个好处。减少了每个SelectorLoop的时间把所有的时间都交给了读事件的处理大大提交了吞吐量。

 

 try {
            while ( (!timedout) && buf.hasRemaining()) {
                if (keycount > 0) { //only write if we were registered for a write
                    int cnt = socket.write(buf); //write the data
                    if (lastWrite != null) lastWrite.set(cnt);
                    if (cnt == -1)
                        throw new EOFException();
                    written += cnt;
                    if (cnt > 0) {
                        time = System.currentTimeMillis(); //reset our timeout timer 首先直接执行channel的write直到不能写入任何byte,这时候就需要交给另外一个selector处理
                        continue; //we successfully wrote, try again without a selector
                    }
                }
                try {
                    if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1);
                    poller.add(att,SelectionKey.OP_WRITE,reference);//注册给阻塞的selector
//同时加了一个CountDownLatch,如果Selector的select一直没有检测到该channel可写而执行countDown()则会后续根据count是否超时                    att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
                }catch (InterruptedException ignore) {
                    Thread.interrupted();
                }
                if ( att.getWriteLatch()!=null && att.getWriteLatch().getCount()> 0) {
                    //we got interrupted, but we haven't received notification from the poller.
                    keycount = 0;
                }else {
                    //latch countdown has happened
                    keycount = 1;
                    att.resetWriteLatch();
                }

                if (writeTimeout > 0 && (keycount == 0))
                    timedout = (System.currentTimeMillis() - time) >= writeTimeout;
            } //while
            if (timedout)
                throw new SocketTimeoutException();
        } finally {
            poller.remove(att,SelectionKey.OP_WRITE);
            if (timedout && reference.key!=null) {
                poller.cancelKey(reference.key);
            }
            reference.key = null;
        }

  blog编辑器有点不太好用,后续再介绍一个Netty的解决方案。