欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty服务端的启动源码分析

程序员文章站 2022-04-14 18:48:25
ServerBootstrap的构造: 隐式地执行了父类的无参构造: 只是初始化了几个容器成员 在ServerBootstrap创建后,需要调用group方法,绑定EventLoopGroup,有关EventLoopGroup的创建在我之前博客中写过:Netty中NioEventLoopGroup的 ......

serverbootstrap的构造:

 1 public class serverbootstrap extends abstractbootstrap<serverbootstrap, serverchannel> {
 2     private static final internallogger logger = internalloggerfactory.getinstance(serverbootstrap.class);
 3     private final map<channeloption<?>, object> childoptions = new linkedhashmap();
 4     private final map<attributekey<?>, object> childattrs = new linkedhashmap();
 5     private final serverbootstrapconfig config = new serverbootstrapconfig(this);
 6     private volatile eventloopgroup childgroup;
 7     private volatile channelhandler childhandler;
 8 
 9     public serverbootstrap() {
10     }
11     ......
12 }

隐式地执行了父类的无参构造:

 1 public abstract class abstractbootstrap<b extends abstractbootstrap<b, c>, c extends channel> implements cloneable {
 2     volatile eventloopgroup group;
 3     private volatile channelfactory<? extends c> channelfactory;
 4     private volatile socketaddress localaddress;
 5     private final map<channeloption<?>, object> options = new linkedhashmap();
 6     private final map<attributekey<?>, object> attrs = new linkedhashmap();
 7     private volatile channelhandler handler;
 8 
 9     abstractbootstrap() {
10     }
11     ......
12 }

只是初始化了几个容器成员

在serverbootstrap创建后,需要调用group方法,绑定eventloopgroup,有关eventloopgroup的创建在我之前博客中写过:netty中nioeventloopgroup的创建源码分析


serverbootstrap的group方法:

 1 public serverbootstrap group(eventloopgroup group) {
 2     return this.group(group, group);
 3 }
 4 
 5 public serverbootstrap group(eventloopgroup parentgroup, eventloopgroup childgroup) {
 6     super.group(parentgroup);
 7     if (childgroup == null) {
 8         throw new nullpointerexception("childgroup");
 9     } else if (this.childgroup != null) {
10         throw new illegalstateexception("childgroup set already");
11     } else {
12         this.childgroup = childgroup;
13         return this;
14     }
15 }

首先调用父类的group方法绑定parentgroup:

 1 public b group(eventloopgroup group) {
 2     if (group == null) {
 3         throw new nullpointerexception("group");
 4     } else if (this.group != null) {
 5         throw new illegalstateexception("group set already");
 6     } else {
 7         this.group = group;
 8         return this.self();
 9     }
10 }
11 
12 private b self() {
13     return this;
14 }

将传入的parentgroup绑定给abstractbootstrap的group成员,将childgroup绑定给serverbootstrap的childgroup成员。
group的绑定仅仅是交给了成员保存。

再来看看serverbootstrap的channel方法,,是在abstractbootstrap中实现的:

1 public b channel(class<? extends c> channelclass) {
2     if (channelclass == null) {
3         throw new nullpointerexception("channelclass");
4     } else {
5         return this.channelfactory((io.netty.channel.channelfactory)(new reflectivechannelfactory(channelclass)));
6     }
7 }

使用channelclass构建了一个reflectivechannelfactory对象:

 1 public class reflectivechannelfactory<t extends channel> implements channelfactory<t> {
 2     private final class<? extends t> clazz;
 3 
 4     public reflectivechannelfactory(class<? extends t> clazz) {
 5         if (clazz == null) {
 6             throw new nullpointerexception("clazz");
 7         } else {
 8             this.clazz = clazz;
 9         }
10     }
11 
12     public t newchannel() {
13         try {
14             return (channel)this.clazz.getconstructor().newinstance();
15         } catch (throwable var2) {
16             throw new channelexception("unable to create channel from class " + this.clazz, var2);
17         }
18     }
19 
20     public string tostring() {
21         return stringutil.simpleclassname(this.clazz) + ".class";
22     }
23 }

可以看到reflectivechannelfactory的作用就是通过反射机制,产生clazz的实例(这里以nioserversocketchannel为例)。

在创建完reflectivechannelfactory对象后, 调用channelfactory方法:

 1 public b channelfactory(io.netty.channel.channelfactory<? extends c> channelfactory) {
 2     return this.channelfactory((channelfactory)channelfactory);
 3 }
 4 
 5 public b channelfactory(channelfactory<? extends c> channelfactory) {
 6     if (channelfactory == null) {
 7         throw new nullpointerexception("channelfactory");
 8     } else if (this.channelfactory != null) {
 9         throw new illegalstateexception("channelfactory set already");
10     } else {
11         this.channelfactory = channelfactory;
12         return this.self();
13     }
14 }

将刚才创建的reflectivechannelfactory对象交给channelfactory成员,用于后续服务端nioserversocketchannel的创建。

再来看serverbootstrap的childhandler方法:

1 public serverbootstrap childhandler(channelhandler childhandler) {
2     if (childhandler == null) {
3         throw new nullpointerexception("childhandler");
4     } else {
5         this.childhandler = childhandler;
6         return this;
7     }
8 }

