Netty源码分析 (一)----- NioEventLoopGroup
提到netty首当其冲被提起的肯定是支持它承受高并发的线程模型,说到线程模型就不得不提到nioeventloopgroup
这个线程池,接下来进入正题。
线程模型
首先来看一段netty的使用示例
package com.wrh.server; import io.netty.bootstrap.serverbootstrap; import io.netty.channel.*; import io.netty.channel.nio.nioeventloopgroup; import io.netty.channel.socket.socketchannel; import io.netty.channel.socket.nio.nioserversocketchannel; public final class simpleserver { public static void main(string[] args) throws exception { eventloopgroup bossgroup = new nioeventloopgroup(1); eventloopgroup workergroup = new nioeventloopgroup(); try { serverbootstrap b = new serverbootstrap(); b.group(bossgroup, workergroup) .channel(nioserversocketchannel.class) .handler(new simpleserverhandler()) .childhandler(new channelinitializer<socketchannel>() { @override public void initchannel(socketchannel ch) throws exception { } }); channelfuture f = b.bind(8888).sync(); f.channel().closefuture().sync(); } finally { bossgroup.shutdowngracefully(); workergroup.shutdowngracefully(); } } private static class simpleserverhandler extends channelinboundhandleradapter { @override public void channelactive(channelhandlercontext ctx) throws exception { system.out.println("channelactive"); } @override public void channelregistered(channelhandlercontext ctx) throws exception { system.out.println("channelregistered"); } @override public void handleradded(channelhandlercontext ctx) throws exception { system.out.println("handleradded"); } } }
下面将分析第一、二行代码,看下nioeventloopgroup类的构造函数干了些什么。其余的部分将在其他博文中分析。
eventloopgroup bossgroup = new nioeventloopgroup(1); eventloopgroup workergroup = new nioeventloopgroup();
从代码中可以看到这里使用了两个线程池bossgroup
和workergroup
,那么为什么需要定义两个线程池呢?这就要说到netty的线程模型了。
netty的线程模型被称为reactor模型,具体如图所示,图上的mainreactor指的就是bossgroup
,这个线程池处理客户端的连接请求,并将accept的连接注册到subreactor的其中一个线程上;图上的subreactor当然指的就是workergroup
,负责处理已建立的客户端通道上的数据读写;图上还有一块threadpool是具体的处理业务逻辑的线程池,一般情况下可以复用subreactor,比我的项目中就是这种用法,但官方建议处理一些较为耗时的业务时还是要使用单独的threadpool。
nioeventloopgroup构造函数
nioeventloopgroup的构造函数的代码如下
public nioeventloopgroup() { this(0); } public nioeventloopgroup(int nthreads) { this(nthreads, null); } public nioeventloopgroup(int nthreads, threadfactory threadfactory) { this(nthreads, threadfactory, selectorprovider.provider()); } public nioeventloopgroup( int nthreads, threadfactory threadfactory, final selectorprovider selectorprovider) { super(nthreads, threadfactory, selectorprovider); }
nioeventloopgroup类中的构造函数最终都是调用的父类multithreadeventloopgroup如下的构造函数:
protected multithreadeventloopgroup(int nthreads, threadfactory threadfactory, object... args) { super(nthreads == 0? default_event_loop_threads : nthreads, threadfactory, args); }
从上面的构造函数可以得到 如果使用eventloopgroup workergroup = new nioeventloopgroup()
来创建对象,即不指定线程个数,则netty给我们使用默认的线程个数,如果指定则用我们指定的线程个数。
默认线程个数相关的代码如下:
static { default_event_loop_threads = math.max(1, systempropertyutil.getint( "io.netty.eventloopthreads", runtime.getruntime().availableprocessors() * 2)); if (logger.isdebugenabled()) { logger.debug("-dio.netty.eventloopthreads: {}", default_event_loop_threads); } }
而systempropertyutil.getint函数的功能为:得到系统属性中指定key(这里:key=”io.netty.eventloopthreads”)所对应的value,如果获取不到获取失败则返回默认值,这里的默认值为:cpu的核数的2倍。
结论:如果没有设置程序启动参数(或者说没有指定key=”io.netty.eventloopthreads”的属性值),那么默认情况下线程的个数为cpu的核数乘以2。
继续看,由于multithreadeventloopgroup的构造函数是调用的是其父类multithreadeventexecutorgroup的构造函数,因此,看下此类的构造函数
protected multithreadeventexecutorgroup(int nthreads, threadfactory threadfactory, object... args) { if (nthreads <= 0) { throw new illegalargumentexception(string.format("nthreads: %d (expected: > 0)", nthreads)); } if (threadfactory == null) { threadfactory = newdefaultthreadfactory(); } children = new singlethreadeventexecutor[nthreads]; //根据线程个数是否为2的幂次方,采用不同策略初始化chooser if (ispoweroftwo(children.length)) { chooser = new poweroftwoeventexecutorchooser(); } else { chooser = new genericeventexecutorchooser(); } //产生ntreads个nioeventloop对象保存在children数组中 for (int i = 0; i < nthreads; i ++) { boolean success = false; try { children[i] = newchild(threadfactory, args); success = true; } catch (exception e) { // todo: think about if this is a good exception type throw new illegalstateexception("failed to create a child event loop", e); } finally { //如果newchild方法执行失败,则对前面执行new成功的几个nioeventloop进行shutdown处理 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdowngracefully(); } for (int j = 0; j < i; j ++) { eventexecutor e = children[j]; try { while (!e.isterminated()) { e.awaittermination(integer.max_value, timeunit.seconds); } } catch (interruptedexception interrupted) { thread.currentthread().interrupt(); break; } } } } } }
该构造函数干了如下三件事:
1、产生了一个线程工场:threadfactory = newdefaultthreadfactory();
multithreadeventexecutorgroup.java protected threadfactory newdefaultthreadfactory() { return new defaultthreadfactory(getclass());//getclass()为:nioeventloopgroup.class } defaultthreadfactory.java public defaultthreadfactory(class<?> pooltype) { this(pooltype, false, thread.norm_priority); }
2、根据线程个数是否为2的幂次方,采用不同策略初始化chooser
private static boolean ispoweroftwo(int val) { return (val & -val) == val; }
3、 产生ntreads个nioeventloop对象保存在children数组中 ,线程都是通过调用newchild方法来产生的。
@override protected eventexecutor newchild( threadfactory threadfactory, object... args) throws exception { return new nioeventloop(this, threadfactory, (selectorprovider) args[0]); }
这里传给nioeventloop构造函数的参数为:nioeventloopgroup、defaultthreadfactory、selectorprovider。
nioeventloop构造函数分析
既然上面提到来new一个nioeventloop对象,下面我们就看下这个类以及其父类。
nioeventloop(nioeventloopgroup parent, threadfactory threadfactory, selectorprovider selectorprovider) { super(parent, threadfactory, false); if (selectorprovider == null) { throw new nullpointerexception("selectorprovider"); } provider = selectorprovider; selector = openselector(); }
继续看父类 singlethreadeventloop的构造函数
protected singlethreadeventloop(eventloopgroup parent, threadfactory threadfactory, boolean addtaskwakesup) { super(parent, threadfactory, addtaskwakesup); }
又是直接调用来父类singlethreadeventexecutor的构造函数,继续看
protected singlethreadeventexecutor( eventexecutorgroup parent, threadfactory threadfactory, boolean addtaskwakesup) { if (threadfactory == null) { throw new nullpointerexception("threadfactory"); } this.parent = parent; this.addtaskwakesup = addtaskwakesup;//false thread = threadfactory.newthread(new runnable() { @override public void run() { boolean success = false; updatelastexecutiontime(); try { //调用nioeventloop类的run方法 singlethreadeventexecutor.this.run(); success = true; } catch (throwable t) { logger.warn("unexpected exception from an event executor: ", t); } finally { for (;;) { int oldstate = state_updater.get(singlethreadeventexecutor.this); if (oldstate >= st_shutting_down || state_updater.compareandset( singlethreadeventexecutor.this, oldstate, st_shutting_down)) { break; } } // check if confirmshutdown() was called at the end of the loop. if (success && gracefulshutdownstarttime == 0) { logger.error( "buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called " + "before run() implementation terminates."); } try { // run all remaining tasks and shutdown hooks. for (;;) { if (confirmshutdown()) { break; } } } finally { try { cleanup(); } finally { state_updater.set(singlethreadeventexecutor.this, st_terminated); threadlock.release(); if (!taskqueue.isempty()) { logger.warn( "an event executor terminated with " + "non-empty task queue (" + taskqueue.size() + ')'); } terminationfuture.setsuccess(null); } } } } }); taskqueue = newtaskqueue(); } protected queue<runnable> newtaskqueue() { return new linkedblockingqueue<runnable>(); }
主要干如下两件事:
1、利用threadfactory创建来一个thread,传入了一个runnable对象,该runnable重写的run代码比较长,不过重点仅仅是调用nioeventloop类的run方法。
2、使用linkedblockingqueue类初始化taskqueue 。
其中,newthread方法的代码如下:
defaultthreadfactory.java
@override public thread newthread(runnable r) { thread t = newthread(new defaultrunnabledecorator(r), prefix + nextid.incrementandget()); try { //判断是否是守护线程,并进行设置 if (t.isdaemon()) { if (!daemon) { t.setdaemon(false); } } else { if (daemon) { t.setdaemon(true); } } //设置其优先级 if (t.getpriority() != priority) { t.setpriority(priority); } } catch (exception ignored) { // doesn't matter even if failed to set. } return t; } protected thread newthread(runnable r, string name) { return new fastthreadlocalthread(r, name); }
fastthreadlocalthread.java
public fastthreadlocalthread(runnable target, string name) { super(target, name);// fastthreadlocalthread extends thread }
到这里,可以看到底层还是借助于类似于thread thread = new thread(r)这种方式来创建线程。
关于nioeventloop对象可以得到的点有,初始化了如下4个属性。
1、nioeventloopgroup (在父类singlethreadeventexecutor中)
2、selector
3、provider
4、thread (在父类singlethreadeventexecutor中)
总结
关于nioeventloopgroup,总结如下
1、 如果不指定线程数,则线程数为:cpu的核数*2
2、根据线程个数是否为2的幂次方,采用不同策略初始化chooser
3、产生nthreads个nioeventloop对象保存在children数组中。
可以理解nioeventloop就是一个线程,线程nioeventloop中里面有如下几个属性:
1、nioeventloopgroup (在父类singlethreadeventexecutor中)
2、selector
3、provider
4、thread (在父类singlethreadeventexecutor中)
更通俗点就是:nioeventloopgroup就是一个线程池,nioeventloop就是一个线程。nioeventloopgroup线程池中有n个nioeventloop线程。
推荐阅读
-
Python的socket模块源码中的一些实现要点分析
-
go开源项目influxdb-relay源码分析(一)
-
vuex 源码分析(一) 使用方法和代码结构
-
Netty源码分析 (四)----- ChannelPipeline
-
kubernetes垃圾回收器GarbageCollector源码分析(一)
-
Netty源码分析 (三)----- 服务端启动源码分析
-
netty源码解析(4.0)-28 ByteBuf内存池:PooledByteBufAllocator-把一切组装起来
-
Flink中watermark为什么选择最小一条(源码分析)
-
Python的socket模块源码中的一些实现要点分析
-
Tomcat源码分析三:Tomcat启动加载过程(一)的源码解析