欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Mina 报文监听器NioDatagramAcceptor二(发送会话消息等)

程序员文章站 2022-03-11 09:30:22
...
Mina 报文监听器NioDatagramAcceptor一(初始化,Io处理器):http://donald-draper.iteye.com/blog/2379152
引言:
    前面一篇文章我们看了报文监听器NioDatagramAcceptor的内部变量,构造和IO处理器相关的功能,先来回顾一下:
    报文监听器NioDatagramAcceptor,内部有一个注册队列registerQueue,用于存放地址绑定的请求,一个取消队列,用于存放地址解绑请求,一个Map-boundHandles,用于存放socket地址与报文通道映射映射关系,会话管理器sessionRecycler,监控连接Service的会话,如果会话过期,关闭过期的会话,一个通道选择器selector处理报文通道的读写操作事件,一个监听器线程acceptor,用于处理地址绑定和解绑,报文通道读写事件,发送会话消息及销毁监听器工作。报文监听器构造主要是初始化会话配置,IO事件执行器和打开选择器。报文监听器写操作,首先获取会话写请求队列,计算会话最大发送字节数,获取会话写请求buffer;如果写请求为空,则从请求队列poll一个写请求,然后获取写请求buffer及写请求目的socket地址,委托会话关联的报文通道发送数据;如果buffer数据太多或没有写成功,添加写请求到会话请求队列,关注写事件,否则取消关注写事件,置空会话当前写请求,触发会话发送事件。绑定地址,首先添加地址绑定请求到注册队列registerQueue,启动监听器线程acceptor,唤醒选择操作,然后等待地址绑定完成,最后返回报文通道绑定的socket地址集。
现在我们来看NioDatagramAcceptor的IoAcceptor和Io服务相关功能的实现:先贴出报文监听器NioDatagramAcceptor的内部变量声明,以便理解后面的内容,
/**
 * {@link IoAcceptor} for datagram transport (UDP/IP).
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 * @org.apache.xbean.XBean
 */
public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> {

    /**
     * A session recycler that is used to retrieve an existing session, unless it's too old.
     默认过期会话回收器
     **/
    private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
    /**
     * A timeout used for the select, as we need to get out to deal with idle
     * sessions 选择超时时间
     */
    private static final long SELECT_TIMEOUT = 1000L;
    /** A lock used to protect the selector to be waked up before it's created */
    private final Semaphore lock = new Semaphore(1);
    /** A queue used to store the list of pending Binds 地址绑定请求*/
    private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>();
    //地址解绑请求队列
    private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>();
    //刷新会话队列,IO处理器刷新操作会用到,暂存刷新操作的会话
    private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<>();
    // socket地址与报文通道映射Map,绑定操作使socket地址与报文通道关联起来
    private final Map<SocketAddress, DatagramChannel> boundHandles = Collections
            .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
    //会话管理器,监控连接Service的会话,如果会话过期,关闭过期的会话
    private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
    private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
    private volatile boolean selectable;
    /** The thread responsible of accepting incoming requests */
    private Acceptor acceptor;//监听器线程
    private long lastIdleCheckTime;//上次空闲检查时间
    /** The Selector used by this acceptor 选择器*/
    private volatile Selector selector;
}

回到上一篇文章启动监听器线程片段startupAcceptor
/**
 * Starts the inner Acceptor thread.
 */
private void startupAcceptor() throws InterruptedException {
    if (!selectable) {
        //如果选择器初始化失败,则清空注册队列,取消队列及刷新会话队列
        registerQueue.clear();
        cancelQueue.clear();
        flushingSessions.clear();
    }
    lock.acquire();
    if (acceptor == null) {
        //创建Acceptor线程实例,并执行
        acceptor = new Acceptor();
        executeWorker(acceptor);
    } else {
        lock.release();
    }
}

下面来看一下Acceptor的定义:
/**
  * This private class is used to accept incoming connection from
  * clients. It's an infinite loop, which can be stopped when all
  * the registered handles have been removed (unbound).
  接收客户端的连接。主操作是一个无限循环,当所有绑定的地址的报文通道解绑时,
  循环退出
  */
 private class Acceptor implements Runnable {
     @Override
     public void run() {
         int nHandles = 0;
         lastIdleCheckTime = System.currentTimeMillis();
         // Release the lock
         lock.release();
         while (selectable) {
             try {
	         //超时选择
                 int selected = select(SELECT_TIMEOUT);
		 //处理地址绑定请求
                 nHandles += registerHandles();
                 if (nHandles == 0) {
                     try {
		         //如果没有报文通道处理,则清空注册队列和取消队列,置空监听器线程
                         lock.acquire();
                         if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
                             acceptor = null;
                             break;
                         }
                     } finally {
                         lock.release();
                     }
                 }
                 if (selected > 0) {
		     //处理读写操作时间就绪的会话
                     processReadySessions(selectedHandles());
                 }
                 long currentTime = System.currentTimeMillis();
		 //发送刷新队列中的写请求
                 flushSessions(currentTime);
		 //处理报文通道地址解绑请求
                 nHandles -= unregisterHandles();
		 //通知会话空闲
                 notifyIdleSessions(currentTime);
             } catch (ClosedSelectorException cse) {
                 // If the selector has been closed, we can exit the loop
                 ExceptionMonitor.getInstance().exceptionCaught(cse);
                 break;
             } catch (Exception e) {
                 ExceptionMonitor.getInstance().exceptionCaught(e);
                 try {
                     Thread.sleep(1000);
                 } catch (InterruptedException e1) {
                 }
             }
         }
          //如何Io处理器正在关闭,则销毁报文监听器
         if (selectable && isDisposing()) {
             selectable = false;
             try {
                 destroy();
             } catch (Exception e) {
                 ExceptionMonitor.getInstance().exceptionCaught(e);
             } finally {
                 disposalFuture.setValue(true);
             }
         }
     }
}

