欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

【Java】NIO中Selector的select方法源码分析

程序员文章站 2023-01-30 08:39:01
该篇博客的有些内容和在之前介绍过了,在这里再次涉及到的就不详细说了,如果有不理解请看【Java】NIO中Channel的注册源码分析, 【Java】NIO中Selector的创建源码分析 Selector的创建在Windows下默认生成WindowsSelectorImpl对象,那么Selector ......

该篇博客的有些内容和在之前介绍过了,在这里再次涉及到的就不详细说了,如果有不理解请看【java】nio中channel的注册源码分析【java】nio中selector的创建源码分析

 

selector的创建在windows下默认生成windowsselectorimpl对象,那么selector的select方法使用的就是windowsselectorimpl的select方法,而在windowsselectorimpl下并没有覆盖这个方法,而是由其基类selectorimpl实现的:

1 public int select() throws ioexception {
2     return this.select(0l);
3 }

这个方法调用了另一个重载的方法:

1 public int select(long var1) throws ioexception {
2     if (var1 < 0l) {
3         throw new illegalargumentexception("negative timeout");
4     } else {
5         return this.lockanddoselect(var1 == 0l ? -1l : var1);
6     }
7 }

首先对var1参数的合法性进行判断,无参传入进来的是0,实则交给lockanddoselect方法去完成,并且令参数为-1。

private int lockanddoselect(long var1) throws ioexception {
    synchronized(this) {
        if (!this.isopen()) {
            throw new closedselectorexception();
        } else {
            set var4 = this.publickeys;
            int var10000;
            synchronized(this.publickeys) {
                set var5 = this.publicselectedkeys;
                synchronized(this.publicselectedkeys) {
                    var10000 = this.doselect(var1);
                }
            }

            return var10000;
        }
    }
}

在方法执行时先使用同步块包裹,使用this作为锁;进入同步块先判断当前的selector对象是否关闭了,因为在初始化时就是开启状态,只有在关闭后isopen才是false;isopen是由abstractselector实现的:

 1 private atomicboolean selectoropen = new atomicboolean(true);
 2 public final boolean isopen() {
 3     return selectoropen.get();
 4 }
 5 public final void close() throws ioexception {
 6     boolean open = selectoropen.getandset(false);
 7     if (!open)
 8         return;
 9     implcloseselector();
10 }

可以看到在abstractselector中使用了原子化boolean值表示开启关闭。

回到selectorimpl的lockanddoselect,若是selector已经关闭则抛出closedselectorexception异常,否则分别以publickeys以及publicselectedkeys为锁,最终的实现交给抽象方法doselect完成;

1 protected abstract int doselect(long var1) throws ioexception;

其中publickeys是供外部访问的selectionkey集合,publicselectedkeys是供外部访问并且已经就绪的selectionkey集合。

因为使用的是windowsselectorimpl,所以来看看windowsselectorimpl的doselect实现:

 1 protected int doselect(long var1) throws ioexception {
 2     if (this.channelarray == null) {
 3         throw new closedselectorexception();
 4     } else {
 5         this.timeout = var1;
 6         this.processderegisterqueue();
 7         if (this.interrupttriggered) {
 8             this.resetwakeupsocket();
 9             return 0;
10         } else {
11             this.adjustthreadscount();
12             this.finishlock.reset();
13             this.startlock.startthreads();
14 
15             try {
16                 this.begin();
17 
18                 try {
19                     this.subselector.poll();
20                 } catch (ioexception var7) {
21                     this.finishlock.setexception(var7);
22                 }
23 
24                 if (this.threads.size() > 0) {
25                     this.finishlock.waitforhelperthreads();
26                 }
27             } finally {
28                 this.end();
29             }
30 
31             this.finishlock.checkforexception();
32             this.processderegisterqueue();
33             int var3 = this.updateselectedkeys();
34             this.resetwakeupsocket();
35             return var3;
36         }
37     }
38 }

