Mina 抽象Io会话
程序员文章站
2022-03-11 09:29:58
...
Mina Io会话接口定义:http://donald-draper.iteye.com/blog/2377737
上一篇我们看到Io会话接口的定义,今天来看一下会话接口的简单实现AbstractIoSession。
来看构造:
从构造可以看出,会话初始化主要为初始化关联service,及关联的IoHandler,实际为Service的IoHandler;初始化所有的会话事件计数器为当前时间。
下面来看会话其他方法:
从关闭方法来看默认关闭时,清空写请求队列,并将写请求的结果置为已写,触发过滤过滤链fireFilterClose事件,
即不flush会话写请求队列,closeOnFlush方法为,在关闭会话前,flush会话写请求队列。
再来看其他方法:
从上面可以看出会话读操作,首先获取会话读请求结果队列,从队列poll一个读结果,如果读结果不为空且已关闭,则
重新入队列,否则新建一个默认读请求结果,添加到会话等待读请求结果队列。
再来看写操作:
从上面可以看出,会话写请求,首先保证消息不为null,会话建立连接,并且远端socket地址不为null;如果消息为IoBuffer,确保buffer不为空,如果消息为文件通道/文件类型,则包装消息为DefaultFileRegion/FilenameFileRegion;然后创建写请求DefaultWriteRequest,触发会话过滤链fireFilterWrite事件,如果消息为文件通道,则注册写结果监听器,在消息发送完后,关闭文件通道,返回写结果DefaultWriteFuture。
再来会话附加物和属性设置相关的方法:
这些没有什么好讲的看一下就行
从上面来看会话属性主要是通过IoSessionAttributeMap来完成。
下面是一些获取读写字节数和消息数的方法
设置与获取调度读写字节数及消息数
再看读写空闲计数器更新:
是否处于空闲状态status(读空闲,写空闲,读写空闲)
总结:
抽象会话AbstractIoSession内部有一个关联的IoService和一个IoHandler;一个写请求队列用于存发会话写请求;一个会话属性Map存放会话属性,还有一些读写字节数,消息数,相关吞吐量和上次读写或空闲操作时间计数器。,会话初始化主要为初始化关联service,及关联的IoHandler,实际为Service的IoHandler;初始化所有的会话事件计数器为当前时间。关闭方法默认关闭时,清空写请求队列,并将写请求的结果置为已写,触发过滤过滤链fireFilterClose事件,即不flush会话写请求队列,closeOnFlush方法为,在关闭会话前,flush会话写请求队列。会话读操作,首先获取会话读请求结果队列,从队列poll一个读结果,如果读结果不为空且已关闭,则重新入队列,否则新建一个默认读请求结果,添加到会话等待读请求结果队列。会话写请求,首先保证消息不为null,会话建立连接,并且远端socket地址不为null;如果消息为IoBuffer,确保buffer不为空,如果消息为文件通道/文件类型,则包装消息为DefaultFileRegion/FilenameFileRegion;然后创建写请求DefaultWriteRequest,触发会话过滤链fireFilterWrite事件,如果消息为文件通道,则注册写结果监听器,在消息发送完后,关闭文件通道,返回写结果DefaultWriteFuture。
附:
//IoFutureListener
//EventListener
//DefaultWriteRequest
//IoSessionAttributeMap
上一篇我们看到Io会话接口的定义,今天来看一下会话接口的简单实现AbstractIoSession。
/** * Base implementation of {@link IoSession}. * @author [url=http://mina.apache.org]Apache MINA Project[/url] */ public abstract class AbstractIoSession implements IoSession { /** The associated handler 会话关联的IoHandler*/ private final IoHandler handler; /** The session config 会话配置*/ protected IoSessionConfig config; /** The service which will manage this session 管理会话的Service*/ private final IoService service; private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class, "readyReadFutures"); private static final AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class, "waitingReadFutures"); //Io结果监听器,主要是重置会话读写字节和消息的计数器 private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() { public void operationComplete(CloseFuture future) { AbstractIoSession session = (AbstractIoSession) future.getSession(); session.scheduledWriteBytes.set(0); session.scheduledWriteMessages.set(0); session.readBytesThroughput = 0; session.readMessagesThroughput = 0; session.writtenBytesThroughput = 0; session.writtenMessagesThroughput = 0; } }; /** * An internal write request object that triggers session close. 内部写请求,会话关闭时触发,即发送关闭写请求 */ public static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object()); /** * An internal write request object that triggers message sent events. 内部写请求,消息发送时触发 */ public static final WriteRequest MESSAGE_SENT_REQUEST = new DefaultWriteRequest(DefaultWriteRequest.EMPTY_MESSAGE); private final Object lock = new Object(); private IoSessionAttributeMap attributes;//会话属性Map private WriteRequestQueue writeRequestQueue;//写请求队列 private WriteRequest currentWriteRequest;//当前写请求 /** The Session creation's time 会话创建时间*/ private final long creationTime; /** An id generator guaranteed to generate unique IDs for the session 会话id产生器*/ private static AtomicLong idGenerator = new AtomicLong(0); /** The session ID 会话id*/ private long sessionId; /** * A future that will be set 'closed' when the connection is closed.连接关闭结果 */ private final CloseFuture closeFuture = new DefaultCloseFuture(this); private volatile boolean closing; // traffic control private boolean readSuspended = false;//是否读暂停 private boolean writeSuspended = false;//是否写暂停 // Status variables private final AtomicBoolean scheduledForFlush = new AtomicBoolean();//是否正在刷新写请求 private final AtomicInteger scheduledWriteBytes = new AtomicInteger();//已发送字节数 private final AtomicInteger scheduledWriteMessages = new AtomicInteger();//已发送消息数 private long readBytes;//读取字节数 private long writtenBytes;//写字节数 private long readMessages;//读取消息数 private long writtenMessages;//写消息数 //上次读写操作发生时间 private long lastReadTime; private long lastWriteTime; //上次吞吐量计算时间 private long lastThroughputCalculationTime; //上次读写字节数及消息数 private long lastReadBytes; private long lastWrittenBytes; private long lastReadMessages; private long lastWrittenMessages; //读写字节及消息吞吐量 private double readBytesThroughput; private double writtenBytesThroughput; private double readMessagesThroughput; private double writtenMessagesThroughput; //空闲状态计数器 private AtomicInteger idleCountForBoth = new AtomicInteger(); private AtomicInteger idleCountForRead = new AtomicInteger(); private AtomicInteger idleCountForWrite = new AtomicInteger(); //上次空闲状态时间 private long lastIdleTimeForBoth; private long lastIdleTimeForRead; private long lastIdleTimeForWrite; private boolean deferDecreaseReadBuffer = true; }
来看构造:
/** * Create a Session for a service * @param service the Service for this session */ protected AbstractIoSession(IoService service) { this.service = service; this.handler = service.getHandler(); // Initialize all the Session counters to the current time long currentTime = System.currentTimeMillis(); creationTime = currentTime; lastThroughputCalculationTime = currentTime; lastReadTime = currentTime; lastWriteTime = currentTime; lastIdleTimeForBoth = currentTime; lastIdleTimeForRead = currentTime; lastIdleTimeForWrite = currentTime; // TODO add documentation closeFuture.addListener(SCHEDULED_COUNTER_RESETTER); // Set a new ID for this session sessionId = idGenerator.incrementAndGet(); }
从构造可以看出,会话初始化主要为初始化关联service,及关联的IoHandler,实际为Service的IoHandler;初始化所有的会话事件计数器为当前时间。
下面来看会话其他方法:
/** * {@inheritDoc} * 获取会话id * We use an AtomicLong to guarantee that the session ID are unique. */ public final long getId() { return sessionId; } /** * @return The associated IoProcessor for this session 获取会话关联Io处理器 */ public abstract IoProcessor getProcessor(); /** * {@inheritDoc} 是否连接 */ public final boolean isConnected() { return !closeFuture.isClosed(); } /** * {@inheritDoc} 是否激活 */ public boolean isActive() { // Return true by default return true; } /** * {@inheritDoc} 是否关闭 */ public final boolean isClosing() { return closing || closeFuture.isClosed(); } /** * {@inheritDoc} 是否安全 */ public boolean isSecured() { // Always false... return false; } /** * {@inheritDoc} 获取关闭结果 */ public final CloseFuture getCloseFuture() { return closeFuture; } /** * Tells if the session is scheduled for flushed * 判断会话是否正在被调度刷新,即处理器发送会话写请求 * @return true if the session is scheduled for flush */ public final boolean isScheduledForFlush() { return scheduledForFlush.get(); } /** * Schedule the session for flushed 刷新会话 */ public final void scheduledForFlush() { scheduledForFlush.set(true); } /** * Change the session's status : it's not anymore scheduled for flush 重置会话刷新状态为否 */ public final void unscheduledForFlush() { scheduledForFlush.set(false); } /** * Set the scheduledForFLush flag. As we may have concurrent access to this * flag, we compare and set it in one call. * 设置会话调度舒心状态 * @param schedule * the new value to set if not already set. * @return true if the session flag has been set, and if it wasn't set * already. */ public final boolean setScheduledForFlush(boolean schedule) { if (schedule) { // If the current tag is set to false, switch it to true, // otherwise, we do nothing but return false : the session // is already scheduled for flush return scheduledForFlush.compareAndSet(false, schedule); } scheduledForFlush.set(schedule); return true; } /** * {@inheritDoc}关闭会话 */ public final CloseFuture close() { return closeNow(); } /** * {@inheritDoc} */ public final CloseFuture closeNow() { synchronized (lock) { if (isClosing()) { return closeFuture; } closing = true; try { destroy(); } catch (Exception e) { IoFilterChain filterChain = getFilterChain(); filterChain.fireExceptionCaught(e); } } //触发过滤过滤链fireFilterClose getFilterChain().fireFilterClose(); return closeFuture; } /** * Destroy the session */ protected void destroy() { if (writeRequestQueue != null) { //清空写请求队列,并将写请求的结果置为已写 while (!writeRequestQueue.isEmpty(this)) { WriteRequest writeRequest = writeRequestQueue.poll(this); if (writeRequest != null) { WriteFuture writeFuture = writeRequest.getFuture(); // The WriteRequest may not always have a future : The CLOSE_REQUEST // and MESSAGE_SENT_REQUEST don't. if (writeFuture != null) { writeFuture.setWritten(); } } } } } /** * {@inheritDoc} 关闭会话 */ public final CloseFuture close(boolean rightNow) { if (rightNow) { return closeNow(); } else { //关闭时,刷新会话请求队列 return closeOnFlush(); } } /** * {@inheritDoc} 关闭时,刷新会话请求队列 */ public final CloseFuture closeOnFlush() { if (!isClosing()) { getWriteRequestQueue().offer(this, CLOSE_REQUEST); getProcessor().flush(this); } return closeFuture; }
从关闭方法来看默认关闭时,清空写请求队列,并将写请求的结果置为已写,触发过滤过滤链fireFilterClose事件,
即不flush会话写请求队列,closeOnFlush方法为,在关闭会话前,flush会话写请求队列。
再来看其他方法:
/** * {@inheritDoc}获取会话关联IoHandler */ public IoHandler getHandler() { return handler; } /** * {@inheritDoc}获取会话配置 */ public IoSessionConfig getConfig() { return config; } /** * {@inheritDoc} */ public final ReadFuture read() { if (!getConfig().isUseReadOperation()) { throw new IllegalStateException("useReadOperation is not enabled."); } //获取会话读请求结果队列 Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); ReadFuture future; synchronized (readyReadFutures) { future = readyReadFutures.poll(); if (future != null) { //请求结果关闭,则添加请求到会话读请求结果队列 if (future.isClosed()) { // Let other readers get notified. readyReadFutures.offer(future); } } else { //新建一个默认请求结果,添加到会话等待读请求结果队列 future = new DefaultReadFuture(this); getWaitingReadFutures().offer(future); } } return future; } /** * @return a queue of ReadFuture 获取会话读请求结果队列 */ private Queue<ReadFuture> getReadyReadFutures() { Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY); if (readyReadFutures == null) { readyReadFutures = new ConcurrentLinkedQueue<>(); Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY, readyReadFutures); if (oldReadyReadFutures != null) { readyReadFutures = oldReadyReadFutures; } } return readyReadFutures; } /** * @return the queue of waiting ReadFuture */ private Queue<ReadFuture> getWaitingReadFutures() { Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY); if (waitingReadyReadFutures == null) { waitingReadyReadFutures = new ConcurrentLinkedQueue<>(); Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent( WAITING_READ_FUTURES_KEY, waitingReadyReadFutures); if (oldWaitingReadyReadFutures != null) { waitingReadyReadFutures = oldWaitingReadyReadFutures; } } return waitingReadyReadFutures; }
从上面可以看出会话读操作,首先获取会话读请求结果队列,从队列poll一个读结果,如果读结果不为空且已关闭,则
重新入队列,否则新建一个默认读请求结果,添加到会话等待读请求结果队列。
/** * Associates a message to a ReadFuture * 关联消息到读结果 * @param message the message to associate to the ReadFuture * */ public final void offerReadFuture(Object message) { newReadFuture().setRead(message); } /** * Associates a failure to a ReadFuture * 管理一个异常到读结果 * @param exception the exception to associate to the ReadFuture */ public final void offerFailedReadFuture(Throwable exception) { newReadFuture().setException(exception); } /** * Inform the ReadFuture that the session has been closed 通知会话写结果已关闭 */ public final void offerClosedReadFuture() { Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); synchronized (readyReadFutures) { newReadFuture().setClosed(); } } /** * @return a readFuture get from the waiting ReadFuture 从会话等待读结果队列,获取一个读结果,添加会话读请求队列 */ private ReadFuture newReadFuture() { Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures(); ReadFuture future; synchronized (readyReadFutures) { future = waitingReadFutures.poll(); if (future == null) { future = new DefaultReadFuture(this); readyReadFutures.offer(future); } } return future; }
再来看写操作:
/** * {@inheritDoc} */ public WriteFuture write(Object message) { return write(message, null); } /** * {@inheritDoc} */ public WriteFuture write(Object message, SocketAddress remoteAddress) { if (message == null) {//消息为空 throw new IllegalArgumentException("Trying to write a null message : not allowed"); } // We can't send a message to a connected session if we don't have // the remote address if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) { throw new UnsupportedOperationException(); } // If the session has been closed or is closing, we can't either // send a message to the remote side. We generate a future // containing an exception. 会话已关闭 if (isClosing() || !isConnected()) { WriteFuture future = new DefaultWriteFuture(this); WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress); WriteException writeException = new WriteToClosedSessionException(request); future.setException(writeException); return future; } FileChannel openedFileChannel = null; // TODO: remove this code as soon as we use InputStream // instead of Object for the message. try { if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) { //如果buffer没有数据 // Nothing to write : probably an error in the user code throw new IllegalArgumentException("message is empty. Forgot to call flip()?"); } else if (message instanceof FileChannel) { FileChannel fileChannel = (FileChannel) message; message = new DefaultFileRegion(fileChannel, 0, fileChannel.size()); } else if (message instanceof File) { File file = (File) message; openedFileChannel = new FileInputStream(file).getChannel(); message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size()); } } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); return DefaultWriteFuture.newNotWrittenFuture(this, e); } // Now, we can write the message. First, create a future //构建写请求及写请求结果 WriteFuture writeFuture = new DefaultWriteFuture(this); WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress); // Then, get the chain and inject the WriteRequest into it //触发会话过滤链fireFilterWrite事件 IoFilterChain filterChain = getFilterChain(); filterChain.fireFilterWrite(writeRequest); // TODO : This is not our business ! The caller has created a // FileChannel, // he has to close it ! if (openedFileChannel != null) { // If we opened a FileChannel, it needs to be closed when the write // has completed final FileChannel finalChannel = openedFileChannel; //在消息发送完,关闭文件通道 writeFuture.addListener(new IoFutureListener<WriteFuture>() { public void operationComplete(WriteFuture future) { try { finalChannel.close(); } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } }); } // Return the WriteFuture. return writeFuture; }
从上面可以看出,会话写请求,首先保证消息不为null,会话建立连接,并且远端socket地址不为null;如果消息为IoBuffer,确保buffer不为空,如果消息为文件通道/文件类型,则包装消息为DefaultFileRegion/FilenameFileRegion;然后创建写请求DefaultWriteRequest,触发会话过滤链fireFilterWrite事件,如果消息为文件通道,则注册写结果监听器,在消息发送完后,关闭文件通道,返回写结果DefaultWriteFuture。
再来会话附加物和属性设置相关的方法:
这些没有什么好讲的看一下就行
/** * {@inheritDoc} */ public final Object getAttachment() { return getAttribute(""); } /** * {@inheritDoc} */ public final Object setAttachment(Object attachment) { return setAttribute("", attachment); } /** * {@inheritDoc} */ public final Object getAttribute(Object key) { return getAttribute(key, null); } /** * {@inheritDoc} */ public final Object getAttribute(Object key, Object defaultValue) { return attributes.getAttribute(this, key, defaultValue); } /** * {@inheritDoc} */ public final Object setAttribute(Object key, Object value) { return attributes.setAttribute(this, key, value); } /** * {@inheritDoc} */ public final Object setAttribute(Object key) { return setAttribute(key, Boolean.TRUE); } /** * {@inheritDoc} */ public final Object setAttributeIfAbsent(Object key, Object value) { return attributes.setAttributeIfAbsent(this, key, value); } /** * {@inheritDoc} */ public final Object setAttributeIfAbsent(Object key) { return setAttributeIfAbsent(key, Boolean.TRUE); } /** * {@inheritDoc} */ public final Object removeAttribute(Object key) { return attributes.removeAttribute(this, key); } /** * {@inheritDoc} */ public final boolean removeAttribute(Object key, Object value) { return attributes.removeAttribute(this, key, value); } /** * {@inheritDoc} */ public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) { return attributes.replaceAttribute(this, key, oldValue, newValue); } /** * {@inheritDoc} */ public final boolean containsAttribute(Object key) { return attributes.containsAttribute(this, key); } /** * {@inheritDoc} */ public final Set<Object> getAttributeKeys() { return attributes.getAttributeKeys(this); }
从上面来看会话属性主要是通过IoSessionAttributeMap来完成。
/** * @return The map of attributes associated with the session */ public final IoSessionAttributeMap getAttributeMap() { return attributes; } /** * Set the map of attributes associated with the session * * @param attributes The Map of attributes */ public final void setAttributeMap(IoSessionAttributeMap attributes) { this.attributes = attributes; } /** * Create a new close aware write queue, based on the given write queue. * * @param writeRequestQueue The write request queue */ public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) { this.writeRequestQueue = writeRequestQueue; } /** * {@inheritDoc} */ public final void suspendRead() { readSuspended = true; if (isClosing() || !isConnected()) { return; } //更新会话在IO处理器中的状态 getProcessor().updateTrafficControl(this); } /** * {@inheritDoc} */ public final void suspendWrite() { writeSuspended = true; if (isClosing() || !isConnected()) { return; } //更新会话在IO处理器中的状态 getProcessor().updateTrafficControl(this); } /** * {@inheritDoc} */ @SuppressWarnings("unchecked") public final void resumeRead() { readSuspended = false; if (isClosing() || !isConnected()) { return; } //更新会话在IO处理器中的状态 getProcessor().updateTrafficControl(this); } /** * {@inheritDoc} */ @SuppressWarnings("unchecked") public final void resumeWrite() { writeSuspended = false; if (isClosing() || !isConnected()) { return; } //更新会话在IO处理器中的状态 getProcessor().updateTrafficControl(this); } /** * {@inheritDoc} */ public boolean isReadSuspended() { return readSuspended; } /** * {@inheritDoc} */ public boolean isWriteSuspended() { return writeSuspended; }
下面是一些获取读写字节数和消息数的方法
/** * {@inheritDoc} */ public final long getReadBytes() { return readBytes; } /** * {@inheritDoc} */ public final long getWrittenBytes() { return writtenBytes; } /** * {@inheritDoc} */ public final long getReadMessages() { return readMessages; } /** * {@inheritDoc} */ public final long getWrittenMessages() { return writtenMessages; } /** * {@inheritDoc} */ public final double getReadBytesThroughput() { return readBytesThroughput; } /** * {@inheritDoc} */ public final double getWrittenBytesThroughput() { return writtenBytesThroughput; } /** * {@inheritDoc} */ public final double getReadMessagesThroughput() { return readMessagesThroughput; } /** * {@inheritDoc} */ public final double getWrittenMessagesThroughput() { return writtenMessagesThroughput; } /** * {@inheritDoc} 更新吞吐量 */ public final void updateThroughput(long currentTime, boolean force) { int interval = (int) (currentTime - lastThroughputCalculationTime); //获取吞吐量计算间隔 long minInterval = getConfig().getThroughputCalculationIntervalInMillis(); if (((minInterval == 0) || (interval < minInterval)) && !force) { return; } //计算读写字节及消息吞吐量 readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval; writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval; readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval; writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval; //更新上次读写字节及消息吞吐量 lastReadBytes = readBytes; lastWrittenBytes = writtenBytes; lastReadMessages = readMessages; lastWrittenMessages = writtenMessages; //记录上次更新吞吐量时间 lastThroughputCalculationTime = currentTime; }
设置与获取调度读写字节数及消息数
/** * {@inheritDoc} */ public final long getScheduledWriteBytes() { return scheduledWriteBytes.get(); } /** * {@inheritDoc} */ public final int getScheduledWriteMessages() { return scheduledWriteMessages.get(); } /** * Set the number of scheduled write bytes * * @param byteCount The number of scheduled bytes for write */ protected void setScheduledWriteBytes(int byteCount) { scheduledWriteBytes.set(byteCount); } /** * Set the number of scheduled write messages * * @param messages The number of scheduled messages for write */ protected void setScheduledWriteMessages(int messages) { scheduledWriteMessages.set(messages); } /** * Increase the number of read bytes * 增加读取字节数 * @param increment The number of read bytes * @param currentTime The current time */ public final void increaseReadBytes(long increment, long currentTime) { if (increment <= 0) { return; } readBytes += increment; lastReadTime = currentTime; //更新空心读,读写空闲次数为0 idleCountForBoth.set(0); idleCountForRead.set(0); if (getService() instanceof AbstractIoService) { ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime); } } /** * Increase the number of read messages * 增加读取消息数 * @param currentTime The current time */ public final void increaseReadMessages(long currentTime) { readMessages++; lastReadTime = currentTime; idleCountForBoth.set(0); idleCountForRead.set(0); if (getService() instanceof AbstractIoService) { ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime); } } /** * Increase the number of written bytes * 增加写字节数 * @param increment The number of written bytes * @param currentTime The current time */ public final void increaseWrittenBytes(int increment, long currentTime) { if (increment <= 0) { return; } writtenBytes += increment; lastWriteTime = currentTime; idleCountForBoth.set(0); idleCountForWrite.set(0); if (getService() instanceof AbstractIoService) { ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime); } increaseScheduledWriteBytes(-increment); } /** * Increase the number of written messages * 增加写消息数 * @param request The written message * @param currentTime The current tile */ public final void increaseWrittenMessages(WriteRequest request, long currentTime) { Object message = request.getMessage(); if (message instanceof IoBuffer) { IoBuffer b = (IoBuffer) message; if (b.hasRemaining()) { return; } } writtenMessages++; lastWriteTime = currentTime; if (getService() instanceof AbstractIoService) { ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime); } decreaseScheduledWriteMessages(); } /** * Increase the number of scheduled write bytes for the session * 增加会话调度写字节数 * @param increment The number of newly added bytes to write */ public final void increaseScheduledWriteBytes(int increment) { scheduledWriteBytes.addAndGet(increment); if (getService() instanceof AbstractIoService) { ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment); } } /** * Increase the number of scheduled message to write 增加会话调度写消息数 */ public final void increaseScheduledWriteMessages() { scheduledWriteMessages.incrementAndGet(); if (getService() instanceof AbstractIoService) { ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages(); } } /** * Decrease the number of scheduled message written */ private void decreaseScheduledWriteMessages() { scheduledWriteMessages.decrementAndGet(); if (getService() instanceof AbstractIoService) { ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages(); } } /** * Decrease the counters of written messages and written bytes when a message has been written * 当消息发送时,减少写字节数与消息数计数器 * @param request The written message */ public final void decreaseScheduledBytesAndMessages(WriteRequest request) { Object message = request.getMessage(); if (message instanceof IoBuffer) { IoBuffer b = (IoBuffer) message; if (b.hasRemaining()) { increaseScheduledWriteBytes(-((IoBuffer) message).remaining()); } else { decreaseScheduledWriteMessages(); } } else { decreaseScheduledWriteMessages(); } } /** * {@inheritDoc} 获取会话写请求队列 */ public final WriteRequestQueue getWriteRequestQueue() { if (writeRequestQueue == null) { throw new IllegalStateException(); } return writeRequestQueue; } /** * {@inheritDoc} 获取会话当前写请求 */ public final WriteRequest getCurrentWriteRequest() { return currentWriteRequest; } /** * {@inheritDoc} */ public final Object getCurrentWriteMessage() { WriteRequest req = getCurrentWriteRequest(); if (req == null) { return null; } return req.getMessage(); } /** * {@inheritDoc} */ public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) { this.currentWriteRequest = currentWriteRequest; } /** * Increase the ReadBuffer size (it will double) 增加读缓存区大小为原来的两倍 */ public final void increaseReadBufferSize() { int newReadBufferSize = getConfig().getReadBufferSize() << 1; if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) { getConfig().setReadBufferSize(newReadBufferSize); } else { getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize()); } deferDecreaseReadBuffer = true; } /** * Decrease the ReadBuffer size (it will be divided by a factor 2) 减少读缓存区大小为原来的1/2 */ public final void decreaseReadBufferSize() { if (deferDecreaseReadBuffer) { deferDecreaseReadBuffer = false; return; } if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) { getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1); } deferDecreaseReadBuffer = true; } /** * {@inheritDoc} 会话创建时间 */ public final long getCreationTime() { return creationTime; } /** * {@inheritDoc} 上次Io时间 */ public final long getLastIoTime() { return Math.max(lastReadTime, lastWriteTime); } /** * {@inheritDoc} 上次读操作时间 */ public final long getLastReadTime() { return lastReadTime; } /** * {@inheritDoc} 上次写操作时间 */ public final long getLastWriteTime() { return lastWriteTime; }
再看读写空闲计数器更新:
是否处于空闲状态status(读空闲,写空闲,读写空闲)
/** * {@inheritDoc} */ public final boolean isIdle(IdleStatus status) { if (status == IdleStatus.BOTH_IDLE) { return idleCountForBoth.get() > 0; } if (status == IdleStatus.READER_IDLE) { return idleCountForRead.get() > 0; } if (status == IdleStatus.WRITER_IDLE) { return idleCountForWrite.get() > 0; } throw new IllegalArgumentException("Unknown idle status: " + status); } /** * {@inheritDoc} */ public final boolean isBothIdle() { return isIdle(IdleStatus.BOTH_IDLE); } /** * {@inheritDoc} */ public final boolean isReaderIdle() { return isIdle(IdleStatus.READER_IDLE); } /** * {@inheritDoc} */ public final boolean isWriterIdle() { return isIdle(IdleStatus.WRITER_IDLE); } /** * {@inheritDoc} 获取会话空闲状态次数 */ public final int getIdleCount(IdleStatus status) { if (getConfig().getIdleTime(status) == 0) { if (status == IdleStatus.BOTH_IDLE) { idleCountForBoth.set(0); } if (status == IdleStatus.READER_IDLE) { idleCountForRead.set(0); } if (status == IdleStatus.WRITER_IDLE) { idleCountForWrite.set(0); } } if (status == IdleStatus.BOTH_IDLE) { return idleCountForBoth.get(); } if (status == IdleStatus.READER_IDLE) { return idleCountForRead.get(); } if (status == IdleStatus.WRITER_IDLE) { return idleCountForWrite.get(); } throw new IllegalArgumentException("Unknown idle status: " + status); } /** * {@inheritDoc} 上次空闲状态时间 */ public final long getLastIdleTime(IdleStatus status) { if (status == IdleStatus.BOTH_IDLE) { return lastIdleTimeForBoth; } if (status == IdleStatus.READER_IDLE) { return lastIdleTimeForRead; } if (status == IdleStatus.WRITER_IDLE) { return lastIdleTimeForWrite; } throw new IllegalArgumentException("Unknown idle status: " + status); } /** * Increase the count of the various Idle counter * 更新空闲状态计数器 * @param status The current status * @param currentTime The current time */ public final void increaseIdleCount(IdleStatus status, long currentTime) { if (status == IdleStatus.BOTH_IDLE) { idleCountForBoth.incrementAndGet(); lastIdleTimeForBoth = currentTime; } else if (status == IdleStatus.READER_IDLE) { idleCountForRead.incrementAndGet(); lastIdleTimeForRead = currentTime; } else if (status == IdleStatus.WRITER_IDLE) { idleCountForWrite.incrementAndGet(); lastIdleTimeForWrite = currentTime; } else { throw new IllegalArgumentException("Unknown idle status: " + status); } } /** * {@inheritDoc} */ public final int getBothIdleCount() { return getIdleCount(IdleStatus.BOTH_IDLE); } /** * {@inheritDoc} */ public final long getLastBothIdleTime() { return getLastIdleTime(IdleStatus.BOTH_IDLE); } /** * {@inheritDoc} */ public final long getLastReaderIdleTime() { return getLastIdleTime(IdleStatus.READER_IDLE); } /** * {@inheritDoc} */ public final long getLastWriterIdleTime() { return getLastIdleTime(IdleStatus.WRITER_IDLE); } /** * {@inheritDoc} */ public final int getReaderIdleCount() { return getIdleCount(IdleStatus.READER_IDLE); } /** * {@inheritDoc} */ public final int getWriterIdleCount() { return getIdleCount(IdleStatus.WRITER_IDLE); } /** * {@inheritDoc} 获取会话远端地址,如果Service为IoAcceptor,为IoAcceptor监听地址, 否则为会话远端socket地址 */ public SocketAddress getServiceAddress() { IoService service = getService(); if (service instanceof IoAcceptor) { return ((IoAcceptor) service).getLocalAddress(); } return getRemoteAddress(); } /** * Get the Id as a String 获取会话id */ private String getIdAsString() { String id = Long.toHexString(getId()).toUpperCase(); if (id.length() <= 8) { return "0x00000000".substring(0, 10 - id.length()) + id; } else { return "0x" + id; } } /** * TGet the Service name 获取service名 */ private String getServiceName() { TransportMetadata tm = getTransportMetadata(); if (tm == null) { return "null"; } return tm.getProviderName() + ' ' + tm.getName(); } /** * {@inheritDoc} 获取会话关联IoService */ public IoService getService() { return service; } /** * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable sessions * in the specified collection. * * @param sessions The sessions that are notified * @param currentTime the current time (i.e. {@link System#currentTimeMillis()}) */ public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) { while (sessions.hasNext()) { IoSession session = sessions.next(); if (!session.getCloseFuture().isClosed()) { notifyIdleSession(session, currentTime); } } } /** * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the * specified {@code session}. * 通知会话空闲 * @param session The session that is notified * @param currentTime the current time (i.e. {@link System#currentTimeMillis()}) */ public static void notifyIdleSession(IoSession session, long currentTime) { notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE), IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE), IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); notifyWriteTimeout(session, currentTime); } //触发会话过滤链会话空闲事件fireSessionIdle private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status, long lastIoTime) { if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) { session.getFilterChain().fireSessionIdle(status); } } //通知会话写超时 private static void notifyWriteTimeout(IoSession session, long currentTime) { long writeTimeout = session.getConfig().getWriteTimeoutInMillis(); if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout) && !session.getWriteRequestQueue().isEmpty(session)) { WriteRequest request = session.getCurrentWriteRequest(); if (request != null) { session.setCurrentWriteRequest(null); WriteTimeoutException cause = new WriteTimeoutException(request); request.getFuture().setException(cause); session.getFilterChain().fireExceptionCaught(cause); // WriteException is an IOException, so we close the session. //会话写超时,关闭会话 session.closeNow(); } } } }
总结:
抽象会话AbstractIoSession内部有一个关联的IoService和一个IoHandler;一个写请求队列用于存发会话写请求;一个会话属性Map存放会话属性,还有一些读写字节数,消息数,相关吞吐量和上次读写或空闲操作时间计数器。,会话初始化主要为初始化关联service,及关联的IoHandler,实际为Service的IoHandler;初始化所有的会话事件计数器为当前时间。关闭方法默认关闭时,清空写请求队列,并将写请求的结果置为已写,触发过滤过滤链fireFilterClose事件,即不flush会话写请求队列,closeOnFlush方法为,在关闭会话前,flush会话写请求队列。会话读操作,首先获取会话读请求结果队列,从队列poll一个读结果,如果读结果不为空且已关闭,则重新入队列,否则新建一个默认读请求结果,添加到会话等待读请求结果队列。会话写请求,首先保证消息不为null,会话建立连接,并且远端socket地址不为null;如果消息为IoBuffer,确保buffer不为空,如果消息为文件通道/文件类型,则包装消息为DefaultFileRegion/FilenameFileRegion;然后创建写请求DefaultWriteRequest,触发会话过滤链fireFilterWrite事件,如果消息为文件通道,则注册写结果监听器,在消息发送完后,关闭文件通道,返回写结果DefaultWriteFuture。
附:
//IoFutureListener
/** * Something interested in being notified when the result * of an {@link IoFuture} becomes available. * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public interface IoFutureListener extends EventListener { /** * An {@link IoFutureListener} that closes the {@link IoSession} which is * associated with the specified {@link IoFuture}. */ static IoFutureListener CLOSE = new IoFutureListener() { public void operationComplete(IoFuture future) { future.getSession().close(); } }; /** * Invoked when the operation associated with the {@link IoFuture} * has been completed. * @param future The source {@link IoFuture} which called this * callback. */ void operationComplete(IoFuture future); }
//EventListener
/** * A tagging interface that all event listener interfaces must extend. * @since JDK1.1 */ public interface EventListener { }
//DefaultWriteRequest
public class DefaultWriteRequest implements WriteRequest { private final Object message; private final WriteFuture future; private final SocketAddress destination; ... }
//IoSessionAttributeMap
public interface IoSessionAttributeMap { public abstract Object getAttribute(IoSession iosession, Object obj, Object obj1); public abstract Object setAttribute(IoSession iosession, Object obj, Object obj1); public abstract Object setAttributeIfAbsent(IoSession iosession, Object obj, Object obj1); public abstract Object removeAttribute(IoSession iosession, Object obj); public abstract boolean removeAttribute(IoSession iosession, Object obj, Object obj1); public abstract boolean replaceAttribute(IoSession iosession, Object obj, Object obj1, Object obj2); public abstract boolean containsAttribute(IoSession iosession, Object obj); public abstract Set getAttributeKeys(IoSession iosession); public abstract void dispose(IoSession iosession) throws Exception; }