欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Mina Io处理器抽象实现

程序员文章站 2024-01-13 09:30:34
...
Mina 过滤链抽象实现:http://donald-draper.iteye.com/blog/2376335
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
在上面这篇文章中,当会话发送消息后,消息被过滤链上的过滤器过滤,从链尾到链头,过程如下:
//消息发送,Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,handler并不处理会话事件)
 
public void fireFilterWrite(IoSession session, WriteRequest writeRequest) {
        Entry tail = this.tail;
        callPreviousFilterWrite(tail, session, writeRequest);
    }
private void callPreviousFilterWrite(Entry entry, IoSession session,
            WriteRequest writeRequest) {
        try {
            entry.getFilter().filterWrite(entry.getNextFilter(), session,
                    writeRequest);
        } catch (Throwable e) {
            writeRequest.getFuture().setWritten(false);
            fireExceptionCaught(session, e);
        }
    }

再来看一下过滤链头HeadFilter
//HeadFilter
private class HeadFilter extends IoFilterAdapter {
       ...
        public void filterWrite(NextFilter nextFilter, IoSession session,
                WriteRequest writeRequest) throws Exception {
            if (session.getTransportType().getEnvelopeType().isAssignableFrom(
                    writeRequest.getMessage().getClass())) {
                doWrite(session, writeRequest);
            } else {
                throw new IllegalStateException(
                        "Write requests must be transformed to "
                                + session.getTransportType().getEnvelopeType()
                                + ": " + writeRequest);
            }
        }
       ...
}

从HeadFilter的定义来看,HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;
有两个方法有所不同:
//HeadFilter
//会话写操作
public void filterWrite(NextFilter nextFilter, IoSession session,
        WriteRequest writeRequest) throws Exception {
    if (session.getTransportType().getEnvelopeType().isAssignableFrom(
            writeRequest.getMessage().getClass())) {
        doWrite(session, writeRequest);
    } else {
        throw new IllegalStateException(
                "Write requests must be transformed to "
                        + session.getTransportType().getEnvelopeType()
                        + ": " + writeRequest);
    }
}

//AbstractIoFilterChain,待子类扩展
protected abstract void doWrite(IoSession session, WriteRequest writeRequest)
            throws Exception;

再来看SocketFilterChain
class SocketFilterChain extends AbstractIoFilterChain {

    SocketFilterChain(IoSession parent) {
        super(parent);
    }

    protected void doWrite(IoSession session, WriteRequest writeRequest) {
        SocketSessionImpl s = (SocketSessionImpl) session;
	//获取Socket会话的的写请求队列,Queue继承于AbstractList,这个我们在后面再讲
        Queue writeRequestQueue = s.getWriteRequestQueue();

        // SocketIoProcessor.doFlush() will reset it after write is finished
        // because the buffer will be passed with messageSent event. 
	//这里之所以要mark buffer的位置,主要是buffer要传给messageSent事件,
	//待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置
        ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
        buffer.mark();
        int remaining = buffer.remaining();
        if (remaining == 0) {
	    //BaseIoSession
	    // private final AtomicInteger scheduledWriteRequests = new AtomicInteger();
            //更新调度请求计数器+1
            s.increaseScheduledWriteRequests();            
        } else {
	    //BaseIoSession
	    //private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
	    //更新调度写字节计数器+buffer.remaining()
            s.increaseScheduledWriteBytes(buffer.remaining());
        }

        synchronized (writeRequestQueue) {
	   //将写请求添加到session写请求队列中
            writeRequestQueue.push(writeRequest);
        }
        //如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作,这个在以后在具体详说
        if (session.getTrafficMask().isWritable()) {
            s.getIoProcessor().flush(s);
        }
    }
    //关闭会话
    protected void doClose(IoSession session) throws IOException {
        SocketSessionImpl s = (SocketSessionImpl) session;
        s.getIoProcessor().remove(s);//委托给session关联的IoProcessor
    }
}

从上面可以看出会话发送消息最后由会话IoProcessor处理,下面来看一下IoProcessor接口的定义:
/**
 * An internal interface to represent an 'I/O processor' that performs
 * actual I/O operations for {@link IoSession}s.  It abstracts existing
 * reactor frameworks such as Java NIO once again to simplify transport
 * implementations.
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 * 
 * @param <S> the type of the {@link IoSession} this processor can handle
 */
public interface IoProcessor<S extends IoSession> {

    /**
     * Releases any resources allocated by this processor.  Please note that 
     * the resources might not be released as long as there are any sessions
     * managed by this processor.  Most implementations will close all sessions
     * immediately and release the related resources.
     释放所有分配给IO处理器的资源。只要有任何会话占用资源,资源也许不会被Io处理器释放。
     在大部分的实现版本中,是立刻关闭所有会话,释放相关资源。
     */
    void dispose();
      /**
     * @return <tt>true</tt> if and if only {@link #dispose()} method has
     * been called.  Please note that this method will return <tt>true</tt>
     * even after all the related resources are released.
     如果dispose已经被调用,则返回true。即使在所有相关资源释放后,此方法仍返回true
     */
    boolean isDisposing();

    /**
     * @return <tt>true</tt> if and if only all resources of this processor
     * have been disposed.
     Io处理器所有资源释放完,则返回true。
     */
    boolean isDisposed();


    /**
     * Adds the specified {@code session} to the I/O processor so that
     * the I/O processor starts to perform any I/O operations related
     * with the {@code session}.
     * 添加会话到Io处理器,以便Io处理器启动时,执行会话相关的操作。
     * @param session The added session
     */
    void add(S session);

    /**
     * Flushes the internal write request queue of the specified
     * {@code session}.
     * 刷新会话内部的写请求队列
     * @param session The session we want the message to be written
     */
    void flush(S session);

    /**
     * Writes the WriteRequest for the specified {@code session}.
     * 发送写请求到会话
     * @param session The session we want the message to be written
     * @param writeRequest the WriteRequest to write
     */
    void write(S session, WriteRequest writeRequest);

    /**
     * Controls the traffic of the specified {@code session} depending of the
     * {@link IoSession#isReadSuspended()} and {@link IoSession#isWriteSuspended()}
     * flags
     * 依赖于会话的IoSession#isReadSuspended/isWriteSuspended标志控制session读写请求的次序
     * @param session The session to be updated
     */
    void updateTrafficControl(S session);

    /**
     * Removes and closes the specified {@code session} from the I/O
     * processor so that the I/O processor closes the connection
     * associated with the {@code session} and releases any other related
     * resources.
     * 从Io处理器移除和关闭会话,以便Io处理器关闭连接关联的会话,释放相关的资源。
     * @param session The session to be removed
     */
    void remove(S session);
}

下面来看Io处理器的抽象实现AbstractPollingIoProcessor
/**
 * An abstract implementation of {@link IoProcessor} which helps transport
 * developers to write an {@link IoProcessor} easily. This class is in charge of
 * active polling a set of {@link IoSession} and trigger events when some I/O
 * operation is possible.
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 * @param <S>
 *            the type of the {@link IoSession} this processor can handle
 */
public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
    /** A logger for this class */
    private static final Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
    /**
     * A timeout used for the select, as we need to get out to deal with idle
     * sessions.选择操作超时时间,我们需要处理空闲的会话
     */
    private static final long SELECT_TIMEOUT = 1000L;
    /** A map containing the last Thread ID for each class */每个class的最后一个线程id
    private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<>();
    /** This IoProcessor instance name处理器实例名 */
    private final String threadName;
    /** The executor to use when we need to start the inner Processor */
    private final Executor executor;//处理器内部执行器,用于运行内部处理器Processor
    /** A Session queue containing the newly created sessions */
    private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();//创建会话队列
    /** A queue used to store the sessions to be removed */
    private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();//移除会话队列
    /** A queue used to store the sessions to be flushed */
    private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>();//刷新会话队列
    /**
     * A queue used to store the sessions which have a trafficControl to be
     * updated
     */
    private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<>();//次序控制会话队列
    /** The processor thread : it handles the incoming messages 处理器线程,用于处理进来的消息*/
    private final AtomicReference<Processor> processorRef = new AtomicReference<>();
    private long lastIdleCheckTime;//上次空闲检查时间
    private final Object disposalLock = new Object();//关闭锁
    private volatile boolean disposing;//是否正在关闭
    private volatile boolean disposed;//是否已关闭
    private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);//关闭结果
    protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);//这个暂时清楚,后面遇见时再说
 }

