欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

WindowsSelectorImpl解析二(选择操作,通道注册,通道反注册,选择器关闭等)

程序员文章站 2022-04-24 16:13:03
...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper):[url]http://donald-draper.iteye.com/blog/2370811[/url]
[url][b]引言:[/b][/url]
上一篇文章我们简单看了一下的WindowsSelectorImpl内部集合和变量及同步锁的定义,先来回顾下,
WindowsSelectorImpl默认加载net和nio资源库;WindowsSelectorImpl内锁4个,分别为关闭锁closeLock,中断锁interruptLock,startLock,finishLock后面两个的作用,目前还不清楚,后面再说;一个唤醒管道,作用尚不明确;一个注册到选择器的通道计数器totalChannels;updateCount计数器作用,尚不明确;通道集合channelArray,存放的元素实际为通道关联的选择key;
pollWrapper用于存储选择key和相应的兴趣事件,及唤醒管道的源通道,唤醒管道的源通道存放在pollWrapper的索引0位置上。
FdMap主要是存储选择key的,FdMap实际上是一个HashMap,key为选择key的文件描述id,value为MapEntry,MapEntry为选择key的包装Entry,里面含有更新计数器updateCount和清除计数器clearedCount。
PollArrayWrapper存放选择key和通道及其相关的操作事件。PollArrayWrapper通过AllocatedNativeObject来存储先关的文件描述及其兴趣事件,AllocatedNativeObject
为已分配的底层内存空间,AllocatedNativeObject的内存主要NativeObject来分配,
NativeObject实际是通过Unsafe来分配内存。PollArrayWrapper作用即存放选择key和选择key关注的事件,用选择key的文件描述id,表示选择key,文件描述id为int,所以占4个字节,选择key的兴趣操作事件也为int,即4个字节,所以SIZE_POLLFD为8,文件描述id开始位置FD_OFFSET为0,兴趣事件开始位置EVENT_OFFSET为4;FD_OFFSET和EVENT_OFFSET都是相对于SIZE_POLLFD的。PollArrayWrapper同时存储唤醒等待选择操作的选择器的通道和唤醒通道关注事件,即通道注册选择器事件,即添加选择key事件。当有通道注册到选择器,则唤醒通道,唤醒等待选择操作的选择器。
这里我们把WindowsSelectorImpl的内部结构和构造代码贴出了,以便我们进一步跟进上一篇文章遗留的问题。
final class WindowsSelectorImpl extends SelectorImpl
{
private final int INIT_CAP = 8;//选择key集合,key包装集合初始化容量
private static final int MAX_SELECTABLE_FDS = 1024;//最大选择key数量
private SelectionKeyImpl channelArray[];//选择器关联通道集合
private PollArrayWrapper pollWrapper;//存放所有文件描述对象(选择key,唤醒管道的源与sink通道)的集合
private int totalChannels;//注册到选择的通道数量
private int threadsCount;//选择线程数
private final List threads = new ArrayList();//选择操作线程集合
private final Pipe wakeupPipe = Pipe.open();//唤醒等待选择操操的管道
private final int wakeupSourceFd;//唤醒管道源通道文件描述
private final int wakeupSinkFd;//唤醒管道sink通道文件描述
private Object closeLock;//选择器关闭同步锁
private final FdMap fdMap = new FdMap();//存放选择key文件描述与选择key映射关系的Map
//子选择器,主要从pollWrapper拉取读写选择key,并更新key通道的就绪操作事件集
private final SubSelector subSelector = new SubSelector();
private long timeout;//超时时间,从pollWrapper拉取文件描述的超时时间
private final Object interruptLock = new Object();//中断同步锁,在唤醒选择操作线程时,用于同步
private volatile boolean interruptTriggered;//是否唤醒等待选择操的线程
private final StartLock startLock = new StartLock();//选择操作开始锁
private final FinishLock finishLock = new FinishLock();//选择操作结束锁
private long updateCount;//已选择key,更新就绪操作事件计数器
static final boolean $assertionsDisabled = !sun/nio/ch/WindowsSelectorImpl.desiredAssertionStatus();
static
{
//加载nio,net资源库
Util.load();
}
}