首先判断channelarray是否为空,上一篇博客说了channelarray是一个selectionkeyimpl数组,selectionkeyimpl负责记录channel和selectionkey状态,channelarray是根据连接的channel数量动态维持的,初始化大小是8。

1 private selectionkeyimpl[] channelarray = new selectionkeyimpl[8];

selectionkeyimpl是selectionkey的子类,只有当selector调用close方法时,在回调函数中才会令channelarray=null,所以这还是检测selector是否关闭了。
接着继续,在前面传入的long类型的参数是-1,在这里令超时时间timeout就等于-1,
接着调用processderegisterqueue方法来取消准备撤销的集合
所谓的准备撤销的集合是因为selectionkey对象在调用cancel方法时,会使selector将其加入cancelledkeys,仅仅如此,真真的取消是在selector调用selector方法时执行

selectionkey的cancel方法是在abstractselectionkey中实现的:

 1 public final void cancel() {
 2    // synchronizing "this" to prevent this key from getting canceled
 3    // multiple times by different threads, which might cause race
 4    // condition between selector's select() and channel's close().
 5    synchronized (this) {
 6        if (valid) {
 7            valid = false;
 8            ((abstractselector)selector()).cancel(this);
 9        }
10    }
11 }

这个方法在上一篇讲过,可以看到基本上什么都没做,仅仅时调用了与它关联的selector对象(abstractselector)的cancel方法:
abstractselector的cancel方法:

1 private final set<selectionkey> cancelledkeys = new hashset<selectionkey>();
2 
3 void cancel(selectionkey k) {                      
4     synchronized (cancelledkeys) {
5         cancelledkeys.add(k);
6     }
7 }

cancelledkeys就是所谓的准备撤销的集合,可以看到abstractselector的cancel方法仅仅是把此时请求取消的selectionkey对象加入到cancelledkeys集合中,并没有多余的操作。

回到doselect方法,processderegisterqueue这个方法的实现是在selectorimpl中:

 1 void processderegisterqueue() throws ioexception {
 2     set var1 = this.cancelledkeys();
 3     synchronized(var1) {
 4         if (!var1.isempty()) {
 5             iterator var3 = var1.iterator();
 6 
 7             while(var3.hasnext()) {
 8                 selectionkeyimpl var4 = (selectionkeyimpl)var3.next();
 9 
10                 try {
11                     this.impldereg(var4);
12                 } catch (socketexception var11) {
13                     throw new ioexception("error deregistering key", var11);
14                 } finally {
15                     var3.remove();
16                 }
17             }
18         }
19 
20     }
21 }

这个方法的逻辑比较简单,首先得到准备撤销的集合cancelledkeys,判断是否有请求取消的,若有那么就进行遍历,实际的取消操作主要逻辑交给了抽象方法impldereg执行,最后再从集合中删除这个selectionkeyimpl对象。

impldereg方法的实现是在windowsselectorimpl中:

 1 protected void impldereg(selectionkeyimpl var1) throws ioexception {
 2     int var2 = var1.getindex();
 3 
 4     assert var2 >= 0;
 5 
 6     object var3 = this.closelock;
 7     synchronized(this.closelock) {
 8         if (var2 != this.totalchannels - 1) {
 9             selectionkeyimpl var4 = this.channelarray[this.totalchannels - 1];
10             this.channelarray[var2] = var4;
11             var4.setindex(var2);
12             this.pollwrapper.replaceentry(this.pollwrapper, this.totalchannels - 1, this.pollwrapper, var2);
13         }
14 
15         var1.setindex(-1);
16     }
17 
18     this.channelarray[this.totalchannels - 1] = null;
19     --this.totalchannels;
20     if (this.totalchannels != 1 && this.totalchannels % 1024 == 1) {
21         --this.totalchannels;
22         --this.threadscount;
23     }
24 
25     this.fdmap.remove(var1);
26     this.keys.remove(var1);
27     this.selectedkeys.remove(var1);
28     this.deregister(var1);
29     selectablechannel var7 = var1.channel();
30     if (!var7.isopen() && !var7.isregistered()) {
31         ((selchimpl)var7).kill();
32     }
33 
34 }