从上面可以看出抽象Io处理器,主要几个关键内部变量为选择操作超时时间SELECT_TIMEOUT,用于腾出时间,处理空闲的会话;executor处理器内部执行器,用于运行内部处理器Processor;存储Io处理器等线程最大线程id的threadIds(Map);创建会话队列newSessions用于存储新创建的会话;移除会话队列removingSessions用于存放从处理器移除的会话;刷新会话队列flushingSessions,用于存放要发送写请求的会话;次序控制会话队列trafficControllingSessions用于存放会话暂定读写的会话;;正在处理进来消息的处理器引用processorRef;这些变量暂时这么理解,后面如果发现错误,再更正。
来看构造:
 /**
     * Create an {@link AbstractPollingIoProcessor} with the given
     * {@link Executor} for handling I/Os events.
     * 根据给定的执行器,创建抽象Io处理器用于处理IO事件
     * @param executor
     *            the {@link Executor} for handling I/O events
     */
    protected AbstractPollingIoProcessor(Executor executor) {
        if (executor == null) {
            throw new IllegalArgumentException("executor");
        }
        this.threadName = nextThreadName();//获取处理器线程名
        this.executor = executor;
    }
    /**
     * Compute the thread ID for this class instance. As we may have different
     * classes, we store the last ID number into a Map associating the class
     * name to the last assigned ID.
     * 计算类型实例最大线程id。因为我们有不同的类型,所以我们存在类型与类型实例最后一个id的映射
     关系放在Map中管理。
     * @return a name for the current thread, based on the class name and an
     *         incremental value, starting at 1.
     */
    private String nextThreadName() {
        Class<?> cls = getClass();
        int newThreadId;
	//从类型最大线程id映射Map,获取Io处理器线程的最大线程id
        AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
        if (threadId == null) {
            newThreadId = 1;
        } else {
            // Just increment the last ID, and get it.
            newThreadId = threadId.incrementAndGet();
        }
        // Now we can compute the name for this thread
        return cls.getSimpleName() + '-' + newThreadId;
    }

来看其他方法定义
/**
     * poll those sessions for the given timeout
     * 超时选择
     * @param timeout
     *            milliseconds before the call timeout if no event appear
     * @return The number of session ready for read or for write
     * @throws Exception
     *             if some low level IO error occurs
     */
    protected abstract int select(long timeout) throws Exception;

    /**
     * poll those sessions forever
     * 选择操作
     * @return The number of session ready for read or for write
     * @throws Exception
     *             if some low level IO error occurs
     */
    protected abstract int select() throws Exception;

    /**
     * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
     * is empty
     * 判断处理器的会话集合是否为空
     * @return <tt>true</tt> if at least a session is managed by this
     *         {@link IoProcessor}
     */
    protected abstract boolean isSelectorEmpty();


    /**
     * Get an {@link Iterator} for the list of {@link IoSession} polled by this
     * {@link IoProcessor}
     * 返回处理器选择的所有会话
     * @return {@link Iterator} of {@link IoSession}
     */
    protected abstract Iterator<S> allSessions();

    /**
     * Get an {@link Iterator} for the list of {@link IoSession} found selected
     * by the last call of {@link #select(long)}
     * 获取上次调用超时选择后的会话集
     * @return {@link Iterator} of {@link IoSession} read for I/Os operation
     */
    protected abstract Iterator<S> selectedSessions();

    /**
     * Get the state of a session (One of OPENING, OPEN, CLOSING)
     * 获取会话状态
     * @param session
     *            the {@link IoSession} to inspect
     * @return the state of the session
     */
    protected abstract SessionState getState(S session);

    /**
     * Tells if the session ready for writing
     * 判断会话是否可写
     * @param session
     *            the queried session
     * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
     */
    protected abstract boolean isWritable(S session);

    /**
     * Tells if the session ready for reading
     * 判断会话是否准备好读操作
     * @param session
     *            the queried session
     * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
     */
    protected abstract boolean isReadable(S session);

    /**
     * Set the session to be informed when a write event should be processed
     * 当有一个写事件要处理时,是否通知会话
     * @param session
     *            the session for which we want to be interested in write events
     * @param isInterested
     *            <tt>true</tt> for registering, <tt>false</tt> for removing
     * @throws Exception
     *             If there was a problem while registering the session
     */
    protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;

    /**
     * Set the session to be informed when a read event should be processed
     * 当有一个读事件要处理时,是否通知会话
     * @param session
     *            the session for which we want to be interested in read events
     * @param isInterested
     *            <tt>true</tt> for registering, <tt>false</tt> for removing
     * @throws Exception
     *             If there was a problem while registering the session
     */
    protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;

    /**
     * Tells if this session is registered for reading
     * 判断会话是否注册读事件
     * @param session
     *            the queried session
     * @return <tt>true</tt> is registered for reading
     */
    protected abstract boolean isInterestedInRead(S session);

    /**
     * Tells if this session is registered for writing
     * 判断会话是否注册写事件
     * @param session
     *            the queried session
     * @return <tt>true</tt> is registered for writing
     */
    protected abstract boolean isInterestedInWrite(S session);

    /**
     * Initialize the polling of a session. Add it to the polling process.
     * 初始化会话,添加到处理器
     * @param session
     *            the {@link IoSession} to add to the polling
     * @throws Exception
     *             any exception thrown by the underlying system calls
     */
    protected abstract void init(S session) throws Exception;

    /**
     * Destroy the underlying client socket handle
     * 关闭底层客户端socket
     * @param session
     *            the {@link IoSession}
     * @throws Exception
     *             any exception thrown by the underlying system calls
     */
    protected abstract void destroy(S session) throws Exception;

    /**
     * Reads a sequence of bytes from a {@link IoSession} into the given
     * {@link IoBuffer}. Is called when the session was found ready for reading.
     * 当会话准备好读操作是,从会话读字节序列
     * @param session
     *            the session to read
     * @param buf
     *            the buffer to fill
     * @return the number of bytes read
     * @throws Exception
     *             any exception thrown by the underlying system calls
     */
    protected abstract int read(S session, IoBuffer buf) throws Exception;

    /**
     * Write a sequence of bytes to a {@link IoSession}, means to be called when
     * a session was found ready for writing.
     * 当会话准备好写操作是,写字节序列到会话
     * @param session
     *            the session to write
     * @param buf
     *            the buffer to write
     * @param length
     *            the number of bytes to write can be superior to the number of
     *            bytes remaining in the buffer
     * @return the number of byte written
     * @throws IOException
     *             any exception thrown by the underlying system calls
     */
    protected abstract int write(S session, IoBuffer buf, int length) throws IOException;

    /**
     * Write a part of a file to a {@link IoSession}, if the underlying API
     * isn't supporting system calls like sendfile(), you can throw a
     * {@link UnsupportedOperationException} so the file will be send using
     * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
     * 写文件的某个Region到会话,如果底层API不支持sendfile方法,你可以抛出一个UnsupportedOperationException,
     那么将调用#write(AbstractIoSession, IoBuffer, int)发送文件。
     * @param session
     *            the session to write
     * @param region
     *            the file region to write
     * @param length
     *            the length of the portion to send
     * @return the number of written bytes
     * @throws Exception
     *             any exception thrown by the underlying system calls
     */
    protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;

