Netty服务端的启动源码分析
serverbootstrap的构造:
1 public class serverbootstrap extends abstractbootstrap<serverbootstrap, serverchannel> { 2 private static final internallogger logger = internalloggerfactory.getinstance(serverbootstrap.class); 3 private final map<channeloption<?>, object> childoptions = new linkedhashmap(); 4 private final map<attributekey<?>, object> childattrs = new linkedhashmap(); 5 private final serverbootstrapconfig config = new serverbootstrapconfig(this); 6 private volatile eventloopgroup childgroup; 7 private volatile channelhandler childhandler; 8 9 public serverbootstrap() { 10 } 11 ...... 12 }
隐式地执行了父类的无参构造:
1 public abstract class abstractbootstrap<b extends abstractbootstrap<b, c>, c extends channel> implements cloneable { 2 volatile eventloopgroup group; 3 private volatile channelfactory<? extends c> channelfactory; 4 private volatile socketaddress localaddress; 5 private final map<channeloption<?>, object> options = new linkedhashmap(); 6 private final map<attributekey<?>, object> attrs = new linkedhashmap(); 7 private volatile channelhandler handler; 8 9 abstractbootstrap() { 10 } 11 ...... 12 }
只是初始化了几个容器成员
在serverbootstrap创建后,需要调用group方法,绑定eventloopgroup,有关eventloopgroup的创建在我之前博客中写过:netty中nioeventloopgroup的创建源码分析
serverbootstrap的group方法:
1 public serverbootstrap group(eventloopgroup group) { 2 return this.group(group, group); 3 } 4 5 public serverbootstrap group(eventloopgroup parentgroup, eventloopgroup childgroup) { 6 super.group(parentgroup); 7 if (childgroup == null) { 8 throw new nullpointerexception("childgroup"); 9 } else if (this.childgroup != null) { 10 throw new illegalstateexception("childgroup set already"); 11 } else { 12 this.childgroup = childgroup; 13 return this; 14 } 15 }
首先调用父类的group方法绑定parentgroup:
1 public b group(eventloopgroup group) { 2 if (group == null) { 3 throw new nullpointerexception("group"); 4 } else if (this.group != null) { 5 throw new illegalstateexception("group set already"); 6 } else { 7 this.group = group; 8 return this.self(); 9 } 10 } 11 12 private b self() { 13 return this; 14 }
将传入的parentgroup绑定给abstractbootstrap的group成员,将childgroup绑定给serverbootstrap的childgroup成员。
group的绑定仅仅是交给了成员保存。
再来看看serverbootstrap的channel方法,,是在abstractbootstrap中实现的:
1 public b channel(class<? extends c> channelclass) { 2 if (channelclass == null) { 3 throw new nullpointerexception("channelclass"); 4 } else { 5 return this.channelfactory((io.netty.channel.channelfactory)(new reflectivechannelfactory(channelclass))); 6 } 7 }
使用channelclass构建了一个reflectivechannelfactory对象:
1 public class reflectivechannelfactory<t extends channel> implements channelfactory<t> { 2 private final class<? extends t> clazz; 3 4 public reflectivechannelfactory(class<? extends t> clazz) { 5 if (clazz == null) { 6 throw new nullpointerexception("clazz"); 7 } else { 8 this.clazz = clazz; 9 } 10 } 11 12 public t newchannel() { 13 try { 14 return (channel)this.clazz.getconstructor().newinstance(); 15 } catch (throwable var2) { 16 throw new channelexception("unable to create channel from class " + this.clazz, var2); 17 } 18 } 19 20 public string tostring() { 21 return stringutil.simpleclassname(this.clazz) + ".class"; 22 } 23 }
可以看到reflectivechannelfactory的作用就是通过反射机制,产生clazz的实例(这里以nioserversocketchannel为例)。
在创建完reflectivechannelfactory对象后, 调用channelfactory方法:
1 public b channelfactory(io.netty.channel.channelfactory<? extends c> channelfactory) { 2 return this.channelfactory((channelfactory)channelfactory); 3 } 4 5 public b channelfactory(channelfactory<? extends c> channelfactory) { 6 if (channelfactory == null) { 7 throw new nullpointerexception("channelfactory"); 8 } else if (this.channelfactory != null) { 9 throw new illegalstateexception("channelfactory set already"); 10 } else { 11 this.channelfactory = channelfactory; 12 return this.self(); 13 } 14 }
将刚才创建的reflectivechannelfactory对象交给channelfactory成员,用于后续服务端nioserversocketchannel的创建。
再来看serverbootstrap的childhandler方法:
1 public serverbootstrap childhandler(channelhandler childhandler) { 2 if (childhandler == null) { 3 throw new nullpointerexception("childhandler"); 4 } else { 5 this.childhandler = childhandler; 6 return this; 7 } 8 }
还是交给了childhandler成员保存,可以看到上述这一系列的操作,都是为了填充serverbootstrap,而serverbootstrap真正的启动是在bind时:
serverbootstrap的bind方法,在abstractbootstrap中实现:
1 public channelfuture bind(int inetport) { 2 return this.bind(new inetsocketaddress(inetport)); 3 } 4 5 public channelfuture bind(string inethost, int inetport) { 6 return this.bind(socketutils.socketaddress(inethost, inetport)); 7 } 8 9 public channelfuture bind(inetaddress inethost, int inetport) { 10 return this.bind(new inetsocketaddress(inethost, inetport)); 11 } 12 13 public channelfuture bind(socketaddress localaddress) { 14 this.validate(); 15 if (localaddress == null) { 16 throw new nullpointerexception("localaddress"); 17 } else { 18 return this.dobind(localaddress); 19 } 20 }
可以看到首先调用了serverbootstrap的validate方法,:
1 public serverbootstrap validate() { 2 super.validate(); 3 if (this.childhandler == null) { 4 throw new illegalstateexception("childhandler not set"); 5 } else { 6 if (this.childgroup == null) { 7 logger.warn("childgroup is not set. using parentgroup instead."); 8 this.childgroup = this.config.group(); 9 } 10 11 return this; 12 } 13 }
先调用了abstractbootstrap的validate方法:
1 public b validate() { 2 if (this.group == null) { 3 throw new illegalstateexception("group not set"); 4 } else if (this.channelfactory == null) { 5 throw new illegalstateexception("channel or channelfactory not set"); 6 } else { 7 return this.self(); 8 } 9 }
这个方法就是用来检查是否绑定了group和channel以及childhandler,所以在执行bind方法前,无论如何都要执行group、channel和childhandler方法。
实际的bind交给了dobind来完成:
1 private channelfuture dobind(final socketaddress localaddress) { 2 final channelfuture regfuture = this.initandregister(); 3 final channel channel = regfuture.channel(); 4 if (regfuture.cause() != null) { 5 return regfuture; 6 } else if (regfuture.isdone()) { 7 channelpromise promise = channel.newpromise(); 8 dobind0(regfuture, channel, localaddress, promise); 9 return promise; 10 } else { 11 final abstractbootstrap.pendingregistrationpromise promise = new abstractbootstrap.pendingregistrationpromise(channel); 12 regfuture.addlistener(new channelfuturelistener() { 13 public void operationcomplete(channelfuture future) throws exception { 14 throwable cause = future.cause(); 15 if (cause != null) { 16 promise.setfailure(cause); 17 } else { 18 promise.registered(); 19 abstractbootstrap.dobind0(regfuture, channel, localaddress, promise); 20 } 21 } 22 }); 23 return promise; 24 } 25 }
首先调用initandregister,完成serversocketchannel的创建以及注册:
1 final channelfuture initandregister() { 2 channel channel = null; 3 4 try { 5 channel = this.channelfactory.newchannel(); 6 this.init(channel); 7 } catch (throwable var3) { 8 if (channel != null) { 9 channel.unsafe().closeforcibly(); 10 return (new defaultchannelpromise(channel, globaleventexecutor.instance)).setfailure(var3); 11 } 12 13 return (new defaultchannelpromise(new failedchannel(), globaleventexecutor.instance)).setfailure(var3); 14 } 15 16 channelfuture regfuture = this.config().group().register(channel); 17 if (regfuture.cause() != null) { 18 if (channel.isregistered()) { 19 channel.close(); 20 } else { 21 channel.unsafe().closeforcibly(); 22 } 23 } 24 25 return regfuture; 26 }
首先调用channelfactory的newchannel通过反射机制构建channel实例,也就是nioserversocketchannel,
nioserversocketchannel的无参构造:
1 public class nioserversocketchannel extends abstractniomessagechannel implements serversocketchannel { 2 private static final selectorprovider default_selector_provider = selectorprovider.provider(); 3 4 public nioserversocketchannel() { 5 this(newsocket(default_selector_provider)); 6 } 7 ...... 8 }
selectorprovider 是jdk的,关于selectorprovider在我之前的博客中有介绍:【java】nio中selector的创建源码分析
在windows系统下默认产生windowsselectorprovider,即default_selector_provider,再来看看newsocket方法:
1 private static java.nio.channels.serversocketchannel newsocket(selectorprovider provider) { 2 try { 3 return provider.openserversocketchannel(); 4 } catch (ioexception var2) { 5 throw new channelexception("failed to open a server socket.", var2); 6 } 7 }
使用windowsselectorprovider创建了一个serversocketchannelimpl,其实看到这里就明白了,nioserversocketchannel是为了封装jdk的serversocketchannel
接着调用另一个重载的构造:
1 public nioserversocketchannel(java.nio.channels.serversocketchannel channel) { 2 super((channel)null, channel, 16); 3 this.config = new nioserversocketchannel.nioserversocketchannelconfig(this, this.javachannel().socket()); 4 }
首先调用父类的三参构造,其中16对应的是jdk中selectionkey的accept状态:
1 public static final int op_accept = 1 << 4;
其父类的构造处于一条继承链上:
abstractniomessagechannel:
1 protected abstractniomessagechannel(channel parent, selectablechannel ch, int readinterestop) { 2 super(parent, ch, readinterestop); 3 }
abstractniochannel:
1 protected abstractniochannel(channel parent, selectablechannel ch, int readinterestop) { 2 super(parent); 3 this.ch = ch; 4 this.readinterestop = readinterestop; 5 6 try { 7 ch.configureblocking(false); 8 } catch (ioexception var7) { 9 try { 10 ch.close(); 11 } catch (ioexception var6) { 12 if (logger.iswarnenabled()) { 13 logger.warn("failed to close a partially initialized socket.", var6); 14 } 15 } 16 17 throw new channelexception("failed to enter non-blocking mode.", var7); 18 } 19 }
abstractchannel:
1 private final channelid id; 2 private final channel parent; 3 private final unsafe unsafe; 4 private final defaultchannelpipeline pipeline; 5 6 protected abstractchannel(channel parent) { 7 this.parent = parent; 8 this.id = this.newid(); 9 this.unsafe = this.newunsafe(); 10 this.pipeline = this.newchannelpipeline(); 11 }
在abstractchannel中使用newunsafe和newchannelpipeline分别创建了一个unsafe和一个defaultchannelpipeline对象,
在前面的博客介绍nioeventloopgroup时候,在nioeventloop的run方法中,每次轮询完调用processselectedkeys方法时,都是通过这个unsafe根据selectedkey来完成数据的读或写,unsafe是处理基础的数据读写
(unsafe在nioserversocketchannel创建时,产生niomessageunsafe实例,在niosocketchannel创建时产生niosocketchannelunsafe实例)
而pipeline的实现是一条双向责任链,负责处理unsafe提供的数据,进而进行用户的业务逻辑 (netty中的channelpipeline源码分析)
在abstractniochannel中调用configureblocking方法给jdk的serversocketchannel设置为非阻塞模式,且让readinterestop成员赋值为16用于未来注册accept事件。
在调用完继承链后回到nioserversocketchannel构造,调用了javachannel方法:
1 protected java.nio.channels.serversocketchannel javachannel() { 2 return (java.nio.channels.serversocketchannel)super.javachannel(); 3 }
其实这个javachannel就是刚才出传入到abstractniochannel中的ch成员:
1 protected selectablechannel javachannel() { 2 return this.ch; 3 }
也就是刚才创建的jdk的serversocketchannelimpl,用其socket方法,得到一个serversocket对象,然后产生了一个nioserversocketchannelconfig对象,用于封装相关信息。
nioserversocketchannel构建完毕,回到initandregister方法,使用刚创建的nioserversocketchannel调用init方法,这个方法是在serverbootstrap中实现的:
1 void init(channel channel) throws exception { 2 map<channeloption<?>, object> options = this.options0(); 3 synchronized(options) { 4 setchanneloptions(channel, options, logger); 5 } 6 7 map<attributekey<?>, object> attrs = this.attrs0(); 8 synchronized(attrs) { 9 iterator var5 = attrs.entryset().iterator(); 10 11 while(true) { 12 if (!var5.hasnext()) { 13 break; 14 } 15 16 entry<attributekey<?>, object> e = (entry)var5.next(); 17 attributekey<object> key = (attributekey)e.getkey(); 18 channel.attr(key).set(e.getvalue()); 19 } 20 } 21 22 channelpipeline p = channel.pipeline(); 23 final eventloopgroup currentchildgroup = this.childgroup; 24 final channelhandler currentchildhandler = this.childhandler; 25 map var9 = this.childoptions; 26 final entry[] currentchildoptions; 27 synchronized(this.childoptions) { 28 currentchildoptions = (entry[])this.childoptions.entryset().toarray(newoptionarray(0)); 29 } 30 31 var9 = this.childattrs; 32 final entry[] currentchildattrs; 33 synchronized(this.childattrs) { 34 currentchildattrs = (entry[])this.childattrs.entryset().toarray(newattrarray(0)); 35 } 36 37 p.addlast(new channelhandler[]{new channelinitializer<channel>() { 38 public void initchannel(final channel ch) throws exception { 39 final channelpipeline pipeline = ch.pipeline(); 40 channelhandler handler = serverbootstrap.this.config.handler(); 41 if (handler != null) { 42 pipeline.addlast(new channelhandler[]{handler}); 43 } 44 45 ch.eventloop().execute(new runnable() { 46 public void run() { 47 pipeline.addlast(new channelhandler[]{new serverbootstrap.serverbootstrapacceptor(ch, currentchildgroup, currentchildhandler, currentchildoptions, currentchildattrs)}); 48 } 49 }); 50 } 51 }}); 52 }
首先对attrs和options这两个成员进行了填充属性配置,这不是重点,然后获取刚才创建的nioserversocketchannel的责任链pipeline,通过addlast将channelinitializer加入责任链,在channelinitializer中重写了initchannel方法,首先根据handler是否是null(这个handler是serverbootstrap调用handler方法添加的,和childhandler方法不一样),若是handler不是null,将handler加入责任链,无论如何,都会异步将一个serverbootstrapacceptor对象加入责任链(后面会说为什么是异步)
这个channelinitializer的initchannel方法的执行需要等到后面注册时才会被调用,在后面pipeline处理channelregistered请求时,此initchannel方法才会被执行 (netty中的channelpipeline源码分析)
channelinitializer的channelregistered方法:
1 public final void channelregistered(channelhandlercontext ctx) throws exception { 2 if (initchannel(ctx)) { 3 ctx.pipeline().firechannelregistered(); 4 } else { 5 ctx.firechannelregistered(); 6 } 7 }
首先调用initchannel方法(和上面的initchannel不是一个):
1 private boolean initchannel(channelhandlercontext ctx) throws exception { 2 if (initmap.putifabsent(ctx, boolean.true) == null) { 3 try { 4 initchannel((c) ctx.channel()); 5 } catch (throwable cause) { 6 exceptioncaught(ctx, cause); 7 } finally { 8 remove(ctx); 9 } 10 return true; 11 } 12 return false; 13 }
可以看到,这个channelinitializer只会在pipeline中初始化一次,仅用于channel的注册,在完成注册后,会调用remove方法将其从pipeline中移除:
remove方法:
1 private void remove(channelhandlercontext ctx) { 2 try { 3 channelpipeline pipeline = ctx.pipeline(); 4 if (pipeline.context(this) != null) { 5 pipeline.remove(this); 6 } 7 } finally { 8 initmap.remove(ctx); 9 } 10 }
在移除前,就会回调用刚才覆盖的initchannel方法,异步向pipeline添加了serverbootstrapacceptor,用于后续的nioserversocketchannel侦听到客户端连接后,完成在服务端的niosocketchannel的注册。
回到initandregister,在对nioserversocketchannel初始化完毕,接下来就是注册逻辑:
1 channelfuture regfuture = this.config().group().register(channel);
首先调用config().group(),这个就得到了一开始在serverbootstrap的group方法传入的parentgroup,调用parentgroup的register方法,parentgroup是nioeventloopgroup,这个方法是在子类multithreadeventloopgroup中实现的:
1 public channelfuture register(channel channel) { 2 return this.next().register(channel); 3 }
首先调用next方法:
1 public eventloop next() { 2 return (eventloop)super.next(); 3 }
实际上调用父类multithreadeventexecutorgroup的next方法:
1 public eventexecutor next() { 2 return this.chooser.next(); 3 }
关于chooser在我之前博客:netty中nioeventloopgroup的创建源码分析 介绍过,在nioeventloopgroup创建时,默认会根据cpu个数创建二倍个nioeventloop,而chooser就负责通过取模,每次选择一个nioeventloop使用
所以在multithreadeventloopgroup的register方法实际调用了nioeventloop的register方法:
nioeventloop的register方法在子类singlethreadeventloop中实现:
1 public channelfuture register(channel channel) { 2 return this.register((channelpromise)(new defaultchannelpromise(channel, this))); 3 } 4 5 public channelfuture register(channelpromise promise) { 6 objectutil.checknotnull(promise, "promise"); 7 promise.channel().unsafe().register(this, promise); 8 return promise; 9 }
先把channel包装成channelpromise,默认是defaultchannelpromise (netty中的channelfuture和channelpromise),用于处理异步操作
调用重载方法,而在重载方法里,可以看到,实际上的register操作交给了channel的unsafe来实现:
unsafe的register方法在abstractunsafe中实现:
1 public final void register(eventloop eventloop, final channelpromise promise) { 2 if (eventloop == null) { 3 throw new nullpointerexception("eventloop"); 4 } else if (abstractchannel.this.isregistered()) { 5 promise.setfailure(new illegalstateexception("registered to an event loop already")); 6 } else if (!abstractchannel.this.iscompatible(eventloop)) { 7 promise.setfailure(new illegalstateexception("incompatible event loop type: " + eventloop.getclass().getname())); 8 } else { 9 abstractchannel.this.eventloop = eventloop; 10 if (eventloop.ineventloop()) { 11 this.register0(promise); 12 } else { 13 try { 14 eventloop.execute(new runnable() { 15 public void run() { 16 abstractunsafe.this.register0(promise); 17 } 18 }); 19 } catch (throwable var4) { 20 abstractchannel.logger.warn("force-closing a channel whose registration task was not accepted by an event loop: {}", abstractchannel.this, var4); 21 this.closeforcibly(); 22 abstractchannel.this.closefuture.setclosed(); 23 this.safesetfailure(promise, var4); 24 } 25 } 26 27 } 28 }
前面的判断做了一些检查就不细说了,直接看到else块
首先给当前channel绑定了eventloop,即通过刚才chooser选择的eventloop,该channel也就是nioserversocketchannel
由于unsafe的操作是在轮询线程中异步执行的,所里,这里需要判断ineventloop是否处于轮询中
在之前介绍nioeventloopgroup的时候说过,nioeventloop在没有调用dostartthread方法时并没有启动轮询的,所以ineventloop判断不成立
那么就调用eventloop的execute方法,实际上的注册方法可以看到调用了abstractunsafe的register0方法,而将这个方法封装为runnable交给eventloop作为一个task去异步执行
先来看eventloop的execute方法实现,是在nioeventloop的子类singlethreadeventexecutor中实现的:
1 public void execute(runnable task) { 2 if (task == null) { 3 throw new nullpointerexception("task"); 4 } else { 5 boolean ineventloop = this.ineventloop(); 6 this.addtask(task); 7 if (!ineventloop) { 8 this.startthread(); 9 if (this.isshutdown() && this.removetask(task)) { 10 reject(); 11 } 12 } 13 14 if (!this.addtaskwakesup && this.wakesupfortask(task)) { 15 this.wakeup(ineventloop); 16 } 17 18 } 19 }
这里首先将task,即刚才的注册事件放入阻塞任务队列中,然后调用startthread方法:
1 private void startthread() { 2 if (this.state == 1 && state_updater.compareandset(this, 1, 2)) { 3 try { 4 this.dostartthread(); 5 } catch (throwable var2) { 6 state_updater.set(this, 1); 7 platformdependent.throwexception(var2); 8 } 9 } 10 11 }
nioeventloop此时还没有轮询,所以状态是1,对应st_not_started,此时利用cas操作,将状态修改为2,即st_started ,标志着nioeventloop要启动轮询了,果然,接下来就调用了dostartthread开启轮询线程:
1 private void dostartthread() { 2 assert this.thread == null; 3 4 this.executor.execute(new runnable() { 5 public void run() { 6 singlethreadeventexecutor.this.thread = thread.currentthread(); 7 if (singlethreadeventexecutor.this.interrupted) { 8 singlethreadeventexecutor.this.thread.interrupt(); 9 } 10 11 boolean success = false; 12 singlethreadeventexecutor.this.updatelastexecutiontime(); 13 boolean var112 = false; 14 15 int oldstate; 16 label1907: { 17 try { 18 var112 = true; 19 singlethreadeventexecutor.this.run(); 20 success = true; 21 var112 = false; 22 break label1907; 23 } catch (throwable var119) { 24 singlethreadeventexecutor.logger.warn("unexpected exception from an event executor: ", var119); 25 var112 = false; 26 } finally { 27 if (var112) { 28 int oldstatex; 29 do { 30 oldstatex = singlethreadeventexecutor.this.state; 31 } while(oldstatex < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstatex, 3)); 32 33 if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l && singlethreadeventexecutor.logger.iserrorenabled()) { 34 singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates."); 35 } 36 37 try { 38 while(!singlethreadeventexecutor.this.confirmshutdown()) { 39 ; 40 } 41 } finally { 42 try { 43 singlethreadeventexecutor.this.cleanup(); 44 } finally { 45 singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5); 46 singlethreadeventexecutor.this.threadlock.release(); 47 if (!singlethreadeventexecutor.this.taskqueue.isempty() && singlethreadeventexecutor.logger.iswarnenabled()) { 48 singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')'); 49 } 50 51 singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null); 52 } 53 } 54 55 } 56 } 57 58 do { 59 oldstate = singlethreadeventexecutor.this.state; 60 } while(oldstate < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstate, 3)); 61 62 if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l && singlethreadeventexecutor.logger.iserrorenabled()) { 63 singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates."); 64 } 65 66 try { 67 while(!singlethreadeventexecutor.this.confirmshutdown()) { 68 ; 69 } 70 71 return; 72 } finally { 73 try { 74 singlethreadeventexecutor.this.cleanup(); 75 } finally { 76 singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5); 77 singlethreadeventexecutor.this.threadlock.release(); 78 if (!singlethreadeventexecutor.this.taskqueue.isempty() && singlethreadeventexecutor.logger.iswarnenabled()) { 79 singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')'); 80 } 81 82 singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null); 83 } 84 } 85 } 86 87 do { 88 oldstate = singlethreadeventexecutor.this.state; 89 } while(oldstate < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstate, 3)); 90 91 if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l && singlethreadeventexecutor.logger.iserrorenabled()) { 92 singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates."); 93 } 94 95 try { 96 while(!singlethreadeventexecutor.this.confirmshutdown()) { 97 ; 98 } 99 } finally { 100 try { 101 singlethreadeventexecutor.this.cleanup(); 102 } finally { 103 singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5); 104 singlethreadeventexecutor.this.threadlock.release(); 105 if (!singlethreadeventexecutor.this.taskqueue.isempty() && singlethreadeventexecutor.logger.iswarnenabled()) { 106 singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')'); 107 } 108 109 singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null); 110 } 111 } 112 113 } 114 }); 115 }
关于dostartthread方法,我在 netty中nioeventloopgroup的创建源码分析 中已经说的很细了,这里就不再一步一步分析了
因为此时还没真正意义上的启动轮询,所以thread等于null成立的,然后调用executor的execute方法,这里的executor是一个线程池,在之前说过的,所以里面的run方法是处于一个线程里面的,然后给thread成员赋值为当前线程,表明正式进入了轮询。
而singlethreadeventexecutor.this.run()才是真正的轮询逻辑,这在之前也说过,这个run的实现是在父类nioeventloop中:
1 protected void run() { 2 while(true) { 3 while(true) { 4 try { 5 switch(this.selectstrategy.calculatestrategy(this.selectnowsupplier, this.hastasks())) { 6 case -2: 7 continue; 8 case -1: 9 this.select(this.wakenup.getandset(false)); 10 if (this.wakenup.get()) { 11 this.selector.wakeup(); 12 } 13 default: 14 this.cancelledkeys = 0; 15 this.needstoselectagain = false; 16 int ioratio = this.ioratio; 17 if (ioratio == 100) { 18 try { 19 this.processselectedkeys(); 20 } finally { 21 this.runalltasks(); 22 } 23 } else { 24 long iostarttime = system.nanotime(); 25 boolean var13 = false; 26 27 try { 28 var13 = true; 29 this.processselectedkeys(); 30 var13 = false; 31 } finally { 32 if (var13) { 33 long iotime = system.nanotime() - iostarttime; 34 this.runalltasks(iotime * (long)(100 - ioratio) / (long)ioratio); 35 } 36 } 37 38 long iotime = system.nanotime() - iostarttime; 39 this.runalltasks(iotime * (long)(100 - ioratio) / (long)ioratio); 40 } 41 } 42 } catch (throwable var21) { 43 handleloopexception(var21); 44 } 45 46 try { 47 if (this.isshuttingdown()) { 48 this.closeall(); 49 if (this.confirmshutdown()) { 50 return; 51 } 52 } 53 } catch (throwable var18) { 54 handleloopexception(var18); 55 } 56 } 57 } 58 }
首先由于task已经有一个了,就是刚才的注册事件,所以选择策略calculatestrategy最终调用selectnow(也是之前说过的):
1 private final intsupplier selectnowsupplier = new intsupplier() { 2 public int get() throws exception { 3 return nioeventloop.this.selectnow(); 4 } 5 }; 6 7 int selectnow() throws ioexception { 8 int var1; 9 try { 10 var1 = this.selector.selectnow(); 11 } finally { 12 if (this.wakenup.get()) { 13 this.selector.wakeup(); 14 } 15 16 } 17 18 return var1; 19 }
使用jdk原生selector进行selectnow,由于此时没有任何channel的注册,所以selectnow会立刻返回0,此时就进入default逻辑,由于没有任何注册,processselectedkeys方法也做不了什么,所以在这一次的轮询实质上只进行了runalltasks方法,此方法会执行阻塞队列中的task的run方法(还是在之前博客中介绍过),由于轮询是在线程池中的一个线程中运行的,所以task的执行是一个异步操作。(在执行完task,将task移除阻塞对立,线程继续轮询)
这时就可以回到abstractchannel的register方法中了,由上面可以知道task实际上异步执行了:
1 abstractunsafe.this.register0(promise);
register0方法:
1 private void register0(channelpromise promise) { 2 try { 3 if (!promise.setuncancellable() || !this.ensureopen(promise)) { 4 return; 5 } 6 7 boolean firstregistration = this.neverregistered; 8 abstractchannel.this.doregister(); 9 this.neverregistered = false; 10 abstractchannel.this.registered = true; 11 abstractchannel.this.pipeline.invokehandleraddedifneeded(); 12 this.safesetsuccess(promise); 13 abstractchannel.this.pipeline.firechannelregistered(); 14 if (abstractchannel.this.isactive()) { 15 if (firstregistration) { 16 abstractchannel.this.pipeline.firechannelactive(); 17 } else if (abstractchannel.this.config().isautoread()) { 18 this.beginread(); 19 } 20 } 21 } catch (throwable var3) { 22 this.closeforcibly(); 23 abstractchannel.this.closefuture.setclosed(); 24 this.safesetfailure(promise, var3); 25 } 26 27 }
可以看到实际上的注册逻辑又交给了abstractchannel的doregister,而这个方法在abstractniochannel中实现:
1 protected void doregister() throws exception { 2 boolean selected = false; 3 4 while(true) { 5 try { 6 this.selectionkey = this.javachannel().register(this.eventloop().unwrappedselector(), 0, this); 7 return; 8 } catch (cancelledkeyexception var3) { 9 if (selected) { 10 throw var3; 11 } 12 13 this.eventloop().selectnow(); 14 selected = true; 15 } 16 } 17 }
javachannel就是之前产生的jdk的serversocketchannel,unwrappedselector在之前说过,就是未经修改的jdk原生selector,这个selector和eventloop是一对一绑定的,可以看到调用jdk原生的注册方法,完成了对serversocketchannel的注册,但是注册的是一个0状态(缺省值),而传入的this,即abstractniochannel对象作为了一个附件,用于以后processselectedkeys方法从selectionkey中得到对应的netty的channel(还是之前博客说过)
关于缺省值,是由于abstractniochannel不仅用于nioserversocketchannel的注册,还用于niosocketchannel的注册,只有都使用缺省值注册才不会产生异常 【java】nio中channel的注册源码分析 ,并且,在以后processselectedkeys方法会对0状态判断,再使用unsafe进行相应的逻辑处理。
在完成jdk的注册后,调用pipeline的invokehandleraddedifneeded方法(netty中的channelpipeline源码分析),处理channelhandler的handleradded的回调,即调用用户添加的channelhandler的handleradded方法。
调用safesetsuccess,标志异步操作完成:
1 protected final void safesetsuccess(channelpromise promise) { 2 if (!(promise instanceof voidchannelpromise) && !promise.trysuccess()) { 3 logger.warn("failed to mark a promise as success because it is done already: {}", promise); 4 } 5 }
关于异步操作我在之前的博客中说的很清楚了:netty中的channelfuture和channelpromise
接着调用pipeline的firechannelregistered方法,也就是在责任链上调用channelregistered方法,这时,就会调用之在serverbootstrap中向pipeline添加的channelinitializer的channelregistered,进而回调initchannel方法,完成对serverbootstrapacceptor的添加。
回到register0方法,在处理完pipeline的责任链后,根据当前abstractchannel即nioserversocketchannel的isactive:
1 public boolean isactive() { 2 return this.javachannel().socket().isbound(); 3 }
获得nioserversocketchannel绑定的jdk的serversocketchannel,进而获取serversocket,判断isbound:
1 public boolean isbound() { 2 // before 1.3 serversockets were always bound during creation 3 return bound || oldimpl; 4 }
这里实际上就是判断serversocket是否调用了bind方法,前面说过register0方法是一个异步操作,在多线程环境下不能保证执行顺序,若是此时已经完成了serversocket的bind,根据firstregistration判断是否需要pipeline传递channelactive请求,首先会执行pipeline的head即headcontext的channelactive方法:
1 @override 2 public void channelactive(channelhandlercontext ctx) throws exception { 3 ctx.firechannelactive(); 4 5 readifisautoread(); 6 }
在headcontext通过channelhandlercontext 传递完channelactive请求后,会调用readifisautoread方法:
1 private void readifisautoread() { 2 if (channel.config().isautoread()) { 3 channel.read(); 4 } 5 }
此时调用abstractchannel的read方法:
1 public channel read() { 2 pipeline.read(); 3 return this; 4 }
最终在请求链由headcontext执行read方法:
1 public void read(channelhandlercontext ctx) { 2 unsafe.beginread(); 3 }
终于可以看到此时调用unsafe的beginread方法:
1 public final void beginread() { 2 asserteventloop(); 3 4 if (!isactive()) { 5 return; 6 } 7 8 try { 9 dobeginread(); 10 } catch (final exception e) { 11 invokelater(new runnable() { 12 @override 13 public void run() { 14 pipeline.fireexceptioncaught(e); 15 } 16 }); 17 close(voidpromise()); 18 } 19 }
最终执行了dobeginread方法,由abstractniochannel实现:
1 protected void dobeginread() throws exception { 2 final selectionkey selectionkey = this.selectionkey; 3 if (!selectionkey.isvalid()) { 4 return; 5 } 6 7 readpending = true; 8 9 final int interestops = selectionkey.interestops(); 10 if ((interestops & readinterestop) == 0) { 11 selectionkey.interestops(interestops | readinterestop); 12 } 13 }
这里,就完成了向selector注册readinterestop事件,从前面来看就是accept事件
回到abstractbootstrap的dobind方法,在initandregister逻辑结束后,由上面可以知道,实际上的register注册逻辑是一个异步操作,在register0中完成
根据channelfuture来判断异步操作是否完成,如果isdone,则表明异步操作先完成,即完成了safesetsuccess方法,
然后调用newpromise方法:
1 public channelpromise newpromise() { 2 return pipeline.newpromise(); 3 }
给channel的pipeline绑定异步操作channelpromise
然后调用dobind0方法完成serversocket的绑定,若是register0这个异步操作还没完成,就需要给channelfuture产生一个异步操作的侦听channelfuturelistener对象,等到register0方法调用safesetsuccess时,在promise的trysuccess中会回调channelfuturelistener的operationcomplete方法,进而调用dobind0方法
dobind0方法:
1 private static void dobind0( 2 final channelfuture regfuture, final channel channel, 3 final socketaddress localaddress, final channelpromise promise) { 4 channel.eventloop().execute(new runnable() { 5 @override 6 public void run() { 7 if (regfuture.issuccess()) { 8 channel.bind(localaddress, promise).addlistener(channelfuturelistener.close_on_failure); 9 } else { 10 promise.setfailure(regfuture.cause()); 11 } 12 } 13 }); 14 }
向轮询线程提交了一个任务,异步处理bind,可以看到,只有在regfuture异步操作成功结束后,调用channel的bind方法:
1 public channelfuture bind(socketaddress localaddress, channelpromise promise) { 2 return pipeline.bind(localaddress, promise); 3 }
实际上的bind又交给pipeline,去完成,pipeline中就会交给责任链去完成,最终会交给headcontext完成:
1 public void bind( 2 channelhandlercontext ctx, socketaddress localaddress, channelpromise promise) 3 throws exception { 4 unsafe.bind(localaddress, promise); 5 }
可以看到,绕了一大圈,交给了unsafe完成:
1 public final void bind(final socketaddress localaddress, final channelpromise promise) { 2 asserteventloop(); 3 4 if (!promise.setuncancellable() || !ensureopen(promise)) { 5 return; 6 } 7 8 if (boolean.true.equals(config().getoption(channeloption.so_broadcast)) && 9 localaddress instanceof inetsocketaddress && 10 !((inetsocketaddress) localaddress).getaddress().isanylocaladdress() && 11 !platformdependent.iswindows() && !platformdependent.maybesuperuser()) { 12 logger.warn( 13 "a non-root user can't receive a broadcast packet if the socket " + 14 "is not bound to a wildcard address; binding to a non-wildcard " + 15 "address (" + localaddress + ") anyway as requested."); 16 } 17 18 boolean wasactive = isactive(); 19 try { 20 dobind(localaddress); 21 } catch (throwable t) { 22 safesetfailure(promise, t); 23 closeifclosed(); 24 return; 25 } 26 27 if (!wasactive && isactive()) { 28 invokelater(new runnable() { 29 @override 30 public void run() { 31 pipeline.firechannelactive(); 32 } 33 }); 34 } 35 36 safesetsuccess(promise); 37 }
然而,真正的bind还是回调了dobind方法,最终是由nioserversocketchannel来实现:
1 @override 2 protected void dobind(socketaddress localaddress) throws exception { 3 if (platformdependent.javaversion() >= 7) { 4 javachannel().bind(localaddress, config.getbacklog()); 5 } else { 6 javachannel().socket().bind(localaddress, config.getbacklog()); 7 } 8 }
在这里终于完成了对jdk的serversocketchannel的bind操作
在上面的
1 if (!wasactive && isactive()) { 2 invokelater(new runnable() { 3 @override 4 public void run() { 5 pipeline.firechannelactive(); 6 } 7 }); 8 }
这个判断,就是确保在register0中isactive时,还没完成绑定,也就没有beginread操作来向selector注册accept事件,那么就在这里进行注册,进而让serversocket去侦听客户端的连接
在服务端accept到客户端的连接后,在nioeventloop轮询中,就会调用processselectedkeys处理accept的事件就绪,然后交给unsafe的read去处理 netty中nioeventloopgroup的创建源码分析
在服务端,由niomessageunsafe实现:
1 public void read() { 2 assert eventloop().ineventloop(); 3 final channelconfig config = config(); 4 final channelpipeline pipeline = pipeline(); 5 final recvbytebufallocator.handle allochandle = unsafe().recvbufallochandle(); 6 allochandle.reset(config); 7 8 boolean closed = false; 9 throwable exception = null; 10 try { 11 try { 12 do { 13 int localread = doreadmessages(readbuf); 14 if (localread == 0) { 15 break; 16 } 17 if (localread < 0) { 18 closed = true; 19 break; 20 } 21 22 allochandle.incmessagesread(localread); 23 } while (allochandle.continuereading()); 24 } catch (throwable t) { 25 exception = t; 26 } 27 28 int size = readbuf.size(); 29 for (int i = 0; i < size; i ++) { 30 readpending = false; 31 pipeline.firechannelread(readbuf.get(i)); 32 } 33 readbuf.clear(); 34 allochandle.readcomplete(); 35 pipeline.firechannelreadcomplete(); 36 37 if (exception != null) { 38 closed = closeonreaderror(exception); 39 40 pipeline.fireexceptioncaught(exception); 41 } 42 43 if (closed) { 44 inputshutdown = true; 45 if (isopen()) { 46 close(voidpromise()); 47 } 48 } 49 } finally { 50 if (!readpending && !config.isautoread()) { 51 removereadop(); 52 } 53 } 54 } 55 }
核心在doreadmessages方法,由nioserversocketchannel实现:
1 protected int doreadmessages(list<object> buf) throws exception { 2 socketchannel ch = socketutils.accept(javachannel()); 3 4 try { 5 if (ch != null) { 6 buf.add(new niosocketchannel(this, ch)); 7 return 1; 8 } 9 } catch (throwable t) { 10 logger.warn("failed to create a new channel from an accepted socket.", t); 11 12 try { 13 ch.close(); 14 } catch (throwable t2) { 15 logger.warn("failed to close a socket.", t2); 16 } 17 } 18 19 return 0; 20 }
socketutils的accept方法其实就是用来调用jdk中serversocketchannel原生的accept方法,来得到一个jdk的socketchannel对象,然后通过这个socketchannel对象,将其包装成niosocketchannel对象添加在buf这个list中
由此可以看到doreadmessages用来侦听所有就绪的连接,包装成niosocketchannel将其放在list中
然后遍历这个list,调用 nioserversocketchannel的pipeline的firechannelread方法,传递channelread请求,、
在前面向pipeline中添加了serverbootstrapacceptor这个channelhandler,此时,它也会响应这个请求,回调channelread方法:
1 public void channelread(channelhandlercontext ctx, object msg) { 2 final channel child = (channel) msg; 3 4 child.pipeline().addlast(childhandler); 5 6 setchanneloptions(child, childoptions, logger); 7 8 for (entry<attributekey<?>, object> e: childattrs) { 9 child.attr((attributekey<object>) e.getkey()).set(e.getvalue()); 10 } 11 12 try { 13 childgroup.register(child).addlistener(new channelfuturelistener() { 14 @override 15 public void operationcomplete(channelfuture future) throws exception { 16 if (!future.issuccess()) { 17 forceclose(child, future.cause()); 18 } 19 } 20 }); 21 } catch (throwable t) { 22 forceclose(child, t); 23 } 24 }
msg就是侦听到的niosocketchannel对象,给该对象的pipeline添加childhandler,也就是我们在serverbootstrap中通过childhandler方法添加的
然后通过register方法完成对niosocketchannel的注册(和nioserversocketchannel注册逻辑一样)
至此netty服务端的启动结束。