首先获取selectionkeyimpl的下标index,这个下标就是其在channelarray中的下标,检验下标的合法性;
在同步块内,首先检验这个selectionkeyimpl对象是否是数组的最后一个元素,若不是那么就直接用最后一个元素覆盖当前位置的selectionkeyimpl对象,同时还需要将pollwrapper中最后一个元素对应的channel描述符和事件响应覆盖到相应位置。无论该selectionkeyimpl对象是否是最后一个,都将其下标置为-1,防止再次访问。

再完成上述操作后,channelarray中的最后一个元素必然是不需要的,直接置为null,再totalchannels再自减。
接着根据totalchannels的数量来判断是否需要减少轮询线程的个数,这和注册时同理,就不再多说。
然后在fdmap中移除掉该selectionkeyimpl和channel的描述符映射(fdmap保存的是channel的描述符和selectionkeyimpl的映射关系,在上一篇提到过),keys和selectedkeys中同样也需要移除(keys所有注册了的selectionkey集合,selectedkeys是所有有事件就绪的selectionkey集合)。

这些操作仅仅是删除了其在selector中的映射关系,而真正的channel的(虽说是selectionkey的cancel方法,实则是channel要取消对某一事件的响应)取消操作是在deregister中执行:
deregister方法在abstractselector中实现:

1 protected final void deregister(abstractselectionkey key) {
2     ((abstractselectablechannel)key.channel()).removekey(key);
3 }

可以看到直接获取selectionkey对应的channel对象,然后调用abstractselectablechannel的removekey方法:

 1 void removekey(selectionkey k) {                  
 2     synchronized (keylock) {
 3         for (int i = 0; i < keys.length; i++)
 4             if (keys[i] == k) {
 5                 keys[i] = null;
 6                 keycount--;
 7             }
 8         ((abstractselectionkey)k).invalidate();
 9     }
10 }

前面的遍历很简单,通过遍历channel的所有绑定的selectionkey,即keys,直接将要取消的置为null,keycount再自减,最后调用selectionkey(abstractselectionkey)的invalidate方法:

1 void invalidate() {                               
2     valid = false;
3 }

直接设置valid属性为false,表明不可用。

回到impldereg中,最后一步操作,检查channel的活跃性,若是channel既没有打开且当且也没有注册了的selectionkey,那么直接“杀死”该channel。
而这个kill方法,在不同的channel中有不同的实现,
socketchannelimpl中:

 1 public void kill() throws ioexception {
 2    object var1 = this.statelock;
 3     synchronized(this.statelock) {
 4         if (this.state != 4) {
 5             if (this.state == -1) {
 6                 this.state = 4;
 7             } else {
 8                 assert !this.isopen() && !this.isregistered();
 9 
10                 if (this.readerthread == 0l && this.writerthread == 0l) {
11                     nd.close(this.fd);
12                     this.state = 4;
13                 } else {
14                     this.state = 3;
15                 }
16 
17             }
18         }
19     }
20 }

其中state表示socketchannelimpl的状态,一共有六种:

1 private static final int st_uninitialized = -1;     // 尚未初始化
2 private static final int st_unconnected = 0;         // 尚未建立连接
3 private static final int st_pending = 1;              // 未决状态
4 private static final int st_connected = 2;             // 连接状态
5 private static final int st_killpending = 3;         // kill的未决状态
6 private static final int st_killed = 4;             // kill状态
7 private int state = -1;

这样就很清晰,若是socketchannelimpl尚未初始化直接变为kill状态,否则检查再次检查channel的活跃性,若是不活跃就断言为false,直接结束,否则“杀死”。
接下来的判断中的readerthread和writerthread,我在看完socketchannelimpl后,发现一直都是赋值的0,并不知道会在何时发生修改,而且这两个成员的赋值都是在有数据读、写操作后,若是有知道的朋友想请教一下!
这个就先不讨论了,但是通过它们的赋值都是发生在有数据读、写操作后,那么就可以明白,若是完成了读、写,那么直接变为kill状态,否则,等待读、写完成,就变为kill的未决状态。
其中 nd.close(this.fd),nd是socket描述符,fd是文件描述符,这就是由操作系统来关闭socket描述符对应的文件描述符。