监听器线程有一下几点要关注:
1.
//处理地址绑定请求
nHandles += registerHandles();

2.
if (selected > 0) {                          
    //处理读写操作时间就绪的会话             
    processReadySessions(selectedHandles()); 
}  
 
3.
//发送刷新队列中的写请求
flushSessions(currentTime);

4.
//处理报文通道地址解绑请求
nHandles -= unregisterHandles();

5.
//通知会话空闲
notifyIdleSessions(currentTime);

6.
 //如何Io处理器正在关闭,则销毁报文监听器                  
if (selectable && isDisposing()) {                         
    selectable = false;                                    
    try {                                                  
        destroy();                                         
    } catch (Exception e) {                                
        ExceptionMonitor.getInstance().exceptionCaught(e); 
    } finally {                                            
        disposalFuture.setValue(true);                     
    }                                                      
}  
                                                       
我们分别来以上几点:
1.
//处理地址绑定请求
nHandles += registerHandles();


private int registerHandles() {
    for (;;) {
        //从注册队列,poll地址绑定请求
        AcceptorOperationFuture req = registerQueue.poll();
        if (req == null) {
            break;
        }
        Map<SocketAddress, DatagramChannel> newHandles = new HashMap<>();
        List<SocketAddress> localAddresses = req.getLocalAddresses();
        try {
	   //遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道
            for (SocketAddress socketAddress : localAddresses) 
                DatagramChannel handle = open(socketAddress);
		//添加socket地址与报文通道映射到集合newHandles
                newHandles.put(localAddress(handle), handle);
            }
            添加socket地址与报文通道映射到boundHandles
            boundHandles.putAll(newHandles);
            //通知service监听,服务已开启,及触发fireServiceActivated事件
            getListeners().fireServiceActivated();
	    //地址绑定结束
            req.setDone();
            return newHandles.size();
        } catch (Exception e) {
            req.setException(e);
        } finally {
            // Roll back if failed to bind all addresses.
	    //如果异常,则关闭报文通道
            if (req.getException() != null) {
                for (DatagramChannel handle : newHandles.values()) {
                    try {
                        close(handle);
                    } catch (Exception e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    }
                }
                wakeup();
            }
        }
    }
    return 0;
}

来看打开通道方法:
protected DatagramChannel open(SocketAddress localAddress) throws Exception {
    //打开一个报文通道
    final DatagramChannel ch = DatagramChannel.open();
    boolean success = false;
    try {
       //配置通道会话及阻塞模式
        new NioDatagramSessionConfig(ch).setAll(getSessionConfig());
        ch.configureBlocking(false);

        try {
	    //绑定地址
            ch.socket().bind(localAddress);
        } catch (IOException ioe) {
            // Add some info regarding the address we try to bind to the
            // message
            String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
                    + ioe.getMessage();
            Exception e = new IOException(newMessage);
            e.initCause(ioe.getCause());
            // And close the channel
            ch.close();
            throw e;
        }
         //注册报文通道读操作事件OP_READ到选择器selector
        ch.register(selector, SelectionKey.OP_READ);
        success = true;
    } finally {
        if (!success) {
            close(ch);
        }
    }

    return ch;
}

从上面来看,处理地址绑定请求,首先从注册队列poll地址绑定请求,遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道,配置通道会话及阻塞模式,绑定socket地址,注册报文通道读操作事件OP_READ到选择器selector,添加socket地址与报文通道映射到boundHandles,
通知service监听,服务已开启,触发fireServiceActivated事件;