来看添加会话到处理器
  
 /**
     * {@inheritDoc}
     */
   //会话创建时,添加回到到处理器,
    @Override
    public final void add(S session) {
        if (disposed || disposing) {//如果处理器已关闭,则抛出非法状态异常
            throw new IllegalStateException("Already disposed.");
        }
        // Adds the session to the newSession queue and starts the worker
	//添加会话到Io处理器的创建会话队列中
        newSessions.add(session);
         //启动一个Io处理器线程
        startupProcessor();
    }

    
 /**
     * Starts the inner Processor, asking the executor to pick a thread in its
     * pool. The Runnable will be renamed
     */
    private void startupProcessor() {
        //从处理器引用获取处理器
        Processor processor = processorRef.get();
        if (processor == null) {
	   //处理器为空,则创建一个
            processor = new Processor();
            if (processorRef.compareAndSet(null, processor)) {
	        //执行处理器
                executor.execute(new NamePreservingRunnable(processor, threadName));
            }
        }
        // Just stop the select() and start it again, so that the processor
        // can be activated immediately.
	//暂时停止选择操作,待处理器线程启动
        wakeup();
    }

/**
     * Interrupt the {@link #select(long)} call.
     中断选择操作
     */
    protected abstract void wakeup();

这个过程有两点要看
1.
 //处理器为空,则创建一个
 processor = new Processor();

2.
//执行处理器                                                         
executor.execute(new NamePreservingRunnable(processor, threadName));  


先来看处理器线程
1.
 //处理器为空,则创建一个
 processor = new Processor();

//Processor
  /**
  * The main loop. This is the place in charge to poll the Selector, and to
  * process the active sessions. It's done in - handle the newly created
  * sessions -
  */
 private class Processor implements Runnable {
     /**
      * {@inheritDoc}
      */
     @Override
     public void run() {
         //断言Io处理器实际处理线程是否为当前Processor
         assert processorRef.get() == this;
         int nSessions = 0;
         lastIdleCheckTime = System.currentTimeMillis();
         int nbTries = 10;
         for (;;) {
             try {
                 // This select has a timeout so that we can manage
                 // idle session when we get out of the select every
                 // second. (note : this is a hack to avoid creating
                 // a dedicated thread).
		 //选择操作有一个超时时间,以便当选择超时时,处理空闲会话
                 long t0 = System.currentTimeMillis();
		 //超时选择,select方法待子类实现
                 int selected = select(SELECT_TIMEOUT);
                 long t1 = System.currentTimeMillis();
                 long delta = t1 - t0;
                 //当前这次选择操作,没有SELECTKey相关事件,没有中断,且此次选择操作耗时
		 //小于SELECT_TIMEOUT(1000)/nbTries(10)
                 if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
                     // Last chance : the select() may have been
                     // interrupted because we have had an closed channel.
		     //在上次尝试选择操作时,可能通道关闭,选择操作可能被中断
                     if (isBrokenConnection()) {
		         //通道关闭的话,仅仅输出日志
                         LOG.warn("Broken connection");
                     } else {
                         // Ok, we are hit by the nasty epoll
                         // spinning.
                         // Basically, there is a race condition
                         // which causes a closing file descriptor not to be
                         // considered as available as a selected channel,
                         // but
                         // it stopped the select. The next time we will
                         // call select(), it will exit immediately for the
                         // same
                         // reason, and do so forever, consuming 100%
                         // CPU.
                         // We have to destroy the selector, and
                         // register all the socket on a new one.
                         if (nbTries == 0) {
			     //如果尝试次数用完我们,注册新的选择器
                             LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
                             registerNewSelector();
                             nbTries = 10;//恢复尝试次数
                         } else {
			     //否则尝试次数自减
                             nbTries--;
                         }
                     }
                 } else {
                     nbTries = 10;
                 }

                 // Manage newly created session first
		 //处理新会话
                 nSessions += handleNewSessions();
                 //更新会话状态
                 updateTrafficMask();

                 // Now, if we have had some incoming or outgoing events,
                 // deal with them
                 if (selected > 0) {
                     // LOG.debug("Processing ..."); // This log hurts one of
                     // the MDCFilter test...
		     //如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件
		     //读写事件,委托为process方法处理
                     process();
                 }

                 // Write the pending requests
		 //处理有些请求的会话
                 long currentTime = System.currentTimeMillis();
                 flush(currentTime);

                 // And manage removed sessions
		 //移除已关闭的会话
                 nSessions -= removeSessions();

                 // Last, not least, send Idle events to the idle sessions
		 //通知会话空闲事件
                 notifyIdleSessions(currentTime);

                 // Get a chance to exit the infinite loop if there are no
                 // more sessions on this Processor
		 //如果在这个过程中,激活会话最后为0,则清除处理器引用
                 if (nSessions == 0) {
                     processorRef.set(null);

                     if (newSessions.isEmpty() && isSelectorEmpty()) {
                         // newSessions.add() precedes startupProcessor
                         assert processorRef.get() != this;
                         break;
                     }

                     assert processorRef.get() != this;

                     if (!processorRef.compareAndSet(null, this)) {
                         // startupProcessor won race, so must exit processor
                         assert processorRef.get() != this;
                         break;
                     }

                     assert processorRef.get() == this;
                 }

                 // Disconnect all sessions immediately if disposal has been
                 // requested so that we exit this loop eventually.
		 //判断Io处理器是否正在关闭,如果正在关闭断开所有会话
                 if (isDisposing()) {
                     boolean hasKeys = false;
                     //获取当前处理器管理的会话,移除会话
                     for (Iterator<S> i = allSessions(); i.hasNext();) {
                         IoSession session = i.next();

                         if (session.isActive()) {
                             scheduleRemove((S) session);
                             hasKeys = true;
                         }
                     }

                     if (hasKeys) {
                         wakeup();
                     }
                 }
             } catch (ClosedSelectorException cse) {
                 // If the selector has been closed, we can exit the loop
                 // But first, dump a stack trace
                 ExceptionMonitor.getInstance().exceptionCaught(cse);
                 break;
             } catch (Exception e) {
                 ExceptionMonitor.getInstance().exceptionCaught(e);

                 try {
                     Thread.sleep(1000);
                 } catch (InterruptedException e1) {
                     ExceptionMonitor.getInstance().exceptionCaught(e1);
                 }
             }
         }
         try {
             synchronized (disposalLock) {
                 if (disposing) {
		     //如果正在关闭则,完成实际关闭工作
                     doDispose();
                 }
             }
         } catch (Exception e) {
	     //捕捉异常
             ExceptionMonitor.getInstance().exceptionCaught(e);
         } finally {
	     //已关闭
             disposalFuture.setValue(true);
         }
}

Io处理器,处理线程Processor的实际工作有一下几点要看
a.
if (isBrokenConnection()) {
       //通道关闭的话,仅仅输出日志
       LOG.warn("Broken connection");
   } 

b.
else {
    // Ok, we are hit by the nasty epoll
    // spinning.
    // Basically, there is a race condition
    // which causes a closing file descriptor not to be
    // considered as available as a selected channel,
    // but
    // it stopped the select. The next time we will
    // call select(), it will exit immediately for the
    // same
    // reason, and do so forever, consuming 100%
    // CPU.
    // We have to destroy the selector, and
    // register all the socket on a new one.
    if (nbTries == 0) {
     //如果尝试次数用完我们,注册新的选择器
        LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
        registerNewSelector();
        nbTries = 10;//恢复尝试次数
    } else {
     //否则尝试次数自减
        nbTries--;
    }
}

c.
 // Manage newly created session first
 //处理新会话
 nSessions += handleNewSessions();

d.
//更新会话状态
updateTrafficMask();