serversocketchannelimpl中kill:

 1 private static final int st_uninitialized = -1;      // 尚未初始化
 2 private static final int st_inuse = 0;                 // 使用中
 3 private static final int st_killed = 1;             // kill状态
 4 private int state = -1;
 5 
 6 public void kill() throws ioexception {
 7     object var1 = this.statelock;
 8     synchronized(this.statelock) {
 9         if (this.state != 1) {
10             if (this.state == -1) {
11                 this.state = 1;
12             } else {
13                 assert !this.isopen() && !this.isregistered();
14 
15                 nd.close(this.fd);
16                 this.state = 1;
17             }
18         }
19     }
20 }

serversocketchannelimpl就要简单一点,基本上一样,由于serversocketchannel只能注册accept事件响应,所以就没有判断读、写。

impldereg方法结束,processderegisterqueue也彻底结束,再回到doselect方法
接着检验interrupttriggered,表示是否触发中断。
interrupttriggered初始化时就是false,表示未触发中断,而在调用close或者wakeup方法时会触发中断,赋值true;

先看wakeup方法:

 1 public selector wakeup() {
 2     object var1 = this.interruptlock;
 3     synchronized(this.interruptlock) {
 4         if (!this.interrupttriggered) {
 5             this.setwakeupsocket();
 6             this.interrupttriggered = true;
 7         }
 8         
 9         return this;
10     }
11 }

可以看到核心是setwakeupsocket方法,当目前没有触发中断调用setwakeupsocket:

1 private void setwakeupsocket() {
2     this.setwakeupsocket0(this.wakeupsinkfd);
3 }
4 private native void setwakeupsocket0(int var1);

在讲selector的创建时说过,在selector创建时会产生一对socketchannel,分别是sourcechannelimpl和sinkchannelimpl,wakeupsinkfd是sinkchannelimpl的描述符。

再来看看setwakeupsocket0的实现:

java_sun_nio_ch_windowsselectorimpl_setwakeupsocket0(jnienv *env, jclass this,
                                                jint scoutfd) {
    /* write one byte into the pipe */
    const char byte = 1;
    send(scoutfd, &byte, 1, 0);
}

虽然是用c写的,但是依旧很清晰,就是通过这个双向通道的sink端向source发送一个字节的数据,这样source端描述符就进入就绪状态,就能被select感知到,selector便被唤醒。

再来看下close方法,在abstractselector中实现的:

1 public final void close() throws ioexception {
2     boolean open = selectoropen.getandset(false);
3     if (!open)
4         return;
5     implcloseselector();
6 }

核心是implcloseselector,在selectorimpl中实现:

 1 public void implcloseselector() throws ioexception {
 2     this.wakeup();
 3     synchronized(this) {
 4         set var2 = this.publickeys;
 5         synchronized(this.publickeys) {
 6             set var3 = this.publicselectedkeys;
 7             synchronized(this.publicselectedkeys) {
 8                 this.implclose();
 9             }
10         }
11 
12     }
13 }

一开始就直接调用wakeup方法唤醒,然后调用implclose方法:
implclose是在windowsselectorimpl中实现的:

 1 protected void implclose() throws ioexception {
 2     object var1 = this.closelock;
 3     synchronized(this.closelock) {
 4         if (this.channelarray != null && this.pollwrapper != null) {
 5             object var2 = this.interruptlock;
 6             synchronized(this.interruptlock) {
 7                 this.interrupttriggered = true;
 8             }
 9 
10             this.wakeuppipe.sink().close();
11             this.wakeuppipe.source().close();
12 
13             for(int var7 = 1; var7 < this.totalchannels; ++var7) {
14                 if (var7 % 1024 != 0) {
15                     this.deregister(this.channelarray[var7]);
16                     selectablechannel var3 = this.channelarray[var7].channel();
17                     if (!var3.isopen() && !var3.isregistered()) {
18                         ((selchimpl)var3).kill();
19                     }
20                 }
21             }
22 
23             this.pollwrapper.free();
24             this.pollwrapper = null;
25             this.selectedkeys = null;
26             this.channelarray = null;
27             iterator var8 = this.threads.iterator();
28 
29             while(var8.hasnext()) {
30                 windowsselectorimpl.selectthread var9 = (windowsselectorimpl.selectthread)var8.next();
31                 var9.makezombie();
32             }
33 
34             this.startlock.startthreads();
35         }
36 
37     }
38 }

