欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty中NioEventLoopGroup的创建源码分析

程序员文章站 2022-04-23 10:09:08
NioEventLoopGroup的无参构造: 调用了单参的构造: 继续看到双参构造: 在这里是使用JDK中NIO的原生API:SelectorProvider的provider,产生了一个SelectorProvider对象调用,继续调用三参构造。关于SelectorProvider在我前面的博客 ......

nioeventloopgroup的无参构造:

1 public nioeventloopgroup() {
2     this(0);
3 }

调用了单参的构造:

1 public nioeventloopgroup(int nthreads) {
2     this(nthreads, (executor)null);
3 }

继续看到双参构造:

1 public nioeventloopgroup(int nthreads, executor executor) {
2     this(nthreads, executor, selectorprovider.provider());
3 }

在这里是使用jdk中nio的原生api:selectorprovider的provider,产生了一个selectorprovider对象调用,继续调用三参构造。
关于selectorprovider在我前面的博客中有介绍过:【java】nio中selector的创建源码分析,在windows下默认创建了windowsselectorprovider对象。

继续看三参构造:

1 public nioeventloopgroup(int nthreads, threadfactory threadfactory, selectorprovider selectorprovider) {
2     this(nthreads, threadfactory, selectorprovider, defaultselectstrategyfactory.instance);
3 }

在这里创建了一个单例的defaultselectstrategyfactory 对象:

 1 public final class defaultselectstrategyfactory implements selectstrategyfactory {
 2     public static final selectstrategyfactory instance = new defaultselectstrategyfactory();
 3 
 4     private defaultselectstrategyfactory() {
 5     }
 6 
 7     public selectstrategy newselectstrategy() {
 8         return defaultselectstrategy.instance;
 9     }
10 }

defaultselectstrategyfactory实现的是selectstrategyfactory 接口:

1 public interface selectstrategyfactory {
2     selectstrategy newselectstrategy();
3 }

该接口提供一个用来产生select策略的方法,selectstrategy接口如下:

1 public interface selectstrategy {
2     int select = -1;
3     int continue = -2;
4 
5     int calculatestrategy(intsupplier var1, boolean var2) throws exception;
6 }

根据intsupplier 和一个boolean值为select策略提供了一个计算策略的方法。
在netty中只提供了defaultselectstrategy这一种默认实现:

 1 final class defaultselectstrategy implements selectstrategy {
 2     static final selectstrategy instance = new defaultselectstrategy();
 3 
 4     private defaultselectstrategy() {
 5     }
 6 
 7     public int calculatestrategy(intsupplier selectsupplier, boolean hastasks) throws exception {
 8         return hastasks ? selectsupplier.get() : -1;
 9     }
10 }


其中intsupplier :

1 public interface intsupplier {
2     int get() throws exception;
3 }

结合上面的来看,最终的选择策略主要是根据intsupplier的get值来得到的。

再回到构造:

1 public nioeventloopgroup(int nthreads, threadfactory threadfactory, selectorprovider selectorprovider, selectstrategyfactory selectstrategyfactory) {
2     super(nthreads, threadfactory, new object[]{selectorprovider, selectstrategyfactory, rejectedexecutionhandlers.reject()});
3 }

这里产生了一个拒绝策略:

 1 public static rejectedexecutionhandler reject() {
 2     return reject;
 3 }
 4 
 5 private static final rejectedexecutionhandler reject = new rejectedexecutionhandler() {
 6     public void rejected(runnable task, singlethreadeventexecutor executor) {
 7         throw new rejectedexecutionexception();
 8     }
 9 };
10 
11 public interface rejectedexecutionhandler {
12     void rejected(runnable var1, singlethreadeventexecutor var2);
13 }

将selectorprovider、selectstrategyfactory以及这个拒绝策略封装在一个object数组里,再调用了父类multithreadeventloopgroup的构造:

1 protected multithreadeventloopgroup(int nthreads, threadfactory threadfactory, object... args) {
2     super(nthreads == 0 ? default_event_loop_threads : nthreads, threadfactory, args);
3 }

在这里对nthreads的大小进行了调整:

1 private static final int default_event_loop_threads = math.max(1, systempropertyutil.getint("io.netty.eventloopthreads", nettyruntime.availableprocessors() * 2));

systempropertyutil.getint是根据key值"io.netty.eventloopthreads",获取系统配置值,在没用设置时使用nettyruntime.availableprocessors() * 2的值
nettyruntime的availableprocessors实现如下:

1 synchronized int availableprocessors() {
2     if (this.availableprocessors == 0) {
3         int availableprocessors = systempropertyutil.getint("io.netty.availableprocessors", runtime.getruntime().availableprocessors());
4         this.setavailableprocessors(availableprocessors);
5     }
6 
7     return this.availableprocessors;
8 }

还是一样,根据key值"io.netty.availableprocessors",获取系统配置值,在没用设置时使用runtime.getruntime().availableprocessors(),是用来获取处理器的个数。

这样保证了在默认情况下nthreads的大小是总是cpu个数的2倍。

继续回到构造,multithreadeventloopgroup继续调用父类multithreadeventexecutorgroup的构造:

1 protected multithreadeventexecutorgroup(int nthreads, executor executor, object... args) {
2     this(nthreads, executor, defaulteventexecutorchooserfactory.instance, args);
3 }

在这里又初始化了一个单例的defaulteventexecutorchooserfactory对象:

1 public static final defaulteventexecutorchooserfactory instance = new defaulteventexecutorchooserfactory();

defaulteventexecutorchooserfactory 实现的是eventexecutorchooserfactory接口:

1 public interface eventexecutorchooserfactory {
2     eventexecutorchooserfactory.eventexecutorchooser newchooser(eventexecutor[] var1);
3 
4     public interface eventexecutorchooser {
5         eventexecutor next();
6     }
7 }

defaulteventexecutorchooserfactory 的具体实现:

1 public eventexecutorchooser newchooser(eventexecutor[] executors) {
2     return (eventexecutorchooser)(ispoweroftwo(executors.length) ? new defaulteventexecutorchooserfactory.poweroftwoeventexecutorchooser(executors) : new defaulteventexecutorchooserfactory.genericeventexecutorchooser(executors));
3 }
4 
5 private static boolean ispoweroftwo(int val) {
6     return (val & -val) == val;
7 }