还是交给了childhandler成员保存,可以看到上述这一系列的操作,都是为了填充serverbootstrap,而serverbootstrap真正的启动是在bind时:
serverbootstrap的bind方法,在abstractbootstrap中实现:

 1 public channelfuture bind(int inetport) {
 2     return this.bind(new inetsocketaddress(inetport));
 3 }
 4 
 5 public channelfuture bind(string inethost, int inetport) {
 6 return this.bind(socketutils.socketaddress(inethost, inetport));
 7 }
 8 
 9 public channelfuture bind(inetaddress inethost, int inetport) {
10     return this.bind(new inetsocketaddress(inethost, inetport));
11 }
12 
13 public channelfuture bind(socketaddress localaddress) {
14     this.validate();
15     if (localaddress == null) {
16         throw new nullpointerexception("localaddress");
17     } else {
18         return this.dobind(localaddress);
19     }
20 }

可以看到首先调用了serverbootstrap的validate方法,:

 1 public serverbootstrap validate() {
 2     super.validate();
 3     if (this.childhandler == null) {
 4         throw new illegalstateexception("childhandler not set");
 5     } else {
 6         if (this.childgroup == null) {
 7             logger.warn("childgroup is not set. using parentgroup instead.");
 8             this.childgroup = this.config.group();
 9         }
10     
11         return this;
12     }
13 }

先调用了abstractbootstrap的validate方法:

1 public b validate() {
2     if (this.group == null) {
3         throw new illegalstateexception("group not set");
4     } else if (this.channelfactory == null) {
5         throw new illegalstateexception("channel or channelfactory not set");
6     } else {
7         return this.self();
8     }
9 }


这个方法就是用来检查是否绑定了group和channel以及childhandler,所以在执行bind方法前,无论如何都要执行group、channel和childhandler方法。

实际的bind交给了dobind来完成:

 1 private channelfuture dobind(final socketaddress localaddress) {
 2     final channelfuture regfuture = this.initandregister();
 3     final channel channel = regfuture.channel();
 4     if (regfuture.cause() != null) {
 5         return regfuture;
 6     } else if (regfuture.isdone()) {
 7         channelpromise promise = channel.newpromise();
 8         dobind0(regfuture, channel, localaddress, promise);
 9         return promise;
10     } else {
11         final abstractbootstrap.pendingregistrationpromise promise = new abstractbootstrap.pendingregistrationpromise(channel);
12         regfuture.addlistener(new channelfuturelistener() {
13             public void operationcomplete(channelfuture future) throws exception {
14                 throwable cause = future.cause();
15                 if (cause != null) {
16                     promise.setfailure(cause);
17                 } else {
18                     promise.registered();
19                     abstractbootstrap.dobind0(regfuture, channel, localaddress, promise);
20                 }
21             }
22         });
23         return promise;
24     }
25 }

首先调用initandregister,完成serversocketchannel的创建以及注册:

 1 final channelfuture initandregister() {
 2     channel channel = null;
 3 
 4     try {
 5         channel = this.channelfactory.newchannel();
 6         this.init(channel);
 7     } catch (throwable var3) {
 8         if (channel != null) {
 9             channel.unsafe().closeforcibly();
10             return (new defaultchannelpromise(channel, globaleventexecutor.instance)).setfailure(var3);
11         }
12 
13         return (new defaultchannelpromise(new failedchannel(), globaleventexecutor.instance)).setfailure(var3);
14     }
15 
16     channelfuture regfuture = this.config().group().register(channel);
17     if (regfuture.cause() != null) {
18         if (channel.isregistered()) {
19             channel.close();
20         } else {
21             channel.unsafe().closeforcibly();
22         }
23     }
24 
25     return regfuture;
26 }

首先调用channelfactory的newchannel通过反射机制构建channel实例,也就是nioserversocketchannel,


nioserversocketchannel的无参构造:

1 public class nioserversocketchannel extends abstractniomessagechannel implements serversocketchannel {
2     private static final selectorprovider default_selector_provider = selectorprovider.provider();
3     
4     public nioserversocketchannel() {
5         this(newsocket(default_selector_provider));
6     }
7     ......
8 }

selectorprovider 是jdk的,关于selectorprovider在我之前的博客中有介绍:【java】nio中selector的创建源码分析

在windows系统下默认产生windowsselectorprovider,即default_selector_provider,再来看看newsocket方法:

1 private static java.nio.channels.serversocketchannel newsocket(selectorprovider provider) {
2     try {
3         return provider.openserversocketchannel();
4     } catch (ioexception var2) {
5         throw new channelexception("failed to open a server socket.", var2);
6     }
7 }

使用windowsselectorprovider创建了一个serversocketchannelimpl,其实看到这里就明白了,nioserversocketchannel是为了封装jdk的serversocketchannel

接着调用另一个重载的构造:

1 public nioserversocketchannel(java.nio.channels.serversocketchannel channel) {
2     super((channel)null, channel, 16);
3     this.config = new nioserversocketchannel.nioserversocketchannelconfig(this, this.javachannel().socket());
4 }

首先调用父类的三参构造,其中16对应的是jdk中selectionkey的accept状态:

1 public static final int op_accept = 1 << 4;

其父类的构造处于一条继承链上:

abstractniomessagechannel:

1 protected abstractniomessagechannel(channel parent, selectablechannel ch, int readinterestop) {
2     super(parent, ch, readinterestop);
3 }