根据channelarray和pollwrapper是否为null来检验是否有必要关闭资源,后面就是对一些资源的关闭,可以看到关闭了我们一开始建立的双向通道,取消了所有注册事件,顺便“杀死”不活跃的channel,删除所有映射关系,将所有轮询线程从阻塞中唤醒,关于makezombie和startlock后面给出。

再次回到doselect上,若是发生了中断,调用resetwakeupsocket方法恢复中断:

1 private void resetwakeupsocket() {
2     object var1 = this.interruptlock;
3     synchronized(this.interruptlock) {
4         if (this.interrupttriggered) {
5             this.resetwakeupsocket0(this.wakeupsourcefd);
6             this.interrupttriggered = false;
7         }
8     }
9 }

resetwakeupsocket0也是一个native方法,和setwakeupsocket0正好互补,用来读取setwakeupsocket0中发送的数据,再将interrupttriggered设置为false,最后doselect将会立即返回0,而不会调用poll操作。

在doselect判断没有触发中断后,首先调用adjustthreadscount调整轮询线程数量:

 1 private void adjustthreadscount() {
 2     int var1;
 3     if (this.threadscount > this.threads.size()) {
 4         for(var1 = this.threads.size(); var1 < this.threadscount; ++var1) {
 5             windowsselectorimpl.selectthread var2 = new windowsselectorimpl.selectthread(var1);
 6             this.threads.add(var2);
 7             var2.setdaemon(true);
 8             var2.start();
 9         }
10     } else if (this.threadscount < this.threads.size()) {
11         for(var1 = this.threads.size() - 1; var1 >= this.threadscount; --var1) {
12             ((windowsselectorimpl.selectthread)this.threads.remove(var1)).makezombie();
13         }
14     }
15 
16 }

threads是用arraylist存放的:

1 private final list<windowsselectorimpl.selectthread> threads = new arraylist();

逻辑比较简单,通过检查threadscount的数量和threads的大小比较,若是threadscount大于threads,则产生一个新的轮询线程selectthread,将其加入threads,并且设置轮询线程是守护线程,直接启动;若是threadscount小于threads,则移除并唤醒多余的轮询线程;若是threadscount等于threads什么都不做。

来看一下selectthread这个轮询线程具体是怎么工作的:

 1 private final class selectthread extends thread {
 2     private final int index;
 3     final windowsselectorimpl.subselector subselector;
 4     private long lastrun;
 5     private volatile boolean zombie;
 6 
 7     private selectthread(int var2) {
 8         this.lastrun = 0l;
 9         this.index = var2;
10         this.subselector = windowsselectorimpl.this.new subselector(var2);
11         this.lastrun = windowsselectorimpl.this.startlock.runscounter;
12     }
13 
14     void makezombie() {
15         this.zombie = true;
16     }
17 
18     boolean iszombie() {
19         return this.zombie;
20     }
21 
22     public void run() {
23         for(; !windowsselectorimpl.this.startlock.waitforstart(this); windowsselectorimpl.this.finishlock.threadfinished()) {
24             try {
25                 this.subselector.poll(this.index);
26             } catch (ioexception var2) {
27                 windowsselectorimpl.this.finishlock.setexception(var2);
28             }
29         }
30 
31     }
32 }