WindowsSelectorImpl(SelectorProvider selectorprovider)
throws IOException
{
super(selectorprovider);
//创建选择器关联通道数组,实际存的为选择key
channelArray = new SelectionKeyImpl[8];
totalChannels = 1;//通道计数器
threadsCount = 0;//线程计数器
closeLock = new Object();//关闭锁
interruptTriggered = false;
updateCount = 0L;//更新计数器
pollWrapper = new PollArrayWrapper(8);
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();//唤醒管道源通道文件描述id
SinkChannelImpl sinkchannelimpl = (SinkChannelImpl)wakeupPipe.sink();//唤醒管道sink通道
sinkchannelimpl.sc.socket().setTcpNoDelay(true);//设置唤醒管道sink通道的Socket为无延时
wakeupSinkFd = sinkchannelimpl.getFDVal();
//将唤醒管道的源通道文件描述id添加pollWrapper的索引0位置上
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}

WindowsSelectorImpl默认加载net和nio资源库;WindowsSelectorImpl内锁4个,分别为关闭锁closeLock,中断锁interruptLock,startLock,finishLock后面两个的作用,目前还不清楚,后面再说;一个唤醒管道,作用尚不明确;一个注册到选择器的通道计数器totalChannels;updateCount计数器作用,尚不明确;通道集合channelArray,存放的元素实际为通道关联的选择key;pollWrapper用于存储选择key和相应的兴趣事件,及唤醒管道的源通道,唤醒管道的源通道存放在pollWrapper的索引0位置上。
我们要关注的几个方法为
1.注册key操作的implRegister方法
2.处理取消key集合方法中implDereg方法
3.选择操作中的doSelect(long l)
4.唤醒方法wakeup
5.实际关闭选择通道方法implClose
今天我们通过这几个方法,来理解interruptLock,startLock,finishLock,totalChannels,
threadsCount,updateCount,wakeupPipe(wakeupSinkFd,wakeupSourceFd),
subSelector等作用。
下面先看
1.注册key操作的implRegister方法
protected void implRegister(SelectionKeyImpl selectionkeyimpl)
{
//同步关闭锁,以防在注册的过程中,选择器被关闭
synchronized(closeLock)
{
if(pollWrapper == null)
//文件描述包装集合为null,即选器已关闭
throw new ClosedSelectorException();
growIfNeeded();//
channelArray[totalChannels] = selectionkeyimpl;//添加到选择器通道集合
selectionkeyimpl.setIndex(totalChannels);//设置key在选择器通道集合的索引
fdMap.put(selectionkeyimpl);//添加选择key到文件描述fdMap
keys.add(selectionkeyimpl);//添加key到key集合
//将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper
pollWrapper.addEntry(totalChannels, selectionkeyimpl);
totalChannels++;//通道计数器自增
}
}

这个方法中我们需要关注的是这一句:
growIfNeeded();

private void growIfNeeded()
{
//如果选择器通道集合已满
if(channelArray.length == totalChannels)
{
//扩容选择器通道集合的容量为原来的两倍
int i = totalChannels * 2;
//创建新的选择器通道集合,并将原始通道集合元素,拷贝到新集合中
SelectionKeyImpl aselectionkeyimpl[] = new SelectionKeyImpl[i];
System.arraycopy(channelArray, 1, aselectionkeyimpl, 1, totalChannels - 1);
channelArray = aselectionkeyimpl;
//扩容文件描述信息及关注操作事件包装集合pollWrapper
pollWrapper.grow(i);
}
//如果通道数量为1024的整数倍
if(totalChannels % 1024 == 0)
{
//添加唤醒源通道到pollWrapper的索引totalChannels
pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
totalChannels++;//通道计数器自增
threadsCount++;//线程数自增
}
}

