Mina Io处理器抽象实现
程序员文章站
2024-01-13 09:30:34
...
Mina 过滤链抽象实现:http://donald-draper.iteye.com/blog/2376335
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
在上面这篇文章中,当会话发送消息后,消息被过滤链上的过滤器过滤,从链尾到链头,过程如下:
//消息发送,Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,handler并不处理会话事件)
再来看一下过滤链头HeadFilter
//HeadFilter
从HeadFilter的定义来看,HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;
有两个方法有所不同:
//HeadFilter
//AbstractIoFilterChain,待子类扩展
再来看SocketFilterChain
从上面可以看出会话发送消息最后由会话IoProcessor处理,下面来看一下IoProcessor接口的定义:
下面来看Io处理器的抽象实现AbstractPollingIoProcessor
从上面可以看出抽象Io处理器,主要几个关键内部变量为选择操作超时时间SELECT_TIMEOUT,用于腾出时间,处理空闲的会话;executor处理器内部执行器,用于运行内部处理器Processor;存储Io处理器等线程最大线程id的threadIds(Map);创建会话队列newSessions用于存储新创建的会话;移除会话队列removingSessions用于存放从处理器移除的会话;刷新会话队列flushingSessions,用于存放要发送写请求的会话;次序控制会话队列trafficControllingSessions用于存放会话暂定读写的会话;;正在处理进来消息的处理器引用processorRef;这些变量暂时这么理解,后面如果发现错误,再更正。
来看构造:
来看其他方法定义
来看添加会话到处理器
这个过程有两点要看
1.
2.
先来看处理器线程
1.
Io处理器,处理线程Processor的实际工作有一下几点要看
a.
b.
c.
d.
e.
f.
g.
h.
i.
j.
t
下面我们分别来看这几点
a.
b.
c.
添加会话有一下几点要看
c.1
c.2
//IoServiceListenerSupport
c.3
d.
e.
处理会话有两点要关注,
e.1
e.2
f.
方法有以下几点要关注
f.1
这一点有一下几点要看
f.1.1
f.1.2
//写文件
//写文件
f.2
f.3
g.
//移除会话
//尝试移除会话
//清除会话写请求队列
h.
//AbstractIoSession
i.
j.
t
从上面来看处理器的实际工作,尝试10次nbTries选择操作,在每次选择操作过程中,
首先进行超时选择操作,然后检查Io处理器是否断开连接,尝试次数nbTries是否为零如果为0,则注册新的选择器;然后遍历创建会话队列,从队列拉取会话,如果会话为不null,则初始化会话,构建会话过滤链(从IoService继承)触发会话过滤链的会话创建和会话打开事件,并记录新创建的会话数量nSessions;更会会话状态,此过程为从会话次序控制队列
获取会话,检查会话状态,如果状态为OPENED更新会话的读写状态,如果为OPENING放回次序控制会话队列;如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件(读写事件),遍历选择后读写等操作就绪的会话,如果会话可读,则读取会话缓存区数据到buffer,触发过滤链消息接收事件MessageReceive,接收完消息后,如果会话输入流关闭则触发过滤链fireInputClosed事件,如果在这过程有异常发生,则触发过滤链异常事件ExceptionCaught,如果会话可写,则添加会话到刷新会话队列;遍历刷新会话队列,根据会话写请求消息类型为IoBuffer还是FileRegion,发送会话数据,发送会话数据后,如果会话还有些请求,则添加会话到队列,如果在这个过程中有异常,则添加会话到会话移除队列;遍历会话移除队列,
如果会话为关闭,则尝试关闭会话,并清除会话写请求队列,如果会话数据已发送完,
则触发会话过滤链消息发送事件fireMessageSent;更新处理器会话计数器nSessions;
遍历处理器所有会话,触发会话过滤器会话空闲时间fireSessionIdle;如果在这个过程中,处理器会话计数器nSessions为0,则清除处理器引用;如果Io处理器正在关闭,则添加所有会话到移除会话队列,释放Io处理器先关的资源。
再来看启动处理器方法#startupProcessor的第二点
2.
回到添加会话方法:
小节,从上面来年,添加会话首先添加会话到Io处理器的创建会话队列中,启动处理器线程Processor。处理器的实际工作,尝试10次nbTries选择操作,在每次选择操作过程中,
首先进行超时选择操作,然后检查Io处理器是否断开连接,尝试次数nbTries是否为零如果为0,则注册新的选择器;然后遍历创建会话队列,从队列拉取会话,如果会话为不null,则初始化会话,构建会话过滤链(从IoService继承)触发会话过滤链的会话创建和会话打开事件,并记录新创建的会话数量nSessions;更会会话状态,此过程为从会话次序控制队列
获取会话,检查会话状态,如果状态为OPENED更新会话的读写状态,如果为OPENING放回次序控制会话队列;如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件(读写事件),遍历选择后读写等操作就绪的会话,如果会话可读,则读取会话缓存区数据到buffer,触发过滤链消息接收事件MessageReceive,接收完消息后,如果会话输入流
关闭则触发过滤链fireInputClosed事件,如果在这过程有异常发生,则触发过滤链异常事件ExceptionCaught,如果会话可写,则添加会话到刷新会话队列;遍历刷新会话队列,根据会话写请求消息类型为IoBuffer还是FileRegion,发送会话数据,发送会话数据后,如果会话还有些请求,则添加会话到队列,如果在这个过程中有异常,则添加会话到会话移除队列;遍历会话移除队列,如果会话为关闭,则尝试关闭会话,并清除会话写请求队列,如果会话数据已发送完,则触发会话过滤链消息发送事件fireMessageSent;更新处理器会话计数器nSessions;遍历处理器所有会话,触发会话过滤器会话空闲时间fireSessionIdle;
如果在这个过程中,处理器会话计数器nSessions为0,则清除处理器引用;如果Io处理器正在关闭,则添加所有会话到移除会话队列,释放Io处理器先关的资源。
再来看其他方法:
总结:
抽象Io处理器AbstractPollingIoProcessor,主要几个关键内部变量为选择操作超时时间SELECT_TIMEOUT,用于腾出时间,处理空闲的会话; executor处理器内部执行器,用于运行内部处理器Processor;存储Io处理器等线程最大线程id的threadIds(Map);创建会话队列newSessions用于存储新创建的会话;移除会话队列removingSessions用于存放从处理器移除的会话;刷新会话队列flushingSessions,用于存放要发送写请求的会话;次序控制会话队列trafficControllingSessions用于存放会话待读写的会话;Io处理器线程引用processorRef。
添加会话首先添加会话到Io处理器的创建会话队列中,启动处理器线程Processor。处理器的实际工作,尝试10次nbTries选择操作,在每次选择操作过程中,首先进行超时选择操作,然后检查Io处理器是否断开连接,尝试次数nbTries是否为零如果为0,则注册新的选择器;然后遍历创建会话队列,从队列拉取会话,如果会话为不null,则初始化会话,构建会话过滤链(从IoService继承)触发会话过滤链的会话创建和会话打开事件,并记录新创建的会话数量nSessions;更会会话状态,此过程为从会话次序控制队列获取会话,检查会话状态,如果状态为OPENED更新会话的读写状态,如果为OPENING放回次序控制会话队列;如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件(读写事件),遍历选择后读写等操作就绪的会话,如果会话可读,则读取会话缓存区数据到buffer,触发过滤链消息接收事件MessageReceive,接收完消息后,如果会话输入流关闭则触发过滤链fireInputClosed事件,如果在这过程有异常发生,则触发过滤链异常事件ExceptionCaught,如果会话可写,则添加会话到刷新会话队列;遍历刷新会话队列,根据会话写请求消息类型为IoBuffer还是FileRegion,发送会话数据,发送会话数据后,如果会话还有些请求,则添加会话到队列,如果在这个过程中有异常,则添加会话到会话移除队列;遍历会话移除队列,如果会话为关闭,则尝试关闭会话,并清除会话写请求队列,如果会话数据已发送完,则触发会话过滤链消息发送事件fireMessageSent;更新处理器会话计数器nSessions;遍历处理器所有会话,触发会话过滤器会话空闲时间fireSessionIdle;如果在这个过程中,处理器会话计数器nSessions为0,则清除处理器引用;如果Io处理器正在关闭,则添加所有会话到移除会话队列,释放Io处理器先关的资源。
抽象Io处理器AbstractPollingIoProcessor主要是处理IoProcessor关联会话message*事件,而所有的工作,都是通过处理器线程Processor完成。每当有会话添加到IoProcessor,则启动一个处理器线程Processor,处理会话的读写操作及相关事件。就连IoProcessor资源的释放,也是由处理器线程Processor处理。关闭IoProcessor时,现将处理器关联会话,添加移除会话队列,实际工作由IoProcessor的子类的doDispose方法完成。
附:
//SessionState会话状态
//原子引用AtomicReference
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
在上面这篇文章中,当会话发送消息后,消息被过滤链上的过滤器过滤,从链尾到链头,过程如下:
//消息发送,Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,handler并不处理会话事件)
public void fireFilterWrite(IoSession session, WriteRequest writeRequest) { Entry tail = this.tail; callPreviousFilterWrite(tail, session, writeRequest); } private void callPreviousFilterWrite(Entry entry, IoSession session, WriteRequest writeRequest) { try { entry.getFilter().filterWrite(entry.getNextFilter(), session, writeRequest); } catch (Throwable e) { writeRequest.getFuture().setWritten(false); fireExceptionCaught(session, e); } }
再来看一下过滤链头HeadFilter
//HeadFilter
private class HeadFilter extends IoFilterAdapter { ... public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { if (session.getTransportType().getEnvelopeType().isAssignableFrom( writeRequest.getMessage().getClass())) { doWrite(session, writeRequest); } else { throw new IllegalStateException( "Write requests must be transformed to " + session.getTransportType().getEnvelopeType() + ": " + writeRequest); } } ... }
从HeadFilter的定义来看,HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;
有两个方法有所不同:
//HeadFilter
//会话写操作 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { if (session.getTransportType().getEnvelopeType().isAssignableFrom( writeRequest.getMessage().getClass())) { doWrite(session, writeRequest); } else { throw new IllegalStateException( "Write requests must be transformed to " + session.getTransportType().getEnvelopeType() + ": " + writeRequest); } }
//AbstractIoFilterChain,待子类扩展
protected abstract void doWrite(IoSession session, WriteRequest writeRequest) throws Exception;
再来看SocketFilterChain
class SocketFilterChain extends AbstractIoFilterChain { SocketFilterChain(IoSession parent) { super(parent); } protected void doWrite(IoSession session, WriteRequest writeRequest) { SocketSessionImpl s = (SocketSessionImpl) session; //获取Socket会话的的写请求队列,Queue继承于AbstractList,这个我们在后面再讲 Queue writeRequestQueue = s.getWriteRequestQueue(); // SocketIoProcessor.doFlush() will reset it after write is finished // because the buffer will be passed with messageSent event. //这里之所以要mark buffer的位置,主要是buffer要传给messageSent事件, //待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置 ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage(); buffer.mark(); int remaining = buffer.remaining(); if (remaining == 0) { //BaseIoSession // private final AtomicInteger scheduledWriteRequests = new AtomicInteger(); //更新调度请求计数器+1 s.increaseScheduledWriteRequests(); } else { //BaseIoSession //private final AtomicInteger scheduledWriteBytes = new AtomicInteger(); //更新调度写字节计数器+buffer.remaining() s.increaseScheduledWriteBytes(buffer.remaining()); } synchronized (writeRequestQueue) { //将写请求添加到session写请求队列中 writeRequestQueue.push(writeRequest); } //如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作,这个在以后在具体详说 if (session.getTrafficMask().isWritable()) { s.getIoProcessor().flush(s); } } //关闭会话 protected void doClose(IoSession session) throws IOException { SocketSessionImpl s = (SocketSessionImpl) session; s.getIoProcessor().remove(s);//委托给session关联的IoProcessor } }
从上面可以看出会话发送消息最后由会话IoProcessor处理,下面来看一下IoProcessor接口的定义:
/** * An internal interface to represent an 'I/O processor' that performs * actual I/O operations for {@link IoSession}s. It abstracts existing * reactor frameworks such as Java NIO once again to simplify transport * implementations. * * @author [url=http://mina.apache.org]Apache MINA Project[/url] * * @param <S> the type of the {@link IoSession} this processor can handle */ public interface IoProcessor<S extends IoSession> { /** * Releases any resources allocated by this processor. Please note that * the resources might not be released as long as there are any sessions * managed by this processor. Most implementations will close all sessions * immediately and release the related resources. 释放所有分配给IO处理器的资源。只要有任何会话占用资源,资源也许不会被Io处理器释放。 在大部分的实现版本中,是立刻关闭所有会话,释放相关资源。 */ void dispose(); /** * @return <tt>true</tt> if and if only {@link #dispose()} method has * been called. Please note that this method will return <tt>true</tt> * even after all the related resources are released. 如果dispose已经被调用,则返回true。即使在所有相关资源释放后,此方法仍返回true */ boolean isDisposing(); /** * @return <tt>true</tt> if and if only all resources of this processor * have been disposed. Io处理器所有资源释放完,则返回true。 */ boolean isDisposed(); /** * Adds the specified {@code session} to the I/O processor so that * the I/O processor starts to perform any I/O operations related * with the {@code session}. * 添加会话到Io处理器,以便Io处理器启动时,执行会话相关的操作。 * @param session The added session */ void add(S session); /** * Flushes the internal write request queue of the specified * {@code session}. * 刷新会话内部的写请求队列 * @param session The session we want the message to be written */ void flush(S session); /** * Writes the WriteRequest for the specified {@code session}. * 发送写请求到会话 * @param session The session we want the message to be written * @param writeRequest the WriteRequest to write */ void write(S session, WriteRequest writeRequest); /** * Controls the traffic of the specified {@code session} depending of the * {@link IoSession#isReadSuspended()} and {@link IoSession#isWriteSuspended()} * flags * 依赖于会话的IoSession#isReadSuspended/isWriteSuspended标志控制session读写请求的次序 * @param session The session to be updated */ void updateTrafficControl(S session); /** * Removes and closes the specified {@code session} from the I/O * processor so that the I/O processor closes the connection * associated with the {@code session} and releases any other related * resources. * 从Io处理器移除和关闭会话,以便Io处理器关闭连接关联的会话,释放相关的资源。 * @param session The session to be removed */ void remove(S session); }
下面来看Io处理器的抽象实现AbstractPollingIoProcessor
/** * An abstract implementation of {@link IoProcessor} which helps transport * developers to write an {@link IoProcessor} easily. This class is in charge of * active polling a set of {@link IoSession} and trigger events when some I/O * operation is possible. * @author [url=http://mina.apache.org]Apache MINA Project[/url] * @param <S> * the type of the {@link IoSession} this processor can handle */ public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> { /** A logger for this class */ private static final Logger LOG = LoggerFactory.getLogger(IoProcessor.class); /** * A timeout used for the select, as we need to get out to deal with idle * sessions.选择操作超时时间,我们需要处理空闲的会话 */ private static final long SELECT_TIMEOUT = 1000L; /** A map containing the last Thread ID for each class */每个class的最后一个线程id private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<>(); /** This IoProcessor instance name处理器实例名 */ private final String threadName; /** The executor to use when we need to start the inner Processor */ private final Executor executor;//处理器内部执行器,用于运行内部处理器Processor /** A Session queue containing the newly created sessions */ private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();//创建会话队列 /** A queue used to store the sessions to be removed */ private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();//移除会话队列 /** A queue used to store the sessions to be flushed */ private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>();//刷新会话队列 /** * A queue used to store the sessions which have a trafficControl to be * updated */ private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<>();//次序控制会话队列 /** The processor thread : it handles the incoming messages 处理器线程,用于处理进来的消息*/ private final AtomicReference<Processor> processorRef = new AtomicReference<>(); private long lastIdleCheckTime;//上次空闲检查时间 private final Object disposalLock = new Object();//关闭锁 private volatile boolean disposing;//是否正在关闭 private volatile boolean disposed;//是否已关闭 private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);//关闭结果 protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);//这个暂时清楚,后面遇见时再说 }
从上面可以看出抽象Io处理器,主要几个关键内部变量为选择操作超时时间SELECT_TIMEOUT,用于腾出时间,处理空闲的会话;executor处理器内部执行器,用于运行内部处理器Processor;存储Io处理器等线程最大线程id的threadIds(Map);创建会话队列newSessions用于存储新创建的会话;移除会话队列removingSessions用于存放从处理器移除的会话;刷新会话队列flushingSessions,用于存放要发送写请求的会话;次序控制会话队列trafficControllingSessions用于存放会话暂定读写的会话;;正在处理进来消息的处理器引用processorRef;这些变量暂时这么理解,后面如果发现错误,再更正。
来看构造:
/** * Create an {@link AbstractPollingIoProcessor} with the given * {@link Executor} for handling I/Os events. * 根据给定的执行器,创建抽象Io处理器用于处理IO事件 * @param executor * the {@link Executor} for handling I/O events */ protected AbstractPollingIoProcessor(Executor executor) { if (executor == null) { throw new IllegalArgumentException("executor"); } this.threadName = nextThreadName();//获取处理器线程名 this.executor = executor; } /** * Compute the thread ID for this class instance. As we may have different * classes, we store the last ID number into a Map associating the class * name to the last assigned ID. * 计算类型实例最大线程id。因为我们有不同的类型,所以我们存在类型与类型实例最后一个id的映射 关系放在Map中管理。 * @return a name for the current thread, based on the class name and an * incremental value, starting at 1. */ private String nextThreadName() { Class<?> cls = getClass(); int newThreadId; //从类型最大线程id映射Map,获取Io处理器线程的最大线程id AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1)); if (threadId == null) { newThreadId = 1; } else { // Just increment the last ID, and get it. newThreadId = threadId.incrementAndGet(); } // Now we can compute the name for this thread return cls.getSimpleName() + '-' + newThreadId; }
来看其他方法定义
/** * poll those sessions for the given timeout * 超时选择 * @param timeout * milliseconds before the call timeout if no event appear * @return The number of session ready for read or for write * @throws Exception * if some low level IO error occurs */ protected abstract int select(long timeout) throws Exception; /** * poll those sessions forever * 选择操作 * @return The number of session ready for read or for write * @throws Exception * if some low level IO error occurs */ protected abstract int select() throws Exception; /** * Say if the list of {@link IoSession} polled by this {@link IoProcessor} * is empty * 判断处理器的会话集合是否为空 * @return <tt>true</tt> if at least a session is managed by this * {@link IoProcessor} */ protected abstract boolean isSelectorEmpty(); /** * Get an {@link Iterator} for the list of {@link IoSession} polled by this * {@link IoProcessor} * 返回处理器选择的所有会话 * @return {@link Iterator} of {@link IoSession} */ protected abstract Iterator<S> allSessions(); /** * Get an {@link Iterator} for the list of {@link IoSession} found selected * by the last call of {@link #select(long)} * 获取上次调用超时选择后的会话集 * @return {@link Iterator} of {@link IoSession} read for I/Os operation */ protected abstract Iterator<S> selectedSessions(); /** * Get the state of a session (One of OPENING, OPEN, CLOSING) * 获取会话状态 * @param session * the {@link IoSession} to inspect * @return the state of the session */ protected abstract SessionState getState(S session); /** * Tells if the session ready for writing * 判断会话是否可写 * @param session * the queried session * @return <tt>true</tt> is ready, <tt>false</tt> if not ready */ protected abstract boolean isWritable(S session); /** * Tells if the session ready for reading * 判断会话是否准备好读操作 * @param session * the queried session * @return <tt>true</tt> is ready, <tt>false</tt> if not ready */ protected abstract boolean isReadable(S session); /** * Set the session to be informed when a write event should be processed * 当有一个写事件要处理时,是否通知会话 * @param session * the session for which we want to be interested in write events * @param isInterested * <tt>true</tt> for registering, <tt>false</tt> for removing * @throws Exception * If there was a problem while registering the session */ protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception; /** * Set the session to be informed when a read event should be processed * 当有一个读事件要处理时,是否通知会话 * @param session * the session for which we want to be interested in read events * @param isInterested * <tt>true</tt> for registering, <tt>false</tt> for removing * @throws Exception * If there was a problem while registering the session */ protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception; /** * Tells if this session is registered for reading * 判断会话是否注册读事件 * @param session * the queried session * @return <tt>true</tt> is registered for reading */ protected abstract boolean isInterestedInRead(S session); /** * Tells if this session is registered for writing * 判断会话是否注册写事件 * @param session * the queried session * @return <tt>true</tt> is registered for writing */ protected abstract boolean isInterestedInWrite(S session); /** * Initialize the polling of a session. Add it to the polling process. * 初始化会话,添加到处理器 * @param session * the {@link IoSession} to add to the polling * @throws Exception * any exception thrown by the underlying system calls */ protected abstract void init(S session) throws Exception; /** * Destroy the underlying client socket handle * 关闭底层客户端socket * @param session * the {@link IoSession} * @throws Exception * any exception thrown by the underlying system calls */ protected abstract void destroy(S session) throws Exception; /** * Reads a sequence of bytes from a {@link IoSession} into the given * {@link IoBuffer}. Is called when the session was found ready for reading. * 当会话准备好读操作是,从会话读字节序列 * @param session * the session to read * @param buf * the buffer to fill * @return the number of bytes read * @throws Exception * any exception thrown by the underlying system calls */ protected abstract int read(S session, IoBuffer buf) throws Exception; /** * Write a sequence of bytes to a {@link IoSession}, means to be called when * a session was found ready for writing. * 当会话准备好写操作是,写字节序列到会话 * @param session * the session to write * @param buf * the buffer to write * @param length * the number of bytes to write can be superior to the number of * bytes remaining in the buffer * @return the number of byte written * @throws IOException * any exception thrown by the underlying system calls */ protected abstract int write(S session, IoBuffer buf, int length) throws IOException; /** * Write a part of a file to a {@link IoSession}, if the underlying API * isn't supporting system calls like sendfile(), you can throw a * {@link UnsupportedOperationException} so the file will be send using * usual {@link #write(AbstractIoSession, IoBuffer, int)} call. * 写文件的某个Region到会话,如果底层API不支持sendfile方法,你可以抛出一个UnsupportedOperationException, 那么将调用#write(AbstractIoSession, IoBuffer, int)发送文件。 * @param session * the session to write * @param region * the file region to write * @param length * the length of the portion to send * @return the number of written bytes * @throws Exception * any exception thrown by the underlying system calls */ protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
来看添加会话到处理器
/** * {@inheritDoc} */ //会话创建时,添加回到到处理器, @Override public final void add(S session) { if (disposed || disposing) {//如果处理器已关闭,则抛出非法状态异常 throw new IllegalStateException("Already disposed."); } // Adds the session to the newSession queue and starts the worker //添加会话到Io处理器的创建会话队列中 newSessions.add(session); //启动一个Io处理器线程 startupProcessor(); }
/** * Starts the inner Processor, asking the executor to pick a thread in its * pool. The Runnable will be renamed */ private void startupProcessor() { //从处理器引用获取处理器 Processor processor = processorRef.get(); if (processor == null) { //处理器为空,则创建一个 processor = new Processor(); if (processorRef.compareAndSet(null, processor)) { //执行处理器 executor.execute(new NamePreservingRunnable(processor, threadName)); } } // Just stop the select() and start it again, so that the processor // can be activated immediately. //暂时停止选择操作,待处理器线程启动 wakeup(); }
/** * Interrupt the {@link #select(long)} call. 中断选择操作 */ protected abstract void wakeup();
这个过程有两点要看
1.
//处理器为空,则创建一个 processor = new Processor();
2.
//执行处理器 executor.execute(new NamePreservingRunnable(processor, threadName));
先来看处理器线程
1.
//处理器为空,则创建一个 processor = new Processor();
//Processor /** * The main loop. This is the place in charge to poll the Selector, and to * process the active sessions. It's done in - handle the newly created * sessions - */ private class Processor implements Runnable { /** * {@inheritDoc} */ @Override public void run() { //断言Io处理器实际处理线程是否为当前Processor assert processorRef.get() == this; int nSessions = 0; lastIdleCheckTime = System.currentTimeMillis(); int nbTries = 10; for (;;) { try { // This select has a timeout so that we can manage // idle session when we get out of the select every // second. (note : this is a hack to avoid creating // a dedicated thread). //选择操作有一个超时时间,以便当选择超时时,处理空闲会话 long t0 = System.currentTimeMillis(); //超时选择,select方法待子类实现 int selected = select(SELECT_TIMEOUT); long t1 = System.currentTimeMillis(); long delta = t1 - t0; //当前这次选择操作,没有SELECTKey相关事件,没有中断,且此次选择操作耗时 //小于SELECT_TIMEOUT(1000)/nbTries(10) if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) { // Last chance : the select() may have been // interrupted because we have had an closed channel. //在上次尝试选择操作时,可能通道关闭,选择操作可能被中断 if (isBrokenConnection()) { //通道关闭的话,仅仅输出日志 LOG.warn("Broken connection"); } else { // Ok, we are hit by the nasty epoll // spinning. // Basically, there is a race condition // which causes a closing file descriptor not to be // considered as available as a selected channel, // but // it stopped the select. The next time we will // call select(), it will exit immediately for the // same // reason, and do so forever, consuming 100% // CPU. // We have to destroy the selector, and // register all the socket on a new one. if (nbTries == 0) { //如果尝试次数用完我们,注册新的选择器 LOG.warn("Create a new selector. Selected is 0, delta = " + delta); registerNewSelector(); nbTries = 10;//恢复尝试次数 } else { //否则尝试次数自减 nbTries--; } } } else { nbTries = 10; } // Manage newly created session first //处理新会话 nSessions += handleNewSessions(); //更新会话状态 updateTrafficMask(); // Now, if we have had some incoming or outgoing events, // deal with them if (selected > 0) { // LOG.debug("Processing ..."); // This log hurts one of // the MDCFilter test... //如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件 //读写事件,委托为process方法处理 process(); } // Write the pending requests //处理有些请求的会话 long currentTime = System.currentTimeMillis(); flush(currentTime); // And manage removed sessions //移除已关闭的会话 nSessions -= removeSessions(); // Last, not least, send Idle events to the idle sessions //通知会话空闲事件 notifyIdleSessions(currentTime); // Get a chance to exit the infinite loop if there are no // more sessions on this Processor //如果在这个过程中,激活会话最后为0,则清除处理器引用 if (nSessions == 0) { processorRef.set(null); if (newSessions.isEmpty() && isSelectorEmpty()) { // newSessions.add() precedes startupProcessor assert processorRef.get() != this; break; } assert processorRef.get() != this; if (!processorRef.compareAndSet(null, this)) { // startupProcessor won race, so must exit processor assert processorRef.get() != this; break; } assert processorRef.get() == this; } // Disconnect all sessions immediately if disposal has been // requested so that we exit this loop eventually. //判断Io处理器是否正在关闭,如果正在关闭断开所有会话 if (isDisposing()) { boolean hasKeys = false; //获取当前处理器管理的会话,移除会话 for (Iterator<S> i = allSessions(); i.hasNext();) { IoSession session = i.next(); if (session.isActive()) { scheduleRemove((S) session); hasKeys = true; } } if (hasKeys) { wakeup(); } } } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop // But first, dump a stack trace ExceptionMonitor.getInstance().exceptionCaught(cse); break; } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } try { synchronized (disposalLock) { if (disposing) { //如果正在关闭则,完成实际关闭工作 doDispose(); } } } catch (Exception e) { //捕捉异常 ExceptionMonitor.getInstance().exceptionCaught(e); } finally { //已关闭 disposalFuture.setValue(true); } }
Io处理器,处理线程Processor的实际工作有一下几点要看
a.
if (isBrokenConnection()) { //通道关闭的话,仅仅输出日志 LOG.warn("Broken connection"); }
b.
else { // Ok, we are hit by the nasty epoll // spinning. // Basically, there is a race condition // which causes a closing file descriptor not to be // considered as available as a selected channel, // but // it stopped the select. The next time we will // call select(), it will exit immediately for the // same // reason, and do so forever, consuming 100% // CPU. // We have to destroy the selector, and // register all the socket on a new one. if (nbTries == 0) { //如果尝试次数用完我们,注册新的选择器 LOG.warn("Create a new selector. Selected is 0, delta = " + delta); registerNewSelector(); nbTries = 10;//恢复尝试次数 } else { //否则尝试次数自减 nbTries--; } }
c.
// Manage newly created session first //处理新会话 nSessions += handleNewSessions();
d.
//更新会话状态 updateTrafficMask();
e.
// Now, if we have had some incoming or outgoing events, // deal with them if (selected > 0) { // LOG.debug("Processing ..."); // This log hurts one of // the MDCFilter test... //如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件 //读写事件,委托为process方法处理 process(); }
f.
// Write the pending requests //处理有些请求的会话 long currentTime = System.currentTimeMillis(); flush(currentTime);
g.
// And manage removed sessions //移除已关闭的会话 nSessions -= removeSessions();
h.
// Last, not least, send Idle events to the idle sessions //通知会话空闲事件 notifyIdleSessions(currentTime);
i.
// Disconnect all sessions immediately if disposal has been // requested so that we exit this loop eventually. //判断Io处理器是否正在关闭,如果正在关闭断开所有会话 if (isDisposing()) { boolean hasKeys = false; //获取当前处理器管理的会话,移除会话 for (Iterator<S> i = allSessions(); i.hasNext();) { IoSession session = i.next(); if (session.isActive()) { scheduleRemove((S) session); hasKeys = true; } } if (hasKeys) { wakeup(); } }
j.
t
ry { synchronized (disposalLock) { if (disposing) { //如果正在关闭,完成实际关闭工作 doDispose(); } } } catch (Exception e) { //捕捉异常 ExceptionMonitor.getInstance().exceptionCaught(e); } finally { //已关闭 disposalFuture.setValue(true); }
下面我们分别来看这几点
a.
if (isBrokenConnection()) { //通道关闭的话,仅仅输出日志 LOG.warn("Broken connection"); }
/** * Check that the select() has not exited immediately just because of a * broken connection. In this case, this is a standard case, and we just * have to loop. * 检查选择是否由于Io处理器连接断开,选择操作还没有退出 * @return <tt>true</tt> if a connection has been brutally closed. * @throws IOException * If we got an exception */ protected abstract boolean isBrokenConnection() throws IOException;
b.
else { // Ok, we are hit by the nasty epoll // spinning. // Basically, there is a race condition // which causes a closing file descriptor not to be // considered as available as a selected channel, // but // it stopped the select. The next time we will // call select(), it will exit immediately for the // same // reason, and do so forever, consuming 100% // CPU. // We have to destroy the selector, and // register all the socket on a new one. if (nbTries == 0) { //如果尝试次数用完我们,注册新的选择器 LOG.warn("Create a new selector. Selected is 0, delta = " + delta); registerNewSelector(); nbTries = 10;//恢复尝试次数 } else { //否则尝试次数自减 nbTries--; } }
/** * In the case we are using the java select() method, this method is used to * trash the buggy selector and create a new one, registring all the sockets * on it. * 丢弃旧的选择器,将所有socket注册到新的选择器上 * @throws IOException * If we got an exception */ protected abstract void registerNewSelector() throws IOException;
c.
// Manage newly created session first //处理新会话 nSessions += handleNewSessions();
/** * Loops over the new sessions blocking queue and returns the number of * sessions which are effectively created * 遍历创建会话队列,返回新建会话的数量 * @return The number of new sessions */ private int handleNewSessions() { int addedSessions = 0; for (S session = newSessions.poll(); session != null; session = newSessions.poll()) { if (addNow(session)) { // A new session has been created addedSessions++; } } return addedSessions; }
/** * Process a new session : - initialize it - create its chain - fire the * CREATED listeners if any * 处理新会话,初始化会话,创建会话过滤链,触发监听器会话创建事件 * @param session * The session to create * @return <tt>true</tt> if the session has been registered */ private boolean addNow(S session) { boolean registered = false; try { //初始化会话 init(session); registered = true; // Build the filter chain of this session. //获取会话service过滤链构建器 IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder(); //构建会话过滤链 chainBuilder.buildFilterChain(session.getFilterChain()); // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here // in AbstractIoFilterChain.fireSessionOpened(). // Propagate the SESSION_CREATED event up to the chain IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners(); //触发会话事件 listeners.fireSessionCreated(session); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { destroy(session); } catch (Exception e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } finally { registered = false; } } return registered; }
添加会话有一下几点要看
c.1
//初始化会话 init(session);
protected abstract void init(AbstractIoSession abstractiosession) throws Exception;
c.2
//触发会话事件 listeners.fireSessionCreated(session);
//IoServiceListenerSupport
public void fireSessionCreated(IoSession session) { boolean firstSession = false; if(session.getService() instanceof IoConnector) synchronized(managedSessions) { firstSession = managedSessions.isEmpty(); } if(managedSessions.putIfAbsent(Long.valueOf(session.getId()), session) != null) return; if(firstSession) fireServiceActivated(); //触发会话过滤链会话创建和会话打开事件 IoFilterChain filterChain = session.getFilterChain(); filterChain.fireSessionCreated(); filterChain.fireSessionOpened(); ... }
c.3
destroy(session);
/** * Destroy the underlying client socket handle * 关闭底层客户端socket * @param session * the {@link IoSession} * @throws Exception * any exception thrown by the underlying system calls */ protected abstract void destroy(S session) throws Exception;
d.
//更新会话状态 updateTrafficMask();
/** * Update the trafficControl for all the session. */ private void updateTrafficMask() { int queueSize = trafficControllingSessions.size(); while (queueSize > 0) { S session = trafficControllingSessions.poll(); if (session == null) { // We are done with this queue. return; } //获取会话状态 SessionState state = getState(session); switch (state) { case OPENED: //更新会话状态 updateTrafficControl(session); break; case CLOSING: break; case OPENING: // Retry later if session is not yet fully initialized. // (In case that Session.suspend??() or session.resume??() is // called before addSession() is processed) // We just put back the session at the end of the queue. //如果正在打开,则添加到次序控制会话队列 trafficControllingSessions.add(session); break; default: throw new IllegalStateException(String.valueOf(state)); } // As we have handled one session, decrement the number of // remaining sessions. The OPENING session will be processed // with the next select(), as the queue size has been decreased, // even // if the session has been pushed at the end of the queue queueSize--; } }
/** * {@inheritDoc} */ @Override public void updateTrafficControl(S session) { // try { //通知读操作事件 setInterestedInRead(session, !session.isReadSuspended()); } catch (Exception e) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } try { //通知写操作事件 setInterestedInWrite(session, !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended()); } catch (Exception e) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } }
/** * Set the session to be informed when a write event should be processed * 当有一个写事件要处理时,是否通知会话 * @param session * the session for which we want to be interested in write events * @param isInterested * <tt>true</tt> for registering, <tt>false</tt> for removing * @throws Exception * If there was a problem while registering the session */ protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
/** * Set the session to be informed when a read event should be processed * 当有一个读事件要处理时,是否通知会话 * @param session * the session for which we want to be interested in read events * @param isInterested * <tt>true</tt> for registering, <tt>false</tt> for removing * @throws Exception * If there was a problem while registering the session */ protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
e.
// Now, if we have had some incoming or outgoing events, // deal with them if (selected > 0) { // LOG.debug("Processing ..."); // This log hurts one of // the MDCFilter test... //如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件 //读写事件,委托为process方法处理 process(); }
private void process() throws Exception { for (Iterator<S> i = selectedSessions(); i.hasNext();) { S session = i.next(); //处理会话 process(session); i.remove(); } }
/** * Get an {@link Iterator} for the list of {@link IoSession} found selected * by the last call of {@link #select(long)} * 获取上次调用超时选择后,准备就绪会话集 * @return {@link Iterator} of {@link IoSession} read for I/Os operation */ protected abstract Iterator<S> selectedSessions();
/** * Deal with session ready for the read or write operations, or both. */ private void process(S session) { // Process Reads if (isReadable(session) && !session.isReadSuspended()) { //如果会话可读,则读会话接收到的数据 read(session); } // Process writes if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) { // add the session to the queue, if it's not already there //如果会话有数据要发送,则将会话添加到刷新会话队列 flushingSessions.add(session); } } } }
处理会话有两点要关注,
e.1
// Process Reads if (isReadable(session) && !session.isReadSuspended()) { //如果会话可读,则读会话接收到的数据 read(session); }
private void read(S session) { //获取会话配置,会话配置读缓存size IoSessionConfig config = session.getConfig(); int bufferSize = config.getReadBufferSize(); IoBuffer buf = IoBuffer.allocate(bufferSize); final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation(); try { int readBytes = 0; int ret; try { if (hasFragmentation) { //从会话读取字节序列到buffer while ((ret = read(session, buf)) > 0) { readBytes += ret; if (!buf.hasRemaining()) { break; } } } else { ret = read(session, buf); if (ret > 0) { readBytes = ret; } } } finally { buf.flip(); } if (readBytes > 0) { //获取会话过滤链,触发过滤链消息接收事件MessageReceive IoFilterChain filterChain = session.getFilterChain(); filterChain.fireMessageReceived(buf); buf = null; if (hasFragmentation) { if (readBytes << 1 < config.getReadBufferSize()) { session.decreaseReadBufferSize(); } else if (readBytes == config.getReadBufferSize()) { session.increaseReadBufferSize(); } } } else { // release temporary buffer when read nothing buf.free(); } //如果会话socket关闭,则触发过滤链fireInputClosed if (ret < 0) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireInputClosed(); } } catch (Exception e) { if ((e instanceof IOException) && (!(e instanceof PortUnreachableException) || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass()) || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable())) { scheduleRemove(session); } //触发过滤链异常事件ExceptionCaught IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } }
e.2
// Process writes if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) { // add the session to the queue, if it's not already there //如果会话有数据要发送,则将会话添加到刷新会话队列 flushingSessions.add(session); }
f.
// Write the pending requests //处理有写请求的会话 long currentTime = System.currentTimeMillis(); flush(currentTime);
/** * Write all the pending messages */ private void flush(long currentTime) { if (flushingSessions.isEmpty()) { return; } //遍历刷新会话队列 do { S session = flushingSessions.poll(); // the same one with // firstSession if (session == null) { // Just in case ... It should not happen. break; } // Reset the Schedule for flush flag for this session, // as we are flushing it now //设置会话刷新状态为未刷新 session.unscheduledForFlush(); //获取会话状态 SessionState state = getState(session); switch (state) { case OPENED: try { //会话已已打开,则委托给flushNow boolean flushedAll = flushNow(session, currentTime); if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) { //调度刷新会话 scheduleFlush(session); } } catch (Exception e) { scheduleRemove(session);//移除会话调度 session.closeNow();//异常立刻关闭会话 IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } break; case CLOSING: // Skip if the channel is already closed. break; case OPENING: // Retry later if session is not yet fully initialized. // (In case that Session.write() is called before addSession() // is processed) //如果正在会话正在打开,则调度刷新会话 scheduleFlush(session); return; default: throw new IllegalStateException(String.valueOf(state)); } } while (!flushingSessions.isEmpty()); }
方法有以下几点要关注
f.1
//会话已已打开,则委托给flushNow boolean flushedAll = flushNow(session, currentTime);
private boolean flushNow(S session, long currentTime) { //如果会话失去连接,则添加会话到移除会话队列 if (!session.isConnected()) { scheduleRemove(session); return false; } final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation(); //获取会话写请求队列 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); // Set limitation for the number of written bytes for read-write // fairness. I used maxReadBufferSize * 3 / 2, which yields best // performance in my experience while not breaking fairness much. //写buffer最大size,经验值为maxReadBufferSize * 3 / 2 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() + (session.getConfig().getMaxReadBufferSize() >>> 1); int writtenBytes = 0; WriteRequest req = null; try { // Clear OP_WRITE,清除会话写事件OP_WRITE标志 setInterestedInWrite(session, false); do { // Check for pending writes. //获取会话当前写情趣 req = session.getCurrentWriteRequest(); if (req == null) { req = writeRequestQueue.poll(session); if (req == null) { break; } session.setCurrentWriteRequest(req); } int localWrittenBytes; //获取写请求消息 Object message = req.getMessage(); if (message instanceof IoBuffer) { //写会话buffer localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes, currentTime); if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) { // the buffer isn't empty, we re-interest it in writing setInterestedInWrite(session, true); return false; } } else if (message instanceof FileRegion) { 写文件 localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes, currentTime); // Fix for Java bug on Linux // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988 // If there's still data to be written in the FileRegion, // return 0 indicating that we need // to pause until writing may resume. if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) { setInterestedInWrite(session, true); return false; } } else { throw new IllegalStateException("Don't know how to handle message of type '" + message.getClass().getName() + "'. Are you missing a protocol encoder?"); } if (localWrittenBytes == 0) { // Kernel buffer is full. if (!req.equals(AbstractIoSession.MESSAGE_SENT_REQUEST)) { setInterestedInWrite(session, true); return false; } } else { writtenBytes += localWrittenBytes; if (writtenBytes >= maxWrittenBytes) { // Wrote too much scheduleFlush(session); return false; } } if (message instanceof IoBuffer) { ((IoBuffer) message).free(); } } while (writtenBytes < maxWrittenBytes); } catch (Exception e) { //写请求结果异常 if (req != null) { req.getFuture().setException(e); } IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); return false; } return true; }
这一点有一下几点要看
f.1.1
//写会话buffer localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes, currentTime);
private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime) throws Exception { IoBuffer buf = (IoBuffer) req.getMessage(); int localWrittenBytes = 0; if (buf.hasRemaining()) { int length; if (hasFragmentation) { length = Math.min(buf.remaining(), maxLength); } else { length = buf.remaining(); } try { //发送会话数据 localWrittenBytes = write(session, buf, length); } catch (IOException ioe) { // We have had an issue while trying to send data to the // peer : let's close the session. buf.free(); session.closeNow(); removeNow(session); return 0; } } session.increaseWrittenBytes(localWrittenBytes, currentTime); // Now, forward the original message if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) { // Buffer has been sent, clear the current request. Object originalMessage = req.getOriginalRequest().getMessage(); if (originalMessage instanceof IoBuffer) { buf = (IoBuffer) req.getOriginalRequest().getMessage(); int pos = buf.position(); buf.reset(); fireMessageSent(session, req); // And set it back to its position buf.position(pos); } else { fireMessageSent(session, req); } } return localWrittenBytes; }
/** * Write a sequence of bytes to a {@link IoSession}, means to be called when * a session was found ready for writing. * 当会话准备好写操作是,写字节序列到会话 * @param session * the session to write * @param buf * the buffer to write * @param length * the number of bytes to write can be superior to the number of * bytes remaining in the buffer * @return the number of byte written * @throws IOException * any exception thrown by the underlying system calls */ protected abstract int write(S session, IoBuffer buf, int length) throws IOException;
f.1.2
//写文件
localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes, currentTime);
//写文件
private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime) throws Exception { int localWrittenBytes; //获取写请求文件FileRegion FileRegion region = (FileRegion) req.getMessage(); if (region.getRemainingBytes() > 0) { int length; if (hasFragmentation) { length = (int) Math.min(region.getRemainingBytes(), maxLength); } else { length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes()); } //委托给transferFile localWrittenBytes = transferFile(session, region, length); region.update(localWrittenBytes); } else { localWrittenBytes = 0; } session.increaseWrittenBytes(localWrittenBytes, currentTime); if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) { //触发会话消息发送事件 fireMessageSent(session, req); } return localWrittenBytes; }
/** * Write a part of a file to a {@link IoSession}, if the underlying API * isn't supporting system calls like sendfile(), you can throw a * {@link UnsupportedOperationException} so the file will be send using * usual {@link #write(AbstractIoSession, IoBuffer, int)} call. * 写文件的某个Region到会话,如果底层API不支持sendfile方法,你可以抛出一个UnsupportedOperationException, 那么将调用#write(AbstractIoSession, IoBuffer, int)发送文件。 * @param session * the session to write * @param region * the file region to write * @param length * the length of the portion to send * @return the number of written bytes * @throws Exception * any exception thrown by the underlying system calls */ protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
//触发会话消息发送事件 private void fireMessageSent(AbstractIoSession session, WriteRequest req) { session.setCurrentWriteRequest(null); IoFilterChain filterChain = session.getFilterChain(); filterChain.fireMessageSent(req); }
f.2
//如果刷新完成,且会话写请求队列不为空,会话待调度 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) { //调度刷新会话 scheduleFlush(session); }
//调度写请求会话,及添加到刷新队列 private void scheduleFlush(S session) { // add the session to the queue if it's not already // in the queue if (session.setScheduledForFlush(true)) { flushingSessions.add(session); } }
f.3
scheduleRemove(session);//移除会话调度 session.closeNow();//异常立刻关闭会话 IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e);
//添加回到到移除队列 private void scheduleRemove(S session) { if (!removingSessions.contains(session)) { removingSessions.add(session); } }
g.
// And manage removed sessions //移除已关闭的会话 nSessions -= removeSessions();
//移除会话
private int removeSessions() { int removedSessions = 0; //遍历移除会话队列,如果poll的会话不为空,则获取会话状态, for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) { SessionState state = getState(session); // Now deal with the removal accordingly to the session's state switch (state) { case OPENED: // Try to remove this session //尝试移除会话 if (removeNow(session)) { removedSessions++; } break; case CLOSING: // Skip if channel is already closed // In any case, remove the session from the queue //会话关闭,则更新会话移除计数器 removedSessions++; break; case OPENING: // Remove session from the newSessions queue and // remove it //正在打开从新创建会话对垒移除会话 newSessions.remove(session); if (removeNow(session)) { removedSessions++; } break; default: throw new IllegalStateException(String.valueOf(state)); } } return removedSessions; }
//尝试移除会话
private boolean removeNow(S session) { //清除会话写请求队列 clearWriteRequestQueue(session); try { //销毁会话 destroy(session); return true; } catch (Exception e) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } finally { try { clearWriteRequestQueue(session); ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session); } catch (Exception e) { // The session was either destroyed or not at this point. // We do not want any exception thrown from this "cleanup" code // to change // the return value by bubbling up. IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } } return false; }
//清除会话写请求队列
private void clearWriteRequestQueue(S session) { WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); WriteRequest req; List<WriteRequest> failedRequests = new ArrayList<>(); if ((req = writeRequestQueue.poll(session)) != null) { Object message = req.getMessage(); if (message instanceof IoBuffer) { IoBuffer buf = (IoBuffer) message; // The first unwritten empty buffer must be // forwarded to the filter chain. //如果会话写请求buffer还有数据,添加写请求到失败写请求集合 if (buf.hasRemaining()) { buf.reset(); failedRequests.add(req); } else { IoFilterChain filterChain = session.getFilterChain(); //触发会话过滤链消息发送事件fireMessageSent filterChain.fireMessageSent(req); } } else { failedRequests.add(req); } // Discard others.丢弃其余的会话写请求 while ((req = writeRequestQueue.poll(session)) != null) { failedRequests.add(req); } } // Create an exception and notify. if (!failedRequests.isEmpty()) { WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests); //更新会话调度字节计数器 for (WriteRequest r : failedRequests) { session.decreaseScheduledBytesAndMessages(r); r.getFuture().setException(cause); } IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(cause); } }
/** * Destroy the underlying client socket handle * 关闭底层客户端socket * @param session * the {@link IoSession} * @throws Exception * any exception thrown by the underlying system calls */ protected abstract void destroy(S session) throws Exception;
h.
// Last, not least, send Idle events to the idle sessions //通知会话空闲事件 notifyIdleSessions(currentTime);
private void notifyIdleSessions(long currentTime) throws Exception { // process idle sessions if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) { lastIdleCheckTime = currentTime; //通知会话空闲, AbstractIoSession.notifyIdleness(allSessions(), currentTime); } }
//AbstractIoSession
//遍历会话集,通知会话空闲 public static void notifyIdleness(Iterator sessions, long currentTime) { do { if(!sessions.hasNext()) break; IoSession session = (IoSession)sessions.next(); if(!session.getCloseFuture().isClosed()) notifyIdleSession(session, currentTime); } while(true); }
public static void notifyIdleSession(IoSession session, long currentTime) { //通知会话读写空闲 notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE), IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE), IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); notifyWriteTimeout(session, currentTime); } //触发会话空闲状态 private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status, long lastIoTime) { if(idleTime > 0L && lastIoTime != 0L && currentTime - lastIoTime >= idleTime) session.getFilterChain().fireSessionIdle(status); } //通知会话超时 private static void notifyWriteTimeout(IoSession session, long currentTime) { long writeTimeout = session.getConfig().getWriteTimeoutInMillis(); if(writeTimeout > 0L && currentTime - session.getLastWriteTime() >= writeTimeout && !session.getWriteRequestQueue().isEmpty(session)) { WriteRequest request = session.getCurrentWriteRequest(); if(request != null) { //设置会话写请求超时异常 session.setCurrentWriteRequest(null); WriteTimeoutException cause = new WriteTimeoutException(request); request.getFuture().setException(cause); session.getFilterChain().fireExceptionCaught(cause); session.closeNow(); } } }
i.
// Disconnect all sessions immediately if disposal has been // requested so that we exit this loop eventually. //判断Io处理器是否正在关闭,如果正在关闭断开所有会话 if (isDisposing()) { boolean hasKeys = false; //获取当前处理器管理的会话,移除会话 for (Iterator<S> i = allSessions(); i.hasNext();) { IoSession session = i.next(); if (session.isActive()) { scheduleRemove((S) session); hasKeys = true; } } if (hasKeys) { wakeup(); } }
j.
t
ry { synchronized (disposalLock) { if (disposing) { //如果正在关闭,完成实际关闭工作 doDispose(); } } } catch (Exception e) { //捕捉异常 ExceptionMonitor.getInstance().exceptionCaught(e); } finally { //已关闭 disposalFuture.setValue(true); }
/** * Dispose the resources used by this {@link IoProcessor} for polling the * client connections. The implementing class doDispose method will be * called. * 释放IO处理器相关的资源 * @throws Exception * if some low level IO error occurs */ protected abstract void doDispose() throws Exception;
从上面来看处理器的实际工作,尝试10次nbTries选择操作,在每次选择操作过程中,
首先进行超时选择操作,然后检查Io处理器是否断开连接,尝试次数nbTries是否为零如果为0,则注册新的选择器;然后遍历创建会话队列,从队列拉取会话,如果会话为不null,则初始化会话,构建会话过滤链(从IoService继承)触发会话过滤链的会话创建和会话打开事件,并记录新创建的会话数量nSessions;更会会话状态,此过程为从会话次序控制队列
获取会话,检查会话状态,如果状态为OPENED更新会话的读写状态,如果为OPENING放回次序控制会话队列;如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件(读写事件),遍历选择后读写等操作就绪的会话,如果会话可读,则读取会话缓存区数据到buffer,触发过滤链消息接收事件MessageReceive,接收完消息后,如果会话输入流关闭则触发过滤链fireInputClosed事件,如果在这过程有异常发生,则触发过滤链异常事件ExceptionCaught,如果会话可写,则添加会话到刷新会话队列;遍历刷新会话队列,根据会话写请求消息类型为IoBuffer还是FileRegion,发送会话数据,发送会话数据后,如果会话还有些请求,则添加会话到队列,如果在这个过程中有异常,则添加会话到会话移除队列;遍历会话移除队列,
如果会话为关闭,则尝试关闭会话,并清除会话写请求队列,如果会话数据已发送完,
则触发会话过滤链消息发送事件fireMessageSent;更新处理器会话计数器nSessions;
遍历处理器所有会话,触发会话过滤器会话空闲时间fireSessionIdle;如果在这个过程中,处理器会话计数器nSessions为0,则清除处理器引用;如果Io处理器正在关闭,则添加所有会话到移除会话队列,释放Io处理器先关的资源。
再来看启动处理器方法#startupProcessor的第二点
2.
//执行处理器 executor.execute(new NamePreservingRunnable(processor, threadName));
/** * A {@link Runnable} wrapper that preserves the name of the thread after the runnable is * complete (for {@link Runnable}s that change the name of the Thread they use.) * 将Runnable包装成一个新的线程,只是线程名不同,线程运行完,恢复原始线程名。 * @author The Apache MINA Project (dev@mina.apache.org) * @version $Rev: 446581 $, $Date: 2006-09-15 11:36:12Z $, */ public class NamePreservingRunnable implements Runnable { private final Logger logger = LoggerFactory.getLogger(NamePreservingRunnable.class); private final String newName;//新线程名 private final Runnable runnable;//实际线程 public NamePreservingRunnable(Runnable runnable, String newName) { this.runnable = runnable; this.newName = newName; } public void run() { Thread currentThread = Thread.currentThread(); String oldName = currentThread.getName(); if (newName != null) { setName(currentThread, newName); } try { runnable.run(); } finally { setName(currentThread, oldName); } } /** * Wraps {@link Thread#setName(String)} to catch a possible {@link Exception}s such as * {@link SecurityException} in sandbox environments, such as applets 设置线程名 */ private void setName(Thread thread, String name) { try { thread.setName(name); } catch (Exception e) { // Probably SecurityException. if (logger.isWarnEnabled()) { logger.warn("Failed to set the thread name.", e); } } } }
回到添加会话方法:
/** * {@inheritDoc} */ //会话创建时,添加回到到处理器, @Override public final void add(S session) { if (disposed || disposing) {//如果处理器已关闭,则抛出非法状态异常 throw new IllegalStateException("Already disposed."); } // Adds the session to the newSession queue and starts the worker //添加会话到Io处理器的创建会话队列中 newSessions.add(session); //启动一个Io处理器线程 startupProcessor(); }
小节,从上面来年,添加会话首先添加会话到Io处理器的创建会话队列中,启动处理器线程Processor。处理器的实际工作,尝试10次nbTries选择操作,在每次选择操作过程中,
首先进行超时选择操作,然后检查Io处理器是否断开连接,尝试次数nbTries是否为零如果为0,则注册新的选择器;然后遍历创建会话队列,从队列拉取会话,如果会话为不null,则初始化会话,构建会话过滤链(从IoService继承)触发会话过滤链的会话创建和会话打开事件,并记录新创建的会话数量nSessions;更会会话状态,此过程为从会话次序控制队列
获取会话,检查会话状态,如果状态为OPENED更新会话的读写状态,如果为OPENING放回次序控制会话队列;如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件(读写事件),遍历选择后读写等操作就绪的会话,如果会话可读,则读取会话缓存区数据到buffer,触发过滤链消息接收事件MessageReceive,接收完消息后,如果会话输入流
关闭则触发过滤链fireInputClosed事件,如果在这过程有异常发生,则触发过滤链异常事件ExceptionCaught,如果会话可写,则添加会话到刷新会话队列;遍历刷新会话队列,根据会话写请求消息类型为IoBuffer还是FileRegion,发送会话数据,发送会话数据后,如果会话还有些请求,则添加会话到队列,如果在这个过程中有异常,则添加会话到会话移除队列;遍历会话移除队列,如果会话为关闭,则尝试关闭会话,并清除会话写请求队列,如果会话数据已发送完,则触发会话过滤链消息发送事件fireMessageSent;更新处理器会话计数器nSessions;遍历处理器所有会话,触发会话过滤器会话空闲时间fireSessionIdle;
如果在这个过程中,处理器会话计数器nSessions为0,则清除处理器引用;如果Io处理器正在关闭,则添加所有会话到移除会话队列,释放Io处理器先关的资源。
再来看其他方法:
/** * {@inheritDoc} 将写请求添加到会话写请求队列 */ @Override public void write(S session, WriteRequest writeRequest) { WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); writeRequestQueue.offer(session, writeRequest); if (!session.isWriteSuspended()) { //刷新会话 this.flush(session); } } /** * {@inheritDoc} //添加会话到刷新会话队列 */ @Override public final void flush(S session) { // add the session to the queue if it's not already // in the queue, then wake up the select() //设置会话正在调度flush if (session.setScheduledForFlush(true)) { flushingSessions.add(session); wakeup(); } } /** * {@inheritDoc} 移除会话,添加会话到移除会话队列,启动处理器线程 */ @Override public final void remove(S session) { scheduleRemove(session); startupProcessor(); } /** * {@inheritDoc} //释放Io处理器资源 */ @Override public final void dispose() { if (disposed || disposing) { return; } synchronized (disposalLock) { disposing = true; startupProcessor(); } disposalFuture.awaitUninterruptibly(); disposed = true; } /** * {@inheritDoc} */ @Override public final boolean isDisposing() { return disposing; } /** * {@inheritDoc} */ @Override public final boolean isDisposed() { return disposed; }
总结:
抽象Io处理器AbstractPollingIoProcessor,主要几个关键内部变量为选择操作超时时间SELECT_TIMEOUT,用于腾出时间,处理空闲的会话; executor处理器内部执行器,用于运行内部处理器Processor;存储Io处理器等线程最大线程id的threadIds(Map);创建会话队列newSessions用于存储新创建的会话;移除会话队列removingSessions用于存放从处理器移除的会话;刷新会话队列flushingSessions,用于存放要发送写请求的会话;次序控制会话队列trafficControllingSessions用于存放会话待读写的会话;Io处理器线程引用processorRef。
添加会话首先添加会话到Io处理器的创建会话队列中,启动处理器线程Processor。处理器的实际工作,尝试10次nbTries选择操作,在每次选择操作过程中,首先进行超时选择操作,然后检查Io处理器是否断开连接,尝试次数nbTries是否为零如果为0,则注册新的选择器;然后遍历创建会话队列,从队列拉取会话,如果会话为不null,则初始化会话,构建会话过滤链(从IoService继承)触发会话过滤链的会话创建和会话打开事件,并记录新创建的会话数量nSessions;更会会话状态,此过程为从会话次序控制队列获取会话,检查会话状态,如果状态为OPENED更新会话的读写状态,如果为OPENING放回次序控制会话队列;如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件(读写事件),遍历选择后读写等操作就绪的会话,如果会话可读,则读取会话缓存区数据到buffer,触发过滤链消息接收事件MessageReceive,接收完消息后,如果会话输入流关闭则触发过滤链fireInputClosed事件,如果在这过程有异常发生,则触发过滤链异常事件ExceptionCaught,如果会话可写,则添加会话到刷新会话队列;遍历刷新会话队列,根据会话写请求消息类型为IoBuffer还是FileRegion,发送会话数据,发送会话数据后,如果会话还有些请求,则添加会话到队列,如果在这个过程中有异常,则添加会话到会话移除队列;遍历会话移除队列,如果会话为关闭,则尝试关闭会话,并清除会话写请求队列,如果会话数据已发送完,则触发会话过滤链消息发送事件fireMessageSent;更新处理器会话计数器nSessions;遍历处理器所有会话,触发会话过滤器会话空闲时间fireSessionIdle;如果在这个过程中,处理器会话计数器nSessions为0,则清除处理器引用;如果Io处理器正在关闭,则添加所有会话到移除会话队列,释放Io处理器先关的资源。
抽象Io处理器AbstractPollingIoProcessor主要是处理IoProcessor关联会话message*事件,而所有的工作,都是通过处理器线程Processor完成。每当有会话添加到IoProcessor,则启动一个处理器线程Processor,处理会话的读写操作及相关事件。就连IoProcessor资源的释放,也是由处理器线程Processor处理。关闭IoProcessor时,现将处理器关联会话,添加移除会话队列,实际工作由IoProcessor的子类的doDispose方法完成。
附:
//SessionState会话状态
public final class SessionState extends Enum { public static final SessionState OPENING; public static final SessionState OPENED; public static final SessionState CLOSING; private static final SessionState $VALUES[]; private SessionState(String s, int i) { super(s, i); } static { OPENING = new SessionState("OPENING", 0); OPENED = new SessionState("OPENED", 1); CLOSING = new SessionState("CLOSING", 2); $VALUES = (new SessionState[] { OPENING, OPENED, CLOSING }); } public static SessionState[] values() { return (SessionState[])$VALUES.clone(); } public static SessionState valueOf(String name) { return (SessionState)Enum.valueOf(org/apache/mina/core/session/SessionState, name); } }
//原子引用AtomicReference
/** * An object reference that may be updated atomically. See the {@link * java.util.concurrent.atomic} package specification for description * of the properties of atomic variables. * @since 1.5 * @author Doug Lea * @param <V> The type of object referred to by this reference */ public class AtomicReference<V> implements java.io.Serializable { private static final long serialVersionUID = -1848883965231344442L; private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicReference.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile V value; /** * Creates a new AtomicReference with the given initial value. * * @param initialValue the initial value */ public AtomicReference(V initialValue) { value = initialValue; } /** * Creates a new AtomicReference with null initial value. */ public AtomicReference() { } /** * Gets the current value. * * @return the current value */ public final V get() { return value; } /** * Sets to the given value. * * @param newValue the new value */ public final void set(V newValue) { value = newValue; } /** * Eventually sets to the given value. * * @param newValue the new value * @since 1.6 */ public final void lazySet(V newValue) { unsafe.putOrderedObject(this, valueOffset, newValue); } /** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * @param expect the expected value * @param update the new value * @return true if successful. False return indicates that * the actual value was not equal to the expected value. */ public final boolean compareAndSet(V expect, V update) { return unsafe.compareAndSwapObject(this, valueOffset, expect, update); } /** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * * <p>May [url=package-summary.html#Spurious]fail spuriously[/url] * and does not provide ordering guarantees, so is only rarely an * appropriate alternative to {@code compareAndSet}. * * @param expect the expected value * @param update the new value * @return true if successful. */ public final boolean weakCompareAndSet(V expect, V update) { return unsafe.compareAndSwapObject(this, valueOffset, expect, update); } /** * Atomically sets to the given value and returns the old value. * * @param newValue the new value * @return the previous value */ public final V getAndSet(V newValue) { while (true) { V x = get(); if (compareAndSet(x, newValue)) return x; } } /** * Returns the String representation of the current value. * @return the String representation of the current value. */ public String toString() { return String.valueOf(get()); } }
上一篇: Mina 抽象Io会话