再来看第二点:
2.
if (selected > 0) {                          
    //处理读写操作时间就绪的会话             
    processReadySessions(selectedHandles()); 
}  

 private void processReadySessions(Set<SelectionKey> handles) {
        Iterator<SelectionKey> iterator = handles.iterator();
	//遍历读写操作事件就绪的报文通道
        while (iterator.hasNext()) {
            //获取选择key,及报文通道
            SelectionKey key = iterator.next();
            DatagramChannel handle = (DatagramChannel) key.channel();
            iterator.remove();
            try {
	        //执行读操作
                if (key.isValid() && key.isReadable()) {
                    readHandle(handle);
                }
		//执行写操作
                if (key.isValid() && key.isWritable()) {
                    for (IoSession session : getManagedSessions().values()) {
                        scheduleFlush((NioSession) session);
                    }
                }
            } catch (Exception e) {
                ExceptionMonitor.getInstance().exceptionCaught(e);
            }
        }
}

这一点有两点要关注
2.a
//执行读操作                              
if (key.isValid() && key.isReadable()) {  
    readHandle(handle);                   
}         
                               
private void readHandle(DatagramChannel handle) throws Exception {
        IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize());
	//接收数据
        SocketAddress remoteAddress = receive(handle, readBuf);
        if (remoteAddress != null) {
	    //创建会话
            IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle));
            readBuf.flip();
	    //触发会话过滤链的消息接收事件fireMessageReceived
            session.getFilterChain().fireMessageReceived(readBuf);
        }
}

来看报文读处理的数据接收和会话创建
2.a.1
//接收数据
protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception {
        return handle.receive(buffer.buf());
}

2.a.2
//创建会话
private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    //获取远端socket地址关联的报文通道
    DatagramChannel handle = boundHandles.get(localAddress);
    if (handle == null) {
        throw new IllegalArgumentException("Unknown local address: " + localAddress);
    }
    IoSession session;
    synchronized (sessionRecycler) {
        //从会话管理器,获取远端socket地址会话,以便重用
        session = sessionRecycler.recycle(remoteAddress);
        if (session != null) {
            return session;
        }
        // If a new session needs to be created.
	//创建会话
        NioSession newSession = newSession(this, handle, remoteAddress);
	//将会话添加会话管理器,监控会话
        getSessionRecycler().put(newSession);
        session = newSession;
    }
    //初始化会话
    initSession(session, null, null);
    try {
        //构建会话过滤链
        this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
	//通知Service监听器发生会话创建事件fireSessionCreated
        getListeners().fireSessionCreated(session);
    } catch (Exception e) {
        ExceptionMonitor.getInstance().exceptionCaught(e);
    }
    return session;
}

来看创建会话这一点
//创建会话
NioSession newSession = newSession(this, handle, remoteAddress);

//根据Io处理器,报文通道及远端socket地址创建会话
 protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle,
            SocketAddress remoteAddress) {
        //获取报文通道注册到选择器的选择key
        SelectionKey key = handle.keyFor(selector);
        if ((key == null) || (!key.isValid())) {
            return null;
        }
        //创建报文会话
        NioDatagramSession newSession = new NioDatagramSession(this, handle, processor, remoteAddress);
	//设置会话选择key
        newSession.setSelectionKey(key);
        return newSession;
    }

默认会话管理器sessionRecycler,见附;
2.b
//执行写操作                                                  
if (key.isValid() && key.isWritable()) {    
    //调度Service管理的会话
    for (IoSession session : getManagedSessions().values()) { 
        scheduleFlush((NioSession) session);                  
    }                                                         
}   
                                                         
从上面可以看出,处理报文通道就绪续事件,如果是读事件,接受报文通道数据,如果远端地址不为空,创建会话,首先从boundHandles获取远端socket地址关联的报文通道,从会话管理器sessionRecycler,获取远端socket地址会话,以便重用,如果会话管理器中不存在,则根据Io处理器,报文通道及远端socket地址创建报文会话,设置会话选择key,将会话添加会话管理器,监控会话,初始化会话,构建会话过滤链,通知Service监听器发生会话创建事件fireSessionCreated;如果是写事件,则调度Service管理的会话,添加到刷新队列;

再来看发送刷新队列的会话写请求:
3.
//发送刷新队列中的会话写请求
flushSessions(currentTime);


private void flushSessions(long currentTime) {
    for (;;) {
        //从刷新队列获取会话
        NioSession session = flushingSessions.poll();
        if (session == null) {
            break;
        }
        // Reset the Schedule for flush flag for this session,
        // as we are flushing it now
	//设置会话为未调度
        session.unscheduledForFlush();
        try {
	    //刷新会话
            boolean flushedAll = flush(session, currentTime);
            if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) {
	        //如果刷新成功,但会话写请求队列不为空,且未调度,则重新调度会话
                scheduleFlush(session);
            }
        } catch (Exception e) {
            session.getFilterChain().fireExceptionCaught(e);
        }
    }
}