到这里可以看到通道计数器totalChannels,是pollWrapper中文件描述符的数量,在WindowsSelectorImpl的构造中totalChannels被初始化为1,而不是0,因为在构造是已经将wakeupSourceFd添加到pollWrapper,wakeupSourceFd在pollWrapper为位置假设为i,那么通道集合相应位置上的元素实际上为null。
从implRegister的方法,可以看出,首先同步关闭锁,以防在注册的过程中,选择器被关闭;
检查选择器是否关闭,没有关闭,则检查是否扩容,需要则扩容为pollWrapper为原来的两倍;检查过后,添加选择key到选择器通道集合,设置key在选择器通道集合的索引,添加选择key到文件描述fdMap,添加key到key集合,将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper,通道计数器自增;

再来看实际的反注册key方法
2.处理取消key集合方法中implDereg方法
protected void implDereg(SelectionKeyImpl selectionkeyimpl)
throws IOException
{
//获取选择key在选择器通道集合中的索引
int i = selectionkeyimpl.getIndex();
//断言开启,且断言失败,i小于0,抛出断言异常
if(!$assertionsDisabled && i < 0)
throw new AssertionError();
if(i != totalChannels - 1)
{
//如果反注册的key不在通道集合的尾部,则与尾部交换,更新key的通道集合索引
SelectionKeyImpl selectionkeyimpl1 = channelArray[totalChannels - 1];
channelArray[i] = selectionkeyimpl1;
selectionkeyimpl1.setIndex(i);
//并将交换信息同步到pollWrapper
pollWrapper.replaceEntry(pollWrapper, totalChannels - 1, pollWrapper, i);
}
//置空通道集合totalChannels - 1索引上的元素
channelArray[totalChannels - 1] = null;
totalChannels--;//通道计数器自减
selectionkeyimpl.setIndex(-1);//设置反注册key的索引为-1,已无效
if(totalChannels != 1 && totalChannels % 1024 == 1)
{
//如果反注册后,通道计数器不为1,且当前通道为每批次(1024)的第一个通道,
//则通道计数器自减,线程计数器自减
totalChannels--;
threadsCount--;
}
//从fdMap,keys,selectedKeys集合移除选择key
fdMap.remove(selectionkeyimpl);
keys.remove(selectionkeyimpl);
selectedKeys.remove(selectionkeyimpl);
//将key从通道中移除
deregister(selectionkeyimpl);
SelectableChannel selectablechannel = selectionkeyimpl.channel();
if(!selectablechannel.isOpen() && !selectablechannel.isRegistered())
//如果通道关闭,且为注册,则kill
((SelChImpl)selectablechannel).kill();
}