abstractniochannel:

 1 protected abstractniochannel(channel parent, selectablechannel ch, int readinterestop) {
 2     super(parent);
 3     this.ch = ch;
 4     this.readinterestop = readinterestop;
 5 
 6     try {
 7         ch.configureblocking(false);
 8     } catch (ioexception var7) {
 9         try {
10             ch.close();
11         } catch (ioexception var6) {
12             if (logger.iswarnenabled()) {
13                 logger.warn("failed to close a partially initialized socket.", var6);
14             }
15         }
16 
17         throw new channelexception("failed to enter non-blocking mode.", var7);
18     }
19 }

abstractchannel:

 1 private final channelid id;
 2 private final channel parent;
 3 private final unsafe unsafe;
 4 private final defaultchannelpipeline pipeline;
 5 
 6 protected abstractchannel(channel parent) {
 7     this.parent = parent;
 8     this.id = this.newid();
 9     this.unsafe = this.newunsafe();
10     this.pipeline = this.newchannelpipeline();
11 }

在abstractchannel中使用newunsafe和newchannelpipeline分别创建了一个unsafe和一个defaultchannelpipeline对象,
在前面的博客介绍nioeventloopgroup时候,在nioeventloop的run方法中,每次轮询完调用processselectedkeys方法时,都是通过这个unsafe根据selectedkey来完成数据的读或写,unsafe是处理基础的数据读写
(unsafe在nioserversocketchannel创建时,产生niomessageunsafe实例,在niosocketchannel创建时产生niosocketchannelunsafe实例)

