Netty中NioEventLoopGroup的创建源码分析
nioeventloopgroup的无参构造:
1 public nioeventloopgroup() { 2 this(0); 3 }
调用了单参的构造:
1 public nioeventloopgroup(int nthreads) { 2 this(nthreads, (executor)null); 3 }
继续看到双参构造:
1 public nioeventloopgroup(int nthreads, executor executor) { 2 this(nthreads, executor, selectorprovider.provider()); 3 }
在这里是使用jdk中nio的原生api:selectorprovider的provider,产生了一个selectorprovider对象调用,继续调用三参构造。
关于selectorprovider在我前面的博客中有介绍过:【java】nio中selector的创建源码分析,在windows下默认创建了windowsselectorprovider对象。
继续看三参构造:
1 public nioeventloopgroup(int nthreads, threadfactory threadfactory, selectorprovider selectorprovider) { 2 this(nthreads, threadfactory, selectorprovider, defaultselectstrategyfactory.instance); 3 }
在这里创建了一个单例的defaultselectstrategyfactory 对象:
1 public final class defaultselectstrategyfactory implements selectstrategyfactory { 2 public static final selectstrategyfactory instance = new defaultselectstrategyfactory(); 3 4 private defaultselectstrategyfactory() { 5 } 6 7 public selectstrategy newselectstrategy() { 8 return defaultselectstrategy.instance; 9 } 10 }
defaultselectstrategyfactory实现的是selectstrategyfactory 接口:
1 public interface selectstrategyfactory { 2 selectstrategy newselectstrategy(); 3 }
该接口提供一个用来产生select策略的方法,selectstrategy接口如下:
1 public interface selectstrategy { 2 int select = -1; 3 int continue = -2; 4 5 int calculatestrategy(intsupplier var1, boolean var2) throws exception; 6 }
根据intsupplier 和一个boolean值为select策略提供了一个计算策略的方法。
在netty中只提供了defaultselectstrategy这一种默认实现:
1 final class defaultselectstrategy implements selectstrategy { 2 static final selectstrategy instance = new defaultselectstrategy(); 3 4 private defaultselectstrategy() { 5 } 6 7 public int calculatestrategy(intsupplier selectsupplier, boolean hastasks) throws exception { 8 return hastasks ? selectsupplier.get() : -1; 9 } 10 }
其中intsupplier :
1 public interface intsupplier { 2 int get() throws exception; 3 }
结合上面的来看,最终的选择策略主要是根据intsupplier的get值来得到的。
再回到构造:
1 public nioeventloopgroup(int nthreads, threadfactory threadfactory, selectorprovider selectorprovider, selectstrategyfactory selectstrategyfactory) { 2 super(nthreads, threadfactory, new object[]{selectorprovider, selectstrategyfactory, rejectedexecutionhandlers.reject()}); 3 }
这里产生了一个拒绝策略:
1 public static rejectedexecutionhandler reject() { 2 return reject; 3 } 4 5 private static final rejectedexecutionhandler reject = new rejectedexecutionhandler() { 6 public void rejected(runnable task, singlethreadeventexecutor executor) { 7 throw new rejectedexecutionexception(); 8 } 9 }; 10 11 public interface rejectedexecutionhandler { 12 void rejected(runnable var1, singlethreadeventexecutor var2); 13 }
将selectorprovider、selectstrategyfactory以及这个拒绝策略封装在一个object数组里,再调用了父类multithreadeventloopgroup的构造:
1 protected multithreadeventloopgroup(int nthreads, threadfactory threadfactory, object... args) { 2 super(nthreads == 0 ? default_event_loop_threads : nthreads, threadfactory, args); 3 }
在这里对nthreads的大小进行了调整:
1 private static final int default_event_loop_threads = math.max(1, systempropertyutil.getint("io.netty.eventloopthreads", nettyruntime.availableprocessors() * 2));
systempropertyutil.getint是根据key值"io.netty.eventloopthreads",获取系统配置值,在没用设置时使用nettyruntime.availableprocessors() * 2的值
nettyruntime的availableprocessors实现如下:
1 synchronized int availableprocessors() { 2 if (this.availableprocessors == 0) { 3 int availableprocessors = systempropertyutil.getint("io.netty.availableprocessors", runtime.getruntime().availableprocessors()); 4 this.setavailableprocessors(availableprocessors); 5 } 6 7 return this.availableprocessors; 8 }
还是一样,根据key值"io.netty.availableprocessors",获取系统配置值,在没用设置时使用runtime.getruntime().availableprocessors(),是用来获取处理器的个数。
这样保证了在默认情况下nthreads的大小是总是cpu个数的2倍。
继续回到构造,multithreadeventloopgroup继续调用父类multithreadeventexecutorgroup的构造:
1 protected multithreadeventexecutorgroup(int nthreads, executor executor, object... args) { 2 this(nthreads, executor, defaulteventexecutorchooserfactory.instance, args); 3 }
在这里又初始化了一个单例的defaulteventexecutorchooserfactory对象:
1 public static final defaulteventexecutorchooserfactory instance = new defaulteventexecutorchooserfactory();
defaulteventexecutorchooserfactory 实现的是eventexecutorchooserfactory接口:
1 public interface eventexecutorchooserfactory { 2 eventexecutorchooserfactory.eventexecutorchooser newchooser(eventexecutor[] var1); 3 4 public interface eventexecutorchooser { 5 eventexecutor next(); 6 } 7 }
defaulteventexecutorchooserfactory 的具体实现:
1 public eventexecutorchooser newchooser(eventexecutor[] executors) { 2 return (eventexecutorchooser)(ispoweroftwo(executors.length) ? new defaulteventexecutorchooserfactory.poweroftwoeventexecutorchooser(executors) : new defaulteventexecutorchooserfactory.genericeventexecutorchooser(executors)); 3 } 4 5 private static boolean ispoweroftwo(int val) { 6 return (val & -val) == val; 7 }
ispoweroftwo是用来检查executors的大小是否是二的整数次方,若是二的整数次方,产生poweroftwoeventexecutorchooser,反之产生genericeventexecutorchooser:
1 private static final class genericeventexecutorchooser implements eventexecutorchooser { 2 private final atomicinteger idx = new atomicinteger(); 3 private final eventexecutor[] executors; 4 5 genericeventexecutorchooser(eventexecutor[] executors) { 6 this.executors = executors; 7 } 8 9 public eventexecutor next() { 10 return this.executors[math.abs(this.idx.getandincrement() % this.executors.length)]; 11 } 12 } 13 14 private static final class poweroftwoeventexecutorchooser implements eventexecutorchooser { 15 private final atomicinteger idx = new atomicinteger(); 16 private final eventexecutor[] executors; 17 18 poweroftwoeventexecutorchooser(eventexecutor[] executors) { 19 this.executors = executors; 20 } 21 22 public eventexecutor next() { 23 return this.executors[this.idx.getandincrement() & this.executors.length - 1]; 24 } 25 }
这两种其实都是用了取模运算,只不过因为二的整数次方的特殊性而使用位运算。
回到构造,multithreadeventexecutorgroup继续调用本省的构造:
1 private final eventexecutor[] children; 2 private final set<eventexecutor> readonlychildren; 3 private final atomicinteger terminatedchildren; 4 private final promise<?> terminationfuture; 5 private final eventexecutorchooser chooser; 6 7 protected multithreadeventexecutorgroup(int nthreads, executor executor, eventexecutorchooserfactory chooserfactory, object... args) { 8 this.terminatedchildren = new atomicinteger(); 9 this.terminationfuture = new defaultpromise(globaleventexecutor.instance); 10 if (nthreads <= 0) { 11 throw new illegalargumentexception(string.format("nthreads: %d (expected: > 0)", nthreads)); 12 } else { 13 if (executor == null) { 14 executor = new threadpertaskexecutor(this.newdefaultthreadfactory()); 15 } 16 17 this.children = new eventexecutor[nthreads]; 18 19 int j; 20 for(int i = 0; i < nthreads; ++i) { 21 boolean success = false; 22 boolean var18 = false; 23 24 try { 25 var18 = true; 26 this.children[i] = this.newchild((executor)executor, args); 27 success = true; 28 var18 = false; 29 } catch (exception var19) { 30 throw new illegalstateexception("failed to create a child event loop", var19); 31 } finally { 32 if (var18) { 33 if (!success) { 34 int j; 35 for(j = 0; j < i; ++j) { 36 this.children[j].shutdowngracefully(); 37 } 38 39 for(j = 0; j < i; ++j) { 40 eventexecutor e = this.children[j]; 41 42 try { 43 while(!e.isterminated()) { 44 e.awaittermination(2147483647l, timeunit.seconds); 45 } 46 } catch (interruptedexception var20) { 47 thread.currentthread().interrupt(); 48 break; 49 } 50 } 51 } 52 53 } 54 } 55 56 if (!success) { 57 for(j = 0; j < i; ++j) { 58 this.children[j].shutdowngracefully(); 59 } 60 61 for(j = 0; j < i; ++j) { 62 eventexecutor e = this.children[j]; 63 64 try { 65 while(!e.isterminated()) { 66 e.awaittermination(2147483647l, timeunit.seconds); 67 } 68 } catch (interruptedexception var22) { 69 thread.currentthread().interrupt(); 70 break; 71 } 72 } 73 } 74 } 75 76 this.chooser = chooserfactory.newchooser(this.children); 77 futurelistener<object> terminationlistener = new futurelistener<object>() { 78 public void operationcomplete(future<object> future) throws exception { 79 if (multithreadeventexecutorgroup.this.terminatedchildren.incrementandget() == multithreadeventexecutorgroup.this.children.length) { 80 multithreadeventexecutorgroup.this.terminationfuture.setsuccess((object)null); 81 } 82 83 } 84 }; 85 eventexecutor[] var24 = this.children; 86 j = var24.length; 87 88 for(int var26 = 0; var26 < j; ++var26) { 89 eventexecutor e = var24[var26]; 90 e.terminationfuture().addlistener(terminationlistener); 91 } 92 93 set<eventexecutor> childrenset = new linkedhashset(this.children.length); 94 collections.addall(childrenset, this.children); 95 this.readonlychildren = collections.unmodifiableset(childrenset); 96 } 97 }
首先是对terminatedchildren的初始化,没什么好说的,对terminationfuture的初始化使用defaultpromise,用来异步处理终止事件。executor初始化产生一个线程池。
接下来就是对children的操作,根据nthreads的大小,产生一个eventexecutor数组,然后遍历这个数组,调用newchild给每一个元素初始化。
newchild是在nioeventloopgroup中实现的:
1 protected eventloop newchild(executor executor, object... args) throws exception { 2 return new nioeventloop(this, executor, (selectorprovider)args[0], ((selectstrategyfactory)args[1]).newselectstrategy(), (rejectedexecutionhandler)args[2]); 3 }
在这里直接使用executor,和之前放在args数组中的selectorprovider、selectstrategyfactory(newselectstrategy方法产生defaultselectstrategy)和rejectedexecutionhandler产生了一个nioeventloop对象:
1 private selector selector; 2 private selector unwrappedselector; 3 private selectedselectionkeyset selectedkeys; 4 private final selectorprovider provider; 5 private final atomicboolean wakenup = new atomicboolean(); 6 private final selectstrategy selectstrategy; 7 8 nioeventloop(nioeventloopgroup parent, executor executor, selectorprovider selectorprovider, selectstrategy strategy, rejectedexecutionhandler rejectedexecutionhandler) { 9 super(parent, executor, false, default_max_pending_tasks, rejectedexecutionhandler); 10 if (selectorprovider == null) { 11 throw new nullpointerexception("selectorprovider"); 12 } else if (strategy == null) { 13 throw new nullpointerexception("selectstrategy"); 14 } else { 15 this.provider = selectorprovider; 16 nioeventloop.selectortuple selectortuple = this.openselector(); 17 this.selector = selectortuple.selector; 18 this.unwrappedselector = selectortuple.unwrappedselector; 19 this.selectstrategy = strategy; 20 } 21 }
nioeventloop首先在继承链上调用父类的构造,都是一些成员的赋值操作,简单看一看:
1 protected singlethreadeventloop(eventloopgroup parent, executor executor, boolean addtaskwakesup, int maxpendingtasks, rejectedexecutionhandler rejectedexecutionhandler) { 2 super(parent, executor, addtaskwakesup, maxpendingtasks, rejectedexecutionhandler); 3 this.tailtasks = this.newtaskqueue(maxpendingtasks); 4 } 5 6 protected singlethreadeventexecutor(eventexecutorgroup parent, executor executor, boolean addtaskwakesup, int maxpendingtasks, rejectedexecutionhandler rejectedhandler) { 7 super(parent); 8 this.threadlock = new semaphore(0); 9 this.shutdownhooks = new linkedhashset(); 10 this.state = 1; 11 this.terminationfuture = new defaultpromise(globaleventexecutor.instance); 12 this.addtaskwakesup = addtaskwakesup; 13 this.maxpendingtasks = math.max(16, maxpendingtasks); 14 this.executor = (executor)objectutil.checknotnull(executor, "executor"); 15 this.taskqueue = this.newtaskqueue(this.maxpendingtasks); 16 this.rejectedexecutionhandler = (rejectedexecutionhandler)objectutil.checknotnull(rejectedhandler, "rejectedhandler"); 17 } 18 19 protected abstractscheduledeventexecutor(eventexecutorgroup parent) { 20 super(parent); 21 } 22 23 protected abstracteventexecutor(eventexecutorgroup parent) { 24 this.selfcollection = collections.singleton(this); 25 this.parent = parent; 26 }
在经过这继承链上的一系列调用后,给provider成员赋值selectorprovider,就是之前创建好的windowsselectorprovider,然后使用openselector方法,最终创建jdk原生的selector:
1 private nioeventloop.selectortuple openselector() { 2 final abstractselector unwrappedselector; 3 try { 4 unwrappedselector = this.provider.openselector(); 5 } catch (ioexception var7) { 6 throw new channelexception("failed to open a new selector", var7); 7 } 8 9 if (disable_keyset_optimization) { 10 return new nioeventloop.selectortuple(unwrappedselector); 11 } else { 12 final selectedselectionkeyset selectedkeyset = new selectedselectionkeyset(); 13 object maybeselectorimplclass = accesscontroller.doprivileged(new privilegedaction<object>() { 14 public object run() { 15 try { 16 return class.forname("sun.nio.ch.selectorimpl", false, platformdependent.getsystemclassloader()); 17 } catch (throwable var2) { 18 return var2; 19 } 20 } 21 }); 22 if (maybeselectorimplclass instanceof class && ((class)maybeselectorimplclass).isassignablefrom(unwrappedselector.getclass())) { 23 final class<?> selectorimplclass = (class)maybeselectorimplclass; 24 object maybeexception = accesscontroller.doprivileged(new privilegedaction<object>() { 25 public object run() { 26 try { 27 field selectedkeysfield = selectorimplclass.getdeclaredfield("selectedkeys"); 28 field publicselectedkeysfield = selectorimplclass.getdeclaredfield("publicselectedkeys"); 29 throwable cause = reflectionutil.trysetaccessible(selectedkeysfield, true); 30 if (cause != null) { 31 return cause; 32 } else { 33 cause = reflectionutil.trysetaccessible(publicselectedkeysfield, true); 34 if (cause != null) { 35 return cause; 36 } else { 37 selectedkeysfield.set(unwrappedselector, selectedkeyset); 38 publicselectedkeysfield.set(unwrappedselector, selectedkeyset); 39 return null; 40 } 41 } 42 } catch (nosuchfieldexception var4) { 43 return var4; 44 } catch (illegalaccessexception var5) { 45 return var5; 46 } 47 } 48 }); 49 if (maybeexception instanceof exception) { 50 this.selectedkeys = null; 51 exception e = (exception)maybeexception; 52 logger.trace("failed to instrument a special java.util.set into: {}", unwrappedselector, e); 53 return new nioeventloop.selectortuple(unwrappedselector); 54 } else { 55 this.selectedkeys = selectedkeyset; 56 logger.trace("instrumented a special java.util.set into: {}", unwrappedselector); 57 return new nioeventloop.selectortuple(unwrappedselector, new selectedselectionkeysetselector(unwrappedselector, selectedkeyset)); 58 } 59 } else { 60 if (maybeselectorimplclass instanceof throwable) { 61 throwable t = (throwable)maybeselectorimplclass; 62 logger.trace("failed to instrument a special java.util.set into: {}", unwrappedselector, t); 63 } 64 65 return new nioeventloop.selectortuple(unwrappedselector); 66 } 67 } 68 }
可以看到在一开始就使用provider的openselector方法,即windowsselectorprovider的openselector方法,创建了windowsselectorimpl对象(【java】nio中selector的创建源码分析 )
然后根据disable_keyset_optimization判断:
1 private static final boolean disable_keyset_optimization = systempropertyutil.getboolean("io.netty.nokeysetoptimization", false);
可以看到这个系统配置在没有设置默认是false,如果设置了则直接创建一个selectortuple对象返回:
1 private static final class selectortuple { 2 final selector unwrappedselector; 3 final selector selector; 4 5 selectortuple(selector unwrappedselector) { 6 this.unwrappedselector = unwrappedselector; 7 this.selector = unwrappedselector; 8 } 9 10 selectortuple(selector unwrappedselector, selector selector) { 11 this.unwrappedselector = unwrappedselector; 12 this.selector = selector; 13 } 14 }
可以看到仅仅是将unwrappedselector和selector封装了,unwrappedselector对应的是jdk原生selector没有经过更改的,而selector对应的是经过更改修饰操作的。
在没有系统配置下,就对selector进行更改修饰操作:
首先创建selectedselectionkeyset对象,这个selectedselectionkeyset继承自abstractset:
1 final class selectedselectionkeyset extends abstractset<selectionkey> { 2 selectionkey[] keys = new selectionkey[1024]; 3 int size; 4 5 selectedselectionkeyset() { 6 } 7 ...... 8 }
后面是通过反射机制,使得windowsselectorimpl的selectedkeys和publicselectedkeys成员直接赋值为selectedselectionkeyset对象。
windowsselectorimpl的这两个成员是在selectorimpl中定义的:
1 protected set<selectionkey> selectedkeys = new hashset(); 2 private set<selectionkey> publicselectedkeys;
从这里就可以明白,在jdk原生的selector中,selectedkeys和publicselectedkeys这两个set的初始化大小都为0,而在这里仅仅就是使其初始化大小变为1024。
后面就是对一些异常的处理,没什么好说的。
openselector结束后,就可以分别对包装过的selector和未包装过的selector,即selector和unwrappedselector成员赋值,再由selectstrategy保存刚才产生的选择策略,用于selector的轮询。
回到multithreadeventexecutorgroup的构造,在调用newchild方法时即nioeventloop创建的过程中可能出现异常情况,就需要遍历children数组,将之前创建好的nioeventloop使用shutdowngracefully优雅地关闭掉:
shutdowngracefully在abstracteventexecutor中实现:
1 public future<?> shutdowngracefully() { 2 return this.shutdowngracefully(2l, 15l, timeunit.seconds); 3 }
这里设置了超时时间,继续调用singlethreadeventexecutor的shutdowngracefully方法:
1 public future<?> shutdowngracefully(long quietperiod, long timeout, timeunit unit) { 2 if (quietperiod < 0l) { 3 throw new illegalargumentexception("quietperiod: " + quietperiod + " (expected >= 0)"); 4 } else if (timeout < quietperiod) { 5 throw new illegalargumentexception("timeout: " + timeout + " (expected >= quietperiod (" + quietperiod + "))"); 6 } else if (unit == null) { 7 throw new nullpointerexception("unit"); 8 } else if (this.isshuttingdown()) { 9 return this.terminationfuture(); 10 } else { 11 boolean ineventloop = this.ineventloop(); 12 13 while(!this.isshuttingdown()) { 14 boolean wakeup = true; 15 int oldstate = this.state; 16 int newstate; 17 if (ineventloop) { 18 newstate = 3; 19 } else { 20 switch(oldstate) { 21 case 1: 22 case 2: 23 newstate = 3; 24 break; 25 default: 26 newstate = oldstate; 27 wakeup = false; 28 } 29 } 30 31 if (state_updater.compareandset(this, oldstate, newstate)) { 32 this.gracefulshutdownquietperiod = unit.tonanos(quietperiod); 33 this.gracefulshutdowntimeout = unit.tonanos(timeout); 34 if (oldstate == 1) { 35 try { 36 this.dostartthread(); 37 } catch (throwable var10) { 38 state_updater.set(this, 5); 39 this.terminationfuture.tryfailure(var10); 40 if (!(var10 instanceof exception)) { 41 platformdependent.throwexception(var10); 42 } 43 44 return this.terminationfuture; 45 } 46 } 47 48 if (wakeup) { 49 this.wakeup(ineventloop); 50 } 51 52 return this.terminationfuture(); 53 } 54 } 55 56 return this.terminationfuture(); 57 } 58 }
前三个判断没什么好说的,isshuttingdown判断:
1 public boolean isshuttingdown() { 2 return this.state >= 3; 3 }
在之前nioeventloop创建的时候,调用了一系列的继承链,其中state是在singlethreadeventexecutor的构造方法中实现的,初始值是1,state有如下几种状态:
1 private static final int st_not_started = 1; 2 private static final int st_started = 2; 3 private static final int st_shutting_down = 3; 4 private static final int st_shutdown = 4; 5 private static final int st_terminated = 5;
可见在nioeventloop初始化后处于尚未启动状态,并没有channel的注册,也就不需要轮询。
isshuttingdown就必然是false,就进入了else块:
首先得到ineventloop的返回值,该方法在abstracteventexecutor中实现:
1 public boolean ineventloop() { 2 return this.ineventloop(thread.currentthread()); 3 }
他传入了一个当前线程,接着调用ineventloop的重载,这个方法是在singlethreadeventexecutor中实现:
1 public boolean ineventloop(thread thread) { 2 return thread == this.thread; 3 }
通过观察之前的singlethreadeventexecutor构造方法,发现并没有对thread成员初始化,此时的this.thread就是null,那么返回值就是false,即ineventloop为false。
在while循环中又对isshuttingdown进行了判断,shutdowngracefully当让不仅仅使用在创建nioeventloop对象失败时才调用的,无论是在eventloopgroup的关闭,还是bootstrap的关闭,都会关闭绑定的nioeventloop,所以在多线程环境中,有可能会被其他线程关闭。
在while循环中,结合上面可知满足进入switch块,在switch块中令newstate为3;
然后调用state_updater的compareandset方法,state_updater是用来原子化更新state成员的:
1 private static final atomicintegerfieldupdater<singlethreadeventexecutor> state_updater = atomicintegerfieldupdater.newupdater(singlethreadeventexecutor.class, "state");
所以这里就是使用cas操作,原子化更新state成员为3,也就是使当前状态由st_not_started 变为了st_shutting_down 状态。
gracefulshutdownquietperiod和gracefulshutdowntimeout分别保存quietperiod和timeout的纳秒级颗粒度。
前面可知oldstate使1,调用dostartthread方法:
1 private void dostartthread() { 2 assert this.thread == null; 3 4 this.executor.execute(new runnable() { 5 public void run() { 6 singlethreadeventexecutor.this.thread = thread.currentthread(); 7 if (singlethreadeventexecutor.this.interrupted) { 8 singlethreadeventexecutor.this.thread.interrupt(); 9 } 10 11 boolean success = false; 12 singlethreadeventexecutor.this.updatelastexecutiontime(); 13 boolean var112 = false; 14 15 int oldstate; 16 label1685: { 17 try { 18 var112 = true; 19 singlethreadeventexecutor.this.run(); 20 success = true; 21 var112 = false; 22 break label1685; 23 } catch (throwable var119) { 24 singlethreadeventexecutor.logger.warn("unexpected exception from an event executor: ", var119); 25 var112 = false; 26 } finally { 27 if (var112) { 28 int oldstatex; 29 do { 30 oldstatex = singlethreadeventexecutor.this.state; 31 } while(oldstatex < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstatex, 3)); 32 33 if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l) { 34 singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates."); 35 } 36 37 try { 38 while(!singlethreadeventexecutor.this.confirmshutdown()) { 39 ; 40 } 41 } finally { 42 try { 43 singlethreadeventexecutor.this.cleanup(); 44 } finally { 45 singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5); 46 singlethreadeventexecutor.this.threadlock.release(); 47 if (!singlethreadeventexecutor.this.taskqueue.isempty()) { 48 singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')'); 49 } 50 51 singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null); 52 } 53 } 54 55 } 56 } 57 58 do { 59 oldstate = singlethreadeventexecutor.this.state; 60 } while(oldstate < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstate, 3)); 61 62 if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l) { 63 singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates."); 64 } 65 66 try { 67 while(!singlethreadeventexecutor.this.confirmshutdown()) { 68 ; 69 } 70 71 return; 72 } finally { 73 try { 74 singlethreadeventexecutor.this.cleanup(); 75 } finally { 76 singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5); 77 singlethreadeventexecutor.this.threadlock.release(); 78 if (!singlethreadeventexecutor.this.taskqueue.isempty()) { 79 singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')'); 80 } 81 82 singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null); 83 } 84 } 85 } 86 87 do { 88 oldstate = singlethreadeventexecutor.this.state; 89 } while(oldstate < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstate, 3)); 90 91 if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l) { 92 singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates."); 93 } 94 95 try { 96 while(!singlethreadeventexecutor.this.confirmshutdown()) { 97 ; 98 } 99 } finally { 100 try { 101 singlethreadeventexecutor.this.cleanup(); 102 } finally { 103 singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5); 104 singlethreadeventexecutor.this.threadlock.release(); 105 if (!singlethreadeventexecutor.this.taskqueue.isempty()) { 106 singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')'); 107 } 108 109 singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null); 110 } 111 } 112 113 } 114 }); 115 }
刚才说过this.thread并没有初始化,所以等于null成立,断言可以继续。
然后直接使executor运行了一个线程,这个executor其实就是在刚才的multithreadeventexecutorgroup中产生的threadpertaskexecutor对象。
在线程中,首先将singlethreadeventexecutor的thread成员初始化为当前线程。
在这里可能就有疑问了,为什么会在关闭时会调用名为dostartthread的方法,这个方法不因该在启动时调用吗?
其实dostartthread在启动时是会被调用的,当在启动时被调用的话,每一个nioeventloop都会被绑定一个线程用来处理真正的selector操作,根据之前的说法就可以知道,每个eventloopgroup在创建后都会被绑定cpu个数的二倍个nioeventloop,而每个nioeventloop都会绑定一个selector对象,上面又说了在启动时singlethreadeventexecutor绑定了一个线程,即nioeventloop绑定了一个线程来处理其绑定的selector的轮询。
至于关闭时还会启动selector的轮询,就是为了解决注册了的channel没有被处理的情况。
回到dostartthread方法,其实这个dostartthread方法的核心是singlethreadeventexecutor.this.run(),这个方法就是正真的selector的轮询操作,在nioeventloop中实现:
1 protected void run() { 2 while(true) { 3 while(true) { 4 try { 5 switch(this.selectstrategy.calculatestrategy(this.selectnowsupplier, this.hastasks())) { 6 case -2: 7 continue; 8 case -1: 9 this.select(this.wakenup.getandset(false)); 10 if (this.wakenup.get()) { 11 this.selector.wakeup(); 12 } 13 default: 14 this.cancelledkeys = 0; 15 this.needstoselectagain = false; 16 int ioratio = this.ioratio; 17 if (ioratio == 100) { 18 try { 19 this.processselectedkeys(); 20 } finally { 21 this.runalltasks(); 22 } 23 } else { 24 long iostarttime = system.nanotime(); 25 boolean var13 = false; 26 27 try { 28 var13 = true; 29 this.processselectedkeys(); 30 var13 = false; 31 } finally { 32 if (var13) { 33 long iotime = system.nanotime() - iostarttime; 34 this.runalltasks(iotime * (long)(100 - ioratio) / (long)ioratio); 35 } 36 } 37 38 long iotime = system.nanotime() - iostarttime; 39 this.runalltasks(iotime * (long)(100 - ioratio) / (long)ioratio); 40 } 41 } 42 } catch (throwable var21) { 43 handleloopexception(var21); 44 } 45 46 try { 47 if (this.isshuttingdown()) { 48 this.closeall(); 49 if (this.confirmshutdown()) { 50 return; 51 } 52 } 53 } catch (throwable var18) { 54 handleloopexception(var18); 55 } 56 } 57 } 58 }
进入switch块,首先调用之前准备好的选择策略,其中this.selectnowsupplier在nioeventloop创建的时候就被创建了:
1 private final intsupplier selectnowsupplier = new intsupplier() { 2 public int get() throws exception { 3 return nioeventloop.this.selectnow(); 4 } 5 };
实际上调用了selectnow方法:
1 int selectnow() throws ioexception { 2 int var1; 3 try { 4 var1 = this.selector.selectnow(); 5 } finally { 6 if (this.wakenup.get()) { 7 this.selector.wakeup(); 8 } 9 10 } 11 12 return var1; 13 }
这里就直接调用了jdk原生的selectnow方法。
之前说过的选择策略:
1 public int calculatestrategy(intsupplier selectsupplier, boolean hastasks) throws exception { 2 return hastasks ? selectsupplier.get() : -1; 3 }
其中hastasks是根据hastasks方法来判断,而hastasks方法就是判断任务队列是否为空,那么在一开始初始化,必然是空的,所以这里calculatestrategy的返回值就是-1;
那么case为-1条件成立,执行this.select(this.wakenup.getandset(false)),其中wakenup是一个原子化的boolean,用来表示是需要唤醒selector的轮询阻塞,初始化是为true,这里通过cas操作设置为false代表不需要唤醒,后面在select执行完后,又判断wakenup是否需要唤醒,说明在select中对selector的阻塞进行了检查,若是需要唤醒,就通过selector的原生api完成唤醒【java】nio中selector的select方法源码分析
来看看这里的select实现:
1 private void select(boolean oldwakenup) throws ioexception { 2 selector selector = this.selector; 3 4 try { 5 int selectcnt = 0; 6 long currenttimenanos = system.nanotime(); 7 long selectdeadlinenanos = currenttimenanos + this.delaynanos(currenttimenanos); 8 9 while(true) { 10 long timeoutmillis = (selectdeadlinenanos - currenttimenanos + 500000l) / 1000000l; 11 if (timeoutmillis <= 0l) { 12 if (selectcnt == 0) { 13 selector.selectnow(); 14 selectcnt = 1; 15 } 16 break; 17 } 18 19 if (this.hastasks() && this.wakenup.compareandset(false, true)) { 20 selector.selectnow(); 21 selectcnt = 1; 22 break; 23 } 24 25 int selectedkeys = selector.select(timeoutmillis); 26 ++selectcnt; 27 if (selectedkeys != 0 || oldwakenup || this.wakenup.get() || this.hastasks() || this.hasscheduledtasks()) { 28 break; 29 } 30 31 if (thread.interrupted()) { 32 if (logger.isdebugenabled()) { 33 logger.debug("selector.select() returned prematurely because thread.currentthread().interrupt() was called. use nioeventloop.shutdowngracefully() to shutdown the nioeventloop."); 34 } 35 36 selectcnt = 1; 37 break; 38 } 39 40 long time = system.nanotime(); 41 if (time - timeunit.milliseconds.tonanos(timeoutmillis) >= currenttimenanos) { 42 selectcnt = 1; 43 } else if (selector_auto_rebuild_threshold > 0 && selectcnt >= selector_auto_rebuild_threshold) { 44 logger.warn("selector.select() returned prematurely {} times in a row; rebuilding selector {}.", selectcnt, selector); 45 this.rebuildselector(); 46 selector = this.selector; 47 selector.selectnow(); 48 selectcnt = 1; 49 break; 50 } 51 52 currenttimenanos = time; 53 } 54 55 if (selectcnt > 3 && logger.isdebugenabled()) { 56 logger.debug("selector.select() returned prematurely {} times in a row for selector {}.", selectcnt - 1, selector); 57 } 58 } catch (cancelledkeyexception var13) { 59 if (logger.isdebugenabled()) { 60 logger.debug(cancelledkeyexception.class.getsimplename() + " raised by a selector {} - jdk bug?", selector, var13); 61 } 62 } 63 64 }
这个方法虽然看着很长,但核心就是判断这个存放任务的阻塞队列是否还有任务,若是有,就直接调用selector的selectnow方法获取就绪的文件描述符,若是没有就绪的文件描述符该方法也会立即返回;若是阻塞队列中没有任务,就调用selector的select(timeout)方法,尝试在超时时间内取获取就绪的文件描述符。
因为现在是在执行nioeventloopgroup的创建,并没有channel的注册,也就没有轮询到任何文件描述符就绪。
在轮询结束后,回到run方法,进入default块:
其中ioratio是执行io操作和执行任务队列的任务用时比率,默认是50。若是ioratio设置为100,就必须等到tasks阻塞队列中的所有任务执行完毕才再次进行轮询;若是小于100,那么就根据(100 - ioratio) / ioratio的比值乘以iotime计算出的超时时间让所有任务尝试在超时时间内执行完毕,若是到达超时时间还没执行完毕,就在下一轮的轮询中执行。
processselectedkeys方法就是获取selector轮询的selectedkeys结果:
1 private void processselectedkeys() { 2 if (this.selectedkeys != null) { 3 this.processselectedkeysoptimized(); 4 } else { 5 this.processselectedkeysplain(this.selector.selectedkeys()); 6 } 7 8 }
selectedkeys 在openselector时被初始化过了,若是在openselector中出现异常selectedkeys才会为null。
processselectedkeysoptimized方法:
1 private void processselectedkeysoptimized() { 2 for(int i = 0; i < this.selectedkeys.size; ++i) { 3 selectionkey k = this.selectedkeys.keys[i]; 4 this.selectedkeys.keys[i] = null; 5 object a = k.attachment(); 6 if (a instanceof abstractniochannel) { 7 this.processselectedkey(k, (abstractniochannel)a); 8 } else { 9 niotask<selectablechannel> task = (niotask)a; 10 processselectedkey(k, task); 11 } 12 13 if (this.needstoselectagain) { 14 this.selectedkeys.reset(i + 1); 15 this.selectagain(); 16 i = -1; 17 } 18 } 19 20 }
这里就通过遍历在openselector中注入进selector的selectedkeys,得到selectionkey 对象。
在这里可以看到netty很巧妙地通过selectionkey的attachment附件,将jdk中的channel和netty中的channel联系了起来。
根据得到的附件channel的类型,执行不同的processselectedkey方法,去处理io操作。
processselectedkey(selectionkey k, abstractniochannel ch)方法:
1 private void processselectedkey(selectionkey k, abstractniochannel ch) { 2 niounsafe unsafe = ch.unsafe(); 3 if (!k.isvalid()) { 4 nioeventloop eventloop; 5 try { 6 eventloop = ch.eventloop(); 7 } catch (throwable var6) { 8 return; 9 } 10 11 if (eventloop == this && eventloop != null) { 12 unsafe.close(unsafe.voidpromise()); 13 } 14 } else { 15 try { 16 int readyops = k.readyops(); 17 if ((readyops & 8) != 0) { 18 int ops = k.interestops(); 19 ops &= -9; 20 k.interestops(ops); 21 unsafe.finishconnect(); 22 } 23 24 if ((readyops & 4) != 0) { 25 ch.unsafe().forceflush(); 26 } 27 28 if ((readyops & 17) != 0 || readyops == 0) { 29 unsafe.read(); 30 } 31 } catch (cancelledkeyexception var7) { 32 unsafe.close(unsafe.voidpromise()); 33 } 34 35 } 36 }
这里的主要核心就是根据selectedkey的readyops值来判断,处理不同的就绪事件,有如下几种事件:
1 public static final int op_read = 1 << 0; 2 public static final int op_write = 1 << 2; 3 public static final int op_connect = 1 << 3; 4 public static final int op_accept = 1 << 4;
结合来看上面的判断就对应:连接就绪、写就绪、侦听或者读就绪,交由netty的abstractniochannel的niounsafe去处理不同事件的byte数据,niounsafe会将数据再交由channelpipeline双向链表去处理。
关于channelpipeline会在后续的博客中详细介绍。
processselectedkey(selectionkey k, niotask<selectablechannel> task)这个方法的实现细节需要由使用者实现niotask<selectablechannel>接口,就不说了。
回到processselectedkeys方法,在this.selectedkeys等于null的情况下:
1 private void processselectedkeysplain(set<selectionkey> selectedkeys) { 2 if (!selectedkeys.isempty()) { 3 iterator i = selectedkeys.iterator(); 4 5 while(true) { 6 selectionkey k = (selectionkey)i.next(); 7 object a = k.attachment(); 8 i.remove(); 9 if (a instanceof abstractniochannel) { 10 this.processselectedkey(k, (abstractniochannel)a); 11 } else { 12 niotask<selectablechannel> task = (niotask)a; 13 processselectedkey(k, task); 14 } 15 16 if (!i.hasnext()) { 17 break; 18 } 19 20 if (this.needstoselectagain) { 21 this.selectagain(); 22 selectedkeys = this.selector.selectedkeys(); 23 if (selectedkeys.isempty()) { 24 break; 25 } 26 27 i = selectedkeys.iterator(); 28 } 29 } 30 31 } 32 }
这是在openselector中注入进selector的selectedkeys失败的情况下,直接遍历selector本身的selectedkeys,和processselectedkeysoptimized没有差别。
继续回到run方法,在调用完processselectedkeys方法后,就需要调用runalltasks处理任务队列中的任务:
runalltasks()方法:
1 protected boolean runalltasks() { 2 assert this.ineventloop(); 3 4 boolean ranatleastone = false; 5 6 boolean fetchedall; 7 do { 8 fetchedall = this.fetchfromscheduledtaskqueue(); 9 if (this.runalltasksfrom(this.taskqueue)) { 10 ranatleastone = true; 11 } 12 } while(!fetchedall); 13 14 if (ranatleastone) { 15 this.lastexecutiontime = scheduledfuturetask.nanotime(); 16 } 17 18 this.afterrunningalltasks(); 19 return ranatleastone; 20 }
首先调用fetchfromscheduledtaskqueue方法:
1 private boolean fetchfromscheduledtaskqueue() { 2 long nanotime = abstractscheduledeventexecutor.nanotime(); 3 4 for(runnable scheduledtask = this.pollscheduledtask(nanotime); scheduledtask != null; scheduledtask = this.pollscheduledtask(nanotime)) { 5 if (!this.taskqueue.offer(scheduledtask)) { 6 this.scheduledtaskqueue().add((scheduledfuturetask)scheduledtask); 7 return false; 8 } 9 } 10 11 return true; 12 }
这里就是通过pollscheduledtask不断地从延时任务队列获取到期的任务,将到期的任务添加到taskqueue任务队列中,为上面的runalltasksfrom执行做准备;若是添加失败,再把它放进延时任务队列。
pollscheduledtask方法:
1 protected final runnable pollscheduledtask(long nanotime) { 2 assert this.ineventloop(); 3 4 queue<scheduledfuturetask<?>> scheduledtaskqueue = this.scheduledtaskqueue; 5 scheduledfuturetask<?> scheduledtask = scheduledtaskqueue == null ? null : (scheduledfuturetask)scheduledtaskqueue.peek(); 6 if (scheduledtask == null) { 7 return null; 8 } else if (scheduledtask.deadlinenanos() <= nanotime) { 9 scheduledtaskqueue.remove(); 10 return scheduledtask; 11 } else { 12 return null; 13 } 14 }
从延时任务队列中获取队首的任务scheduledtask,若是scheduledtask的deadlinenanos小于等于nanotime,说明该任务到期。
回到runalltasks,将到期了的延时任务放在了任务队列,由runalltasksfrom执行这些任务:
1 protected final boolean runalltasksfrom(queue<runnable> taskqueue) { 2 runnable task = polltaskfrom(taskqueue); 3 if (task == null) { 4 return false; 5 } else { 6 do { 7 safeexecute(task); 8 task = polltaskfrom(taskqueue); 9 } while(task != null); 10 11 return true; 12 } 13 }
不断地从任务队列队首获取任务,然后执行,直到没有任务。
polltaskfrom是获取队首任务:
1 protected static runnable polltaskfrom(queue<runnable> taskqueue) { 2 runnable task; 3 do { 4 task = (runnable)taskqueue.poll(); 5 } while(task == wakeup_task); 6 7 return task; 8 }
其中wakeup_task,是用来巧妙地控制循环:
1 private static final runnable wakeup_task = new runnable() { 2 public void run() { 3 } 4 };
safeexecute是执行任务:
1 protected static void safeexecute(runnable task) { 2 try { 3 task.run(); 4 } catch (throwable var2) { 5 logger.warn("a task raised an exception. task: {}", task, var2); 6 } 7 8 }
实际上就是执行runnable 的run方法。
继续回到runalltasks方法,当所有到期任务执行完毕后,根据ranatleastone判断是否需要修改最后一次执行时间lastexecutiontime,最后调用afterrunningalltasks方法,该方法是在singlethreadeventloop中实现的:
1 protected void afterrunningalltasks() { 2 this.runalltasksfrom(this.tailtasks); 3 }
这里就仅仅执行了tailtasks队列中的任务。runalltasks到这里执行完毕。
再来看看runalltasks(timeoutnanos)方法:
1 protected boolean runalltasks(long timeoutnanos) { 2 this.fetchfromscheduledtaskqueue(); 3 runnable task = this.polltask(); 4 if (task == null) { 5 this.afterrunningalltasks(); 6 return false; 7 } else { 8 long deadline = scheduledfuturetask.nanotime() + timeoutnanos; 9 long runtasks = 0l; 10 11 long lastexecutiontime; 12 while(true) { 13 safeexecute(task); 14 ++runtasks; 15 if ((runtasks & 63l) == 0l) { 16 lastexecutiontime = scheduledfuturetask.nanotime(); 17 if (lastexecutiontime >= deadline) { 18 break; 19 } 20 } 21 22 task = this.polltask(); 23 if (task == null) { 24 lastexecutiontime = scheduledfuturetask.nanotime(); 25 break; 26 } 27 } 28 29 this.afterrunningalltasks(); 30 this.lastexecutiontime = lastexecutiontime; 31 return true; 32 } 33 }
这个方法前面的runalltasks方法类似,先通过fetchfromscheduledtaskqueue将所有到期了的延时任务放在taskqueue中,然后不断从taskqueue队首获取任务,但是,若是执行到了到超过了63个任务,判断是否达到了超时时间deadline,若是达到结束循环,留着下次执行,反之继续循环执行任务。
回到run方法,在轮询完毕,并且执行完任务后,通过isshuttingdown判断当前状态,在之前的cas操作中,state已经变为了3,所以isshuttingdown成立,就需要调用closeall方法
1 private void closeall() { 2 this.selectagain(); 3 set<selectionkey> keys = this.selector.keys(); 4 collection<abstractniochannel> channels = new arraylist(keys.size()); 5 iterator var3 = keys.iterator(); 6 7 while(var3.hasnext()) { 8 selectionkey k = (selectionkey)var3.next(); 9 object a = k.attachment(); 10 if (a instanceof abstractniochannel) { 11 channels.add((abstractniochannel)a); 12 } else { 13 k.cancel(); 14 niotask<selectablechannel> task = (niotask)a; 15 invokechannelunregistered(task, k, (throwable)null); 16 } 17 } 18 19 var3 = channels.iterator(); 20 21 while(var3.hasnext()) { 22 abstractniochannel ch = (abstractniochannel)var3.next(); 23 ch.unsafe().close(ch.unsafe().voidpromise()); 24 } 25 26 }
在这里首先调用selectagain进行一次轮询:
1 private void selectagain() { 2 this.needstoselectagain = false; 3 4 try { 5 this.selector.selectnow(); 6 } catch (throwable var2) { 7 logger.warn("failed to update selectionkeys.", var2); 8 } 9 10 }
通过这次的轮询,将当前仍有事件就绪的jdk的selectionkey中绑定的netty的channel添加到channels集合中,遍历这个集合,通过unsafe的close方法关闭netty的channel。
之后调用confirmshutdown方法:
1 protected boolean confirmshutdown() { 2 if (!this.isshuttingdown()) { 3 return false; 4 } else if (!this.ineventloop()) { 5 throw new illegalstateexception("must be invoked from an event loop"); 6 } else { 7 this.cancelscheduledtasks(); 8 if (this.gracefulshutdownstarttime == 0l) { 9 this.gracefulshutdownstarttime = scheduledfuturetask.nanotime(); 10 } 11 12 if (!this.runalltasks() && !this.runshutdownhooks()) { 13 long nanotime = scheduledfuturetask.nanotime(); 14 if (!this.isshutdown() && nanotime - this.gracefulshutdownstarttime <= this.gracefulshutdowntimeout) { 15 if (nanotime - this.lastexecutiontime <= this.gracefulshutdownquietperiod) { 16 this.wakeup(true); 17 18 try { 19 thread.sleep(100l); 20 } catch (interruptedexception var4) { 21 ; 22 } 23 24 return false; 25 } else { 26 return true; 27 } 28 } else { 29 return true; 30 } 31 } else if (this.isshutdown()) { 32 return true; 33 } else if (this.gracefulshutdownquietperiod == 0l) { 34 return true; 35 } else { 36 this.wakeup(true); 37 return false; 38 } 39 } 40 }
首先调用cancelscheduledtasks,取消所有的延时任务:
1 protected void cancelscheduledtasks() { 2 assert this.ineventloop(); 3 4 priorityqueue<scheduledfuturetask<?>> scheduledtaskqueue = this.scheduledtaskqueue; 5 if (!isnullorempty(scheduledtaskqueue)) { 6 scheduledfuturetask<?>[] scheduledtasks = (scheduledfuturetask[])scheduledtaskqueue.toarray(new scheduledfuturetask[scheduledtaskqueue.size()]); 7 scheduledfuturetask[] var3 = scheduledtasks; 8 int var4 = scheduledtasks.length; 9 10 for(int var5 = 0; var5 < var4; ++var5) { 11 scheduledfuturetask<?> task = var3[var5]; 12 task.cancelwithoutremove(false); 13 } 14 15 scheduledtaskqueue.clearignoringindexes(); 16 } 17 }
遍历scheduledtasks这个延时任务对立中所有的任务,通过cancelwithoutremove将该任务取消。
至此轮询的整个生命周期完成。
回到singlethreadeventexecutor的dostartthread方法,在run方法执行完毕后,说明selector轮询结束,调用singlethreadeventexecutor.this.cleanup()方法关闭selector:
1 protected void cleanup() { 2 try { 3 this.selector.close(); 4 } catch (ioexception var2) { 5 logger.warn("failed to close a selector.", var2); 6 } 7 8 }
这次终于可以回到multithreadeventexecutorgroup的构造,在children创建完毕后,用chooserfactory根据children的大小创建chooser,前面说过。
然后产生terminationlistener异步中断监听对象,给每个nioeventloop设置中断监听,然后对children进行了备份处理,通过readonlychildren保存。
至此nioeventloopgroup的创建全部结束。
上一篇: 微信公众平台原创声明怎么添加到文章中?
推荐阅读
-
Netty源码分析之ChannelPipeline(二)—ChannelHandler的添加与删除
-
Java日期时间API系列8-----Jdk8中java.time包中的新的日期时间API类的LocalDate源码分析
-
Mybaits 源码解析 (六)----- 全网最详细:Select 语句的执行过程分析(上篇)(Mapper方法是如何调用到XML中的SQL的?)
-
深入源码分析Spring中的构造器注入
-
laravel框架中控制器的创建和使用方法分析
-
netty之NioEventLoopGroup源码分析二
-
【Java】NIO中Selector的select方法源码分析
-
Netty源码分析 (一)----- NioEventLoopGroup
-
netty中的发动机--EventLoop及其实现类NioEventLoop的源码分析
-
Bootstrap初始化过程源码分析--netty客户端的启动