从implDereg的方法,可以看出点线程计数器threadsCount的意思,threadCount记录的是
pollWrapper中wakeupSourceFd的数量,pollWrapper的文件描述是分批次的,每批次有1024文件描述,这个批次的文件描述的第一个为wakeupSourceFd。implDereg方法首选判断反注册的key是不是在通道key尾部,不在交换,并将交换信息更新到pollWrapper,
从fdMap,keys,selectedKeys集合移除选择key,并将key从通道中移除。
3.选择操作中的doSelect(long l)
在看这个方法之前,因为涉及到StartLock,FinishLock,SelectThread和SubSelector,先看一下这个类的定义;
private final class SubSelector
{
private final int pollArrayIndex;//pollWrapper索引
private final int readFds[];//读操作文件描述符集
private final int writeFds[];//写操作文件描述符集
private final int exceptFds[];
final WindowsSelectorImpl this$0;
private SubSelector()
{
this$0 = WindowsSelectorImpl.this;
super();
readFds = new int[1025];
writeFds = new int[1025];
exceptFds = new int[1025];
//如果索引参数(线程集合的索引),则初始化为0
pollArrayIndex = 0;
}
private SubSelector(int i)
{
this$0 = WindowsSelectorImpl.this;
super();
readFds = new int[1025];
writeFds = new int[1025];
exceptFds = new int[1025];
//有索引参数,则定位pollArrayIndex的位置,每个线程
//处理的文件描述为1024个
pollArrayIndex = (i + 1) * 1024;
}
//从pollWrapper的pollArrayAddress位置超时读取Math.min(totalChannels, 1024)个文件描述
//到读写操作文件描述符集
private int poll()
throws IOException
{
return poll0(pollWrapper.pollArrayAddress, Math.min(totalChannels, 1024), readFds, writeFds, exceptFds, timeout);
}
//从pollWrapper的pollArrayAddress + (long)(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD)位置
//超时读取Math.min(1024, totalChannels - (i + 1) * 1024)个文件描述
//到读写操作文件描述符集
private int poll(int i)
throws IOException
{
return poll0(pollWrapper.pollArrayAddress + (long)(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), Math.min(1024, totalChannels - (i + 1) * 1024), readFds, writeFds, exceptFds, timeout);
}
private native int poll0(long l, int i, int ai[], int ai1[], int ai2[], long l1);
//处理选择key集合,l为选择key清除次数
private int processSelectedKeys(long l)
{
int i = 0;
//处理读操作选择key
i += processFDSet(l, readFds, 1, false);
//处理写操作选择key
i += processFDSet(l, writeFds, 6, false);
i += processFDSet(l, exceptFds, 7, true);
return i;
}
private int processFDSet(long l, int ai[], int i, boolean flag)
{
int j = 0;
for(int k = 1; k <= ai[0]; k++)
{
int i1 = ai[k];
if(i1 == wakeupSourceFd)
{
synchronized(interruptLock)
{
//可以唤醒等待操作的选择操作线程
interruptTriggered = true;
}
continue;
}
MapEntry mapentry = fdMap.get(i1);
if(mapentry == null)
continue;
SelectionKeyImpl selectionkeyimpl = mapentry.ski;
//是否丢弃i1文件描述的UrgentData
if(flag && (selectionkeyimpl.channel() instanceof SocketChannelImpl) && discardUrgentData(i1))
continue;
//如果选择key集合selectedKeys中包括selectionkeyimpl,则根据clearedCount和当前清除计数器l
//判断是重新设置通道就绪事件,还是直接更新通道就绪事件
if(selectedKeys.contains(selectionkeyimpl))
{
if(mapentry.clearedCount != l)
{
//选择key的清除计数器不为l,则重新设置通道就绪事件
if(selectionkeyimpl.channel.translateAndSetReadyOps(i, selectionkeyimpl) && mapentry.updateCount != l)
{
//更新计数器自增
mapentry.updateCount = l;
j++;
}
} else
//直接更新通道就绪事件
if(selectionkeyimpl.channel.translateAndUpdateReadyOps(i, selectionkeyimpl) && mapentry.updateCount != l)
{
mapentry.updateCount = l;
j++;
}
mapentry.clearedCount = l;
continue;
}
/如果选择key集合selectedKeys中不包括selectionkeyimpl
if(mapentry.clearedCount != l)
{
//设置通道就绪事件,并添加到selectedKeys集合中,重置更新计数器
selectionkeyimpl.channel.translateAndSetReadyOps(i, selectionkeyimpl);
if((selectionkeyimpl.nioReadyOps() & selectionkeyimpl.nioInterestOps()) != 0)
{
selectedKeys.add(selectionkeyimpl);
mapentry.updateCount = l;
j++;
}
} else
{
//直接更新通道就绪事件
selectionkeyimpl.channel.translateAndUpdateReadyOps(i, selectionkeyimpl);
if((selectionkeyimpl.nioReadyOps() & selectionkeyimpl.nioInterestOps()) != 0)
{
selectedKeys.add(selectionkeyimpl);
mapentry.updateCount = l;
j++;
}
}
mapentry.clearedCount = l;
}

return j;
}
}

private native boolean discardUrgentData(int i);