//发送会话写请求
 private boolean flush(NioSession session, long currentTime) throws Exception {
        //获取会话写请求队列,会话最大读buffersize
        final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
        final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
                + (session.getConfig().getMaxReadBufferSize() >>> 1);
        int writtenBytes = 0;
        try {
            for (;;) {
	        //获取会话当前写请求
                WriteRequest req = session.getCurrentWriteRequest();
                if (req == null) {
                    //从写请求队列poll一个写请求
                    req = writeRequestQueue.poll(session);
                    if (req == null) {
		        //设置会话不在关注写事件
                        setInterestedInWrite(session, false);
                        break;
                    }
                    //设置会话当前写请求
                    session.setCurrentWriteRequest(req);
                }
                //获取写请求消息
                IoBuffer buf = (IoBuffer) req.getMessage();

                if (buf.remaining() == 0) {
                    // Clear and fire event
		    //置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent
                    session.setCurrentWriteRequest(null);
                    buf.reset();
                    session.getFilterChain().fireMessageSent(req);
                    continue;
                }
                //获取写请求远端地址
                SocketAddress destination = req.getDestination();
                //如果写请求远端地址为null,则获取会话远端地址
                if (destination == null) {
                    destination = session.getRemoteAddress();
                }
                //发送会话写请求字节序列
                int localWrittenBytes = send(session, buf, destination);

                if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
                    // Kernel buffer is full or wrote too much
		    //如果数据太多或发送数据失败,设置会话关注写操作事件
                    setInterestedInWrite(session, true);
                    return false;
                } else {
		   //数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent
                    setInterestedInWrite(session, false);

                    // Clear and fire event
                    session.setCurrentWriteRequest(null);
                    writtenBytes += localWrittenBytes;
                    buf.reset();
                    session.getFilterChain().fireMessageSent(req);
                }
            }
        } finally {
	    //更新会话写字节计数器
            session.increaseWrittenBytes(writtenBytes, currentTime);
        }

        return true;
}

//委托会话关联的报文通道发送会话消息字节序列
 protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception {
        return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress);
 }

从上面可以看出处理刷新队列,从刷新队列poll写请求会话,获取会话写请求队列,会话最大读buffer size,获取会话当前写请求,获取写请求消息,写请求远端地址,通过会话关联的报文通道发送会话消息字节序列,数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent,否则设置会话重新关注写操作事件,如果刷新会话写请求成功,但会话写请求队列不为空,且未调度,则重新调度会话
4.
//处理报文通道地址解绑请求
nHandles -= unregisterHandles();

private int unregisterHandles() {
    int nHandles = 0;
    for (;;) {
       //从取消队列,poll地址解绑请求
        AcceptorOperationFuture request = cancelQueue.poll();
        if (request == null) {
            break;
        }
        // close the channels
	//遍历地址解绑请求socket地址集合
        for (SocketAddress socketAddress : request.getLocalAddresses()) {
	    //从socket与报文通道映射集boundHandles移除socket地址
            DatagramChannel handle = boundHandles.remove(socketAddress);
            if (handle == null) {
                continue;
            }
            try {
	        //关闭报文通道
                close(handle);
		//唤醒选择操作
                wakeup(); // wake up again to trigger thread death
            } catch (Exception e) {
                ExceptionMonitor.getInstance().exceptionCaught(e);
            } finally {
                nHandles++;
            }
        }
	//解绑成功
        request.setDone();
    }
    return nHandles;
}

//关闭通道
protected void close(DatagramChannel handle) throws Exception {
    SelectionKey key = handle.keyFor(selector);
    //取消选择key
    if (key != null) {
        key.cancel();
    }
   //关闭连接及通道
    handle.disconnect();
    handle.close();
}


从上可以看出处理解绑地址请求队列,首先从取消队列,poll地址解绑请求,遍历地址解绑请求socket地址集合,从socket与报文通道映射集boundHandles移除socket地址,关闭报文通道;

5.
//通知会话空闲
notifyIdleSessions(currentTime);


 private void notifyIdleSessions(long currentTime) {
     // process idle sessions
     if (currentTime - lastIdleCheckTime >= 1000) {
         lastIdleCheckTime = currentTime;
	 //通知service管理的会话空闲
         AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime);
     }
 }

6.
//如何Io处理器正在关闭,则销毁报文监听器                  
if (selectable && isDisposing()) {                         
    selectable = false;                                    
    try {                                                  
        destroy();                                         
    } catch (Exception e) {                                
        ExceptionMonitor.getInstance().exceptionCaught(e); 
    } finally {                                            
        disposalFuture.setValue(true);                     
    }                                                      
}  

//关闭选择器
 protected void destroy() throws Exception {
        if (selector != null) {
            selector.close();
        }
}

来看剩余的方法操作,很简单,不详解:
 