而pipeline的实现是一条双向责任链,负责处理unsafe提供的数据,进而进行用户的业务逻辑 (netty中的channelpipeline源码分析

在abstractniochannel中调用configureblocking方法给jdk的serversocketchannel设置为非阻塞模式,且让readinterestop成员赋值为16用于未来注册accept事件。

在调用完继承链后回到nioserversocketchannel构造,调用了javachannel方法:

1 protected java.nio.channels.serversocketchannel javachannel() {
2     return (java.nio.channels.serversocketchannel)super.javachannel();
3 }

其实这个javachannel就是刚才出传入到abstractniochannel中的ch成员:

1 protected selectablechannel javachannel() {
2     return this.ch;
3 }

也就是刚才创建的jdk的serversocketchannelimpl,用其socket方法,得到一个serversocket对象,然后产生了一个nioserversocketchannelconfig对象,用于封装相关信息。

 

nioserversocketchannel构建完毕,回到initandregister方法,使用刚创建的nioserversocketchannel调用init方法,这个方法是在serverbootstrap中实现的:

 1 void init(channel channel) throws exception {
 2     map<channeloption<?>, object> options = this.options0();
 3     synchronized(options) {
 4         setchanneloptions(channel, options, logger);
 5     }
 6 
 7     map<attributekey<?>, object> attrs = this.attrs0();
 8     synchronized(attrs) {
 9         iterator var5 = attrs.entryset().iterator();
10 
11         while(true) {
12             if (!var5.hasnext()) {
13                 break;
14             }
15 
16             entry<attributekey<?>, object> e = (entry)var5.next();
17             attributekey<object> key = (attributekey)e.getkey();
18             channel.attr(key).set(e.getvalue());
19         }
20     }
21 
22     channelpipeline p = channel.pipeline();
23     final eventloopgroup currentchildgroup = this.childgroup;
24     final channelhandler currentchildhandler = this.childhandler;
25     map var9 = this.childoptions;
26     final entry[] currentchildoptions;
27     synchronized(this.childoptions) {
28         currentchildoptions = (entry[])this.childoptions.entryset().toarray(newoptionarray(0));
29     }
30 
31     var9 = this.childattrs;
32     final entry[] currentchildattrs;
33     synchronized(this.childattrs) {
34         currentchildattrs = (entry[])this.childattrs.entryset().toarray(newattrarray(0));
35     }
36 
37     p.addlast(new channelhandler[]{new channelinitializer<channel>() {
38         public void initchannel(final channel ch) throws exception {
39             final channelpipeline pipeline = ch.pipeline();
40             channelhandler handler = serverbootstrap.this.config.handler();
41             if (handler != null) {
42                 pipeline.addlast(new channelhandler[]{handler});
43             }
44 
45             ch.eventloop().execute(new runnable() {
46                 public void run() {
47                     pipeline.addlast(new channelhandler[]{new serverbootstrap.serverbootstrapacceptor(ch, currentchildgroup, currentchildhandler, currentchildoptions, currentchildattrs)});
48                 }
49             });
50         }
51     }});
52 }

首先对attrs和options这两个成员进行了填充属性配置,这不是重点,然后获取刚才创建的nioserversocketchannel的责任链pipeline,通过addlast将channelinitializer加入责任链,在channelinitializer中重写了initchannel方法,首先根据handler是否是null(这个handler是serverbootstrap调用handler方法添加的,和childhandler方法不一样),若是handler不是null,将handler加入责任链,无论如何,都会异步将一个serverbootstrapacceptor对象加入责任链(后面会说为什么是异步)

 

这个channelinitializer的initchannel方法的执行需要等到后面注册时才会被调用,在后面pipeline处理channelregistered请求时,此initchannel方法才会被执行 (netty中的channelpipeline源码分析

channelinitializer的channelregistered方法:

1 public final void channelregistered(channelhandlercontext ctx) throws exception {
2     if (initchannel(ctx)) {
3         ctx.pipeline().firechannelregistered();
4     } else {
5         ctx.firechannelregistered();
6     }
7 }

首先调用initchannel方法(和上面的initchannel不是一个):

 1 private boolean initchannel(channelhandlercontext ctx) throws exception {
 2     if (initmap.putifabsent(ctx, boolean.true) == null) { 
 3         try {
 4             initchannel((c) ctx.channel());
 5         } catch (throwable cause) {
 6             exceptioncaught(ctx, cause);
 7         } finally {
 8             remove(ctx);
 9         }
10         return true;
11     }
12     return false;
13 }

可以看到,这个channelinitializer只会在pipeline中初始化一次,仅用于channel的注册,在完成注册后,会调用remove方法将其从pipeline中移除:
remove方法:

 1 private void remove(channelhandlercontext ctx) {
 2     try {
 3         channelpipeline pipeline = ctx.pipeline();
 4         if (pipeline.context(this) != null) {
 5             pipeline.remove(this);
 6         }
 7     } finally {
 8         initmap.remove(ctx);
 9     }
10 }

在移除前,就会回调用刚才覆盖的initchannel方法,异步向pipeline添加了serverbootstrapacceptor,用于后续的nioserversocketchannel侦听到客户端连接后,完成在服务端的niosocketchannel的注册。

回到initandregister,在对nioserversocketchannel初始化完毕,接下来就是注册逻辑:

1 channelfuture regfuture = this.config().group().register(channel);

首先调用config().group(),这个就得到了一开始在serverbootstrap的group方法传入的parentgroup,调用parentgroup的register方法,parentgroup是nioeventloopgroup,这个方法是在子类multithreadeventloopgroup中实现的:

1 public channelfuture register(channel channel) {
2     return this.next().register(channel);
3 }

首先调用next方法:

1 public eventloop next() {
2     return (eventloop)super.next();
3 }

实际上调用父类multithreadeventexecutorgroup的next方法:

1 public eventexecutor next() {
2     return this.chooser.next();
3 }

关于chooser在我之前博客:netty中nioeventloopgroup的创建源码分析 介绍过,在nioeventloopgroup创建时,默认会根据cpu个数创建二倍个nioeventloop,而chooser就负责通过取模,每次选择一个nioeventloop使用

所以在multithreadeventloopgroup的register方法实际调用了nioeventloop的register方法:

nioeventloop的register方法在子类singlethreadeventloop中实现:

1 public channelfuture register(channel channel) {
2     return this.register((channelpromise)(new defaultchannelpromise(channel, this)));
3 }
4 
5 public channelfuture register(channelpromise promise) {
6    objectutil.checknotnull(promise, "promise");
7     promise.channel().unsafe().register(this, promise);
8     return promise;
9 }

先把channel包装成channelpromise,默认是defaultchannelpromise (netty中的channelfuture和channelpromise),用于处理异步操作

调用重载方法,而在重载方法里,可以看到,实际上的register操作交给了channel的unsafe来实现:

unsafe的register方法在abstractunsafe中实现:

 1 public final void register(eventloop eventloop, final channelpromise promise) {
 2     if (eventloop == null) {
 3         throw new nullpointerexception("eventloop");
 4     } else if (abstractchannel.this.isregistered()) {
 5         promise.setfailure(new illegalstateexception("registered to an event loop already"));
 6     } else if (!abstractchannel.this.iscompatible(eventloop)) {
 7         promise.setfailure(new illegalstateexception("incompatible event loop type: " + eventloop.getclass().getname()));
 8     } else {
 9         abstractchannel.this.eventloop = eventloop;
10         if (eventloop.ineventloop()) {
11             this.register0(promise);
12         } else {
13             try {
14                 eventloop.execute(new runnable() {
15                     public void run() {
16                         abstractunsafe.this.register0(promise);
17                     }
18                 });
19             } catch (throwable var4) {
20                 abstractchannel.logger.warn("force-closing a channel whose registration task was not accepted by an event loop: {}", abstractchannel.this, var4);
21                 this.closeforcibly();
22                 abstractchannel.this.closefuture.setclosed();
23                 this.safesetfailure(promise, var4);
24             }
25         }
26 
27     }
28 }

前面的判断做了一些检查就不细说了,直接看到else块
首先给当前channel绑定了eventloop,即通过刚才chooser选择的eventloop,该channel也就是nioserversocketchannel
由于unsafe的操作是在轮询线程中异步执行的,所里,这里需要判断ineventloop是否处于轮询中
在之前介绍nioeventloopgroup的时候说过,nioeventloop在没有调用dostartthread方法时并没有启动轮询的,所以ineventloop判断不成立

那么就调用eventloop的execute方法,实际上的注册方法可以看到调用了abstractunsafe的register0方法,而将这个方法封装为runnable交给eventloop作为一个task去异步执行
先来看eventloop的execute方法实现,是在nioeventloop的子类singlethreadeventexecutor中实现的:

 1 public void execute(runnable task) {
 2     if (task == null) {
 3         throw new nullpointerexception("task");
 4     } else {
 5         boolean ineventloop = this.ineventloop();
 6         this.addtask(task);
 7         if (!ineventloop) {
 8             this.startthread();
 9             if (this.isshutdown() && this.removetask(task)) {
10                 reject();
11             }
12         }
13 
14         if (!this.addtaskwakesup && this.wakesupfortask(task)) {
15             this.wakeup(ineventloop);
16         }
17 
18     }
19 }

这里首先将task,即刚才的注册事件放入阻塞任务队列中,然后调用startthread方法:

 1 private void startthread() {
 2     if (this.state == 1 && state_updater.compareandset(this, 1, 2)) {
 3         try {
 4             this.dostartthread();
 5         } catch (throwable var2) {
 6             state_updater.set(this, 1);
 7             platformdependent.throwexception(var2);
 8         }
 9     }
10 
11 }

nioeventloop此时还没有轮询,所以状态是1,对应st_not_started,此时利用cas操作,将状态修改为2,即st_started ,标志着nioeventloop要启动轮询了,果然,接下来就调用了dostartthread开启轮询线程:

  1 private void dostartthread() {
  2     assert this.thread == null;
  3 
  4     this.executor.execute(new runnable() {
  5         public void run() {
  6             singlethreadeventexecutor.this.thread = thread.currentthread();
  7             if (singlethreadeventexecutor.this.interrupted) {
  8                 singlethreadeventexecutor.this.thread.interrupt();
  9             }
 10 
 11             boolean success = false;
 12             singlethreadeventexecutor.this.updatelastexecutiontime();
 13             boolean var112 = false;
 14 
 15             int oldstate;
 16             label1907: {
 17                 try {
 18                     var112 = true;
 19                     singlethreadeventexecutor.this.run();
 20                     success = true;
 21                     var112 = false;
 22                     break label1907;
 23                 } catch (throwable var119) {
 24                     singlethreadeventexecutor.logger.warn("unexpected exception from an event executor: ", var119);
 25                     var112 = false;
 26                 } finally {
 27                     if (var112) {
 28                         int oldstatex;
 29                         do {
 30                             oldstatex = singlethreadeventexecutor.this.state;
 31                         } while(oldstatex < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstatex, 3));
 32 
 33                         if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l && singlethreadeventexecutor.logger.iserrorenabled()) {
 34                             singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates.");
 35                         }
 36 
 37                         try {
 38                             while(!singlethreadeventexecutor.this.confirmshutdown()) {
 39                                 ;
 40                             }
 41                         } finally {
 42                             try {
 43                                 singlethreadeventexecutor.this.cleanup();
 44                             } finally {
 45                                 singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5);
 46                                 singlethreadeventexecutor.this.threadlock.release();
 47                                 if (!singlethreadeventexecutor.this.taskqueue.isempty() && singlethreadeventexecutor.logger.iswarnenabled()) {
 48                                     singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')');
 49                                 }
 50 
 51                                 singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null);
 52                             }
 53                         }
 54 
 55                     }
 56                 }
 57 
 58                 do {
 59                     oldstate = singlethreadeventexecutor.this.state;
 60                 } while(oldstate < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstate, 3));
 61 
 62                 if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l && singlethreadeventexecutor.logger.iserrorenabled()) {
 63                     singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates.");
 64                 }
 65 
 66                 try {
 67                     while(!singlethreadeventexecutor.this.confirmshutdown()) {
 68                         ;
 69                     }
 70 
 71                     return;
 72                 } finally {
 73                     try {
 74                         singlethreadeventexecutor.this.cleanup();
 75                     } finally {
 76                         singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5);
 77                         singlethreadeventexecutor.this.threadlock.release();
 78                         if (!singlethreadeventexecutor.this.taskqueue.isempty() && singlethreadeventexecutor.logger.iswarnenabled()) {
 79                             singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')');
 80                         }
 81 
 82                         singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null);
 83                     }
 84                 }
 85             }
 86 
 87             do {
 88                 oldstate = singlethreadeventexecutor.this.state;
 89             } while(oldstate < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstate, 3));
 90 
 91             if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l && singlethreadeventexecutor.logger.iserrorenabled()) {
 92                 singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates.");
 93             }
 94 
 95             try {
 96                 while(!singlethreadeventexecutor.this.confirmshutdown()) {
 97                     ;
 98                 }
 99             } finally {
100                 try {
101                     singlethreadeventexecutor.this.cleanup();
102                 } finally {
103                     singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5);
104                     singlethreadeventexecutor.this.threadlock.release();
105                     if (!singlethreadeventexecutor.this.taskqueue.isempty() && singlethreadeventexecutor.logger.iswarnenabled()) {
106                         singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')');
107                     }
108 
109                     singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null);
110                 }
111             }
112 
113         }
114     });
115 }