在构造方法中对几个成员完成初始化,index对应的是其在arraylist中的下标,lastrun 和startlock有关等会再说,subselector是真正执行轮询的对象;zombie是一个标志,在startlock中会使用到。
再来看run方法,核心就是调用subselector的poll方法,而何时调用该方法由startlock来决定。

startlock的定义:

 1 private final class startlock {
 2     private long runscounter;
 3 
 4     private startlock() {
 5     }
 6 
 7     private synchronized void startthreads() {
 8         ++this.runscounter;
 9         this.notifyall();
10     }
11 
12     private synchronized boolean waitforstart(windowsselectorimpl.selectthread var1) {
13         while(this.runscounter == var1.lastrun) {
14             try {
15                 windowsselectorimpl.this.startlock.wait();
16             } catch (interruptedexception var3) {
17                 thread.currentthread().interrupt();
18             }
19         }
20 
21         if (var1.iszombie()) {
22             return true;
23         } else {
24             var1.lastrun = this.runscounter;
25             return false;
26         }
27     }
28 }

在startthreads方法中,仅仅是通过synchronized 包裹,使runscounter自增,然后notifyall唤醒所有持有startlock对象锁的阻塞。
在windowsselectorimpl中startlock对象有且只有一份,对于所有selectthread来说startlock是公共的
waitforstart方法需要结合selectthread的run方法来看,首先先检验selectthread的lastrun成员是否和runscounter相等,若是相等直接阻塞,等待startthreads方法将其唤醒;若是不相等,说明它的run是在startthreads之后运行的,需要将lastrun更新后再执行。

回到selectthread中,我们再来看看subselector的定义:

 1 private final class subselector {
 2     private final int pollarrayindex;
 3     private final int[] readfds;
 4     private final int[] writefds;
 5     private final int[] exceptfds;
 6     
 7     private subselector() {
 8         this.readfds = new int[1025];
 9         this.writefds = new int[1025];
10         this.exceptfds = new int[1025];
11         this.pollarrayindex = 0;
12     }
13     
14     private subselector(int var2) {
15         this.readfds = new int[1025];
16         this.writefds = new int[1025];
17         this.exceptfds = new int[1025];
18         this.pollarrayindex = (var2 + 1) * 1024;
19     }
20     ......
21 }

其中无参构造是windowsselectorimpl使用的,单参构造由selectthread使用。
之前在讲channel的注册时说过,每1024个注册了的channel会开启一个selectthread轮询,如果是1024个以内,那么直接由windowsselectorimpl轮询,不交给selectthread处理,超过1024则windowsselectorimpl和selectthread一起轮询。

readfds 、writefds、exceptfds 分别对应读、写、异常描述符 ,在subselector构造中初始化大小都是1025,多出来的一个就是前面说过的wakeupsourcefd描述符,用于唤醒,所以是1025。pollarrayindex 对应其在pollwrapper中的wakeupsourcefd描述符的起始位置。

再来看看poll方法:

1 private int poll() throws ioexception {
2     return this.poll0(windowsselectorimpl.this.pollwrapper.pollarrayaddress, math.min(windowsselectorimpl.this.totalchannels, 1024), this.readfds, this.writefds, this.exceptfds, windowsselectorimpl.this.timeout);
3 }
4 
5 private int poll(int var1) throws ioexception {
6     return this.poll0(windowsselectorimpl.this.pollwrapper.pollarrayaddress + (long)(this.pollarrayindex * pollarraywrapper.size_pollfd), math.min(1024, windowsselectorimpl.this.totalchannels - (var1 + 1) * 1024), this.readfds, this.writefds, this.exceptfds, windowsselectorimpl.this.timeout);
7 }
8 
9 private native int poll0(long var1, int var3, int[] var4, int[] var5, int[] var6, long var7);