从上可以看出SubSelector主要有两个方法以poll从pollWrapper拉取关注读写事件的选择key;processSelectedKeys方法主要是更新关注读写事件的选择key的相关通道的已经就绪的操作事件集。
//StartLock
private final class StartLock
{
private long runsCounter;//选择操作线程计数器
final WindowsSelectorImpl this$0;
private StartLock()
{
this$0 = WindowsSelectorImpl.this;
super();
}
private synchronized void startThreads()
{
//启动线程,唤醒所有等待开始的选择操作线程
runsCounter++;
notifyAll();
}
private synchronized boolean waitForStart(SelectThread selectthread)
{
while(runsCounter == selectthread.lastRun)
try
{
//选择操作线程等待开始信号
startLock.wait();
}
catch(InterruptedException interruptedexception)
{
Thread.currentThread().interrupt();
}
if(selectthread.isZombie())
{
//处于等待状态
return true;
} else
{
//选择操作线程正在执行,waitForStart返回else
selectthread.lastRun = runsCounter;
return false;
}
}
}

StartLock主要控制选择线程,startThreads方法为唤醒所有等待选择操作的线程,
运行计数器runsCounter自增,waitForStart方法为,判断选择线程是否需要等待
开始锁。
//FinishLock
private final class FinishLock
{
private int threadsToFinish;//选择操作线程计数器
IOException exception;
final WindowsSelectorImpl this$0;
private FinishLock()
{
this$0 = WindowsSelectorImpl.this;
super();
exception = null;
}
//重置需要完成选择操作的线程计数器
private void reset()
{
threadsToFinish = threads.size();
}
private synchronized void threadFinished()
{
if(threadsToFinish == threads.size())
//如果选择操作线程计数器为线程集合的大小,则唤醒等待选择操作的线程
wakeup();
//选择操作线程计数器自减
threadsToFinish--;
if(threadsToFinish == 0)
//如果选择线程都执行完,则唤醒等待完成锁的线程
notify();
}
private synchronized void waitForHelperThreads()
{
if(threadsToFinish == threads.size())
//唤醒等待选择操作的线程
wakeup();
//如果所有选择操作没完成,则等待所有选择操作完成
while(threadsToFinish != 0)
try
{
//只有在所有选择操作线程都完成后,完成锁才释放。
finishLock.wait();
}
catch(InterruptedException interruptedexception)
{
Thread.currentThread().interrupt();
}
}
//设置选择线程执行异常
private synchronized void setException(IOException ioexception)
{
exception = ioexception;
}
//检查异常,如果选择线程执行异常,将异常包装为IOException抛出
private void checkForException()
throws IOException
{
if(exception == null)
{
return;
} else
{
StringBuffer stringbuffer = new StringBuffer("An exception occured during the execution of select(): \n");
stringbuffer.append(exception);
stringbuffer.append('\n');
exception = null;
throw new IOException(stringbuffer.toString());
}
}
}

FinishLock用于控制线程集合中的选择线程,完成锁只有在所有线程集合中的执行完,
才释放,waitForHelperThreads方法为等待完成锁,threadFinished方法为当前选择线程
已结束,更新完成的选择线程计数器threadsToFinish(减一),reset方法重置threadsToFinish为线程集合大小。
再来看SelectThread
//SelectThread
private final class SelectThread extends Thread
{
//选择线程索引,即WindowsSelectorImpl中的线程集合对应的索引
//为什么是这样,我们后面会说
private final int index;
final SubSelector subSelector;
private long lastRun;//已经运行的选择操作线程数
//选择操作线程状态,是否处于等待状态或空闲状态
private volatile boolean zombie;
final WindowsSelectorImpl this$0;
private SelectThread(int i)
{
this$0 = WindowsSelectorImpl.this;
super();
lastRun = 0L;
index = i;
subSelector = new SubSelector(i);
lastRun = startLock.runsCounter;
}
//置选择操作状态为空闲等待
void makeZombie()
{
zombie = true;
}
boolean isZombie()
{
return zombie;
}
public void run()
{
do
{
//如果需要等待开始锁,则直接返回
if(startLock.waitForStart(this))
return;
try
{
//否则拉取索引index批次的选择key
subSelector.poll(index);
}
catch(IOException ioexception)
{
finishLock.setException(ioexception);
}
//完成,更新完成选择操作线程计数器,自减
finishLock.threadFinished();
} while(true);
}
}