关于dostartthread方法,我在 netty中nioeventloopgroup的创建源码分析 中已经说的很细了,这里就不再一步一步分析了

因为此时还没真正意义上的启动轮询,所以thread等于null成立的,然后调用executor的execute方法,这里的executor是一个线程池,在之前说过的,所以里面的run方法是处于一个线程里面的,然后给thread成员赋值为当前线程,表明正式进入了轮询。
而singlethreadeventexecutor.this.run()才是真正的轮询逻辑,这在之前也说过,这个run的实现是在父类nioeventloop中:

 1 protected void run() {
 2     while(true) {
 3         while(true) {
 4             try {
 5                 switch(this.selectstrategy.calculatestrategy(this.selectnowsupplier, this.hastasks())) {
 6                 case -2:
 7                     continue;
 8                 case -1:
 9                     this.select(this.wakenup.getandset(false));
10                     if (this.wakenup.get()) {
11                         this.selector.wakeup();
12                     }
13                 default:
14                     this.cancelledkeys = 0;
15                     this.needstoselectagain = false;
16                     int ioratio = this.ioratio;
17                     if (ioratio == 100) {
18                         try {
19                             this.processselectedkeys();
20                         } finally {
21                             this.runalltasks();
22                         }
23                     } else {
24                         long iostarttime = system.nanotime();
25                         boolean var13 = false;
26 
27                         try {
28                             var13 = true;
29                             this.processselectedkeys();
30                             var13 = false;
31                         } finally {
32                             if (var13) {
33                                 long iotime = system.nanotime() - iostarttime;
34                                 this.runalltasks(iotime * (long)(100 - ioratio) / (long)ioratio);
35                             }
36                         }
37 
38                         long iotime = system.nanotime() - iostarttime;
39                         this.runalltasks(iotime * (long)(100 - ioratio) / (long)ioratio);
40                     }
41                 }
42             } catch (throwable var21) {
43                 handleloopexception(var21);
44             }
45 
46             try {
47                 if (this.isshuttingdown()) {
48                     this.closeall();
49                     if (this.confirmshutdown()) {
50                         return;
51                     }
52                 }
53             } catch (throwable var18) {
54                 handleloopexception(var18);
55             }
56         }
57     }
58 }