ispoweroftwo是用来检查executors的大小是否是二的整数次方,若是二的整数次方,产生poweroftwoeventexecutorchooser,反之产生genericeventexecutorchooser:

 1 private static final class genericeventexecutorchooser implements eventexecutorchooser {
 2     private final atomicinteger idx = new atomicinteger();
 3     private final eventexecutor[] executors;
 4 
 5     genericeventexecutorchooser(eventexecutor[] executors) {
 6         this.executors = executors;
 7     }
 8 
 9     public eventexecutor next() {
10         return this.executors[math.abs(this.idx.getandincrement() % this.executors.length)];
11     }
12 }
13 
14 private static final class poweroftwoeventexecutorchooser implements eventexecutorchooser {
15     private final atomicinteger idx = new atomicinteger();
16     private final eventexecutor[] executors;
17 
18     poweroftwoeventexecutorchooser(eventexecutor[] executors) {
19         this.executors = executors;
20     }
21 
22     public eventexecutor next() {
23         return this.executors[this.idx.getandincrement() & this.executors.length - 1];
24     }
25 }

这两种其实都是用了取模运算,只不过因为二的整数次方的特殊性而使用位运算。

回到构造,multithreadeventexecutorgroup继续调用本省的构造:

 1 private final eventexecutor[] children;
 2 private final set<eventexecutor> readonlychildren;
 3 private final atomicinteger terminatedchildren;
 4 private final promise<?> terminationfuture;
 5 private final eventexecutorchooser chooser;
 6 
 7 protected multithreadeventexecutorgroup(int nthreads, executor executor, eventexecutorchooserfactory chooserfactory, object... args) {
 8     this.terminatedchildren = new atomicinteger();
 9     this.terminationfuture = new defaultpromise(globaleventexecutor.instance);
10     if (nthreads <= 0) {
11         throw new illegalargumentexception(string.format("nthreads: %d (expected: > 0)", nthreads));
12     } else {
13         if (executor == null) {
14             executor = new threadpertaskexecutor(this.newdefaultthreadfactory());
15         }
16 
17         this.children = new eventexecutor[nthreads];
18 
19         int j;
20         for(int i = 0; i < nthreads; ++i) {
21             boolean success = false;
22             boolean var18 = false;
23 
24             try {
25                 var18 = true;
26                 this.children[i] = this.newchild((executor)executor, args);
27                 success = true;
28                 var18 = false;
29             } catch (exception var19) {
30                 throw new illegalstateexception("failed to create a child event loop", var19);
31             } finally {
32                 if (var18) {
33                     if (!success) {
34                         int j;
35                         for(j = 0; j < i; ++j) {
36                             this.children[j].shutdowngracefully();
37                         }
38 
39                         for(j = 0; j < i; ++j) {
40                             eventexecutor e = this.children[j];
41 
42                             try {
43                                 while(!e.isterminated()) {
44                                     e.awaittermination(2147483647l, timeunit.seconds);
45                                 }
46                             } catch (interruptedexception var20) {
47                                 thread.currentthread().interrupt();
48                                 break;
49                             }
50                         }
51                     }
52 
53                 }
54             }
55 
56             if (!success) {
57                 for(j = 0; j < i; ++j) {
58                     this.children[j].shutdowngracefully();
59                 }
60 
61                 for(j = 0; j < i; ++j) {
62                     eventexecutor e = this.children[j];
63 
64                     try {
65                         while(!e.isterminated()) {
66                             e.awaittermination(2147483647l, timeunit.seconds);
67                         }
68                     } catch (interruptedexception var22) {
69                         thread.currentthread().interrupt();
70                         break;
71                     }
72                 }
73             }
74         }
75 
76         this.chooser = chooserfactory.newchooser(this.children);
77         futurelistener<object> terminationlistener = new futurelistener<object>() {
78             public void operationcomplete(future<object> future) throws exception {
79                 if (multithreadeventexecutorgroup.this.terminatedchildren.incrementandget() == multithreadeventexecutorgroup.this.children.length) {
80                     multithreadeventexecutorgroup.this.terminationfuture.setsuccess((object)null);
81                 }
82 
83             }
84         };
85         eventexecutor[] var24 = this.children;
86         j = var24.length;
87 
88         for(int var26 = 0; var26 < j; ++var26) {
89             eventexecutor e = var24[var26];
90             e.terminationfuture().addlistener(terminationlistener);
91         }
92 
93         set<eventexecutor> childrenset = new linkedhashset(this.children.length);
94         collections.addall(childrenset, this.children);
95         this.readonlychildren = collections.unmodifiableset(childrenset);
96     }
97 }

首先是对terminatedchildren的初始化,没什么好说的,对terminationfuture的初始化使用defaultpromise,用来异步处理终止事件。executor初始化产生一个线程池。

接下来就是对children的操作,根据nthreads的大小,产生一个eventexecutor数组,然后遍历这个数组,调用newchild给每一个元素初始化。

newchild是在nioeventloopgroup中实现的:

1 protected eventloop newchild(executor executor, object... args) throws exception {
2     return new nioeventloop(this, executor, (selectorprovider)args[0], ((selectstrategyfactory)args[1]).newselectstrategy(), (rejectedexecutionhandler)args[2]);
3 }

在这里直接使用executor,和之前放在args数组中的selectorprovider、selectstrategyfactory(newselectstrategy方法产生defaultselectstrategy)和rejectedexecutionhandler产生了一个nioeventloop对象:

 1 private selector selector;
 2 private selector unwrappedselector;
 3 private selectedselectionkeyset selectedkeys;
 4 private final selectorprovider provider;
 5 private final atomicboolean wakenup = new atomicboolean();
 6 private final selectstrategy selectstrategy;
 7 
 8 nioeventloop(nioeventloopgroup parent, executor executor, selectorprovider selectorprovider, selectstrategy strategy, rejectedexecutionhandler rejectedexecutionhandler) {
 9    super(parent, executor, false, default_max_pending_tasks, rejectedexecutionhandler);
10     if (selectorprovider == null) {
11         throw new nullpointerexception("selectorprovider");
12     } else if (strategy == null) {
13         throw new nullpointerexception("selectstrategy");
14     } else {
15         this.provider = selectorprovider;
16         nioeventloop.selectortuple selectortuple = this.openselector();
17         this.selector = selectortuple.selector;
18         this.unwrappedselector = selectortuple.unwrappedselector;
19         this.selectstrategy = strategy;
20     }
21 }