无参poll方法是windowsselectorimpl执行的,单参poll是由selectthread执行;
最后都调用poll0这个native方法,这个方法是真正的轮询核心,交由操作系统来完成。
其中pollarrayaddress是pollarray在内存空间的起始位置,在poll()中直接定位到最开始,而在poll(int var1)中通过加上pollarrayindex * pollarraywrapper.size_pollfd这个偏移量定位。
pollarraywrapper.size_pollfd是8,表示pollwrapper中存放的一对channel描述符和事件响应共8位,0-3位保存channel描述符fdval,4-7位保存事件响应events。
第二个参数表明需要底层轮询的描述符fd个数,最后一个是超时时间,若是底层超时是会结束的。

还是回到doselect方法,在adjustthreadscount调整完轮询线程后,调用finishlock的reset方法
finishlock定义如下:

 1 private final class finishlock {
 2     private int threadstofinish;
 3     ioexception exception;
 4     
 5     private finishlock() {
 6         this.exception = null;
 7     }
 8     
 9     private void reset() {
10         this.threadstofinish = windowsselectorimpl.this.threads.size();
11     }
12     
13     private synchronized void threadfinished() {
14         if (this.threadstofinish == windowsselectorimpl.this.threads.size()) {
15             windowsselectorimpl.this.wakeup();
16         }
17     
18         --this.threadstofinish;
19         if (this.threadstofinish == 0) {
20             this.notify();
21         }
22     
23     }
24     ......
25 }

这个和startlock很相似,也是windowsselectorimpl持有,有且仅有一份,所有selectthread共享,reset方法用来记录在当前select方法执行时需要的轮询线程个数,在selectthread的run方法中执行完poll方法后,会执行threadfinished,首先this.threadstofinish == windowsselectorimpl.this.threads.size()的判断是为帮助唤醒所有处于poll阻塞的轮询。selectthread执行完毕,就需要让threadstofinish自减,至于notify的唤醒和后面有关系。

doselect中执行完finishlock的reset后,就需要调用startlock的startthreads唤醒所有轮询线程工作。接着调用begin方法:
begin方法在abstractselector中实现:

 1 private interruptible interruptor = null;
 2 
 3 protected final void begin() {
 4     if (interruptor == null) {
 5         interruptor = new interruptible() {
 6                 public void interrupt(thread ignore) {
 7                     abstractselector.this.wakeup();
 8                 }};
 9     }
10     abstractinterruptiblechannel.blockedon(interruptor);
11     thread me = thread.currentthread();
12     if (me.isinterrupted())
13         interruptor.interrupt(me);
14 }

若是中断器interruptor=null,就创建一个,当当前线程阻塞在i/o操作上并且发生了线程级别的中断时,就会调用wakeup方法唤醒selector。

doselect中begin完毕后,调用subselector的poll方*询;若是poll上有事件就绪,那么就不会阻塞,继续往下进行;若poll上没有事件就绪就会等待selectthread上的事件就绪,通过threadfinished将其唤醒;若是selectthread上也没有事件就绪就会一直阻塞,除非被外部唤醒,或者调用的是select的单参方法,会阻塞到超时结束。