/**
  * {@inheritDoc}
  创建会话
  */
 @Override
 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
     if (isDisposing()) {
         throw new IllegalStateException("The Acceptor is being disposed.");
     }
     if (remoteAddress == null) {
         throw new IllegalArgumentException("remoteAddress");
     }
     synchronized (bindLock) {
         if (!isActive()) {
             throw new IllegalStateException("Can't create a session from a unbound service.");
         }
         try {
	     //创建报文会话
             return newSessionWithoutLock(remoteAddress, localAddress);
         } catch (RuntimeException | Error e) {
             throw e;
         } catch (Exception e) {
             throw new RuntimeIoException("Failed to create a session.", e);
         }
     }
 }
 /**
  * {@inheritDoc}
  解绑地址
  */
 @Override
 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
     AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
     //添加地址解绑请求到取消队列
     cancelQueue.add(request);
     startupAcceptor();//启动监听器线程
     wakeup();//唤醒选择器
     //等待解绑成功
     request.awaitUninterruptibly();
     if (request.getException() != null) {
         throw request.getException();
     }
 }
 /**
  * {@inheritDoc}
  关闭IO处理器相关的资源
  */
 @Override
 protected void dispose0() throws Exception {
     unbind();//解绑地址
     startupAcceptor();//启动监听器线程
     wakeup();
 }
 //选择操作
 protected int select() throws Exception {
     return selector.select();
 }
 protected int select(long timeout) throws Exception {
     return selector.select(timeout);
 }
 //上一次选择后,存在就绪事件的选择key
 protected Set<SelectionKey> selectedHandles() {
     return selector.selectedKeys();
 }
 @Override
 public InetSocketAddress getDefaultLocalAddress() {
     return (InetSocketAddress) super.getDefaultLocalAddress();
 }
 @Override
 public InetSocketAddress getLocalAddress() {
     return (InetSocketAddress) super.getLocalAddress();
 }
 /**
  * {@inheritDoc}
  */
 @Override
 public DatagramSessionConfig getSessionConfig() {
     return (DatagramSessionConfig) sessionConfig;
 }

 @Override
 public final IoSessionRecycler getSessionRecycler() {
     return sessionRecycler;
 }

 @Override
 public TransportMetadata getTransportMetadata() {
     return NioDatagramSession.METADATA;
 }
 protected boolean isReadable(DatagramChannel handle) {
     SelectionKey key = handle.keyFor(selector);

     if ((key == null) || (!key.isValid())) {
         return false;
     }

     return key.isReadable();
 }
 protected boolean isWritable(DatagramChannel handle) {
     SelectionKey key = handle.keyFor(selector);

     if ((key == null) || (!key.isValid())) {
         return false;
     }
     return key.isWritable();
 }
 @Override
 public void setDefaultLocalAddress(InetSocketAddress localAddress) {
     setDefaultLocalAddress((SocketAddress) localAddress);
 }
 @Override
 public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
     synchronized (bindLock) {
         if (isActive()) {
             throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound.");
         }

         if (sessionRecycler == null) {
             sessionRecycler = DEFAULT_RECYCLER;
         }

         this.sessionRecycler = sessionRecycler;
     }
 }

在下面这篇文章中,我们讲过报文过滤链,可以集合本文,在回到看看下面这篇文章
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
我们贴出上面这篇文章的报文过滤链的定义:
class DatagramFilterChain extends AbstractIoFilterChain {
    DatagramFilterChain(IoSession parent) {
        super(parent);
    }
    //会话发送写请求,及添加会话写请求队列,待报文监听器调度刷新,即通过会话关联的报文通道
    //发送消息字节序列
    protected void doWrite(IoSession session, WriteRequest writeRequest) {
        DatagramSessionImpl s = (DatagramSessionImpl) session;
	//获取Socket会话的的写请求队列,Queue继承于AbstractList,这个我们在后面再讲
        Queue writeRequestQueue = s.getWriteRequestQueue();

        // SocketIoProcessor.doFlush() will reset it after write is finished
        // because the buffer will be passed with messageSent event. 
        //这里之所以要mark buffer的位置,主要是buffer要传给messageSent事件,
	//待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置
        ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
        buffer.mark();
        int remaining = buffer.remaining();
        if (remaining == 0) {
	    //BaseIoSession
	    // private final AtomicInteger scheduledWriteRequests = new AtomicInteger();
            //更新调度请求计数器+1
            s.increaseScheduledWriteRequests();            
        } else {
	     //BaseIoSession
	    //private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
	    //更新调度写字节计数器+buffer.remaining()
            s.increaseScheduledWriteBytes(buffer.remaining());
            s.increaseScheduledWriteBytes(buffer.remaining());
        }

        synchronized (writeRequestQueue) {
	    //将写请求添加到session写请求队列中
            writeRequestQueue.push(writeRequest);
        }
        
        if (session.getTrafficMask().isWritable()) {
	     //DatagramSessionImpl
	     //private final DatagramService managerDelegate;
	    //如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作,
	    //这个在以后在具体详说
            s.getManagerDelegate().flushSession(s);
        }
    }

