Mina 报文监听器NioDatagramAcceptor二(发送会话消息等)
程序员文章站
2022-03-11 09:30:22
...
Mina 报文监听器NioDatagramAcceptor一(初始化,Io处理器):http://donald-draper.iteye.com/blog/2379152
引言:
前面一篇文章我们看了报文监听器NioDatagramAcceptor的内部变量,构造和IO处理器相关的功能,先来回顾一下:
报文监听器NioDatagramAcceptor,内部有一个注册队列registerQueue,用于存放地址绑定的请求,一个取消队列,用于存放地址解绑请求,一个Map-boundHandles,用于存放socket地址与报文通道映射映射关系,会话管理器sessionRecycler,监控连接Service的会话,如果会话过期,关闭过期的会话,一个通道选择器selector处理报文通道的读写操作事件,一个监听器线程acceptor,用于处理地址绑定和解绑,报文通道读写事件,发送会话消息及销毁监听器工作。报文监听器构造主要是初始化会话配置,IO事件执行器和打开选择器。报文监听器写操作,首先获取会话写请求队列,计算会话最大发送字节数,获取会话写请求buffer;如果写请求为空,则从请求队列poll一个写请求,然后获取写请求buffer及写请求目的socket地址,委托会话关联的报文通道发送数据;如果buffer数据太多或没有写成功,添加写请求到会话请求队列,关注写事件,否则取消关注写事件,置空会话当前写请求,触发会话发送事件。绑定地址,首先添加地址绑定请求到注册队列registerQueue,启动监听器线程acceptor,唤醒选择操作,然后等待地址绑定完成,最后返回报文通道绑定的socket地址集。
现在我们来看NioDatagramAcceptor的IoAcceptor和Io服务相关功能的实现:先贴出报文监听器NioDatagramAcceptor的内部变量声明,以便理解后面的内容,
回到上一篇文章启动监听器线程片段startupAcceptor
下面来看一下Acceptor的定义:
监听器线程有一下几点要关注:
1.
2.
3.
4.
5.
6.
我们分别来以上几点:
1.
来看打开通道方法:
从上面来看,处理地址绑定请求,首先从注册队列poll地址绑定请求,遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道,配置通道会话及阻塞模式,绑定socket地址,注册报文通道读操作事件OP_READ到选择器selector,添加socket地址与报文通道映射到boundHandles,
通知service监听,服务已开启,触发fireServiceActivated事件;
再来看第二点:
2.
这一点有两点要关注
2.a
来看报文读处理的数据接收和会话创建
2.a.1
2.a.2
来看创建会话这一点
//根据Io处理器,报文通道及远端socket地址创建会话
默认会话管理器sessionRecycler,见附;
2.b
从上面可以看出,处理报文通道就绪续事件,如果是读事件,接受报文通道数据,如果远端地址不为空,创建会话,首先从boundHandles获取远端socket地址关联的报文通道,从会话管理器sessionRecycler,获取远端socket地址会话,以便重用,如果会话管理器中不存在,则根据Io处理器,报文通道及远端socket地址创建报文会话,设置会话选择key,将会话添加会话管理器,监控会话,初始化会话,构建会话过滤链,通知Service监听器发生会话创建事件fireSessionCreated;如果是写事件,则调度Service管理的会话,添加到刷新队列;
再来看发送刷新队列的会话写请求:
3.
从上面可以看出处理刷新队列,从刷新队列poll写请求会话,获取会话写请求队列,会话最大读buffer size,获取会话当前写请求,获取写请求消息,写请求远端地址,通过会话关联的报文通道发送会话消息字节序列,数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent,否则设置会话重新关注写操作事件,如果刷新会话写请求成功,但会话写请求队列不为空,且未调度,则重新调度会话
4.
从上可以看出处理解绑地址请求队列,首先从取消队列,poll地址解绑请求,遍历地址解绑请求socket地址集合,从socket与报文通道映射集boundHandles移除socket地址,关闭报文通道;
5.
6.
来看剩余的方法操作,很简单,不详解:
在下面这篇文章中,我们讲过报文过滤链,可以集合本文,在回到看看下面这篇文章
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
我们贴出上面这篇文章的报文过滤链的定义:
报文过滤链发送会话写请求,即添加会话写请求队列,待报文监听器NioDatagramAcceptor(监听器线程Acceptor)调度刷新(通过会话关联的报文通道发送消息字节序列)。
总结:
监听器线程Acceptor,首先执行超时选择操作;处理地址绑定请求,首先从注册队列poll地址绑定请求,遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道,配置通道会话及阻塞模式,绑定socket地址,注册报文通道读操作事件OP_READ到选择器selector,添加socket地址与报文通道映射到boundHandles,通知service监听,服务已开启,触发fireServiceActivated事件; 如果没有报文通道处理,则清空注册队列和取消队列,置空监听器线程; 如果选择操作后,有报文通道的读写事件就绪,则遍历读写操作事件就绪的报文通道,如果是读事件,接受报文通道数据,如果远端地址不为空,创建会话,首先从boundHandles获取远端socket地址关联的报文通道,从会话管理器sessionRecycler,获取远端socket地址会话,以便重用,如果会话管理器中不存在,则根据Io处理器,报文通道及远端socket地址创建报文会话,设置会话选择key,将会话添加会话管理器,监控会话,初始化会话,构建会话过滤链,通知Service监听器发生会话创建事件fireSessionCreated;如果是写事件,则调度Service管理的会话,添加到刷新队列; 处理刷新队列,从刷新队列poll写请求会话,获取会话写请求队列,会话最大读buffer size,获取会话当前写请求,获取写请求消息,写请求远端地址,通过会话关联的报文通道发送会话消息字节序列,数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent,否则设置会话重新关注写操作事件,如果刷新会话写请求成功,但会话写请求队列不为空,且未调度,则重新调度会话;处理解绑地址请求队列,首先从取消队列,poll地址解绑请求,遍历地址解绑请求socket地址集合,从socket与报文通道映射集boundHandles移除socket地址,关闭报文通道;通知service管理的会话空闲;如何Io处理器正在关闭,则销毁报文监听器。
附:
来看一下默认会话管理器ExpiringSessionRecycler:
//过期Map-ExpiringMap
//对象过期监听器ExpirationListener
引言:
前面一篇文章我们看了报文监听器NioDatagramAcceptor的内部变量,构造和IO处理器相关的功能,先来回顾一下:
报文监听器NioDatagramAcceptor,内部有一个注册队列registerQueue,用于存放地址绑定的请求,一个取消队列,用于存放地址解绑请求,一个Map-boundHandles,用于存放socket地址与报文通道映射映射关系,会话管理器sessionRecycler,监控连接Service的会话,如果会话过期,关闭过期的会话,一个通道选择器selector处理报文通道的读写操作事件,一个监听器线程acceptor,用于处理地址绑定和解绑,报文通道读写事件,发送会话消息及销毁监听器工作。报文监听器构造主要是初始化会话配置,IO事件执行器和打开选择器。报文监听器写操作,首先获取会话写请求队列,计算会话最大发送字节数,获取会话写请求buffer;如果写请求为空,则从请求队列poll一个写请求,然后获取写请求buffer及写请求目的socket地址,委托会话关联的报文通道发送数据;如果buffer数据太多或没有写成功,添加写请求到会话请求队列,关注写事件,否则取消关注写事件,置空会话当前写请求,触发会话发送事件。绑定地址,首先添加地址绑定请求到注册队列registerQueue,启动监听器线程acceptor,唤醒选择操作,然后等待地址绑定完成,最后返回报文通道绑定的socket地址集。
现在我们来看NioDatagramAcceptor的IoAcceptor和Io服务相关功能的实现:先贴出报文监听器NioDatagramAcceptor的内部变量声明,以便理解后面的内容,
/** * {@link IoAcceptor} for datagram transport (UDP/IP). * * @author [url=http://mina.apache.org]Apache MINA Project[/url] * @org.apache.xbean.XBean */ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> { /** * A session recycler that is used to retrieve an existing session, unless it's too old. 默认过期会话回收器 **/ private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler(); /** * A timeout used for the select, as we need to get out to deal with idle * sessions 选择超时时间 */ private static final long SELECT_TIMEOUT = 1000L; /** A lock used to protect the selector to be waked up before it's created */ private final Semaphore lock = new Semaphore(1); /** A queue used to store the list of pending Binds 地址绑定请求*/ private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>(); //地址解绑请求队列 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>(); //刷新会话队列,IO处理器刷新操作会用到,暂存刷新操作的会话 private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<>(); // socket地址与报文通道映射Map,绑定操作使socket地址与报文通道关联起来 private final Map<SocketAddress, DatagramChannel> boundHandles = Collections .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>()); //会话管理器,监控连接Service的会话,如果会话过期,关闭过期的会话 private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER; private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture(); private volatile boolean selectable; /** The thread responsible of accepting incoming requests */ private Acceptor acceptor;//监听器线程 private long lastIdleCheckTime;//上次空闲检查时间 /** The Selector used by this acceptor 选择器*/ private volatile Selector selector; }
回到上一篇文章启动监听器线程片段startupAcceptor
/** * Starts the inner Acceptor thread. */ private void startupAcceptor() throws InterruptedException { if (!selectable) { //如果选择器初始化失败,则清空注册队列,取消队列及刷新会话队列 registerQueue.clear(); cancelQueue.clear(); flushingSessions.clear(); } lock.acquire(); if (acceptor == null) { //创建Acceptor线程实例,并执行 acceptor = new Acceptor(); executeWorker(acceptor); } else { lock.release(); } }
下面来看一下Acceptor的定义:
/** * This private class is used to accept incoming connection from * clients. It's an infinite loop, which can be stopped when all * the registered handles have been removed (unbound). 接收客户端的连接。主操作是一个无限循环,当所有绑定的地址的报文通道解绑时, 循环退出 */ private class Acceptor implements Runnable { @Override public void run() { int nHandles = 0; lastIdleCheckTime = System.currentTimeMillis(); // Release the lock lock.release(); while (selectable) { try { //超时选择 int selected = select(SELECT_TIMEOUT); //处理地址绑定请求 nHandles += registerHandles(); if (nHandles == 0) { try { //如果没有报文通道处理,则清空注册队列和取消队列,置空监听器线程 lock.acquire(); if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { acceptor = null; break; } } finally { lock.release(); } } if (selected > 0) { //处理读写操作时间就绪的会话 processReadySessions(selectedHandles()); } long currentTime = System.currentTimeMillis(); //发送刷新队列中的写请求 flushSessions(currentTime); //处理报文通道地址解绑请求 nHandles -= unregisterHandles(); //通知会话空闲 notifyIdleSessions(currentTime); } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop ExceptionMonitor.getInstance().exceptionCaught(cse); break; } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { } } } //如何Io处理器正在关闭,则销毁报文监听器 if (selectable && isDisposing()) { selectable = false; try { destroy(); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setValue(true); } } } }
监听器线程有一下几点要关注:
1.
//处理地址绑定请求 nHandles += registerHandles();
2.
if (selected > 0) { //处理读写操作时间就绪的会话 processReadySessions(selectedHandles()); }
3.
//发送刷新队列中的写请求 flushSessions(currentTime);
4.
//处理报文通道地址解绑请求 nHandles -= unregisterHandles();
5.
//通知会话空闲 notifyIdleSessions(currentTime);
6.
//如何Io处理器正在关闭,则销毁报文监听器 if (selectable && isDisposing()) { selectable = false; try { destroy(); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setValue(true); } }
我们分别来以上几点:
1.
//处理地址绑定请求 nHandles += registerHandles();
private int registerHandles() { for (;;) { //从注册队列,poll地址绑定请求 AcceptorOperationFuture req = registerQueue.poll(); if (req == null) { break; } Map<SocketAddress, DatagramChannel> newHandles = new HashMap<>(); List<SocketAddress> localAddresses = req.getLocalAddresses(); try { //遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道 for (SocketAddress socketAddress : localAddresses) DatagramChannel handle = open(socketAddress); //添加socket地址与报文通道映射到集合newHandles newHandles.put(localAddress(handle), handle); } 添加socket地址与报文通道映射到boundHandles boundHandles.putAll(newHandles); //通知service监听,服务已开启,及触发fireServiceActivated事件 getListeners().fireServiceActivated(); //地址绑定结束 req.setDone(); return newHandles.size(); } catch (Exception e) { req.setException(e); } finally { // Roll back if failed to bind all addresses. //如果异常,则关闭报文通道 if (req.getException() != null) { for (DatagramChannel handle : newHandles.values()) { try { close(handle); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } wakeup(); } } } return 0; }
来看打开通道方法:
protected DatagramChannel open(SocketAddress localAddress) throws Exception { //打开一个报文通道 final DatagramChannel ch = DatagramChannel.open(); boolean success = false; try { //配置通道会话及阻塞模式 new NioDatagramSessionConfig(ch).setAll(getSessionConfig()); ch.configureBlocking(false); try { //绑定地址 ch.socket().bind(localAddress); } catch (IOException ioe) { // Add some info regarding the address we try to bind to the // message String newMessage = "Error while binding on " + localAddress + "\n" + "original message : " + ioe.getMessage(); Exception e = new IOException(newMessage); e.initCause(ioe.getCause()); // And close the channel ch.close(); throw e; } //注册报文通道读操作事件OP_READ到选择器selector ch.register(selector, SelectionKey.OP_READ); success = true; } finally { if (!success) { close(ch); } } return ch; }
从上面来看,处理地址绑定请求,首先从注册队列poll地址绑定请求,遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道,配置通道会话及阻塞模式,绑定socket地址,注册报文通道读操作事件OP_READ到选择器selector,添加socket地址与报文通道映射到boundHandles,
通知service监听,服务已开启,触发fireServiceActivated事件;
再来看第二点:
2.
if (selected > 0) { //处理读写操作时间就绪的会话 processReadySessions(selectedHandles()); }
private void processReadySessions(Set<SelectionKey> handles) { Iterator<SelectionKey> iterator = handles.iterator(); //遍历读写操作事件就绪的报文通道 while (iterator.hasNext()) { //获取选择key,及报文通道 SelectionKey key = iterator.next(); DatagramChannel handle = (DatagramChannel) key.channel(); iterator.remove(); try { //执行读操作 if (key.isValid() && key.isReadable()) { readHandle(handle); } //执行写操作 if (key.isValid() && key.isWritable()) { for (IoSession session : getManagedSessions().values()) { scheduleFlush((NioSession) session); } } } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } }
这一点有两点要关注
2.a
//执行读操作 if (key.isValid() && key.isReadable()) { readHandle(handle); }
private void readHandle(DatagramChannel handle) throws Exception { IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize()); //接收数据 SocketAddress remoteAddress = receive(handle, readBuf); if (remoteAddress != null) { //创建会话 IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle)); readBuf.flip(); //触发会话过滤链的消息接收事件fireMessageReceived session.getFilterChain().fireMessageReceived(readBuf); } }
来看报文读处理的数据接收和会话创建
2.a.1
//接收数据 protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception { return handle.receive(buffer.buf()); }
2.a.2
//创建会话 private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { //获取远端socket地址关联的报文通道 DatagramChannel handle = boundHandles.get(localAddress); if (handle == null) { throw new IllegalArgumentException("Unknown local address: " + localAddress); } IoSession session; synchronized (sessionRecycler) { //从会话管理器,获取远端socket地址会话,以便重用 session = sessionRecycler.recycle(remoteAddress); if (session != null) { return session; } // If a new session needs to be created. //创建会话 NioSession newSession = newSession(this, handle, remoteAddress); //将会话添加会话管理器,监控会话 getSessionRecycler().put(newSession); session = newSession; } //初始化会话 initSession(session, null, null); try { //构建会话过滤链 this.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); //通知Service监听器发生会话创建事件fireSessionCreated getListeners().fireSessionCreated(session); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } return session; }
来看创建会话这一点
//创建会话 NioSession newSession = newSession(this, handle, remoteAddress);
//根据Io处理器,报文通道及远端socket地址创建会话
protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle, SocketAddress remoteAddress) { //获取报文通道注册到选择器的选择key SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid())) { return null; } //创建报文会话 NioDatagramSession newSession = new NioDatagramSession(this, handle, processor, remoteAddress); //设置会话选择key newSession.setSelectionKey(key); return newSession; }
默认会话管理器sessionRecycler,见附;
2.b
//执行写操作 if (key.isValid() && key.isWritable()) { //调度Service管理的会话 for (IoSession session : getManagedSessions().values()) { scheduleFlush((NioSession) session); } }
从上面可以看出,处理报文通道就绪续事件,如果是读事件,接受报文通道数据,如果远端地址不为空,创建会话,首先从boundHandles获取远端socket地址关联的报文通道,从会话管理器sessionRecycler,获取远端socket地址会话,以便重用,如果会话管理器中不存在,则根据Io处理器,报文通道及远端socket地址创建报文会话,设置会话选择key,将会话添加会话管理器,监控会话,初始化会话,构建会话过滤链,通知Service监听器发生会话创建事件fireSessionCreated;如果是写事件,则调度Service管理的会话,添加到刷新队列;
再来看发送刷新队列的会话写请求:
3.
//发送刷新队列中的会话写请求 flushSessions(currentTime);
private void flushSessions(long currentTime) { for (;;) { //从刷新队列获取会话 NioSession session = flushingSessions.poll(); if (session == null) { break; } // Reset the Schedule for flush flag for this session, // as we are flushing it now //设置会话为未调度 session.unscheduledForFlush(); try { //刷新会话 boolean flushedAll = flush(session, currentTime); if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) { //如果刷新成功,但会话写请求队列不为空,且未调度,则重新调度会话 scheduleFlush(session); } } catch (Exception e) { session.getFilterChain().fireExceptionCaught(e); } } }
//发送会话写请求 private boolean flush(NioSession session, long currentTime) throws Exception { //获取会话写请求队列,会话最大读buffersize final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() + (session.getConfig().getMaxReadBufferSize() >>> 1); int writtenBytes = 0; try { for (;;) { //获取会话当前写请求 WriteRequest req = session.getCurrentWriteRequest(); if (req == null) { //从写请求队列poll一个写请求 req = writeRequestQueue.poll(session); if (req == null) { //设置会话不在关注写事件 setInterestedInWrite(session, false); break; } //设置会话当前写请求 session.setCurrentWriteRequest(req); } //获取写请求消息 IoBuffer buf = (IoBuffer) req.getMessage(); if (buf.remaining() == 0) { // Clear and fire event //置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent session.setCurrentWriteRequest(null); buf.reset(); session.getFilterChain().fireMessageSent(req); continue; } //获取写请求远端地址 SocketAddress destination = req.getDestination(); //如果写请求远端地址为null,则获取会话远端地址 if (destination == null) { destination = session.getRemoteAddress(); } //发送会话写请求字节序列 int localWrittenBytes = send(session, buf, destination); if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) { // Kernel buffer is full or wrote too much //如果数据太多或发送数据失败,设置会话关注写操作事件 setInterestedInWrite(session, true); return false; } else { //数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent setInterestedInWrite(session, false); // Clear and fire event session.setCurrentWriteRequest(null); writtenBytes += localWrittenBytes; buf.reset(); session.getFilterChain().fireMessageSent(req); } } } finally { //更新会话写字节计数器 session.increaseWrittenBytes(writtenBytes, currentTime); } return true; }
//委托会话关联的报文通道发送会话消息字节序列 protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception { return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress); }
从上面可以看出处理刷新队列,从刷新队列poll写请求会话,获取会话写请求队列,会话最大读buffer size,获取会话当前写请求,获取写请求消息,写请求远端地址,通过会话关联的报文通道发送会话消息字节序列,数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent,否则设置会话重新关注写操作事件,如果刷新会话写请求成功,但会话写请求队列不为空,且未调度,则重新调度会话
4.
//处理报文通道地址解绑请求 nHandles -= unregisterHandles();
private int unregisterHandles() { int nHandles = 0; for (;;) { //从取消队列,poll地址解绑请求 AcceptorOperationFuture request = cancelQueue.poll(); if (request == null) { break; } // close the channels //遍历地址解绑请求socket地址集合 for (SocketAddress socketAddress : request.getLocalAddresses()) { //从socket与报文通道映射集boundHandles移除socket地址 DatagramChannel handle = boundHandles.remove(socketAddress); if (handle == null) { continue; } try { //关闭报文通道 close(handle); //唤醒选择操作 wakeup(); // wake up again to trigger thread death } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { nHandles++; } } //解绑成功 request.setDone(); } return nHandles; }
//关闭通道 protected void close(DatagramChannel handle) throws Exception { SelectionKey key = handle.keyFor(selector); //取消选择key if (key != null) { key.cancel(); } //关闭连接及通道 handle.disconnect(); handle.close(); }
从上可以看出处理解绑地址请求队列,首先从取消队列,poll地址解绑请求,遍历地址解绑请求socket地址集合,从socket与报文通道映射集boundHandles移除socket地址,关闭报文通道;
5.
//通知会话空闲 notifyIdleSessions(currentTime);
private void notifyIdleSessions(long currentTime) { // process idle sessions if (currentTime - lastIdleCheckTime >= 1000) { lastIdleCheckTime = currentTime; //通知service管理的会话空闲 AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime); } }
6.
//如何Io处理器正在关闭,则销毁报文监听器 if (selectable && isDisposing()) { selectable = false; try { destroy(); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setValue(true); } }
//关闭选择器 protected void destroy() throws Exception { if (selector != null) { selector.close(); } }
来看剩余的方法操作,很简单,不详解:
/** * {@inheritDoc} 创建会话 */ @Override public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) { if (isDisposing()) { throw new IllegalStateException("The Acceptor is being disposed."); } if (remoteAddress == null) { throw new IllegalArgumentException("remoteAddress"); } synchronized (bindLock) { if (!isActive()) { throw new IllegalStateException("Can't create a session from a unbound service."); } try { //创建报文会话 return newSessionWithoutLock(remoteAddress, localAddress); } catch (RuntimeException | Error e) { throw e; } catch (Exception e) { throw new RuntimeIoException("Failed to create a session.", e); } } } /** * {@inheritDoc} 解绑地址 */ @Override protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception { AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); //添加地址解绑请求到取消队列 cancelQueue.add(request); startupAcceptor();//启动监听器线程 wakeup();//唤醒选择器 //等待解绑成功 request.awaitUninterruptibly(); if (request.getException() != null) { throw request.getException(); } } /** * {@inheritDoc} 关闭IO处理器相关的资源 */ @Override protected void dispose0() throws Exception { unbind();//解绑地址 startupAcceptor();//启动监听器线程 wakeup(); } //选择操作 protected int select() throws Exception { return selector.select(); } protected int select(long timeout) throws Exception { return selector.select(timeout); } //上一次选择后,存在就绪事件的选择key protected Set<SelectionKey> selectedHandles() { return selector.selectedKeys(); } @Override public InetSocketAddress getDefaultLocalAddress() { return (InetSocketAddress) super.getDefaultLocalAddress(); } @Override public InetSocketAddress getLocalAddress() { return (InetSocketAddress) super.getLocalAddress(); } /** * {@inheritDoc} */ @Override public DatagramSessionConfig getSessionConfig() { return (DatagramSessionConfig) sessionConfig; } @Override public final IoSessionRecycler getSessionRecycler() { return sessionRecycler; } @Override public TransportMetadata getTransportMetadata() { return NioDatagramSession.METADATA; } protected boolean isReadable(DatagramChannel handle) { SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid())) { return false; } return key.isReadable(); } protected boolean isWritable(DatagramChannel handle) { SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid())) { return false; } return key.isWritable(); } @Override public void setDefaultLocalAddress(InetSocketAddress localAddress) { setDefaultLocalAddress((SocketAddress) localAddress); } @Override public final void setSessionRecycler(IoSessionRecycler sessionRecycler) { synchronized (bindLock) { if (isActive()) { throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound."); } if (sessionRecycler == null) { sessionRecycler = DEFAULT_RECYCLER; } this.sessionRecycler = sessionRecycler; } }
在下面这篇文章中,我们讲过报文过滤链,可以集合本文,在回到看看下面这篇文章
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
我们贴出上面这篇文章的报文过滤链的定义:
class DatagramFilterChain extends AbstractIoFilterChain { DatagramFilterChain(IoSession parent) { super(parent); } //会话发送写请求,及添加会话写请求队列,待报文监听器调度刷新,即通过会话关联的报文通道 //发送消息字节序列 protected void doWrite(IoSession session, WriteRequest writeRequest) { DatagramSessionImpl s = (DatagramSessionImpl) session; //获取Socket会话的的写请求队列,Queue继承于AbstractList,这个我们在后面再讲 Queue writeRequestQueue = s.getWriteRequestQueue(); // SocketIoProcessor.doFlush() will reset it after write is finished // because the buffer will be passed with messageSent event. //这里之所以要mark buffer的位置,主要是buffer要传给messageSent事件, //待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置 ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage(); buffer.mark(); int remaining = buffer.remaining(); if (remaining == 0) { //BaseIoSession // private final AtomicInteger scheduledWriteRequests = new AtomicInteger(); //更新调度请求计数器+1 s.increaseScheduledWriteRequests(); } else { //BaseIoSession //private final AtomicInteger scheduledWriteBytes = new AtomicInteger(); //更新调度写字节计数器+buffer.remaining() s.increaseScheduledWriteBytes(buffer.remaining()); s.increaseScheduledWriteBytes(buffer.remaining()); } synchronized (writeRequestQueue) { //将写请求添加到session写请求队列中 writeRequestQueue.push(writeRequest); } if (session.getTrafficMask().isWritable()) { //DatagramSessionImpl //private final DatagramService managerDelegate; //如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作, //这个在以后在具体详说 s.getManagerDelegate().flushSession(s); } } protected void doClose(IoSession session) { DatagramSessionImpl s = (DatagramSessionImpl) session; DatagramService manager = s.getManagerDelegate(); ////委托给session关联的managerDelegate(DatagramService)关闭会话 if (manager instanceof DatagramConnectorDelegate) { //如果是DatagramConnectorDelegate者直接关闭会话,则在后面具体再看 ((DatagramConnectorDelegate) manager).closeSession(s); } else { //通知DatagramAcceptorDelegate的监听器会话已关闭 ((DatagramAcceptorDelegate) manager).getListeners() .fireSessionDestroyed(session); //设置会话CloseFuture为已关闭状态 session.getCloseFuture().setClosed(); } } }
报文过滤链发送会话写请求,即添加会话写请求队列,待报文监听器NioDatagramAcceptor(监听器线程Acceptor)调度刷新(通过会话关联的报文通道发送消息字节序列)。
总结:
监听器线程Acceptor,首先执行超时选择操作;处理地址绑定请求,首先从注册队列poll地址绑定请求,遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道,配置通道会话及阻塞模式,绑定socket地址,注册报文通道读操作事件OP_READ到选择器selector,添加socket地址与报文通道映射到boundHandles,通知service监听,服务已开启,触发fireServiceActivated事件; 如果没有报文通道处理,则清空注册队列和取消队列,置空监听器线程; 如果选择操作后,有报文通道的读写事件就绪,则遍历读写操作事件就绪的报文通道,如果是读事件,接受报文通道数据,如果远端地址不为空,创建会话,首先从boundHandles获取远端socket地址关联的报文通道,从会话管理器sessionRecycler,获取远端socket地址会话,以便重用,如果会话管理器中不存在,则根据Io处理器,报文通道及远端socket地址创建报文会话,设置会话选择key,将会话添加会话管理器,监控会话,初始化会话,构建会话过滤链,通知Service监听器发生会话创建事件fireSessionCreated;如果是写事件,则调度Service管理的会话,添加到刷新队列; 处理刷新队列,从刷新队列poll写请求会话,获取会话写请求队列,会话最大读buffer size,获取会话当前写请求,获取写请求消息,写请求远端地址,通过会话关联的报文通道发送会话消息字节序列,数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent,否则设置会话重新关注写操作事件,如果刷新会话写请求成功,但会话写请求队列不为空,且未调度,则重新调度会话;处理解绑地址请求队列,首先从取消队列,poll地址解绑请求,遍历地址解绑请求socket地址集合,从socket与报文通道映射集boundHandles移除socket地址,关闭报文通道;通知service管理的会话空闲;如何Io处理器正在关闭,则销毁报文监听器。
附:
来看一下默认会话管理器ExpiringSessionRecycler:
/** * An {@link IoSessionRecycler} with sessions that time out on inactivity. * * @author [url=http://mina.apache.org]Apache MINA Project[/url] * @org.apache.xbean.XBean */ public class ExpiringSessionRecycler implements IoSessionRecycler { /** A map used to store the session 存储会话*/ private ExpiringMap<SocketAddress, IoSession> sessionMap; /** A map used to keep a track of the expiration ,监控会话是否过期线程*/ private ExpiringMap<SocketAddress, IoSession>.Expirer mapExpirer; /** * Create a new ExpiringSessionRecycler instance */ public ExpiringSessionRecycler() { this(ExpiringMap.DEFAULT_TIME_TO_LIVE); } /** * Create a new ExpiringSessionRecycler instance * * @param timeToLive The delay after which the session is going to be recycled */ public ExpiringSessionRecycler(int timeToLive) { this(timeToLive, ExpiringMap.DEFAULT_EXPIRATION_INTERVAL); } /** * Create a new ExpiringSessionRecycler instance * * @param timeToLive The delay after which the session is going to be recycled * @param expirationInterval The delay after which the expiration occurs */ public ExpiringSessionRecycler(int timeToLive, int expirationInterval) { sessionMap = new ExpiringMap<>(timeToLive, expirationInterval); mapExpirer = sessionMap.getExpirer(); //添加会话过期监听器 sessionMap.addExpirationListener(new DefaultExpirationListener()); } /** * {@inheritDoc} 添加会话 */ @Override public void put(IoSession session) { //如果检查线程没启动,启动检查线程,监控会话是否过期 mapExpirer.startExpiringIfNotStarted(); SocketAddress key = session.getRemoteAddress(); if (!sessionMap.containsKey(key)) { sessionMap.put(key, session); } } /** * {@inheritDoc} 获取远端socket地址对应的会话 */ @Override public IoSession recycle(SocketAddress remoteAddress) { return sessionMap.get(remoteAddress); } /** * {@inheritDoc} 移除会话 */ @Override public void remove(IoSession session) { sessionMap.remove(session.getRemoteAddress()); } /** * Stop the thread from monitoring the map 停止过期检查线程 */ public void stopExpiring() { mapExpirer.stopExpiring(); } //配置获取对象生存时间 /** * Update the value for the time-to-live * * @param timeToLive The time-to-live (seconds) */ public void setTimeToLive(int timeToLive) { sessionMap.setTimeToLive(timeToLive); } /** * @return The session time-to-live in second */ public int getTimeToLive() { return sessionMap.getTimeToLive(); } //配置获取过期检查间隔 /** * Set the interval in which a session will live in the map before it is removed. * * @param expirationInterval The session expiration time in seconds */ public void setExpirationInterval(int expirationInterval) { sessionMap.setExpirationInterval(expirationInterval); } /** * @return The session expiration time in second */ public int getExpirationInterval() { return sessionMap.getExpirationInterval(); } //默认过期监听器,即关闭会话 private class DefaultExpirationListener implements ExpirationListener<IoSession> { @Override public void expired(IoSession expiredSession) { expiredSession.closeNow(); } } }
//过期Map-ExpiringMap
** * A map with expiration. This class contains a worker thread that will * periodically check this class in order to determine if any objects * should be removed based on the provided time-to-live value. * 过期map包含一个线程,将间歇地检查监控集合delegate中的过期对象ExpiringObject的生存时间是否 大于timeToLive,大于则从监控集合delegate中移除过期元素对象ExpiringObject。 * @param <K> The key type * @param <V> The value type * * @author [url=http://mina.apache.org]Apache MINA Project[/url] */ public class ExpiringMap<K, V> implements Map<K, V> { /** The default value, 60 seconds */ public static final int DEFAULT_TIME_TO_LIVE = 60;//对象生存时间,默认60s /** The default value, 1 second */ public static final int DEFAULT_EXPIRATION_INTERVAL = 1;//默认检查间隔1s private static volatile int expirerCount = 1; private final ConcurrentHashMap<K, ExpiringObject> delegate;//检查线程expirer,监控的Map private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners; private final Expirer expirer;//过期Map元素检查线程 /** * Creates a new instance of ExpiringMap using the default values * DEFAULT_TIME_TO_LIVE and DEFAULT_EXPIRATION_INTERVAL * */ public ExpiringMap() { this(DEFAULT_TIME_TO_LIVE, DEFAULT_EXPIRATION_INTERVAL); } /** * Creates a new instance of ExpiringMap using the supplied * time-to-live value and the default value for DEFAULT_EXPIRATION_INTERVAL * * @param timeToLive The time-to-live value (seconds) */ public ExpiringMap(int timeToLive) { this(timeToLive, DEFAULT_EXPIRATION_INTERVAL); } /** * Creates a new instance of ExpiringMap using the supplied values and * a {@link ConcurrentHashMap} for the internal data structure. * * @param timeToLive The time-to-live value (seconds) * @param expirationInterval The time between checks to see if a value should be removed (seconds) */ public ExpiringMap(int timeToLive, int expirationInterval) { this(new ConcurrentHashMap<K, ExpiringObject>(), new CopyOnWriteArrayList<ExpirationListener<V>>(), timeToLive, expirationInterval); } private ExpiringMap(ConcurrentHashMap<K, ExpiringObject> delegate, CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners, int timeToLive, int expirationInterval) { this.delegate = delegate;//需要过期检查的对象集合(报文会话) this.expirationListeners = expirationListeners;//过期监听器 this.expirer = new Expirer();//过期检查线程 expirer.setTimeToLive(timeToLive);//设置对象存活时间 expirer.setExpirationInterval(expirationInterval);//设置检查线程检查过期元素间隔 } //此处省略一些方法,主要是put,get,contain,remove等操作 ... //过期map元素 private class ExpiringObject { private K key; private V value; private long lastAccessTime;//上次访问时间 //可重入读写锁,保护lastAccessTime的读写操作 private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock(); ExpiringObject(K key, V value, long lastAccessTime) { if (value == null) { throw new IllegalArgumentException("An expiring object cannot be null."); } this.key = key; this.value = value; this.lastAccessTime = lastAccessTime; } public long getLastAccessTime() { lastAccessTimeLock.readLock().lock(); try { return lastAccessTime; } finally { lastAccessTimeLock.readLock().unlock(); } } public void setLastAccessTime(long lastAccessTime) { lastAccessTimeLock.writeLock().lock(); try { this.lastAccessTime = lastAccessTime; } finally { lastAccessTimeLock.writeLock().unlock(); } } public K getKey() { return key; } public V getValue() { return value; } @Override public boolean equals(Object obj) { return value.equals(obj); } @Override public int hashCode() { return value.hashCode(); } } /** * A Thread that monitors an {@link ExpiringMap} and will remove * elements that have passed the threshold. * */ public class Expirer implements Runnable { //状态锁 private final ReadWriteLock stateLock = new ReentrantReadWriteLock(); private long timeToLiveMillis;//保活时间 private long expirationIntervalMillis;//过期检查间隔时间 private boolean running = false; private final Thread expirerThread; /** * Creates a new instance of Expirer. * */ public Expirer() { expirerThread = new Thread(this, "ExpiringMapExpirer-" + expirerCount++); expirerThread.setDaemon(true); } /** * {@inheritDoc} */ @Override public void run() { while (running) { processExpires(); try { Thread.sleep(expirationIntervalMillis); } catch (InterruptedException e) { // Do nothing } } } private void processExpires() { long timeNow = System.currentTimeMillis(); //遍历代理Map中的过期元素ExpiringObject for (ExpiringObject o : delegate.values()) { if (timeToLiveMillis <= 0) { continue; } long timeIdle = timeNow - o.getLastAccessTime(); if (timeIdle >= timeToLiveMillis) { //如果过期,则从代理Map中移除对象 delegate.remove(o.getKey()); for (ExpirationListener<V> listener : expirationListeners) { //通知过期监听器,过期对象已移除 listener.expired(o.getValue()); } } } } /** * Kick off this thread which will look for old objects and remove them. *启动过期检查线程 */ public void startExpiring() { stateLock.writeLock().lock(); try { if (!running) { running = true; expirerThread.start(); } } finally { stateLock.writeLock().unlock(); } } /** * If this thread has not started, then start it. * Otherwise just return; 如果过期检查线程没有启动,则启动 */ public void startExpiringIfNotStarted() { stateLock.readLock().lock(); try { if (running) { return; } } finally { stateLock.readLock().unlock(); } stateLock.writeLock().lock(); try { if (!running) { running = true; expirerThread.start(); } } finally { stateLock.writeLock().unlock(); } } /** * Stop the thread from monitoring the map. 中断过期检查线程,监控过期Map */ public void stopExpiring() { stateLock.writeLock().lock(); try { if (running) { running = false; expirerThread.interrupt(); } } finally { stateLock.writeLock().unlock(); } } /** * Checks to see if the thread is running * * @return * If the thread is running, true. Otherwise false. */ public boolean isRunning() { stateLock.readLock().lock(); try { return running; } finally { stateLock.readLock().unlock(); } } //配置获取对象生存时间 /** * @return the Time-to-live value in seconds. */ public int getTimeToLive() { stateLock.readLock().lock(); try { return (int) timeToLiveMillis / 1000; } finally { stateLock.readLock().unlock(); } } /** * Update the value for the time-to-live * * @param timeToLive * The time-to-live (seconds) */ public void setTimeToLive(long timeToLive) { stateLock.writeLock().lock(); try { this.timeToLiveMillis = timeToLive * 1000; } finally { stateLock.writeLock().unlock(); } } //配置获取过期检查间隔 /** * Get the interval in which an object will live in the map before * it is removed. * * @return * The time in seconds. */ public int getExpirationInterval() { stateLock.readLock().lock(); try { return (int) expirationIntervalMillis / 1000; } finally { stateLock.readLock().unlock(); } } /** * Set the interval in which an object will live in the map before * it is removed. * * @param expirationInterval * The time in seconds */ public void setExpirationInterval(long expirationInterval) { stateLock.writeLock().lock(); try { this.expirationIntervalMillis = expirationInterval * 1000; } finally { stateLock.writeLock().unlock(); } } } }
//对象过期监听器ExpirationListener
public interface ExpirationListener { public abstract void expired(Object obj); }
上一篇: Ruby 1.9概要(3)类和模块
下一篇: Ruby 1.9概要(3)类和模块