首先由于task已经有一个了,就是刚才的注册事件,所以选择策略calculatestrategy最终调用selectnow(也是之前说过的):

 1 private final intsupplier selectnowsupplier = new intsupplier() {
 2     public int get() throws exception {
 3         return nioeventloop.this.selectnow();
 4     }
 5 };
 6 
 7 int selectnow() throws ioexception {
 8     int var1;
 9     try {
10         var1 = this.selector.selectnow();
11     } finally {
12         if (this.wakenup.get()) {
13             this.selector.wakeup();
14         }
15 
16     }
17 
18     return var1;
19 }

使用jdk原生selector进行selectnow,由于此时没有任何channel的注册,所以selectnow会立刻返回0,此时就进入default逻辑,由于没有任何注册,processselectedkeys方法也做不了什么,所以在这一次的轮询实质上只进行了runalltasks方法,此方法会执行阻塞队列中的task的run方法(还是在之前博客中介绍过),由于轮询是在线程池中的一个线程中运行的,所以task的执行是一个异步操作。(在执行完task,将task移除阻塞对立,线程继续轮询)

这时就可以回到abstractchannel的register方法中了,由上面可以知道task实际上异步执行了:

1 abstractunsafe.this.register0(promise);

register0方法:

 1 private void register0(channelpromise promise) {
 2     try {
 3         if (!promise.setuncancellable() || !this.ensureopen(promise)) {
 4             return;
 5         }
 6 
 7         boolean firstregistration = this.neverregistered;
 8         abstractchannel.this.doregister();
 9         this.neverregistered = false;
10         abstractchannel.this.registered = true;
11         abstractchannel.this.pipeline.invokehandleraddedifneeded();
12         this.safesetsuccess(promise);
13         abstractchannel.this.pipeline.firechannelregistered();
14         if (abstractchannel.this.isactive()) {
15             if (firstregistration) {
16                 abstractchannel.this.pipeline.firechannelactive();
17             } else if (abstractchannel.this.config().isautoread()) {
18                 this.beginread();
19             }
20         }
21     } catch (throwable var3) {
22         this.closeforcibly();
23         abstractchannel.this.closefuture.setclosed();
24         this.safesetfailure(promise, var3);
25     }
26 
27 }

可以看到实际上的注册逻辑又交给了abstractchannel的doregister,而这个方法在abstractniochannel中实现:

 1 protected void doregister() throws exception {
 2     boolean selected = false;
 3 
 4     while(true) {
 5         try {
 6             this.selectionkey = this.javachannel().register(this.eventloop().unwrappedselector(), 0, this);
 7             return;
 8         } catch (cancelledkeyexception var3) {
 9             if (selected) {
10                 throw var3;
11             }
12 
13             this.eventloop().selectnow();
14             selected = true;
15         }
16     }
17 }

javachannel就是之前产生的jdk的serversocketchannel,unwrappedselector在之前说过,就是未经修改的jdk原生selector,这个selector和eventloop是一对一绑定的,可以看到调用jdk原生的注册方法,完成了对serversocketchannel的注册,但是注册的是一个0状态(缺省值),而传入的this,即abstractniochannel对象作为了一个附件,用于以后processselectedkeys方法从selectionkey中得到对应的netty的channel(还是之前博客说过)
关于缺省值,是由于abstractniochannel不仅用于nioserversocketchannel的注册,还用于niosocketchannel的注册,只有都使用缺省值注册才不会产生异常  【java】nio中channel的注册源码分析 ,并且,在以后processselectedkeys方法会对0状态判断,再使用unsafe进行相应的逻辑处理。

 

在完成jdk的注册后,调用pipeline的invokehandleraddedifneeded方法(netty中的channelpipeline源码分析),处理channelhandler的handleradded的回调,即调用用户添加的channelhandler的handleradded方法。
调用safesetsuccess,标志异步操作完成:

1 protected final void safesetsuccess(channelpromise promise) {
2     if (!(promise instanceof voidchannelpromise) && !promise.trysuccess()) {
3         logger.warn("failed to mark a promise as success because it is done already: {}", promise);
4     }
5 }