    protected void doClose(IoSession session) {
        DatagramSessionImpl s = (DatagramSessionImpl) session;
        DatagramService manager = s.getManagerDelegate();
	////委托给session关联的managerDelegate(DatagramService)关闭会话
        if (manager instanceof DatagramConnectorDelegate) {
	    //如果是DatagramConnectorDelegate者直接关闭会话,则在后面具体再看
            ((DatagramConnectorDelegate) manager).closeSession(s);
        } else {
	    //通知DatagramAcceptorDelegate的监听器会话已关闭
            ((DatagramAcceptorDelegate) manager).getListeners()
                    .fireSessionDestroyed(session);
	    //设置会话CloseFuture为已关闭状态
            session.getCloseFuture().setClosed();
        }
    }
}

报文过滤链发送会话写请求,即添加会话写请求队列,待报文监听器NioDatagramAcceptor(监听器线程Acceptor)调度刷新(通过会话关联的报文通道发送消息字节序列)。


总结:                                                              
监听器线程Acceptor,首先执行超时选择操作;处理地址绑定请求,首先从注册队列poll地址绑定请求,遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道,配置通道会话及阻塞模式,绑定socket地址,注册报文通道读操作事件OP_READ到选择器selector,添加socket地址与报文通道映射到boundHandles,通知service监听,服务已开启,触发fireServiceActivated事件;  如果没有报文通道处理,则清空注册队列和取消队列,置空监听器线程; 如果选择操作后,有报文通道的读写事件就绪,则遍历读写操作事件就绪的报文通道,如果是读事件,接受报文通道数据,如果远端地址不为空,创建会话,首先从boundHandles获取远端socket地址关联的报文通道,从会话管理器sessionRecycler,获取远端socket地址会话,以便重用,如果会话管理器中不存在,则根据Io处理器,报文通道及远端socket地址创建报文会话,设置会话选择key,将会话添加会话管理器,监控会话,初始化会话,构建会话过滤链,通知Service监听器发生会话创建事件fireSessionCreated;如果是写事件,则调度Service管理的会话,添加到刷新队列; 处理刷新队列,从刷新队列poll写请求会话,获取会话写请求队列,会话最大读buffer size,获取会话当前写请求,获取写请求消息,写请求远端地址,通过会话关联的报文通道发送会话消息字节序列,数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent,否则设置会话重新关注写操作事件,如果刷新会话写请求成功,但会话写请求队列不为空,且未调度,则重新调度会话;处理解绑地址请求队列,首先从取消队列,poll地址解绑请求,遍历地址解绑请求socket地址集合,从socket与报文通道映射集boundHandles移除socket地址,关闭报文通道;通知service管理的会话空闲;如何Io处理器正在关闭,则销毁报文监听器。



附:
来看一下默认会话管理器ExpiringSessionRecycler:
/**
 * An {@link IoSessionRecycler} with sessions that time out on inactivity.
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 * @org.apache.xbean.XBean
 */
public class ExpiringSessionRecycler implements IoSessionRecycler {
    /** A map used to store the session 存储会话*/
    private ExpiringMap<SocketAddress, IoSession> sessionMap;
    /** A map used to keep a track of the expiration ,监控会话是否过期线程*/ 
    private ExpiringMap<SocketAddress, IoSession>.Expirer mapExpirer;
    /**
     * Create a new ExpiringSessionRecycler instance
     */
    public ExpiringSessionRecycler() {
        this(ExpiringMap.DEFAULT_TIME_TO_LIVE);
    }
    /**
     * Create a new ExpiringSessionRecycler instance
     * 
     * @param timeToLive The delay after which the session is going to be recycled
     */
    public ExpiringSessionRecycler(int timeToLive) {
        this(timeToLive, ExpiringMap.DEFAULT_EXPIRATION_INTERVAL);
    }
    /**
     * Create a new ExpiringSessionRecycler instance
     * 
     * @param timeToLive The delay after which the session is going to be recycled
     * @param expirationInterval The delay after which the expiration occurs
     */
    public ExpiringSessionRecycler(int timeToLive, int expirationInterval) {
        sessionMap = new ExpiringMap<>(timeToLive, expirationInterval);
        mapExpirer = sessionMap.getExpirer();
	//添加会话过期监听器
        sessionMap.addExpirationListener(new DefaultExpirationListener());
    }