SelectThread线程启动时等待startLock,从pollWrapper拉取索引index对应的关注读写事件的选择key如果运行异常,则设置finishLock的finishLock,运行结束则更新完成选择操作线程计数器(自减)。

当这里我们已经把StartLock,FinishLock,SelectThread和SubSelector看完,再来看
选择操作中的doSelect(long l)
protected int doSelect(long l)
throws IOException
{
if(channelArray == null)
//选择器已关闭
throw new ClosedSelectorException();
timeout = l;//设置超时时间
//反注册已经取消的选key
processDeregisterQueue();
if(interruptTriggered)
{
//已触发中断,则重新设置唤醒通道
resetWakeupSocket();
return 0;
}
//调整选择线程数量
adjustThreadsCount();
//重置完成锁需要完整的选择线程数量
finishLock.reset();
//启动所有等待选择操作的线程
startLock.startThreads();
begin();//这个方法与end方法配合使用记录在io操作的过程中是否被中断
try
{
//pollWrapper的起始位置拉取读写选择key
subSelector.poll();
}
catch(IOException ioexception)
{
finishLock.setException(ioexception);
}
//如果选择线程不为空,则等待所有选择线程结束
if(threads.size() > 0)
finishLock.waitForHelperThreads();
end();
break MISSING_BLOCK_LABEL_114;
Exception exception;
exception;
end();
throw exception;
//检查异常
finishLock.checkForException();
processDeregisterQueue();
//更新已准备就绪的通道操作事件
int i = updateSelectedKeys();
resetWakeupSocket();
return i;
}

这个方法中我们有几点要看
3.1
//已触发中断,则重新设置唤醒通道
 resetWakeupSocket();

3.2
//调整选择线程数量
adjustThreadsCount();

3.3
//更新已准备就绪的通道操作事件
int i = updateSelectedKeys();

下面分别来看这几点:
3.1
//已触发中断,则重新设置唤醒通道
resetWakeupSocket();


private void resetWakeupSocket()
{
label0:
{
synchronized(interruptLock)
{
if(interruptTriggered)
break label0;
}
return;
}
//重新设置唤醒通道文件描述符
resetWakeupSocket0(wakeupSourceFd);
interruptTriggered = false;
obj;
JVM INSTR monitorexit ;
goto _L1
exception;
throw exception;
_L1:
}

private native void resetWakeupSocket0(int i);


3.2
//调整选择线程数量
adjustThreadsCount();

 private void adjustThreadsCount()
{
if(threadsCount > threads.size())
{
//需要的线程数量大于线程集合实际线程数,补充选择操作线程
for(int i = threads.size(); i < threadsCount; i++)
{
SelectThread selectthread = new SelectThread(i);
threads.add(selectthread);
selectthread.setDaemon(true);
selectthread.start();
}

} else
if(threadsCount < threads.size())
{
//否则从选择线程集合中移除不必要的选择线程,并设置状态为Zombie(空闲)
for(int j = threads.size() - 1; j >= threadsCount; j--)
((SelectThread)threads.remove(j)).makeZombie();

}
}

3.3
//更新已准备就绪的通道操作事件
int i = updateSelectedKeys();


private int updateSelectedKeys()
{
updateCount++;
int i = 0
i += subSelector.processSelectedKeys(updateCount);
//遍历选择线程集,更新子选择线程中已经就绪的通道操作事件
for(Iterator iterator = threads.iterator(); iterator.hasNext();)
{
SelectThread selectthread = (SelectThread)iterator.next();
//更新通道已经就绪的操作事件
i += selectthread.subSelector.processSelectedKeys(updateCount);
}

return i;
}