关于异步操作我在之前的博客中说的很清楚了:netty中的channelfuture和channelpromise


接着调用pipeline的firechannelregistered方法,也就是在责任链上调用channelregistered方法,这时,就会调用之在serverbootstrap中向pipeline添加的channelinitializer的channelregistered,进而回调initchannel方法,完成对serverbootstrapacceptor的添加。

回到register0方法,在处理完pipeline的责任链后,根据当前abstractchannel即nioserversocketchannel的isactive:

1 public boolean isactive() {
2     return this.javachannel().socket().isbound();
3 }

获得nioserversocketchannel绑定的jdk的serversocketchannel,进而获取serversocket,判断isbound:

1 public boolean isbound() {
2    // before 1.3 serversockets were always bound during creation
3     return bound || oldimpl;
4 }

这里实际上就是判断serversocket是否调用了bind方法,前面说过register0方法是一个异步操作,在多线程环境下不能保证执行顺序,若是此时已经完成了serversocket的bind,根据firstregistration判断是否需要pipeline传递channelactive请求,首先会执行pipeline的head即headcontext的channelactive方法:

1 @override
2 public void channelactive(channelhandlercontext ctx) throws exception {
3     ctx.firechannelactive();
4 
5     readifisautoread();
6 }

在headcontext通过channelhandlercontext 传递完channelactive请求后,会调用readifisautoread方法:

1 private void readifisautoread() {
2     if (channel.config().isautoread()) {
3         channel.read();
4     }
5 }

此时调用abstractchannel的read方法:

1 public channel read() {
2     pipeline.read();
3     return this;
4 }

最终在请求链由headcontext执行read方法:

1 public void read(channelhandlercontext ctx) {
2     unsafe.beginread();
3 }

终于可以看到此时调用unsafe的beginread方法:

 1 public final void beginread() {
 2     asserteventloop();
 3 
 4     if (!isactive()) {
 5         return;
 6     }
 7 
 8     try {
 9         dobeginread();
10     } catch (final exception e) {
11         invokelater(new runnable() {
12             @override
13             public void run() {
14                 pipeline.fireexceptioncaught(e);
15             }
16         });
17         close(voidpromise());
18     }
19 }

最终执行了dobeginread方法,由abstractniochannel实现:

 1 protected void dobeginread() throws exception {
 2     final selectionkey selectionkey = this.selectionkey;
 3     if (!selectionkey.isvalid()) {
 4         return;
 5     }
 6     
 7     readpending = true;
 8     
 9     final int interestops = selectionkey.interestops();
10     if ((interestops & readinterestop) == 0) {
11         selectionkey.interestops(interestops | readinterestop);
12     }
13 }

这里,就完成了向selector注册readinterestop事件,从前面来看就是accept事件

 

回到abstractbootstrap的dobind方法,在initandregister逻辑结束后,由上面可以知道,实际上的register注册逻辑是一个异步操作,在register0中完成
根据channelfuture来判断异步操作是否完成,如果isdone,则表明异步操作先完成,即完成了safesetsuccess方法,
然后调用newpromise方法:

1 public channelpromise newpromise() {
2     return pipeline.newpromise();
3 }

给channel的pipeline绑定异步操作channelpromise
然后调用dobind0方法完成serversocket的绑定,若是register0这个异步操作还没完成,就需要给channelfuture产生一个异步操作的侦听channelfuturelistener对象,等到register0方法调用safesetsuccess时,在promise的trysuccess中会回调channelfuturelistener的operationcomplete方法,进而调用dobind0方法

dobind0方法:

 1 private static void dobind0(
 2         final channelfuture regfuture, final channel channel,
 3         final socketaddress localaddress, final channelpromise promise) {
 4     channel.eventloop().execute(new runnable() {
 5         @override
 6         public void run() {
 7             if (regfuture.issuccess()) {
 8                 channel.bind(localaddress, promise).addlistener(channelfuturelistener.close_on_failure);
 9             } else {
10                 promise.setfailure(regfuture.cause());
11             }
12         }
13     });
14 }

向轮询线程提交了一个任务,异步处理bind,可以看到,只有在regfuture异步操作成功结束后,调用channel的bind方法:

1 public channelfuture bind(socketaddress localaddress, channelpromise promise) {
2    return pipeline.bind(localaddress, promise);
3 }

实际上的bind又交给pipeline,去完成,pipeline中就会交给责任链去完成,最终会交给headcontext完成:

1 public void bind(
2                 channelhandlercontext ctx, socketaddress localaddress, channelpromise promise)
3                 throws exception {
4     unsafe.bind(localaddress, promise);
5 }

可以看到,绕了一大圈,交给了unsafe完成:

 1 public final void bind(final socketaddress localaddress, final channelpromise promise) {
 2     asserteventloop();
 3 
 4     if (!promise.setuncancellable() || !ensureopen(promise)) {
 5         return;
 6     }
 7     
 8     if (boolean.true.equals(config().getoption(channeloption.so_broadcast)) &&
 9         localaddress instanceof inetsocketaddress &&
10         !((inetsocketaddress) localaddress).getaddress().isanylocaladdress() &&
11         !platformdependent.iswindows() && !platformdependent.maybesuperuser()) {
12         logger.warn(
13                 "a non-root user can't receive a broadcast packet if the socket " +
14                 "is not bound to a wildcard address; binding to a non-wildcard " +
15                 "address (" + localaddress + ") anyway as requested.");
16     }
17 
18     boolean wasactive = isactive();
19     try {
20         dobind(localaddress);
21     } catch (throwable t) {
22         safesetfailure(promise, t);
23         closeifclosed();
24         return;
25     }
26 
27     if (!wasactive && isactive()) {
28         invokelater(new runnable() {
29             @override
30             public void run() {
31                 pipeline.firechannelactive();
32             }
33         });
34     }
35 
36     safesetsuccess(promise);
37 }