e.
// Now, if we have had some incoming or outgoing events,
// deal with them
if (selected > 0) {
    // LOG.debug("Processing ..."); // This log hurts one of
    // the MDCFilter test...
    //如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件
    //读写事件,委托为process方法处理
    process();
}

f.
 // Write the pending requests
 //处理有些请求的会话
 long currentTime = System.currentTimeMillis();
 flush(currentTime);

g.
 // And manage removed sessions
//移除已关闭的会话
nSessions -= removeSessions();

h.
// Last, not least, send Idle events to the idle sessions
//通知会话空闲事件
notifyIdleSessions(currentTime);

i.
// Disconnect all sessions immediately if disposal has been
// requested so that we exit this loop eventually.
//判断Io处理器是否正在关闭,如果正在关闭断开所有会话
if (isDisposing()) {
    boolean hasKeys = false;
    //获取当前处理器管理的会话,移除会话
    for (Iterator<S> i = allSessions(); i.hasNext();) {
        IoSession session = i.next();

        if (session.isActive()) {
            scheduleRemove((S) session);
            hasKeys = true;
        }
    }
    if (hasKeys) {
        wakeup();
    }
}

j.
t
ry {
       synchronized (disposalLock) {
           if (disposing) {
   	     //如果正在关闭,完成实际关闭工作
               doDispose();
           }
       }
   } catch (Exception e) {
       //捕捉异常
       ExceptionMonitor.getInstance().exceptionCaught(e);
   } finally {
	     //已关闭
             disposalFuture.setValue(true);
}

下面我们分别来看这几点
a.
 if (isBrokenConnection()) {
       //通道关闭的话,仅仅输出日志
       LOG.warn("Broken connection");
   } 

  /**
     * Check that the select() has not exited immediately just because of a
     * broken connection. In this case, this is a standard case, and we just
     * have to loop.
     * 检查选择是否由于Io处理器连接断开,选择操作还没有退出
     * @return <tt>true</tt> if a connection has been brutally closed.
     * @throws IOException
     *             If we got an exception
     */
    protected abstract boolean isBrokenConnection() throws IOException;


b.
else {
    // Ok, we are hit by the nasty epoll
    // spinning.
    // Basically, there is a race condition
    // which causes a closing file descriptor not to be
    // considered as available as a selected channel,
    // but
    // it stopped the select. The next time we will
    // call select(), it will exit immediately for the
    // same
    // reason, and do so forever, consuming 100%
    // CPU.
    // We have to destroy the selector, and
    // register all the socket on a new one.
    if (nbTries == 0) {
     //如果尝试次数用完我们,注册新的选择器
        LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
        registerNewSelector();
        nbTries = 10;//恢复尝试次数
    } else {
     //否则尝试次数自减
        nbTries--;
    }
}

 /**
     * In the case we are using the java select() method, this method is used to
     * trash the buggy selector and create a new one, registring all the sockets
     * on it.
     * 丢弃旧的选择器,将所有socket注册到新的选择器上
     * @throws IOException
     *             If we got an exception
     */
    protected abstract void registerNewSelector() throws IOException;

c.
// Manage newly created session first
 //处理新会话
 nSessions += handleNewSessions();

 /**
    * Loops over the new sessions blocking queue and returns the number of
    * sessions which are effectively created
    * 遍历创建会话队列,返回新建会话的数量
    * @return The number of new sessions
    */
   private int handleNewSessions() {
       int addedSessions = 0;
       for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
           if (addNow(session)) {
               // A new session has been created
               addedSessions++;
           }
       }
       return addedSessions;
   }


/**
  * Process a new session : - initialize it - create its chain - fire the
  * CREATED listeners if any
  * 处理新会话,初始化会话,创建会话过滤链,触发监听器会话创建事件
  * @param session
  *            The session to create
  * @return <tt>true</tt> if the session has been registered
  */
 private boolean addNow(S session) {
     boolean registered = false;

     try {
        //初始化会话
         init(session);
         registered = true;
         // Build the filter chain of this session.
	 //获取会话service过滤链构建器
         IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
	 //构建会话过滤链
         chainBuilder.buildFilterChain(session.getFilterChain());
         // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
         // in AbstractIoFilterChain.fireSessionOpened().
         // Propagate the SESSION_CREATED event up to the chain
         IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
	 //触发会话事件
         listeners.fireSessionCreated(session);
     } catch (Exception e) {
         ExceptionMonitor.getInstance().exceptionCaught(e);

         try {
             destroy(session);
         } catch (Exception e1) {
             ExceptionMonitor.getInstance().exceptionCaught(e1);
         } finally {
             registered = false;
         }
     }
     return registered;
 }

添加会话有一下几点要看
c.1
//初始化会话
init(session);

protected abstract void init(AbstractIoSession abstractiosession)
        throws Exception;

c.2
//触发会话事件
listeners.fireSessionCreated(session);

//IoServiceListenerSupport
public void fireSessionCreated(IoSession session)
    {
        boolean firstSession = false;
        if(session.getService() instanceof IoConnector)
            synchronized(managedSessions)
            {
                firstSession = managedSessions.isEmpty();
            }
        if(managedSessions.putIfAbsent(Long.valueOf(session.getId()), session) != null)
            return;
        if(firstSession)
            fireServiceActivated();
	//触发会话过滤链会话创建和会话打开事件
        IoFilterChain filterChain = session.getFilterChain();
        filterChain.fireSessionCreated();
        filterChain.fireSessionOpened();
        ...
}

c.3
 destroy(session);

/**
     * Destroy the underlying client socket handle
     * 关闭底层客户端socket
     * @param session
     *            the {@link IoSession}
     * @throws Exception
     *             any exception thrown by the underlying system calls
     */
    protected abstract void destroy(S session) throws Exception;

d.
//更新会话状态
updateTrafficMask()
;


 /**
  * Update the trafficControl for all the session.
  */
 private void updateTrafficMask() {
     int queueSize = trafficControllingSessions.size();
     while (queueSize > 0) {
         S session = trafficControllingSessions.poll();
         if (session == null) {
             // We are done with this queue.
             return;
         }
	 //获取会话状态
         SessionState state = getState(session);
         switch (state) {
         case OPENED:
	     //更新会话状态
             updateTrafficControl(session);
             break;
         case CLOSING:
             break;
         case OPENING:
             // Retry later if session is not yet fully initialized.
             // (In case that Session.suspend??() or session.resume??() is
             // called before addSession() is processed)
             // We just put back the session at the end of the queue.
	     //如果正在打开,则添加到次序控制会话队列
             trafficControllingSessions.add(session);
             break;
         default:
             throw new IllegalStateException(String.valueOf(state));
         }

         // As we have handled one session, decrement the number of
         // remaining sessions. The OPENING session will be processed
         // with the next select(), as the queue size has been decreased,
         // even
         // if the session has been pushed at the end of the queue
         queueSize--;
     }
 }

/**
 * {@inheritDoc}
 */