nioeventloop首先在继承链上调用父类的构造,都是一些成员的赋值操作,简单看一看:

 1 protected singlethreadeventloop(eventloopgroup parent, executor executor, boolean addtaskwakesup, int maxpendingtasks, rejectedexecutionhandler rejectedexecutionhandler) {
 2     super(parent, executor, addtaskwakesup, maxpendingtasks, rejectedexecutionhandler);
 3     this.tailtasks = this.newtaskqueue(maxpendingtasks);
 4 }
 5 
 6 protected singlethreadeventexecutor(eventexecutorgroup parent, executor executor, boolean addtaskwakesup, int maxpendingtasks, rejectedexecutionhandler rejectedhandler) {
 7     super(parent);
 8     this.threadlock = new semaphore(0);
 9     this.shutdownhooks = new linkedhashset();
10     this.state = 1;
11     this.terminationfuture = new defaultpromise(globaleventexecutor.instance);
12     this.addtaskwakesup = addtaskwakesup;
13     this.maxpendingtasks = math.max(16, maxpendingtasks);
14     this.executor = (executor)objectutil.checknotnull(executor, "executor");
15     this.taskqueue = this.newtaskqueue(this.maxpendingtasks);
16     this.rejectedexecutionhandler = (rejectedexecutionhandler)objectutil.checknotnull(rejectedhandler, "rejectedhandler");
17 }
18 
19 protected abstractscheduledeventexecutor(eventexecutorgroup parent) {
20     super(parent);
21 }
22 
23 protected abstracteventexecutor(eventexecutorgroup parent) {
24     this.selfcollection = collections.singleton(this);
25     this.parent = parent;
26 }

在经过这继承链上的一系列调用后,给provider成员赋值selectorprovider,就是之前创建好的windowsselectorprovider,然后使用openselector方法,最终创建jdk原生的selector:

 1 private nioeventloop.selectortuple openselector() {
 2     final abstractselector unwrappedselector;
 3     try {
 4         unwrappedselector = this.provider.openselector();
 5     } catch (ioexception var7) {
 6         throw new channelexception("failed to open a new selector", var7);
 7     }
 8 
 9     if (disable_keyset_optimization) {
10         return new nioeventloop.selectortuple(unwrappedselector);
11     } else {
12         final selectedselectionkeyset selectedkeyset = new selectedselectionkeyset();
13         object maybeselectorimplclass = accesscontroller.doprivileged(new privilegedaction<object>() {
14             public object run() {
15                 try {
16                     return class.forname("sun.nio.ch.selectorimpl", false, platformdependent.getsystemclassloader());
17                 } catch (throwable var2) {
18                     return var2;
19                 }
20             }
21         });
22         if (maybeselectorimplclass instanceof class && ((class)maybeselectorimplclass).isassignablefrom(unwrappedselector.getclass())) {
23             final class<?> selectorimplclass = (class)maybeselectorimplclass;
24             object maybeexception = accesscontroller.doprivileged(new privilegedaction<object>() {
25                 public object run() {
26                     try {
27                         field selectedkeysfield = selectorimplclass.getdeclaredfield("selectedkeys");
28                         field publicselectedkeysfield = selectorimplclass.getdeclaredfield("publicselectedkeys");
29                         throwable cause = reflectionutil.trysetaccessible(selectedkeysfield, true);
30                         if (cause != null) {
31                             return cause;
32                         } else {
33                             cause = reflectionutil.trysetaccessible(publicselectedkeysfield, true);
34                             if (cause != null) {
35                                 return cause;
36                             } else {
37                                 selectedkeysfield.set(unwrappedselector, selectedkeyset);
38                                 publicselectedkeysfield.set(unwrappedselector, selectedkeyset);
39                                 return null;
40                             }
41                         }
42                     } catch (nosuchfieldexception var4) {
43                         return var4;
44                     } catch (illegalaccessexception var5) {
45                         return var5;
46                     }
47                 }
48             });
49             if (maybeexception instanceof exception) {
50                 this.selectedkeys = null;
51                 exception e = (exception)maybeexception;
52                 logger.trace("failed to instrument a special java.util.set into: {}", unwrappedselector, e);
53                 return new nioeventloop.selectortuple(unwrappedselector);
54             } else {
55                 this.selectedkeys = selectedkeyset;
56                 logger.trace("instrumented a special java.util.set into: {}", unwrappedselector);
57                 return new nioeventloop.selectortuple(unwrappedselector, new selectedselectionkeysetselector(unwrappedselector, selectedkeyset));
58             }
59         } else {
60             if (maybeselectorimplclass instanceof throwable) {
61                 throwable t = (throwable)maybeselectorimplclass;
62                 logger.trace("failed to instrument a special java.util.set into: {}", unwrappedselector, t);
63             }
64 
65             return new nioeventloop.selectortuple(unwrappedselector);
66         }
67     }
68 }

可以看到在一开始就使用provider的openselector方法,即windowsselectorprovider的openselector方法,创建了windowsselectorimpl对象(【java】nio中selector的创建源码分析 )

然后根据disable_keyset_optimization判断:

1 private static final boolean disable_keyset_optimization = systempropertyutil.getboolean("io.netty.nokeysetoptimization", false);

可以看到这个系统配置在没有设置默认是false,如果设置了则直接创建一个selectortuple对象返回:

 1 private static final class selectortuple {
 2     final selector unwrappedselector;
 3     final selector selector;
 4 
 5     selectortuple(selector unwrappedselector) {
 6         this.unwrappedselector = unwrappedselector;
 7         this.selector = unwrappedselector;
 8     }
 9 
10     selectortuple(selector unwrappedselector, selector selector) {
11         this.unwrappedselector = unwrappedselector;
12         this.selector = selector;
13     }
14 }

可以看到仅仅是将unwrappedselector和selector封装了,unwrappedselector对应的是jdk原生selector没有经过更改的,而selector对应的是经过更改修饰操作的。

在没有系统配置下,就对selector进行更改修饰操作:
首先创建selectedselectionkeyset对象,这个selectedselectionkeyset继承自abstractset:

1 final class selectedselectionkeyset extends abstractset<selectionkey> {
2     selectionkey[] keys = new selectionkey[1024];
3     int size;
4     
5     selectedselectionkeyset() {
6     }
7     ......
8 }

后面是通过反射机制,使得windowsselectorimpl的selectedkeys和publicselectedkeys成员直接赋值为selectedselectionkeyset对象。

windowsselectorimpl的这两个成员是在selectorimpl中定义的:

1 protected set<selectionkey> selectedkeys = new hashset();
2 private set<selectionkey> publicselectedkeys;

从这里就可以明白,在jdk原生的selector中,selectedkeys和publicselectedkeys这两个set的初始化大小都为0,而在这里仅仅就是使其初始化大小变为1024。
后面就是对一些异常的处理,没什么好说的。

openselector结束后,就可以分别对包装过的selector和未包装过的selector,即selector和unwrappedselector成员赋值,再由selectstrategy保存刚才产生的选择策略,用于selector的轮询。

回到multithreadeventexecutorgroup的构造,在调用newchild方法时即nioeventloop创建的过程中可能出现异常情况,就需要遍历children数组,将之前创建好的nioeventloop使用shutdowngracefully优雅地关闭掉:
shutdowngracefully在abstracteventexecutor中实现:

1 public future<?> shutdowngracefully() {
2     return this.shutdowngracefully(2l, 15l, timeunit.seconds);
3 }

这里设置了超时时间,继续调用singlethreadeventexecutor的shutdowngracefully方法:

 1 public future<?> shutdowngracefully(long quietperiod, long timeout, timeunit unit) {
 2     if (quietperiod < 0l) {
 3         throw new illegalargumentexception("quietperiod: " + quietperiod + " (expected >= 0)");
 4     } else if (timeout < quietperiod) {
 5         throw new illegalargumentexception("timeout: " + timeout + " (expected >= quietperiod (" + quietperiod + "))");
 6     } else if (unit == null) {
 7         throw new nullpointerexception("unit");
 8     } else if (this.isshuttingdown()) {
 9         return this.terminationfuture();
10     } else {
11         boolean ineventloop = this.ineventloop();
12 
13         while(!this.isshuttingdown()) {
14             boolean wakeup = true;
15             int oldstate = this.state;
16             int newstate;
17             if (ineventloop) {
18                 newstate = 3;
19             } else {
20                 switch(oldstate) {
21                 case 1:
22                 case 2:
23                     newstate = 3;
24                     break;
25                 default:
26                     newstate = oldstate;
27                     wakeup = false;
28                 }
29             }
30 
31             if (state_updater.compareandset(this, oldstate, newstate)) {
32                 this.gracefulshutdownquietperiod = unit.tonanos(quietperiod);
33                 this.gracefulshutdowntimeout = unit.tonanos(timeout);
34                 if (oldstate == 1) {
35                     try {
36                         this.dostartthread();
37                     } catch (throwable var10) {
38                         state_updater.set(this, 5);
39                         this.terminationfuture.tryfailure(var10);
40                         if (!(var10 instanceof exception)) {
41                             platformdependent.throwexception(var10);
42                         }
43 
44                         return this.terminationfuture;
45                     }
46                 }
47 
48                 if (wakeup) {
49                     this.wakeup(ineventloop);
50                 }
51 
52                 return this.terminationfuture();
53             }
54         }
55 
56         return this.terminationfuture();
57     }
58 }

前三个判断没什么好说的,isshuttingdown判断:

1 public boolean isshuttingdown() {
2     return this.state >= 3;
3 }

在之前nioeventloop创建的时候,调用了一系列的继承链,其中state是在singlethreadeventexecutor的构造方法中实现的,初始值是1,state有如下几种状态:

1 private static final int st_not_started = 1;
2 private static final int st_started = 2;
3 private static final int st_shutting_down = 3;
4 private static final int st_shutdown = 4;
5 private static final int st_terminated = 5;

可见在nioeventloop初始化后处于尚未启动状态,并没有channel的注册,也就不需要轮询。

isshuttingdown就必然是false,就进入了else块:
首先得到ineventloop的返回值,该方法在abstracteventexecutor中实现:

1 public boolean ineventloop() {
2     return this.ineventloop(thread.currentthread());
3 }

他传入了一个当前线程,接着调用ineventloop的重载,这个方法是在singlethreadeventexecutor中实现:

1 public boolean ineventloop(thread thread) {
2     return thread == this.thread;
3 }

通过观察之前的singlethreadeventexecutor构造方法,发现并没有对thread成员初始化,此时的this.thread就是null,那么返回值就是false,即ineventloop为false。

在while循环中又对isshuttingdown进行了判断,shutdowngracefully当让不仅仅使用在创建nioeventloop对象失败时才调用的,无论是在eventloopgroup的关闭,还是bootstrap的关闭,都会关闭绑定的nioeventloop,所以在多线程环境中,有可能会被其他线程关闭。

在while循环中,结合上面可知满足进入switch块,在switch块中令newstate为3;
然后调用state_updater的compareandset方法,state_updater是用来原子化更新state成员的:

1 private static final atomicintegerfieldupdater<singlethreadeventexecutor> state_updater = atomicintegerfieldupdater.newupdater(singlethreadeventexecutor.class, "state");

所以这里就是使用cas操作,原子化更新state成员为3,也就是使当前状态由st_not_started 变为了st_shutting_down 状态。

gracefulshutdownquietperiod和gracefulshutdowntimeout分别保存quietperiod和timeout的纳秒级颗粒度。