然而,真正的bind还是回调了dobind方法,最终是由nioserversocketchannel来实现:

1 @override
2 protected void dobind(socketaddress localaddress) throws exception {
3     if (platformdependent.javaversion() >= 7) {
4         javachannel().bind(localaddress, config.getbacklog());
5     } else {
6         javachannel().socket().bind(localaddress, config.getbacklog());
7     }
8 }

在这里终于完成了对jdk的serversocketchannel的bind操作


在上面的

1 if (!wasactive && isactive()) {
2     invokelater(new runnable() {
3         @override
4         public void run() {
5             pipeline.firechannelactive();
6         }
7     });
8 }

这个判断,就是确保在register0中isactive时,还没完成绑定,也就没有beginread操作来向selector注册accept事件,那么就在这里进行注册,进而让serversocket去侦听客户端的连接


在服务端accept到客户端的连接后,在nioeventloop轮询中,就会调用processselectedkeys处理accept的事件就绪,然后交给unsafe的read去处理  netty中nioeventloopgroup的创建源码分析

 

在服务端,由niomessageunsafe实现:

 1 public void read() {
 2         assert eventloop().ineventloop();
 3         final channelconfig config = config();
 4         final channelpipeline pipeline = pipeline();
 5         final recvbytebufallocator.handle allochandle = unsafe().recvbufallochandle();
 6         allochandle.reset(config);
 7 
 8         boolean closed = false;
 9         throwable exception = null;
10         try {
11             try {
12                 do {
13                     int localread = doreadmessages(readbuf);
14                     if (localread == 0) {
15                         break;
16                     }
17                     if (localread < 0) {
18                         closed = true;
19                         break;
20                     }
21 
22                     allochandle.incmessagesread(localread);
23                 } while (allochandle.continuereading());
24             } catch (throwable t) {
25                 exception = t;
26             }
27 
28             int size = readbuf.size();
29             for (int i = 0; i < size; i ++) {
30                 readpending = false;
31                 pipeline.firechannelread(readbuf.get(i));
32             }
33             readbuf.clear();
34             allochandle.readcomplete();
35             pipeline.firechannelreadcomplete();
36 
37             if (exception != null) {
38                 closed = closeonreaderror(exception);
39 
40                 pipeline.fireexceptioncaught(exception);
41             }
42 
43             if (closed) {
44                 inputshutdown = true;
45                 if (isopen()) {
46                     close(voidpromise());
47                 }
48             }
49         } finally {
50             if (!readpending && !config.isautoread()) {
51                 removereadop();
52             }
53         }
54     }
55 }

核心在doreadmessages方法,由nioserversocketchannel实现:

 1 protected int doreadmessages(list<object> buf) throws exception {
 2     socketchannel ch = socketutils.accept(javachannel());
 3 
 4     try {
 5         if (ch != null) {
 6             buf.add(new niosocketchannel(this, ch));
 7             return 1;
 8         }
 9     } catch (throwable t) {
10         logger.warn("failed to create a new channel from an accepted socket.", t);
11 
12         try {
13             ch.close();
14         } catch (throwable t2) {
15             logger.warn("failed to close a socket.", t2);
16         }
17     }
18 
19     return 0;
20 }

socketutils的accept方法其实就是用来调用jdk中serversocketchannel原生的accept方法,来得到一个jdk的socketchannel对象,然后通过这个socketchannel对象,将其包装成niosocketchannel对象添加在buf这个list中

由此可以看到doreadmessages用来侦听所有就绪的连接,包装成niosocketchannel将其放在list中
然后遍历这个list,调用 nioserversocketchannel的pipeline的firechannelread方法,传递channelread请求,、
在前面向pipeline中添加了serverbootstrapacceptor这个channelhandler,此时,它也会响应这个请求,回调channelread方法:

 1 public void channelread(channelhandlercontext ctx, object msg) {
 2     final channel child = (channel) msg;
 3 
 4     child.pipeline().addlast(childhandler);
 5 
 6     setchanneloptions(child, childoptions, logger);
 7 
 8     for (entry<attributekey<?>, object> e: childattrs) {
 9         child.attr((attributekey<object>) e.getkey()).set(e.getvalue());
10     }
11 
12     try {
13         childgroup.register(child).addlistener(new channelfuturelistener() {
14             @override
15             public void operationcomplete(channelfuture future) throws exception {
16                 if (!future.issuccess()) {
17                     forceclose(child, future.cause());
18                 }
19             }
20         });
21     } catch (throwable t) {
22         forceclose(child, t);
23     }
24 }

msg就是侦听到的niosocketchannel对象,给该对象的pipeline添加childhandler,也就是我们在serverbootstrap中通过childhandler方法添加的
然后通过register方法完成对niosocketchannel的注册(和nioserversocketchannel注册逻辑一样)


至此netty服务端的启动结束。