接着判断是否有轮询线程的工作,调用waitforhelperthreads等待轮询线程的结束:

 1 private synchronized void waitforhelperthreads() {
 2     if (this.threadstofinish == windowsselectorimpl.this.threads.size() {
 3         windowsselectorimpl.this.wakeup();
 4     }
 5 
 6     while(this.threadstofinish != 0) {
 7         try {
 8             windowsselectorimpl.this.finishlock.wait();
 9         } catch (interruptedexception var2) {
10             thread.currentthread().interrupt();
11         }
12     }
13 
14 }

waitforhelperthreads方法就呼应了threadfinished方法,若是threadstofinish != 0说明还有轮询线程没有结束,就wait阻塞,一直等到threadstofinish == 0时再将其唤醒。

当所有轮询结束后,调用end方法:

1 protected final void end() {
2     abstractinterruptiblechannel.blockedon(null);
3 }

这个方法是处理发生中断,具体就不详细介绍了。

然后调用finishlock的checkforexception方法检查异常,这个没啥好说的,然后又调用processderegisterqueue来取消可能在select轮询时发生的selectionkeyl的撤销。

接着调用updateselectedkeys方法:

 1 private long updatecount = 0l;
 2 
 3 private int updateselectedkeys() {
 4     ++this.updatecount;
 5     byte var1 = 0;
 6     int var4 = var1 + this.subselector.processselectedkeys(this.updatecount);
 7 
 8     windowsselectorimpl.selectthread var3;
 9     for(iterator var2 = this.threads.iterator(); var2.hasnext(); var4 += var3.subselector.processselectedkeys(this.updatecount)) {
10         var3 = (windowsselectorimpl.selectthread)var2.next();
11     }
12 
13     return var4;
14 }

updatecount记录更新次数,即select调用次数;然后调用subselector的processselectedkeys方法,得到poll返回的就绪的channel描述符,也就是得到事件就绪的channel个数,同理也就需要得到所有selectthread中的。

其中processselectedkeys方法如下:

1 private int processselectedkeys(long var1) {
2     byte var3 = 0;
3     int var4 = var3 + this.processfdset(var1, this.readfds, net.pollin, false);
4     var4 += this.processfdset(var1, this.writefds, net.pollconn | net.pollout, false);
5     var4 += this.processfdset(var1, this.exceptfds, net.pollin | net.pollconn | net.pollout, true);
6     return var4;
7 }

分别对读、写、异常都处理了,主要还是调用processfdset方法:

 1 private int processfdset(long var1, int[] var3, int var4, boolean var5) {
 2     int var6 = 0;
 3 
 4     for(int var7 = 1; var7 <= var3[0]; ++var7) {
 5         int var8 = var3[var7];
 6         if (var8 == windowsselectorimpl.this.wakeupsourcefd) {
 7             synchronized(windowsselectorimpl.this.interruptlock) {
 8                 windowsselectorimpl.this.interrupttriggered = true;
 9             }
10         } else {
11             windowsselectorimpl.mapentry var9 = windowsselectorimpl.this.fdmap.get(var8);
12             if (var9 != null) {
13                 selectionkeyimpl var10 = var9.ski;
14                 if (!var5 || !(var10.channel() instanceof socketchannelimpl) || !windowsselectorimpl.this.discardurgentdata(var8)) {
15                     if (windowsselectorimpl.this.selectedkeys.contains(var10)) {
16                         if (var9.clearedcount != var1) {
17                             if (var10.channel.translateandsetreadyops(var4, var10) && var9.updatecount != var1) {
18                                 var9.updatecount = var1;
19                                 ++var6;
20                             }
21                         } else if (var10.channel.translateandupdatereadyops(var4, var10) && var9.updatecount != var1) {
22                             var9.updatecount = var1;
23                             ++var6;
24                         }
25 
26                         var9.clearedcount = var1;
27                     } else {
28                         if (var9.clearedcount != var1) {
29                             var10.channel.translateandsetreadyops(var4, var10);
30                             if ((var10.nioreadyops() & var10.niointerestops()) != 0) {
31                                 windowsselectorimpl.this.selectedkeys.add(var10);
32                                 var9.updatecount = var1;
33                                 ++var6;
34                             }
35                         } else {
36                             var10.channel.translateandupdatereadyops(var4, var10);
37                             if ((var10.nioreadyops() & var10.niointerestops()) != 0) {
38                                 windowsselectorimpl.this.selectedkeys.add(var10);
39                                 var9.updatecount = var1;
40                                 ++var6;
41                             }
42                         }
43 
44                         var9.clearedcount = var1;
45                     }
46                 }
47             }
48         }
49     }
50 
51     return var6;
52 }

这个方法其实就是把poll0方*询的描述符结果放入传入的数组中,然后通过遍历这个数组,得到相应的channel描述符,因为之前通过fdmap保存了channel的描述符和selectionkeyimpl的映射关系,那么就可以根据channel描述符找到对应的selectionkeyimpl对象,再根据传入的状态值var4来更新channel的状态,最后将其保存在selectedkeys集合*外部访问。


selector的select方法到此全部结束。