前面可知oldstate使1,调用dostartthread方法:

  1 private void dostartthread() {
  2     assert this.thread == null;
  3 
  4     this.executor.execute(new runnable() {
  5         public void run() {
  6             singlethreadeventexecutor.this.thread = thread.currentthread();
  7             if (singlethreadeventexecutor.this.interrupted) {
  8                 singlethreadeventexecutor.this.thread.interrupt();
  9             }
 10 
 11             boolean success = false;
 12             singlethreadeventexecutor.this.updatelastexecutiontime();
 13             boolean var112 = false;
 14 
 15             int oldstate;
 16             label1685: {
 17                 try {
 18                     var112 = true;
 19                     singlethreadeventexecutor.this.run();
 20                     success = true;
 21                     var112 = false;
 22                     break label1685;
 23                 } catch (throwable var119) {
 24                     singlethreadeventexecutor.logger.warn("unexpected exception from an event executor: ", var119);
 25                     var112 = false;
 26                 } finally {
 27                     if (var112) {
 28                         int oldstatex;
 29                         do {
 30                             oldstatex = singlethreadeventexecutor.this.state;
 31                         } while(oldstatex < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstatex, 3));
 32 
 33                         if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l) {
 34                             singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates.");
 35                         }
 36 
 37                         try {
 38                             while(!singlethreadeventexecutor.this.confirmshutdown()) {
 39                                 ;
 40                             }
 41                         } finally {
 42                             try {
 43                                 singlethreadeventexecutor.this.cleanup();
 44                             } finally {
 45                                 singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5);
 46                                 singlethreadeventexecutor.this.threadlock.release();
 47                                 if (!singlethreadeventexecutor.this.taskqueue.isempty()) {
 48                                     singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')');
 49                                 }
 50 
 51                                 singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null);
 52                             }
 53                         }
 54 
 55                     }
 56                 }
 57 
 58                 do {
 59                     oldstate = singlethreadeventexecutor.this.state;
 60                 } while(oldstate < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstate, 3));
 61 
 62                 if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l) {
 63                     singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates.");
 64                 }
 65 
 66                 try {
 67                     while(!singlethreadeventexecutor.this.confirmshutdown()) {
 68                         ;
 69                     }
 70 
 71                     return;
 72                 } finally {
 73                     try {
 74                         singlethreadeventexecutor.this.cleanup();
 75                     } finally {
 76                         singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5);
 77                         singlethreadeventexecutor.this.threadlock.release();
 78                         if (!singlethreadeventexecutor.this.taskqueue.isempty()) {
 79                             singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')');
 80                         }
 81 
 82                         singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null);
 83                     }
 84                 }
 85             }
 86 
 87             do {
 88                 oldstate = singlethreadeventexecutor.this.state;
 89             } while(oldstate < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstate, 3));
 90 
 91             if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l) {
 92                 singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates.");
 93             }
 94 
 95             try {
 96                 while(!singlethreadeventexecutor.this.confirmshutdown()) {
 97                     ;
 98                 }
 99             } finally {
100                 try {
101                     singlethreadeventexecutor.this.cleanup();
102                 } finally {
103                     singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5);
104                     singlethreadeventexecutor.this.threadlock.release();
105                     if (!singlethreadeventexecutor.this.taskqueue.isempty()) {
106                         singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')');
107                     }
108 
109                     singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null);
110                 }
111             }
112 
113         }
114     });
115 }

刚才说过this.thread并没有初始化,所以等于null成立,断言可以继续。

然后直接使executor运行了一个线程,这个executor其实就是在刚才的multithreadeventexecutorgroup中产生的threadpertaskexecutor对象。

在线程中,首先将singlethreadeventexecutor的thread成员初始化为当前线程。

在这里可能就有疑问了,为什么会在关闭时会调用名为dostartthread的方法,这个方法不因该在启动时调用吗?
其实dostartthread在启动时是会被调用的,当在启动时被调用的话,每一个nioeventloop都会被绑定一个线程用来处理真正的selector操作,根据之前的说法就可以知道,每个eventloopgroup在创建后都会被绑定cpu个数的二倍个nioeventloop,而每个nioeventloop都会绑定一个selector对象,上面又说了在启动时singlethreadeventexecutor绑定了一个线程,即nioeventloop绑定了一个线程来处理其绑定的selector的轮询。
至于关闭时还会启动selector的轮询,就是为了解决注册了的channel没有被处理的情况。

回到dostartthread方法,其实这个dostartthread方法的核心是singlethreadeventexecutor.this.run(),这个方法就是正真的selector的轮询操作,在nioeventloop中实现:

 1 protected void run() {
 2     while(true) {
 3         while(true) {
 4             try {
 5                 switch(this.selectstrategy.calculatestrategy(this.selectnowsupplier, this.hastasks())) {
 6                 case -2:
 7                     continue;
 8                 case -1:
 9                     this.select(this.wakenup.getandset(false));
10                     if (this.wakenup.get()) {
11                         this.selector.wakeup();
12                     }
13                 default:
14                     this.cancelledkeys = 0;
15                     this.needstoselectagain = false;
16                     int ioratio = this.ioratio;
17                     if (ioratio == 100) {
18                         try {
19                             this.processselectedkeys();
20                         } finally {
21                             this.runalltasks();
22                         }
23                     } else {
24                         long iostarttime = system.nanotime();
25                         boolean var13 = false;
26 
27                         try {
28                             var13 = true;
29                             this.processselectedkeys();
30                             var13 = false;
31                         } finally {
32                             if (var13) {
33                                 long iotime = system.nanotime() - iostarttime;
34                                 this.runalltasks(iotime * (long)(100 - ioratio) / (long)ioratio);
35                             }
36                         }
37 
38                         long iotime = system.nanotime() - iostarttime;
39                         this.runalltasks(iotime * (long)(100 - ioratio) / (long)ioratio);
40                     }
41                 }
42             } catch (throwable var21) {
43                 handleloopexception(var21);
44             }
45 
46             try {
47                 if (this.isshuttingdown()) {
48                     this.closeall();
49                     if (this.confirmshutdown()) {
50                         return;
51                     }
52                 }
53             } catch (throwable var18) {
54                 handleloopexception(var18);
55             }
56         }
57     }
58 }

进入switch块,首先调用之前准备好的选择策略,其中this.selectnowsupplier在nioeventloop创建的时候就被创建了:

1 private final intsupplier selectnowsupplier = new intsupplier() {
2     public int get() throws exception {
3         return nioeventloop.this.selectnow();
4     }
5 };

实际上调用了selectnow方法:

 1 int selectnow() throws ioexception {
 2     int var1;
 3     try {
 4         var1 = this.selector.selectnow();
 5     } finally {
 6         if (this.wakenup.get()) {
 7             this.selector.wakeup();
 8         }
 9 
10     }
11 
12     return var1;
13 }

这里就直接调用了jdk原生的selectnow方法。
之前说过的选择策略:

1 public int calculatestrategy(intsupplier selectsupplier, boolean hastasks) throws exception {
2     return hastasks ? selectsupplier.get() : -1;
3 }