doSelect方法将选择操作分成多个选择线程SelectThread放在选择线程放在threads集合中,每个SelectThread使用,SubSelector从当前注册到选择器的通道中选取SubSelector索引所对应的批次的通道已经就绪的通道并更新操作事件。整个选择过程有startLock和finishLock来控制。再有在一个选择操作的所有子选择线程执行完,才释放finishLock。下一个选择操作才能开始,即startLock可用。

4.唤醒方法wakeup
public Selector wakeup()
{
synchronized(interruptLock)
{
if(!interruptTriggered)
{
//设置唤醒sink通道
setWakeupSocket();
interruptTriggered = true;
}
}
return this;
}
private void setWakeupSocket()
{
//唤醒所有等待选择操作的线程
setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int i);

wakeup主要是通过sink通道发送信息给source通道(native实现),通知子选择线程可以进行选择操作。子选择线程选择主要处理相应批次的1024个通道就绪事件(每批次通道关联到source通道)。
5.实际关闭选择通道方法implClose

protected void implClose()
throws IOException
{
synchronized(closeLock)
{
if(channelArray != null && pollWrapper != null)
{
synchronized(interruptLock)
{
interruptTriggered = true;
}
//关闭唤醒管道的source和sink通道
wakeupPipe.sink().close();
wakeupPipe.source().close();
for(int i = 1; i < totalChannels; i++)
{
if(i % 1024 == 0)
//1024的整数位置为唤醒source通道,跳过
continue;
//反注册通道
deregister(channelArray[i]);
SelectableChannel selectablechannel = channelArray[i].channel();
if(!selectablechannel.isOpen() && !selectablechannel.isRegistered())
((SelChImpl)selectablechannel).kill();
}
//释放pollWrapper空间
pollWrapper.free();
pollWrapper = null;
selectedKeys = null;
channelArray = null;
SelectThread selectthread;
//结束所有选择线程集合中的线程
for(Iterator iterator = threads.iterator(); iterator.hasNext(); selectthread.makeZombie())
selectthread = (SelectThread)iterator.next();

startLock.startThreads();
}
}
}

implClose方法主要关闭唤醒管道的sink和source通道,反注册选择器的所有通道,释放所有通道空间,结束所有选择线程集合中的线程。
再回到WindowsSelectorImpl集合,变量声明和构造
final class WindowsSelectorImpl extends SelectorImpl
{
private final int INIT_CAP = 8;//选择key集合,key包装集合初始化容量
private static final int MAX_SELECTABLE_FDS = 1024;//最大选择key数量
private SelectionKeyImpl channelArray[];//选择器关联通道集合
private PollArrayWrapper pollWrapper;//存放所有文件描述对象(选择key,唤醒管道的源与sink通道)的集合
private int totalChannels;//注册到选择的通道数量
private int threadsCount;//选择线程数
private final List threads = new ArrayList();//选择操作线程集合
private final Pipe wakeupPipe = Pipe.open();//唤醒等待选择操操的管道
private final int wakeupSourceFd;//唤醒管道源通道文件描述
private final int wakeupSinkFd;//唤醒管道sink通道文件描述
private Object closeLock;//选择器关闭同步锁
private final FdMap fdMap = new FdMap();//存放选择key文件描述与选择key映射关系的Map
//子选择器,主要从pollWrapper拉取读写选择key,并更新key通道的就绪操作事件集
private final SubSelector subSelector = new SubSelector();
private long timeout;//超时时间,从pollWrapper拉取文件描述的超时时间
private final Object interruptLock = new Object();//中断同步锁,在唤醒选择操作线程时,用于同步
private volatile boolean interruptTriggered;//是否唤醒等待选择操的线程
private final StartLock startLock = new StartLock();//选择操作开始锁
private final FinishLock finishLock = new FinishLock();//选择操作结束锁
private long updateCount;//已选择key,更新就绪操作事件计数器
static final boolean $assertionsDisabled = !sun/nio/ch/WindowsSelectorImpl.desiredAssertionStatus();
static
{
//加载nio,net资源库
Util.load();
}
}