    /**
     * {@inheritDoc}
     添加会话
     */
    @Override
    public void put(IoSession session) {
       //如果检查线程没启动,启动检查线程,监控会话是否过期
        mapExpirer.startExpiringIfNotStarted();
        SocketAddress key = session.getRemoteAddress();
        if (!sessionMap.containsKey(key)) {
            sessionMap.put(key, session);
        }
    }
    /**
     * {@inheritDoc}
     获取远端socket地址对应的会话
     */
    @Override
    public IoSession recycle(SocketAddress remoteAddress) {
        return sessionMap.get(remoteAddress);
    }
    /**
     * {@inheritDoc}
     移除会话
     */
    @Override
    public void remove(IoSession session) {
        sessionMap.remove(session.getRemoteAddress());
    }
    /**
     * Stop the thread from monitoring the map
     停止过期检查线程
     */
    public void stopExpiring() {
        mapExpirer.stopExpiring();
    }
    //配置获取对象生存时间
    /**
     * Update the value for the time-to-live
     *
     * @param timeToLive The time-to-live (seconds)
     */
    public void setTimeToLive(int timeToLive) {
        sessionMap.setTimeToLive(timeToLive);
    }
    /**
     * @return The session time-to-live in second
     */
    public int getTimeToLive() {
        return sessionMap.getTimeToLive();
    }
    //配置获取过期检查间隔
    /**
     * Set the interval in which a session will live in the map before it is removed.
     * 
     * @param expirationInterval The session expiration time in seconds
     */
    public void setExpirationInterval(int expirationInterval) {
        sessionMap.setExpirationInterval(expirationInterval);
    }
     /**
     * @return The session expiration time in second
     */
    public int getExpirationInterval() {
        return sessionMap.getExpirationInterval();
    }
    
    //默认过期监听器,即关闭会话
    private class DefaultExpirationListener implements ExpirationListener<IoSession> {
        @Override
        public void expired(IoSession expiredSession) {
            expiredSession.closeNow();
        }
    }
}