其中hastasks是根据hastasks方法来判断,而hastasks方法就是判断任务队列是否为空,那么在一开始初始化,必然是空的,所以这里calculatestrategy的返回值就是-1;

那么case为-1条件成立,执行this.select(this.wakenup.getandset(false)),其中wakenup是一个原子化的boolean,用来表示是需要唤醒selector的轮询阻塞,初始化是为true,这里通过cas操作设置为false代表不需要唤醒,后面在select执行完后,又判断wakenup是否需要唤醒,说明在select中对selector的阻塞进行了检查,若是需要唤醒,就通过selector的原生api完成唤醒【java】nio中selector的select方法源码分析

来看看这里的select实现:

 1 private void select(boolean oldwakenup) throws ioexception {
 2     selector selector = this.selector;
 3 
 4     try {
 5         int selectcnt = 0;
 6         long currenttimenanos = system.nanotime();
 7         long selectdeadlinenanos = currenttimenanos + this.delaynanos(currenttimenanos);
 8 
 9         while(true) {
10             long timeoutmillis = (selectdeadlinenanos - currenttimenanos + 500000l) / 1000000l;
11             if (timeoutmillis <= 0l) {
12                 if (selectcnt == 0) {
13                     selector.selectnow();
14                     selectcnt = 1;
15                 }
16                 break;
17             }
18 
19             if (this.hastasks() && this.wakenup.compareandset(false, true)) {
20                 selector.selectnow();
21                 selectcnt = 1;
22                 break;
23             }
24 
25             int selectedkeys = selector.select(timeoutmillis);
26             ++selectcnt;
27             if (selectedkeys != 0 || oldwakenup || this.wakenup.get() || this.hastasks() || this.hasscheduledtasks()) {
28                 break;
29             }
30 
31             if (thread.interrupted()) {
32                 if (logger.isdebugenabled()) {
33                     logger.debug("selector.select() returned prematurely because thread.currentthread().interrupt() was called. use nioeventloop.shutdowngracefully() to shutdown the nioeventloop.");
34                 }
35 
36                 selectcnt = 1;
37                 break;
38             }
39 
40             long time = system.nanotime();
41             if (time - timeunit.milliseconds.tonanos(timeoutmillis) >= currenttimenanos) {
42                 selectcnt = 1;
43             } else if (selector_auto_rebuild_threshold > 0 && selectcnt >= selector_auto_rebuild_threshold) {
44                 logger.warn("selector.select() returned prematurely {} times in a row; rebuilding selector {}.", selectcnt, selector);
45                 this.rebuildselector();
46                 selector = this.selector;
47                 selector.selectnow();
48                 selectcnt = 1;
49                 break;
50             }
51 
52             currenttimenanos = time;
53         }
54 
55         if (selectcnt > 3 && logger.isdebugenabled()) {
56             logger.debug("selector.select() returned prematurely {} times in a row for selector {}.", selectcnt - 1, selector);
57         }
58     } catch (cancelledkeyexception var13) {
59         if (logger.isdebugenabled()) {
60             logger.debug(cancelledkeyexception.class.getsimplename() + " raised by a selector {} - jdk bug?", selector, var13);
61         }
62     }
63 
64 }

这个方法虽然看着很长,但核心就是判断这个存放任务的阻塞队列是否还有任务,若是有,就直接调用selector的selectnow方法获取就绪的文件描述符,若是没有就绪的文件描述符该方法也会立即返回;若是阻塞队列中没有任务,就调用selector的select(timeout)方法,尝试在超时时间内取获取就绪的文件描述符。

因为现在是在执行nioeventloopgroup的创建,并没有channel的注册,也就没有轮询到任何文件描述符就绪。
在轮询结束后,回到run方法,进入default块:
其中ioratio是执行io操作和执行任务队列的任务用时比率,默认是50。若是ioratio设置为100,就必须等到tasks阻塞队列中的所有任务执行完毕才再次进行轮询;若是小于100,那么就根据(100 - ioratio) / ioratio的比值乘以iotime计算出的超时时间让所有任务尝试在超时时间内执行完毕,若是到达超时时间还没执行完毕,就在下一轮的轮询中执行。

processselectedkeys方法就是获取selector轮询的selectedkeys结果:

1 private void processselectedkeys() {
2     if (this.selectedkeys != null) {
3         this.processselectedkeysoptimized();
4     } else {
5         this.processselectedkeysplain(this.selector.selectedkeys());
6     }
7 
8 }

selectedkeys 在openselector时被初始化过了,若是在openselector中出现异常selectedkeys才会为null。

processselectedkeysoptimized方法:

 1 private void processselectedkeysoptimized() {
 2     for(int i = 0; i < this.selectedkeys.size; ++i) {
 3         selectionkey k = this.selectedkeys.keys[i];
 4         this.selectedkeys.keys[i] = null;
 5         object a = k.attachment();
 6         if (a instanceof abstractniochannel) {
 7             this.processselectedkey(k, (abstractniochannel)a);
 8         } else {
 9             niotask<selectablechannel> task = (niotask)a;
10             processselectedkey(k, task);
11         }
12 
13         if (this.needstoselectagain) {
14             this.selectedkeys.reset(i + 1);
15             this.selectagain();
16             i = -1;
17         }
18     }
19 
20 }

这里就通过遍历在openselector中注入进selector的selectedkeys,得到selectionkey 对象。
在这里可以看到netty很巧妙地通过selectionkey的attachment附件,将jdk中的channel和netty中的channel联系了起来。
根据得到的附件channel的类型,执行不同的processselectedkey方法,去处理io操作。

processselectedkey(selectionkey k, abstractniochannel ch)方法:

 1 private void processselectedkey(selectionkey k, abstractniochannel ch) {
 2     niounsafe unsafe = ch.unsafe();
 3     if (!k.isvalid()) {
 4         nioeventloop eventloop;
 5         try {
 6             eventloop = ch.eventloop();
 7         } catch (throwable var6) {
 8             return;
 9         }
10 
11         if (eventloop == this && eventloop != null) {
12             unsafe.close(unsafe.voidpromise());
13         }
14     } else {
15         try {
16             int readyops = k.readyops();
17             if ((readyops & 8) != 0) {
18                 int ops = k.interestops();
19                 ops &= -9;
20                 k.interestops(ops);
21                 unsafe.finishconnect();
22             }
23 
24             if ((readyops & 4) != 0) {
25                 ch.unsafe().forceflush();
26             }
27 
28             if ((readyops & 17) != 0 || readyops == 0) {
29                 unsafe.read();
30             }
31         } catch (cancelledkeyexception var7) {
32             unsafe.close(unsafe.voidpromise());
33         }
34 
35     }
36 }