@Override
public void updateTrafficControl(S session) {
    //
    try {
        //通知读操作事件
        setInterestedInRead(session, !session.isReadSuspended());
    } catch (Exception e) {
        IoFilterChain filterChain = session.getFilterChain();
        filterChain.fireExceptionCaught(e);
    }
    try {
       //通知写操作事件
        setInterestedInWrite(session,
                !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
    } catch (Exception e) {
        IoFilterChain filterChain = session.getFilterChain();
        filterChain.fireExceptionCaught(e);
    }
}

 /**
     * Set the session to be informed when a write event should be processed
     * 当有一个写事件要处理时,是否通知会话
     * @param session
     *            the session for which we want to be interested in write events
     * @param isInterested
     *            <tt>true</tt> for registering, <tt>false</tt> for removing
     * @throws Exception
     *             If there was a problem while registering the session
     */
    protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;


 
  /**
     * Set the session to be informed when a read event should be processed
     * 当有一个读事件要处理时,是否通知会话
     * @param session
     *            the session for which we want to be interested in read events
     * @param isInterested
     *            <tt>true</tt> for registering, <tt>false</tt> for removing
     * @throws Exception
     *             If there was a problem while registering the session
     */
    protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;

e.
// Now, if we have had some incoming or outgoing events,
// deal with them
if (selected > 0) {
    // LOG.debug("Processing ..."); // This log hurts one of
    // the MDCFilter test...
    //如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件
    //读写事件,委托为process方法处理
    process();
}

 private void process() throws Exception {
     for (Iterator<S> i = selectedSessions(); i.hasNext();) {
         S session = i.next();
	 //处理会话
         process(session);
         i.remove();
     }
 }

/**
     * Get an {@link Iterator} for the list of {@link IoSession} found selected
     * by the last call of {@link #select(long)}
     * 获取上次调用超时选择后,准备就绪会话集
     * @return {@link Iterator} of {@link IoSession} read for I/Os operation
     */
    protected abstract Iterator<S> selectedSessions();


/**
     * Deal with session ready for the read or write operations, or both.
     */
    private void process(S session) {
        // Process Reads
        if (isReadable(session) && !session.isReadSuspended()) {
	    //如果会话可读,则读会话接收到的数据
            read(session);
        }

        // Process writes
        if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
            // add the session to the queue, if it's not already there
	    //如果会话有数据要发送,则将会话添加到刷新会话队列
            flushingSessions.add(session);
        }
    }
}
}

处理会话有两点要关注,
e.1
// Process Reads
  if (isReadable(session) && !session.isReadSuspended()) {
    //如果会话可读,则读会话接收到的数据
      read(session);
  }

private void read(S session) {
        //获取会话配置,会话配置读缓存size
        IoSessionConfig config = session.getConfig();
        int bufferSize = config.getReadBufferSize();
        IoBuffer buf = IoBuffer.allocate(bufferSize);
        final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
        try {
            int readBytes = 0;
            int ret;

            try {
                if (hasFragmentation) {
                    //从会话读取字节序列到buffer
                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;

                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);

                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }

            if (readBytes > 0) {
	       //获取会话过滤链,触发过滤链消息接收事件MessageReceive
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireMessageReceived(buf);
                buf = null;

                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            } else {
                // release temporary buffer when read nothing
                buf.free(); 
            }
            //如果会话socket关闭,则触发过滤链fireInputClosed
            if (ret < 0) {
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireInputClosed();
            }
        } catch (Exception e) {
            if ((e instanceof IOException) &&
                (!(e instanceof PortUnreachableException)
                        || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
                        || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable())) {
                scheduleRemove(session);
            }
	    //触发过滤链异常事件ExceptionCaught
            IoFilterChain filterChain = session.getFilterChain();
            filterChain.fireExceptionCaught(e);
        }
    }

e.2
// Process writes                                                                                   
if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {     
    // add the session to the queue, if it's not already there                                      
    //如果会话有数据要发送,则将会话添加到刷新会话队列                                              
    flushingSessions.add(session);  
}   

f.
 // Write the pending requests
 //处理有写请求的会话
 long currentTime = System.currentTimeMillis();
 flush(currentTime);


 
/**
  * Write all the pending messages
  */
 private void flush(long currentTime) {
     if (flushingSessions.isEmpty()) {
         return;
     }
     //遍历刷新会话队列
     do {
         S session = flushingSessions.poll(); // the same one with
                                              // firstSession
         if (session == null) {
             // Just in case ... It should not happen.
             break;
         }
         // Reset the Schedule for flush flag for this session,
         // as we are flushing it now
	 //设置会话刷新状态为未刷新
         session.unscheduledForFlush();
	 //获取会话状态
         SessionState state = getState(session);
         switch (state) {
         case OPENED:
             try {
	         //会话已已打开,则委托给flushNow
                 boolean flushedAll = flushNow(session, currentTime);
                 
                 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
                         && !session.isScheduledForFlush()) {
		     //调度刷新会话
                     scheduleFlush(session);
                 }
             } catch (Exception e) {
                 scheduleRemove(session);//移除会话调度
                 session.closeNow();//异常立刻关闭会话
                 IoFilterChain filterChain = session.getFilterChain();
                 filterChain.fireExceptionCaught(e);
             }

             break;

         case CLOSING:
             // Skip if the channel is already closed.
             break;

         case OPENING:
             // Retry later if session is not yet fully initialized.
             // (In case that Session.write() is called before addSession()
             // is processed)
	     //如果正在会话正在打开,则调度刷新会话
             scheduleFlush(session);
             return;

         default:
             throw new IllegalStateException(String.valueOf(state));
         }

     } while (!flushingSessions.isEmpty());
 }

方法有以下几点要关注
f.1
 //会话已已打开,则委托给flushNow                                   
 boolean flushedAll = flushNow(session, currentTime); 



private boolean flushNow(S session, long currentTime) {
    //如果会话失去连接,则添加会话到移除会话队列
    if (!session.isConnected()) {
        scheduleRemove(session);
        return false;
    }
    final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
   //获取会话写请求队列
    final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
    // Set limitation for the number of written bytes for read-write
    // fairness. I used maxReadBufferSize * 3 / 2, which yields best
    // performance in my experience while not breaking fairness much.
    //写buffer最大size,经验值为maxReadBufferSize * 3 / 2
    final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
            + (session.getConfig().getMaxReadBufferSize() >>> 1);
    int writtenBytes = 0;
    WriteRequest req = null;
    try {
        // Clear OP_WRITE,清除会话写事件OP_WRITE标志
        setInterestedInWrite(session, false);

        do {
            // Check for pending writes.
	    //获取会话当前写情趣
            req = session.getCurrentWriteRequest();
            if (req == null) {
                req = writeRequestQueue.poll(session);

                if (req == null) {
                    break;
                }

                session.setCurrentWriteRequest(req);
            }
            int localWrittenBytes;
	    //获取写请求消息
            Object message = req.getMessage();
            if (message instanceof IoBuffer) {
	        //写会话buffer
                localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
                        currentTime);
                if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
                    // the buffer isn't empty, we re-interest it in writing
                    setInterestedInWrite(session, true);

                    return false;
                }
            } else if (message instanceof FileRegion) {
	        写文件
                localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
                        currentTime);

                // Fix for Java bug on Linux
                // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
                // If there's still data to be written in the FileRegion,
                // return 0 indicating that we need
                // to pause until writing may resume.
                if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
                    setInterestedInWrite(session, true);
                    return false;
                }
            } else {
                throw new IllegalStateException("Don't know how to handle message of type '"
                        + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
            }
            if (localWrittenBytes == 0) {

                // Kernel buffer is full.
                if (!req.equals(AbstractIoSession.MESSAGE_SENT_REQUEST)) {
                    setInterestedInWrite(session, true);
                    return false;
                }
            } else {
                writtenBytes += localWrittenBytes;

                if (writtenBytes >= maxWrittenBytes) {
                    // Wrote too much
                    scheduleFlush(session);
                    return false;
                }
            }

            if (message instanceof IoBuffer) {
                ((IoBuffer) message).free();
            }
        } while (writtenBytes < maxWrittenBytes);
    } catch (Exception e) {
       //写请求结果异常
        if (req != null) {
            req.getFuture().setException(e);
        }
        IoFilterChain filterChain = session.getFilterChain();
        filterChain.fireExceptionCaught(e);
        return false;
    }

    return true;
}


这一点有一下几点要看
f.1.1
 //写会话buffer
localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
        currentTime);

  private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
                throws Exception {
            IoBuffer buf = (IoBuffer) req.getMessage();
            int localWrittenBytes = 0;
            if (buf.hasRemaining()) {
                int length;

                if (hasFragmentation) {
                    length = Math.min(buf.remaining(), maxLength);
                } else {
                    length = buf.remaining();
                }
                try {
		    //发送会话数据
                    localWrittenBytes = write(session, buf, length);
                } catch (IOException ioe) {
                    // We have had an issue while trying to send data to the
                    // peer : let's close the session.
                    buf.free();
                    session.closeNow();
                    removeNow(session);

                    return 0;
                }
            }
            session.increaseWrittenBytes(localWrittenBytes, currentTime);
            // Now, forward the original message
            if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
                // Buffer has been sent, clear the current request.
                Object originalMessage = req.getOriginalRequest().getMessage();

                if (originalMessage instanceof IoBuffer) {
                    buf = (IoBuffer) req.getOriginalRequest().getMessage();

                    int pos = buf.position();
                    buf.reset();
                    fireMessageSent(session, req);
                    // And set it back to its position
                    buf.position(pos);
                } else {
                    fireMessageSent(session, req);
                }
            }

            return localWrittenBytes;
}

 /**
     * Write a sequence of bytes to a {@link IoSession}, means to be called when
     * a session was found ready for writing.
     * 当会话准备好写操作是,写字节序列到会话
     * @param session
     *            the session to write
     * @param buf
     *            the buffer to write
     * @param length
     *            the number of bytes to write can be superior to the number of
     *            bytes remaining in the buffer
     * @return the number of byte written
     * @throws IOException
     *             any exception thrown by the underlying system calls
     */
    protected abstract int write(S session, IoBuffer buf, int length) throws IOException;


f.1.2
//写文件                                                                                        
localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,  
        currentTime); 
   

//写文件
private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
        throws Exception {
    int localWrittenBytes;
    //获取写请求文件FileRegion
    FileRegion region = (FileRegion) req.getMessage();
    if (region.getRemainingBytes() > 0) {
        int length;

        if (hasFragmentation) {
            length = (int) Math.min(region.getRemainingBytes(), maxLength);
        } else {
            length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
        }
        //委托给transferFile
        localWrittenBytes = transferFile(session, region, length);
        region.update(localWrittenBytes);
    } else {
        localWrittenBytes = 0;
    }
    session.increaseWrittenBytes(localWrittenBytes, currentTime);
    if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
       //触发会话消息发送事件
        fireMessageSent(session, req);
    }
    return localWrittenBytes;
}

  
 /**
     * Write a part of a file to a {@link IoSession}, if the underlying API
     * isn't supporting system calls like sendfile(), you can throw a
     * {@link UnsupportedOperationException} so the file will be send using
     * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
     * 写文件的某个Region到会话,如果底层API不支持sendfile方法,你可以抛出一个UnsupportedOperationException,
     那么将调用#write(AbstractIoSession, IoBuffer, int)发送文件。
     * @param session
     *            the session to write
     * @param region
     *            the file region to write
     * @param length
     *            the length of the portion to send
     * @return the number of written bytes
     * @throws Exception
     *             any exception thrown by the underlying system calls
     */
    protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;

//触发会话消息发送事件
 private void fireMessageSent(AbstractIoSession session, WriteRequest req)
    {
        session.setCurrentWriteRequest(null);
        IoFilterChain filterChain = session.getFilterChain();
        filterChain.fireMessageSent(req);
    }

f.2   
//如果刷新完成,且会话写请求队列不为空,会话待调度
 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) 
         && !session.isScheduledForFlush()) {                       
     //调度刷新会话                                                 
     scheduleFlush(session);        
 }  

 //调度写请求会话,及添加到刷新队列
 private void scheduleFlush(S session) {
            // add the session to the queue if it's not already
            // in the queue
            if (session.setScheduledForFlush(true)) {
                flushingSessions.add(session);
            }
 }

f.3
 scheduleRemove(session);//移除会话调度
session.closeNow();//异常立刻关闭会话
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e); 

//添加回到到移除队列
private void scheduleRemove(S session) {
    if (!removingSessions.contains(session)) {
        removingSessions.add(session);
    }
}

g.
 // And manage removed sessions
//移除已关闭的会话
nSessions -= removeSessions();

//移除会话
private int removeSessions() {
    int removedSessions = 0;
    //遍历移除会话队列,如果poll的会话不为空,则获取会话状态,
    for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
        SessionState state = getState(session);

        // Now deal with the removal accordingly to the session's state
        switch (state) {
        case OPENED:
            // Try to remove this session
	    //尝试移除会话
            if (removeNow(session)) {
                removedSessions++;
            }
            break;
        case CLOSING:
            // Skip if channel is already closed
            // In any case, remove the session from the queue
	    //会话关闭,则更新会话移除计数器
            removedSessions++;
            break;
        case OPENING:
            // Remove session from the newSessions queue and
            // remove it
	    //正在打开从新创建会话对垒移除会话
            newSessions.remove(session);
            if (removeNow(session)) {
                removedSessions++;
            }
            break;
        default:
            throw new IllegalStateException(String.valueOf(state));
        }
    }
    return removedSessions;
}

//尝试移除会话
private boolean removeNow(S session) {
   //清除会话写请求队列
    clearWriteRequestQueue(session);
    try {
        //销毁会话
        destroy(session);
        return true;
    } catch (Exception e) {
        IoFilterChain filterChain = session.getFilterChain();
        filterChain.fireExceptionCaught(e);
    } finally {
        try {
            clearWriteRequestQueue(session);
            ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
        } catch (Exception e) {
            // The session was either destroyed or not at this point.
            // We do not want any exception thrown from this "cleanup" code
            // to change
            // the return value by bubbling up.
            IoFilterChain filterChain = session.getFilterChain();
            filterChain.fireExceptionCaught(e);
        }
    }
    return false;
}

//清除会话写请求队列
 private void clearWriteRequestQueue(S session) {
          WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
          WriteRequest req;
          List<WriteRequest> failedRequests = new ArrayList<>();
          if ((req = writeRequestQueue.poll(session)) != null) {
              Object message = req.getMessage();
              if (message instanceof IoBuffer) {
                  IoBuffer buf = (IoBuffer) message;

                  // The first unwritten empty buffer must be
                  // forwarded to the filter chain.
		  //如果会话写请求buffer还有数据,添加写请求到失败写请求集合
                  if (buf.hasRemaining()) {
                      buf.reset();
                      failedRequests.add(req);
                  } else {
                      IoFilterChain filterChain = session.getFilterChain();
		      //触发会话过滤链消息发送事件fireMessageSent
                      filterChain.fireMessageSent(req);
                  }
              } else {
                  failedRequests.add(req);
              }
              // Discard others.丢弃其余的会话写请求
              while ((req = writeRequestQueue.poll(session)) != null) {
                  failedRequests.add(req);
              }
          }
          // Create an exception and notify.
          if (!failedRequests.isEmpty()) {
              WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
	      //更新会话调度字节计数器
              for (WriteRequest r : failedRequests) {
                  session.decreaseScheduledBytesAndMessages(r);
                  r.getFuture().setException(cause);
              }
              IoFilterChain filterChain = session.getFilterChain();
              filterChain.fireExceptionCaught(cause);
          }
      }

 
/**
     * Destroy the underlying client socket handle
     * 关闭底层客户端socket
     * @param session
     *            the {@link IoSession}
     * @throws Exception
     *             any exception thrown by the underlying system calls
     */
    protected abstract void destroy(S session) throws Exception;

h.
// Last, not least, send Idle events to the idle sessions
//通知会话空闲事件
notifyIdleSessions(currentTime);


private void notifyIdleSessions(long currentTime) throws Exception {
           // process idle sessions
           if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
               lastIdleCheckTime = currentTime;
	       //通知会话空闲,
               AbstractIoSession.notifyIdleness(allSessions(), currentTime);
           }
       }

