netty之NioEventLoopGroup源码分析二
大家好,今天我准备死磕nioeventloopgroup的源码,首先讲下概念,nioeventloopgroup 它是一个线程池,存放nioeventloop,一个数组,今天打算先看下这行代码的初始化
eventloopgroup bossgroup = new nioeventloopgroup();
一、初始化
1、时序图:
2.类的关系
3.源码说明:
步骤1:
1 eventloopgroup bossgroup = new nioeventloopgroup(); 2 //调用链1 3 public nioeventloopgroup() { 4 this(0); 5 } 6 //调用链2 7 public nioeventloopgroup(int nthreads) { 8 this(nthreads, (executor) null); 9 } 10 //调用链3 11 public nioeventloopgroup(int nthreads, executor executor) { 12 this(nthreads, executor, selectorprovider.provider()); 13 } 14 //调用链4 15 public nioeventloopgroup( 16 int nthreads, executor executor, final selectorprovider selectorprovider) { 17 super(nthreads, executor, selectorprovider); 18 }
如上四个方法是nioeventloopgroup类的四个构造函数,首先会执行第3行的无参构造函数,然后调用第7行的构造函数,再继续调用第11行的构造函数,这里需要获取一个selectorprovider,见步骤2,再继续调用第15行的构造函数,这里内部实现调用父类multithreadeventloopgroup的构造函数。
步骤2:
1 public static selectorprovider provider() { 2 synchronized (lock) { 3 if (provider != null) 4 return provider; 5 return accesscontroller.doprivileged( 6 new privilegedaction<selectorprovider>() { 7 public selectorprovider run() { 8 if (loadproviderfromproperty()) 9 return provider; 10 if (loadproviderasservice()) 11 return provider; 12 provider = sun.nio.ch.defaultselectorprovider.create(); 13 return provider; 14 } 15 }); 16 } 17 }
这里selectorprovider的静态方法provider获取一个selectorprovider,全局唯一的,有加锁,依赖底层系统的实现provider不一样,后续另开专题分析;
步骤3:
1 protected multithreadeventloopgroup(int nthreads, executor executor, object... args) { 2 super(nthreads == 0 ? default_event_loop_threads : nthreads, executor, args); 3 }
如上是multithreadeventloopgroup的构造函数,内部又调用了父类multithreadeventexecutorgroup的构造函数,继续往下看;
步骤4:
1 protected multithreadeventexecutorgroup(int nthreads, executor executor, object... args) { 2 if (nthreads <= 0) { 3 throw new illegalargumentexception(string.format("nthreads: %d (expected: > 0)", nthreads)); 4 } 5 6 if (executor == null) { 7 executor = new threadpertaskexecutor(newdefaultthreadfactory()); 8 } 9 10 children = new eventexecutor[nthreads]; 11 for (int i = 0; i < nthreads; i ++) { 12 boolean success = false; 13 try { 14 children[i] = newchild(executor, args); 15 success = true; 16 } catch (exception e) { 17 // todo: think about if this is a good exception type 18 throw new illegalstateexception("failed to create a child event loop", e); 19 } finally { 20 if (!success) { 21 for (int j = 0; j < i; j ++) { 22 children[j].shutdowngracefully(); 23 } 24 25 for (int j = 0; j < i; j ++) { 26 eventexecutor e = children[j]; 27 try { 28 while (!e.isterminated()) { 29 e.awaittermination(integer.max_value, timeunit.seconds); 30 } 31 } catch (interruptedexception interrupted) { 32 thread.currentthread().interrupt(); 33 break; 34 } 35 } 36 } 37 } 38 } 39 40 final futurelistener<object> terminationlistener = new futurelistener<object>() { 41 @override 42 public void operationcomplete(future<object> future) throws exception { 43 if (terminatedchildren.incrementandget() == children.length) { 44 terminationfuture.setsuccess(null); 45 } 46 } 47 }; 48 49 for (eventexecutor e: children) { 50 e.terminationfuture().addlistener(terminationlistener); 51 } 52 53 set<eventexecutor> childrenset = new linkedhashset<eventexecutor>(children.length); 54 collections.addall(childrenset, children); 55 readonlychildren = collections.unmodifiableset(childrenset); 56 }
如上是核心实现的构造函数,代码比较多,我们分段来看下都做了哪些事?
为了方便查看,把代码又粘贴了一遍
if (nthreads <= 0) { throw new illegalargumentexception(string.format("nthreads: %d (expected: > 0)", nthreads)); }
判断线程数是否小于等于0,否则抛出异常。
if (executor == null) { executor = new threadpertaskexecutor(newdefaultthreadfactory()); }
第6-8行,会调用子类multithreadeventloopgroup的newdefaultthreadfactory方法生成threadfactory,这里涉及到java继承初始化的知识,不是调用multithreadeventexecutorgroup的newdefaultthreadfactory方法,需要注意一下;
//调用链1,此方法是multithreadeventloopgroup类的
1 protected threadfactory newdefaultthreadfactory() { 2 return new defaultthreadfactory(getclass(), thread.max_priority); 3 } 4 5 //调用链2 6 public defaultthreadfactory(class<?> pooltype, int priority) { 7 this(pooltype, false, priority); 8 } 9 //调用链3 10 public defaultthreadfactory(class<?> pooltype, boolean daemon, int priority) { 11 this(topoolname(pooltype), daemon, priority); 12 } 13 14 //调用链4 15 private static string topoolname(class<?> pooltype) { 16 if (pooltype == null) { 17 throw new nullpointerexception("pooltype"); 18 } 19 string poolname; 20 package pkg = pooltype.getpackage(); 21 if (pkg != null) { 22 poolname = pooltype.getname().substring(pkg.getname().length() + 1); 23 } else { 24 poolname = pooltype.getname(); 25 } 26 27 switch (poolname.length()) { 28 case 0: 29 return "unknown"; 30 case 1: 31 return poolname.tolowercase(locale.us); 32 default: 33 if (character.isuppercase(poolname.charat(0)) && character.islowercase(poolname.charat(1))) { 34 return character.tolowercase(poolname.charat(0)) + poolname.substring(1); 35 } else { 36 return poolname; 37 } 38 } 39 } 40 41 //调用链5 42 public defaultthreadfactory(string poolname, boolean daemon, int priority) { 43 if (poolname == null) { 44 throw new nullpointerexception("poolname"); 45 } 46 if (priority < thread.min_priority || priority > thread.max_priority) { 47 throw new illegalargumentexception( 48 "priority: " + priority + " (expected: thread.min_priority <= priority <= thread.max_priority)"); 49 } 50 51 prefix = poolname + '-' + poolid.incrementandget() + '-'; 52 this.daemon = daemon; 53 this.priority = priority; 54 }
如上依次从调用链到调用链5,完成defaultthreadfactory初始化;
再继续初始化threadpertaskexecutor类
1 public threadpertaskexecutor(threadfactory threadfactory) { 2 if (threadfactory == null) { 3 throw new nullpointerexception("threadfactory"); 4 } 5 this.threadfactory = threadfactory; 6 }
这里就是new 一个线程工程;
1 children = new eventexecutor[nthreads]; 2 for (int i = 0; i < nthreads; i ++) { 3 boolean success = false; 4 try { 5 children[i] = newchild(executor, args); 6 success = true; 7 } catch (exception e) { 8 // todo: think about if this is a good exception type 9 throw new illegalstateexception("failed to create a child event loop", e); 10 } finally { 11 if (!success) { 12 for (int j = 0; j < i; j ++) { 13 children[j].shutdowngracefully(); 14 } 15 16 for (int j = 0; j < i; j ++) { 17 eventexecutor e = children[j]; 18 try { 19 while (!e.isterminated()) { 20 e.awaittermination(integer.max_value, timeunit.seconds); 21 } 22 } catch (interruptedexception interrupted) { 23 thread.currentthread().interrupt(); 24 break; 25 } 26 } 27 } 28 } 29 }
上面初始化了8个(默认,可修改)nioeventloop, nioeventloop另外专题分析;
final futurelistener<object> terminationlistener = new futurelistener<object>() { @override public void operationcomplete(future<object> future) throws exception { if (terminatedchildren.incrementandget() == children.length) { terminationfuture.setsuccess(null); } } };
创建了一个监听器
for (eventexecutor e: children) { e.terminationfuture().addlistener(terminationlistener); }
这里循环调用增加这个监听器;
set<eventexecutor> childrenset = new linkedhashset<eventexecutor>(children.length); collections.addall(childrenset, children);
readonlychildren = collections.unmodifiableset(childrenset);
这里创建一个linkedhashset,在把children中的元素全部加进去,然后调用unmodifiableset,返回一个只读的chidren,readonlychildren;
好了,暂时先分析这么多。
下一篇: 吃饭的时候地震
推荐阅读
-
九、Spring之BeanFactory源码分析(一)
-
Spring源码分析之IoC容器初始化
-
Mybaits 源码解析 (九)----- 全网最详细,没有之一:一级缓存和二级缓存源码分析
-
PHP网页游戏学习之Xnova(ogame)源码解读(二)
-
ThinkPHP6源码分析之应用初始化
-
Tomcat源码分析 (二)----- Tomcat整体架构及组件
-
并发编程(五)——AbstractQueuedSynchronizer 之 ReentrantLock源码分析
-
Android8.1 SystemUI源码分析之 电池时钟刷新
-
Netty源码分析 (十)----- 拆包器之LineBasedFrameDecoder
-
MapReduce之Job提交流程源码和切片源码分析