WindowsSelectorImpl(SelectorProvider selectorprovider)
throws IOException
{
super(selectorprovider);
//创建选择器关联通道数组,实际存的为选择key
channelArray = new SelectionKeyImpl[8];
totalChannels = 1;//通道计数器
threadsCount = 0;//线程计数器
closeLock = new Object();//关闭锁
interruptTriggered = false;//唤醒是否触发
updateCount = 0L;//更新计数器
pollWrapper = new PollArrayWrapper(8);
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();//唤醒管道源通道文件描述id
SinkChannelImpl sinkchannelimpl = (SinkChannelImpl)wakeupPipe.sink();//唤醒管道sink通道
sinkchannelimpl.sc.socket().setTcpNoDelay(true);//设置唤醒管道sink通道的Socket为无延时
wakeupSinkFd = sinkchannelimpl.getFDVal();
//将唤醒管道的源通道文件描述id添加pollWrapper的索引0位置上
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}

WindowsSelectorImpl默认加载net和nio资源库;WindowsSelectorImpl内锁4个,分别为关闭锁closeLock,中断锁interruptLock,选择操作开始锁startLock,选择操作结束finishLock控制;一个唤醒管道(wakeupSourceFd,wakeupSinkFd),所有阻塞选择操的子选择线程与wakeupSourceFd相关联;唤醒方法主要是通过Sink通道发送消息给source通道,以唤醒所有选择操作线程。注册到选择器的通道计数器totalChannels;updateCount计数器,已选择key集合更新的记录数;通道集合channelArray,存放的元素实际为通道关联的选择key;

总结:
[color=blue] implRegister方法,首先同步关闭锁,以防在注册的过程中,选择器被关闭;检查选择器是否关闭,没有关闭,则检查是否扩容,需要则扩容为pollWrapper为原来的两倍;检查过后,添加选择key到选择器通道集合,设置key在选择器通道集合的索引,添加选择key到文件描述fdMap,添加key到key集合,将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper,通道计数器自增。
implDereg方法首选判断反注册的key是不是在通道key尾部,不在交换,并将交换信息更新到pollWrapper,从fdMap,keys,selectedKeys集合移除选择key,并将key从通道中移除。
SubSelector主要有两个方法以poll从pollWrapper拉取关注读写事件的选择key;
processSelectedKeys方法主要是更新关注读写事件的选择key的相关通道的已经就绪的操作事件集。
StartLock主要控制选择线程,startThreads方法为唤醒所有等待选择操作的线程,运行计数器runsCounter自增,waitForStart方法为,判断选择线程是否需要等待开始锁。
FinishLock用于控制线程集合中的选择线程,完成锁只有在所有线程集合中的执行完,
才释放,waitForHelperThreads方法为等待完成锁,threadFinished方法为当前选择线程
已结束,更新完成的选择线程计数器threadsToFinish(减一),reset方法重置threadsToFinish为线程集合大小。
SelectThread线程启动时等待startLock,从pollWrapper拉取索引index对应的关注读写事件的选择key如果运行异常,则设置finishLock的finishLock,运行结束则更新完成选择操作线程计数器(自减)。
doSelect方法将选择操作分成多个选择线程SelectThread放在选择线程放在threads集合中,每个SelectThread使用SubSelector从当前注册到选择器的通道中选取SubSelector索引所对应的批次的通道已经就绪的通道并更新操作事件。整个选择过程有startLock和finishLock来控制。再有在一个选择操作的所有子选择线程执行完,才释放finishLock。下一个选择操作才能开始,即startLock可用。
wakeup主要是通过sink通道发送信息给source通道(native实现),通知子选择线程可以进行选择操作。子选择线程选择主要处理相应批次的1024个通道就绪事件(每批次通道关联到source通道)。
implClose方法主要关闭唤醒管道的sink和source通道,反注册选择器的所有通道,释放所有通道空间,结束所有选择线程集合中的线程


[/color]
相关标签: nio