//AbstractIoSession
//遍历会话集,通知会话空闲
 public static void notifyIdleness(Iterator sessions, long currentTime)
    {
        do
        {
            if(!sessions.hasNext())
                break;
            IoSession session = (IoSession)sessions.next();
            if(!session.getCloseFuture().isClosed())
                notifyIdleSession(session, currentTime);
        } while(true);
    }


public static void notifyIdleSession(IoSession session, long currentTime)
    {
       //通知会话读写空闲
        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE), IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE), IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
        notifyWriteTimeout(session, currentTime);
    }
    //触发会话空闲状态
    private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status, long lastIoTime)
    {
        if(idleTime > 0L && lastIoTime != 0L && currentTime - lastIoTime >= idleTime)
            session.getFilterChain().fireSessionIdle(status);
    }
    //通知会话超时
    private static void notifyWriteTimeout(IoSession session, long currentTime)
    {
        long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
        if(writeTimeout > 0L && currentTime - session.getLastWriteTime() >= writeTimeout && !session.getWriteRequestQueue().isEmpty(session))
        {
            WriteRequest request = session.getCurrentWriteRequest();
            if(request != null)
            {
	       //设置会话写请求超时异常
                session.setCurrentWriteRequest(null);
                WriteTimeoutException cause = new WriteTimeoutException(request);
                request.getFuture().setException(cause);
                session.getFilterChain().fireExceptionCaught(cause);
                session.closeNow();
            }
        }
    }


i.
 // Disconnect all sessions immediately if disposal has been
// requested so that we exit this loop eventually.
//判断Io处理器是否正在关闭,如果正在关闭断开所有会话
if (isDisposing()) {
    boolean hasKeys = false;
    //获取当前处理器管理的会话,移除会话
    for (Iterator<S> i = allSessions(); i.hasNext();) {
        IoSession session = i.next();

        if (session.isActive()) {
            scheduleRemove((S) session);
            hasKeys = true;
        }
    }
    if (hasKeys) {
        wakeup();
    }
}

j.
t
ry {
       synchronized (disposalLock) {
           if (disposing) {
   	     //如果正在关闭,完成实际关闭工作
               doDispose();
           }
       }
   } catch (Exception e) {
       //捕捉异常
       ExceptionMonitor.getInstance().exceptionCaught(e);
   } finally {
	     //已关闭
             disposalFuture.setValue(true);
}

/**
 * Dispose the resources used by this {@link IoProcessor} for polling the
 * client connections. The implementing class doDispose method will be
 * called.
 * 释放IO处理器相关的资源
 * @throws Exception
 *             if some low level IO error occurs
 */
protected abstract void doDispose() throws Exception;


从上面来看处理器的实际工作,尝试10次nbTries选择操作,在每次选择操作过程中,
首先进行超时选择操作,然后检查Io处理器是否断开连接,尝试次数nbTries是否为零如果为0,则注册新的选择器;然后遍历创建会话队列,从队列拉取会话,如果会话为不null,则初始化会话,构建会话过滤链(从IoService继承)触发会话过滤链的会话创建和会话打开事件,并记录新创建的会话数量nSessions;更会会话状态,此过程为从会话次序控制队列
获取会话,检查会话状态,如果状态为OPENED更新会话的读写状态,如果为OPENING放回次序控制会话队列;如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件(读写事件),遍历选择后读写等操作就绪的会话,如果会话可读,则读取会话缓存区数据到buffer,触发过滤链消息接收事件MessageReceive,接收完消息后,如果会话输入流关闭则触发过滤链fireInputClosed事件,如果在这过程有异常发生,则触发过滤链异常事件ExceptionCaught,如果会话可写,则添加会话到刷新会话队列;遍历刷新会话队列,根据会话写请求消息类型为IoBuffer还是FileRegion,发送会话数据,发送会话数据后,如果会话还有些请求,则添加会话到队列,如果在这个过程中有异常,则添加会话到会话移除队列;遍历会话移除队列,
如果会话为关闭,则尝试关闭会话,并清除会话写请求队列,如果会话数据已发送完,
则触发会话过滤链消息发送事件fireMessageSent;更新处理器会话计数器nSessions;
遍历处理器所有会话,触发会话过滤器会话空闲时间fireSessionIdle;如果在这个过程中,处理器会话计数器nSessions为0,则清除处理器引用;如果Io处理器正在关闭,则添加所有会话到移除会话队列,释放Io处理器先关的资源。

再来看启动处理器方法#startupProcessor的第二点
2.
//执行处理器                                                         
executor.execute(new NamePreservingRunnable(processor, threadName));

/**
 * A {@link Runnable} wrapper that preserves the name of the thread after the runnable is
 * complete (for {@link Runnable}s that change the name of the Thread they use.)
 * 将Runnable包装成一个新的线程,只是线程名不同,线程运行完,恢复原始线程名。
 * @author The Apache MINA Project (dev@mina.apache.org)
 * @version $Rev: 446581 $, $Date: 2006-09-15 11:36:12Z $,
 */
public class NamePreservingRunnable implements Runnable {
    private final Logger logger = LoggerFactory.getLogger(NamePreservingRunnable.class);

    private final String newName;//新线程名
    private final Runnable runnable;//实际线程

    public NamePreservingRunnable(Runnable runnable, String newName) {
        this.runnable = runnable;
        this.newName = newName;
    }

    public void run() {
        Thread currentThread = Thread.currentThread();
        String oldName = currentThread.getName();
        if (newName != null) {
            setName(currentThread, newName);
        }
        try {
            runnable.run();
        } finally {
            setName(currentThread, oldName);
        }
    }
    
    /**
     * Wraps {@link Thread#setName(String)} to catch a possible {@link Exception}s such as
     * {@link SecurityException} in sandbox environments, such as applets
     设置线程名
     */
    private void setName(Thread thread, String name) {
        try {
            thread.setName(name);
        } catch (Exception e) {
            // Probably SecurityException.
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to set the thread name.", e);
            }
        }
    }
}

回到添加会话方法:
 /**
  * {@inheritDoc}
  */
//会话创建时,添加回到到处理器,
 @Override
 public final void add(S session) {
     if (disposed || disposing) {//如果处理器已关闭,则抛出非法状态异常
         throw new IllegalStateException("Already disposed.");
     }
     // Adds the session to the newSession queue and starts the worker
	//添加会话到Io处理器的创建会话队列中
     newSessions.add(session);
      //启动一个Io处理器线程
     startupProcessor();
 }