这里的主要核心就是根据selectedkey的readyops值来判断,处理不同的就绪事件,有如下几种事件:

1 public static final int op_read = 1 << 0;
2 public static final int op_write = 1 << 2;
3 public static final int op_connect = 1 << 3;
4 public static final int op_accept = 1 << 4;

结合来看上面的判断就对应:连接就绪、写就绪、侦听或者读就绪,交由netty的abstractniochannel的niounsafe去处理不同事件的byte数据,niounsafe会将数据再交由channelpipeline双向链表去处理。
关于channelpipeline会在后续的博客中详细介绍。

processselectedkey(selectionkey k, niotask<selectablechannel> task)这个方法的实现细节需要由使用者实现niotask<selectablechannel>接口,就不说了。

回到processselectedkeys方法,在this.selectedkeys等于null的情况下:

 1 private void processselectedkeysplain(set<selectionkey> selectedkeys) {
 2     if (!selectedkeys.isempty()) {
 3         iterator i = selectedkeys.iterator();
 4 
 5         while(true) {
 6             selectionkey k = (selectionkey)i.next();
 7             object a = k.attachment();
 8             i.remove();
 9             if (a instanceof abstractniochannel) {
10                 this.processselectedkey(k, (abstractniochannel)a);
11             } else {
12                 niotask<selectablechannel> task = (niotask)a;
13                 processselectedkey(k, task);
14             }
15 
16             if (!i.hasnext()) {
17                 break;
18             }
19 
20             if (this.needstoselectagain) {
21                 this.selectagain();
22                 selectedkeys = this.selector.selectedkeys();
23                 if (selectedkeys.isempty()) {
24                     break;
25                 }
26 
27                 i = selectedkeys.iterator();
28             }
29         }
30 
31     }
32 }

这是在openselector中注入进selector的selectedkeys失败的情况下,直接遍历selector本身的selectedkeys,和processselectedkeysoptimized没有差别。

继续回到run方法,在调用完processselectedkeys方法后,就需要调用runalltasks处理任务队列中的任务:
runalltasks()方法:

 1 protected boolean runalltasks() {
 2     assert this.ineventloop();
 3 
 4     boolean ranatleastone = false;
 5 
 6     boolean fetchedall;
 7     do {
 8         fetchedall = this.fetchfromscheduledtaskqueue();
 9         if (this.runalltasksfrom(this.taskqueue)) {
10             ranatleastone = true;
11         }
12     } while(!fetchedall);
13 
14     if (ranatleastone) {
15         this.lastexecutiontime = scheduledfuturetask.nanotime();
16     }
17 
18     this.afterrunningalltasks();
19     return ranatleastone;
20 }

首先调用fetchfromscheduledtaskqueue方法:

 1 private boolean fetchfromscheduledtaskqueue() {
 2     long nanotime = abstractscheduledeventexecutor.nanotime();
 3 
 4     for(runnable scheduledtask = this.pollscheduledtask(nanotime); scheduledtask != null; scheduledtask = this.pollscheduledtask(nanotime)) {
 5         if (!this.taskqueue.offer(scheduledtask)) {
 6             this.scheduledtaskqueue().add((scheduledfuturetask)scheduledtask);
 7             return false;
 8         }
 9     }
10 
11     return true;
12 }

这里就是通过pollscheduledtask不断地从延时任务队列获取到期的任务,将到期的任务添加到taskqueue任务队列中,为上面的runalltasksfrom执行做准备;若是添加失败,再把它放进延时任务队列。

pollscheduledtask方法:

 1 protected final runnable pollscheduledtask(long nanotime) {
 2     assert this.ineventloop();
 3 
 4     queue<scheduledfuturetask<?>> scheduledtaskqueue = this.scheduledtaskqueue;
 5     scheduledfuturetask<?> scheduledtask = scheduledtaskqueue == null ? null : (scheduledfuturetask)scheduledtaskqueue.peek();
 6     if (scheduledtask == null) {
 7         return null;
 8     } else if (scheduledtask.deadlinenanos() <= nanotime) {
 9         scheduledtaskqueue.remove();
10         return scheduledtask;
11     } else {
12         return null;
13     }
14 }

从延时任务队列中获取队首的任务scheduledtask,若是scheduledtask的deadlinenanos小于等于nanotime,说明该任务到期。

回到runalltasks,将到期了的延时任务放在了任务队列,由runalltasksfrom执行这些任务:

 1 protected final boolean runalltasksfrom(queue<runnable> taskqueue) {
 2     runnable task = polltaskfrom(taskqueue);
 3     if (task == null) {
 4         return false;
 5     } else {
 6         do {
 7             safeexecute(task);
 8             task = polltaskfrom(taskqueue);
 9         } while(task != null);
10 
11         return true;
12     }
13 }

不断地从任务队列队首获取任务,然后执行,直到没有任务。

polltaskfrom是获取队首任务:

1 protected static runnable polltaskfrom(queue<runnable> taskqueue) {
2     runnable task;
3     do {
4         task = (runnable)taskqueue.poll();
5     } while(task == wakeup_task);
6 
7     return task;
8 }

其中wakeup_task,是用来巧妙地控制循环:

1 private static final runnable wakeup_task = new runnable() {
2     public void run() {
3     }
4 };


safeexecute是执行任务:

1 protected static void safeexecute(runnable task) {
2     try {
3         task.run();
4     } catch (throwable var2) {
5         logger.warn("a task raised an exception. task: {}", task, var2);
6     }
7 
8 }

实际上就是执行runnable 的run方法。

继续回到runalltasks方法,当所有到期任务执行完毕后,根据ranatleastone判断是否需要修改最后一次执行时间lastexecutiontime,最后调用afterrunningalltasks方法,该方法是在singlethreadeventloop中实现的:

1 protected void afterrunningalltasks() {
2     this.runalltasksfrom(this.tailtasks);
3 }

这里就仅仅执行了tailtasks队列中的任务。runalltasks到这里执行完毕。

再来看看runalltasks(timeoutnanos)方法:

 1 protected boolean runalltasks(long timeoutnanos) {
 2     this.fetchfromscheduledtaskqueue();
 3     runnable task = this.polltask();
 4     if (task == null) {
 5         this.afterrunningalltasks();
 6         return false;
 7     } else {
 8         long deadline = scheduledfuturetask.nanotime() + timeoutnanos;
 9         long runtasks = 0l;
10 
11         long lastexecutiontime;
12         while(true) {
13             safeexecute(task);
14             ++runtasks;
15             if ((runtasks & 63l) == 0l) {
16                 lastexecutiontime = scheduledfuturetask.nanotime();
17                 if (lastexecutiontime >= deadline) {
18                     break;
19                 }
20             }
21 
22             task = this.polltask();
23             if (task == null) {
24                 lastexecutiontime = scheduledfuturetask.nanotime();
25                 break;
26             }
27         }
28 
29         this.afterrunningalltasks();
30         this.lastexecutiontime = lastexecutiontime;
31         return true;
32     }
33 }

这个方法前面的runalltasks方法类似,先通过fetchfromscheduledtaskqueue将所有到期了的延时任务放在taskqueue中,然后不断从taskqueue队首获取任务,但是,若是执行到了到超过了63个任务,判断是否达到了超时时间deadline,若是达到结束循环,留着下次执行,反之继续循环执行任务。

 

回到run方法,在轮询完毕,并且执行完任务后,通过isshuttingdown判断当前状态,在之前的cas操作中,state已经变为了3,所以isshuttingdown成立,就需要调用closeall方法

 1 private void closeall() {
 2     this.selectagain();
 3     set<selectionkey> keys = this.selector.keys();
 4     collection<abstractniochannel> channels = new arraylist(keys.size());
 5     iterator var3 = keys.iterator();
 6 
 7     while(var3.hasnext()) {
 8         selectionkey k = (selectionkey)var3.next();
 9         object a = k.attachment();
10         if (a instanceof abstractniochannel) {
11             channels.add((abstractniochannel)a);
12         } else {
13             k.cancel();
14             niotask<selectablechannel> task = (niotask)a;
15             invokechannelunregistered(task, k, (throwable)null);
16         }
17     }
18 
19     var3 = channels.iterator();
20 
21     while(var3.hasnext()) {
22         abstractniochannel ch = (abstractniochannel)var3.next();
23         ch.unsafe().close(ch.unsafe().voidpromise());
24     }
25 
26 }

在这里首先调用selectagain进行一次轮询:

 1 private void selectagain() {
 2     this.needstoselectagain = false;
 3 
 4     try {
 5         this.selector.selectnow();
 6     } catch (throwable var2) {
 7         logger.warn("failed to update selectionkeys.", var2);
 8     }
 9 
10 }

通过这次的轮询,将当前仍有事件就绪的jdk的selectionkey中绑定的netty的channel添加到channels集合中,遍历这个集合,通过unsafe的close方法关闭netty的channel。

之后调用confirmshutdown方法:

 1 protected boolean confirmshutdown() {
 2     if (!this.isshuttingdown()) {
 3         return false;
 4     } else if (!this.ineventloop()) {
 5         throw new illegalstateexception("must be invoked from an event loop");
 6     } else {
 7         this.cancelscheduledtasks();
 8         if (this.gracefulshutdownstarttime == 0l) {
 9             this.gracefulshutdownstarttime = scheduledfuturetask.nanotime();
10         }
11 
12         if (!this.runalltasks() && !this.runshutdownhooks()) {
13             long nanotime = scheduledfuturetask.nanotime();
14             if (!this.isshutdown() && nanotime - this.gracefulshutdownstarttime <= this.gracefulshutdowntimeout) {
15                 if (nanotime - this.lastexecutiontime <= this.gracefulshutdownquietperiod) {
16                     this.wakeup(true);
17 
18                     try {
19                         thread.sleep(100l);
20                     } catch (interruptedexception var4) {
21                         ;
22                     }
23 
24                     return false;
25                 } else {
26                     return true;
27                 }
28             } else {
29                 return true;
30             }
31         } else if (this.isshutdown()) {
32             return true;
33         } else if (this.gracefulshutdownquietperiod == 0l) {
34             return true;
35         } else {
36             this.wakeup(true);
37             return false;
38         }
39     }
40 }

首先调用cancelscheduledtasks,取消所有的延时任务:

 1 protected void cancelscheduledtasks() {
 2     assert this.ineventloop();
 3 
 4     priorityqueue<scheduledfuturetask<?>> scheduledtaskqueue = this.scheduledtaskqueue;
 5     if (!isnullorempty(scheduledtaskqueue)) {
 6         scheduledfuturetask<?>[] scheduledtasks = (scheduledfuturetask[])scheduledtaskqueue.toarray(new scheduledfuturetask[scheduledtaskqueue.size()]);
 7         scheduledfuturetask[] var3 = scheduledtasks;
 8         int var4 = scheduledtasks.length;
 9 
10         for(int var5 = 0; var5 < var4; ++var5) {
11             scheduledfuturetask<?> task = var3[var5];
12             task.cancelwithoutremove(false);
13         }
14 
15         scheduledtaskqueue.clearignoringindexes();
16     }
17 }

遍历scheduledtasks这个延时任务对立中所有的任务,通过cancelwithoutremove将该任务取消。

至此轮询的整个生命周期完成。

回到singlethreadeventexecutor的dostartthread方法,在run方法执行完毕后,说明selector轮询结束,调用singlethreadeventexecutor.this.cleanup()方法关闭selector:

1 protected void cleanup() {
2     try {
3         this.selector.close();
4     } catch (ioexception var2) {
5         logger.warn("failed to close a selector.", var2);
6     }
7 
8 }

这次终于可以回到multithreadeventexecutorgroup的构造,在children创建完毕后,用chooserfactory根据children的大小创建chooser,前面说过。

然后产生terminationlistener异步中断监听对象,给每个nioeventloop设置中断监听,然后对children进行了备份处理,通过readonlychildren保存。


至此nioeventloopgroup的创建全部结束。