//过期Map-ExpiringMap
**
 * A map with expiration.  This class contains a worker thread that will 
 * periodically check this class in order to determine if any objects 
 * should be removed based on the provided time-to-live value.
 * 过期map包含一个线程,将间歇地检查监控集合delegate中的过期对象ExpiringObject的生存时间是否
 大于timeToLive,大于则从监控集合delegate中移除过期元素对象ExpiringObject。
 * @param <K> The key type
 * @param <V> The value type
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public class ExpiringMap<K, V> implements Map<K, V> {
    /** The default value, 60 seconds */
    public static final int DEFAULT_TIME_TO_LIVE = 60;//对象生存时间,默认60s
    /** The default value, 1 second */
    public static final int DEFAULT_EXPIRATION_INTERVAL = 1;//默认检查间隔1s
    private static volatile int expirerCount = 1;
    private final ConcurrentHashMap<K, ExpiringObject> delegate;//检查线程expirer,监控的Map
    private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners;
    private final Expirer expirer;//过期Map元素检查线程
    /**
     * Creates a new instance of ExpiringMap using the default values 
     * DEFAULT_TIME_TO_LIVE and DEFAULT_EXPIRATION_INTERVAL
     *
     */
    public ExpiringMap() {
        this(DEFAULT_TIME_TO_LIVE, DEFAULT_EXPIRATION_INTERVAL);
    }
    /**
     * Creates a new instance of ExpiringMap using the supplied 
     * time-to-live value and the default value for DEFAULT_EXPIRATION_INTERVAL
     *
     * @param timeToLive The time-to-live value (seconds)
     */
    public ExpiringMap(int timeToLive) {
        this(timeToLive, DEFAULT_EXPIRATION_INTERVAL);
    }

    /**
     * Creates a new instance of ExpiringMap using the supplied values and 
     * a {@link ConcurrentHashMap} for the internal data structure.
     *
     * @param timeToLive The time-to-live value (seconds)
     * @param expirationInterval The time between checks to see if a value should be removed (seconds)
     */
    public ExpiringMap(int timeToLive, int expirationInterval) {
        this(new ConcurrentHashMap<K, ExpiringObject>(), new CopyOnWriteArrayList<ExpirationListener<V>>(), timeToLive,
                expirationInterval);
    }

    private ExpiringMap(ConcurrentHashMap<K, ExpiringObject> delegate,
            CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners, int timeToLive, int expirationInterval) {
        this.delegate = delegate;//需要过期检查的对象集合(报文会话)
        this.expirationListeners = expirationListeners;//过期监听器
        this.expirer = new Expirer();//过期检查线程
        expirer.setTimeToLive(timeToLive);//设置对象存活时间
        expirer.setExpirationInterval(expirationInterval);//设置检查线程检查过期元素间隔
   }
   //此处省略一些方法,主要是put,get,contain,remove等操作
   ...
   //过期map元素
    private class ExpiringObject {
        private K key;
        private V value;
        private long lastAccessTime;//上次访问时间
	//可重入读写锁,保护lastAccessTime的读写操作
        private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();
        ExpiringObject(K key, V value, long lastAccessTime) {
            if (value == null) {
                throw new IllegalArgumentException("An expiring object cannot be null.");
            }
            this.key = key;
            this.value = value;
            this.lastAccessTime = lastAccessTime;
        }
        public long getLastAccessTime() {
            lastAccessTimeLock.readLock().lock();

            try {
                return lastAccessTime;
            } finally {
                lastAccessTimeLock.readLock().unlock();
            }
        }
        public void setLastAccessTime(long lastAccessTime) {
            lastAccessTimeLock.writeLock().lock();

            try {
                this.lastAccessTime = lastAccessTime;
            } finally {
                lastAccessTimeLock.writeLock().unlock();
            }
        }
        public K getKey() {
            return key;
        }
        public V getValue() {
            return value;
        }
        @Override
        public boolean equals(Object obj) {
            return value.equals(obj);
        }
        @Override
        public int hashCode() {
            return value.hashCode();
        }
    }

    /**
     * A Thread that monitors an {@link ExpiringMap} and will remove
     * elements that have passed the threshold.
     *
     */
    public class Expirer implements Runnable {
       //状态锁
        private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
        private long timeToLiveMillis;//保活时间
        private long expirationIntervalMillis;//过期检查间隔时间
        private boolean running = false;
        private final Thread expirerThread;
        /**
         * Creates a new instance of Expirer.  
         *
         */
        public Expirer() {
            expirerThread = new Thread(this, "ExpiringMapExpirer-" + expirerCount++);
            expirerThread.setDaemon(true);
        }

        /**
         * {@inheritDoc}
         */
        @Override
        public void run() {
            while (running) {
                processExpires();
                try {
                    Thread.sleep(expirationIntervalMillis);
                } catch (InterruptedException e) {
                    // Do nothing
                }
            }
        }

        private void processExpires() {
            long timeNow = System.currentTimeMillis();
            //遍历代理Map中的过期元素ExpiringObject
            for (ExpiringObject o : delegate.values()) {
                if (timeToLiveMillis <= 0) {
                    continue;
                }
                long timeIdle = timeNow - o.getLastAccessTime();
                if (timeIdle >= timeToLiveMillis) {
		   //如果过期,则从代理Map中移除对象
                    delegate.remove(o.getKey());
                    for (ExpirationListener<V> listener : expirationListeners) {
		        //通知过期监听器,过期对象已移除
                        listener.expired(o.getValue());
                    }
                }
            }
        }

        /**
         * Kick off this thread which will look for old objects and remove them.
         *启动过期检查线程
         */
        public void startExpiring() {
            stateLock.writeLock().lock();
            try {
                if (!running) {
                    running = true;
                    expirerThread.start();
                }
            } finally {
                stateLock.writeLock().unlock();
            }
        }

        /**
         * If this thread has not started, then start it.  
         * Otherwise just return;
	 如果过期检查线程没有启动,则启动
         */
        public void startExpiringIfNotStarted() {
            stateLock.readLock().lock();
            
            try {
                if (running) {
                    return;
                }
            } finally {
                stateLock.readLock().unlock();
            }
            stateLock.writeLock().lock();
            try {
                if (!running) {
                    running = true;
                    expirerThread.start();
                }
            } finally {
                stateLock.writeLock().unlock();
            }
        }

        /**
         * Stop the thread from monitoring the map.
	 中断过期检查线程,监控过期Map
         */
        public void stopExpiring() {
            stateLock.writeLock().lock();
            try {
                if (running) {
                    running = false;
                    expirerThread.interrupt();
                }
            } finally {
                stateLock.writeLock().unlock();
            }
        }

        /**
         * Checks to see if the thread is running
         *
         * @return
         *  If the thread is running, true.  Otherwise false.
         */
        public boolean isRunning() {
            stateLock.readLock().lock();

            try {
                return running;
            } finally {
                stateLock.readLock().unlock();
            }
        }
        //配置获取对象生存时间
        /**
         * @return the Time-to-live value in seconds.
         */
        public int getTimeToLive() {
            stateLock.readLock().lock();

            try {
                return (int) timeToLiveMillis / 1000;
            } finally {
                stateLock.readLock().unlock();
            }
        }
        /**
         * Update the value for the time-to-live
         *
         * @param timeToLive
         *  The time-to-live (seconds)
         */
        public void setTimeToLive(long timeToLive) {
            stateLock.writeLock().lock();

            try {
                this.timeToLiveMillis = timeToLive * 1000;
            } finally {
                stateLock.writeLock().unlock();
            }
        }
       //配置获取过期检查间隔
        /**
         * Get the interval in which an object will live in the map before
         * it is removed.
         *
         * @return
         *  The time in seconds.
         */
        public int getExpirationInterval() {
            stateLock.readLock().lock();
            try {
                return (int) expirationIntervalMillis / 1000;
            } finally {
                stateLock.readLock().unlock();
            }
        }
        /**
         * Set the interval in which an object will live in the map before
         * it is removed.
         *
         * @param expirationInterval
         *  The time in seconds
         */
        public void setExpirationInterval(long expirationInterval) {
            stateLock.writeLock().lock();

            try {
                this.expirationIntervalMillis = expirationInterval * 1000;
            } finally {
                stateLock.writeLock().unlock();
            }
        }
    }
}

//对象过期监听器ExpirationListener
public interface ExpirationListener
{
    public abstract void expired(Object obj);
}
相关标签: mina