小节,从上面来年,添加会话首先添加会话到Io处理器的创建会话队列中,启动处理器线程Processor。处理器的实际工作,尝试10次nbTries选择操作,在每次选择操作过程中,
首先进行超时选择操作,然后检查Io处理器是否断开连接,尝试次数nbTries是否为零如果为0,则注册新的选择器;然后遍历创建会话队列,从队列拉取会话,如果会话为不null,则初始化会话,构建会话过滤链(从IoService继承)触发会话过滤链的会话创建和会话打开事件,并记录新创建的会话数量nSessions;更会会话状态,此过程为从会话次序控制队列
获取会话,检查会话状态,如果状态为OPENED更新会话的读写状态,如果为OPENING放回次序控制会话队列;如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件(读写事件),遍历选择后读写等操作就绪的会话,如果会话可读,则读取会话缓存区数据到buffer,触发过滤链消息接收事件MessageReceive,接收完消息后,如果会话输入流
关闭则触发过滤链fireInputClosed事件,如果在这过程有异常发生,则触发过滤链异常事件ExceptionCaught,如果会话可写,则添加会话到刷新会话队列;遍历刷新会话队列,根据会话写请求消息类型为IoBuffer还是FileRegion,发送会话数据,发送会话数据后,如果会话还有些请求,则添加会话到队列,如果在这个过程中有异常,则添加会话到会话移除队列;遍历会话移除队列,如果会话为关闭,则尝试关闭会话,并清除会话写请求队列,如果会话数据已发送完,则触发会话过滤链消息发送事件fireMessageSent;更新处理器会话计数器nSessions;遍历处理器所有会话,触发会话过滤器会话空闲时间fireSessionIdle;
如果在这个过程中,处理器会话计数器nSessions为0,则清除处理器引用;如果Io处理器正在关闭,则添加所有会话到移除会话队列,释放Io处理器先关的资源。
再来看其他方法:
 /**
  * {@inheritDoc}
  将写请求添加到会话写请求队列
  */
 @Override
 public void write(S session, WriteRequest writeRequest) {
     WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();

     writeRequestQueue.offer(session, writeRequest);

     if (!session.isWriteSuspended()) {
         //刷新会话
         this.flush(session);
     }
 }
   /**
     * {@inheritDoc}
     //添加会话到刷新会话队列
     */
    @Override
    public final void flush(S session) {
        // add the session to the queue if it's not already
        // in the queue, then wake up the select()
	//设置会话正在调度flush
        if (session.setScheduledForFlush(true)) {
            flushingSessions.add(session);
            wakeup();
        }
    }
    /**
     * {@inheritDoc}
     移除会话,添加会话到移除会话队列,启动处理器线程
     */
    @Override
    public final void remove(S session) {
        scheduleRemove(session);
        startupProcessor();
    }

    /**
     * {@inheritDoc}
     //释放Io处理器资源
     */
    @Override
    public final void dispose() {
        if (disposed || disposing) {
            return;
        }
        synchronized (disposalLock) {
            disposing = true;
            startupProcessor();
        }
        disposalFuture.awaitUninterruptibly();
        disposed = true;
    }
   /**
     * {@inheritDoc}
     */
    @Override
    public final boolean isDisposing() {
        return disposing;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public final boolean isDisposed() {
        return disposed;
    }

总结:

     抽象Io处理器AbstractPollingIoProcessor,主要几个关键内部变量为选择操作超时时间SELECT_TIMEOUT,用于腾出时间,处理空闲的会话; executor处理器内部执行器,用于运行内部处理器Processor;存储Io处理器等线程最大线程id的threadIds(Map);创建会话队列newSessions用于存储新创建的会话;移除会话队列removingSessions用于存放从处理器移除的会话;刷新会话队列flushingSessions,用于存放要发送写请求的会话;次序控制会话队列trafficControllingSessions用于存放会话待读写的会话;Io处理器线程引用processorRef。
     添加会话首先添加会话到Io处理器的创建会话队列中,启动处理器线程Processor。处理器的实际工作,尝试10次nbTries选择操作,在每次选择操作过程中,首先进行超时选择操作,然后检查Io处理器是否断开连接,尝试次数nbTries是否为零如果为0,则注册新的选择器;然后遍历创建会话队列,从队列拉取会话,如果会话为不null,则初始化会话,构建会话过滤链(从IoService继承)触发会话过滤链的会话创建和会话打开事件,并记录新创建的会话数量nSessions;更会会话状态,此过程为从会话次序控制队列获取会话,检查会话状态,如果状态为OPENED更新会话的读写状态,如果为OPENING放回次序控制会话队列;如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件(读写事件),遍历选择后读写等操作就绪的会话,如果会话可读,则读取会话缓存区数据到buffer,触发过滤链消息接收事件MessageReceive,接收完消息后,如果会话输入流关闭则触发过滤链fireInputClosed事件,如果在这过程有异常发生,则触发过滤链异常事件ExceptionCaught,如果会话可写,则添加会话到刷新会话队列;遍历刷新会话队列,根据会话写请求消息类型为IoBuffer还是FileRegion,发送会话数据,发送会话数据后,如果会话还有些请求,则添加会话到队列,如果在这个过程中有异常,则添加会话到会话移除队列;遍历会话移除队列,如果会话为关闭,则尝试关闭会话,并清除会话写请求队列,如果会话数据已发送完,则触发会话过滤链消息发送事件fireMessageSent;更新处理器会话计数器nSessions;遍历处理器所有会话,触发会话过滤器会话空闲时间fireSessionIdle;如果在这个过程中,处理器会话计数器nSessions为0,则清除处理器引用;如果Io处理器正在关闭,则添加所有会话到移除会话队列,释放Io处理器先关的资源。
    抽象Io处理器AbstractPollingIoProcessor主要是处理IoProcessor关联会话message*事件,而所有的工作,都是通过处理器线程Processor完成。每当有会话添加到IoProcessor,则启动一个处理器线程Processor,处理会话的读写操作及相关事件。就连IoProcessor资源的释放,也是由处理器线程Processor处理。关闭IoProcessor时,现将处理器关联会话,添加移除会话队列,实际工作由IoProcessor的子类的doDispose方法完成。


附:
//SessionState会话状态
public final class SessionState extends Enum
{
    public static final SessionState OPENING;
    public static final SessionState OPENED;
    public static final SessionState CLOSING;
    private static final SessionState $VALUES[];
    private SessionState(String s, int i)
    {
        super(s, i);
    }
    static 
    {
        OPENING = new SessionState("OPENING", 0);
        OPENED = new SessionState("OPENED", 1);
        CLOSING = new SessionState("CLOSING", 2);
        $VALUES = (new SessionState[] {
            OPENING, OPENED, CLOSING
        });
    }
    public static SessionState[] values()
    {
        return (SessionState[])$VALUES.clone();
    }

    public static SessionState valueOf(String name)
    {
        return (SessionState)Enum.valueOf(org/apache/mina/core/session/SessionState, name);
    }
}

//原子引用AtomicReference
/**
 * An object reference that may be updated atomically. See the {@link
 * java.util.concurrent.atomic} package specification for description
 * of the properties of atomic variables.
 * @since 1.5
 * @author Doug Lea
 * @param <V> The type of object referred to by this reference
 */
public class AtomicReference<V>  implements java.io.Serializable {
    private static final long serialVersionUID = -1848883965231344442L;
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
    static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicReference.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
    }
    private volatile V value;

    /**
     * Creates a new AtomicReference with the given initial value.
     *
     * @param initialValue the initial value
     */
    public AtomicReference(V initialValue) {
        value = initialValue;
    }
    /**
     * Creates a new AtomicReference with null initial value.
     */
    public AtomicReference() {
    }
    /**
     * Gets the current value.
     *
     * @return the current value
     */
    public final V get() {
        return value;
    }
    /**
     * Sets to the given value.
     *
     * @param newValue the new value
     */
    public final void set(V newValue) {
        value = newValue;
    }
    /**
     * Eventually sets to the given value.
     *
     * @param newValue the new value
     * @since 1.6
     */
    public final void lazySet(V newValue) {
        unsafe.putOrderedObject(this, valueOffset, newValue);
    }
    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     * @param expect the expected value
     * @param update the new value
     * @return true if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(V expect, V update) {
        return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    }
    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * <p>May [url=package-summary.html#Spurious]fail spuriously[/url]
     * and does not provide ordering guarantees, so is only rarely an
     * appropriate alternative to {@code compareAndSet}.
     *
     * @param expect the expected value
     * @param update the new value
     * @return true if successful.
     */
    public final boolean weakCompareAndSet(V expect, V update) {
        return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    }
    /**
     * Atomically sets to the given value and returns the old value.
     *
     * @param newValue the new value
     * @return the previous value
     */
    public final V getAndSet(V newValue) {
        while (true) {
            V x = get();
            if (compareAndSet(x, newValue))
                return x;
        }
    }

    /**
     * Returns the String representation of the current value.
     * @return the String representation of the current value.
     */
    public String toString() {
        return String.valueOf(get());
    }

}
相关标签: mina