欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty中的ChannelPipeline源码分析

程序员文章站 2022-04-29 17:34:14
ChannelPipeline在Netty中是用来处理请求的责任链,默认实现是DefaultChannelPipeline,其构造方法如下: ChannelPipeline和Channel是一一对应关系,一个Channel绑定一条ChannelPipeline责任链succeededFuture 和 ......

channelpipeline在netty中是用来处理请求的责任链,默认实现是defaultchannelpipeline,其构造方法如下:

 1 private final channel channel;
 2 private final channelfuture succeededfuture;
 3 private final voidchannelpromise voidpromise;
 4 final abstractchannelhandlercontext head;
 5 final abstractchannelhandlercontext tail;
 6 
 7 protected defaultchannelpipeline(channel channel) {
 8     this.channel = (channel)objectutil.checknotnull(channel, "channel");
 9     this.succeededfuture = new succeededchannelfuture(channel, (eventexecutor)null);
10     this.voidpromise = new voidchannelpromise(channel, true);
11     this.tail = new defaultchannelpipeline.tailcontext(this);
12     this.head = new defaultchannelpipeline.headcontext(this);
13     this.head.next = this.tail;
14     this.tail.prev = this.head;
15 }

channelpipeline和channel是一一对应关系,一个channel绑定一条channelpipeline责任链
succeededfuture 和voidpromise用来处理异步操作
abstractchannelhandlercontext 是持有请求的上下文对象,其和channelhandler是对应关系(在使用sharable注解的情况下,不同的abstractchannelhandlercontext 还可以对应同一个channelhandler),channelpipeline责任链
处理的就abstractchannelhandlercontext ,再将最后的abstractchannelhandlercontext 交给channelhandler去做正真的逻辑处理

abstractchannelhandlercontext构造方法如下:

 1 private final string name;
 2 private final defaultchannelpipeline pipeline;
 3 final eventexecutor executor;
 4 private final boolean inbound;
 5 private final boolean outbound;
 6 private final boolean ordered;
 7 volatile abstractchannelhandlercontext next;
 8 volatile abstractchannelhandlercontext prev;
 9 
10 abstractchannelhandlercontext(defaultchannelpipeline pipeline, eventexecutor executor, string name, boolean inbound, boolean outbound) {
11     this.name = (string)objectutil.checknotnull(name, "name");
12     this.pipeline = pipeline;
13     this.executor = executor;
14     this.inbound = inbound;
15     this.outbound = outbound;
16     this.ordered = executor == null || executor instanceof orderedeventexecutor;
17 }

name是abstractchannelhandlercontext的名称,pipeline就是上面说的channelpipeline;executor是用来进行异步操作的,默认使用的是在前面博客中说过的nioeventloop  (netty中nioeventloopgroup的创建源码分析

inbound 和outbound 代表两种请求处理方式,对应netty中的i/o操作,若是inbound则处理input操作,由channelpipeline从head 开始向后遍历链表,并且只处理channelinboundhandler类型的abstractchannelhandlercontext;若是outbound 则处理output操作,由channelpipeline从tail开始向前遍历链表,并且只处理channeloutboundhandler类型的abstractchannelhandlercontext;
ordered 是判断是否需要提供executor。

由next和prev成员可以知道,channelpipeline维护的是一条abstractchannelhandlercontext的双向链表
其头节点head和尾节点tail分别默认初始化了headcontext和tailcontext

headcontext的构造:

1 final class headcontext extends abstractchannelhandlercontext implements channeloutboundhandler, channelinboundhandler {
2     private final unsafe unsafe;
3     
4     headcontext(defaultchannelpipeline pipeline) {
5     super(pipeline, (eventexecutor)null, defaultchannelpipeline.head_name, false, true);
6     this.unsafe = pipeline.channel().unsafe();
7     this.setaddcomplete();
8     }
9 }

其中setaddcomplete是由abstractchannelhandlercontext实现的:

1 final void setaddcomplete() {
2     int oldstate;
3     do {
4         oldstate = this.handlerstate;
5     } while(oldstate != 3 && !handler_state_updater.compareandset(this, oldstate, 2));
6 
7 }

handlerstate表示abstractchannelhandlercontext对应的channelhandler的状态,有一下几种:

1 private static final int add_pending = 1;
2 private static final int add_complete = 2;
3 private static final int remove_complete = 3;
4 private static final int init = 0;
5 private volatile int handlerstate = 0;    

handlerstate初始化默认是init状态。

handler_state_updater是一个原子更新器:

1 private static final atomicintegerfieldupdater<abstractchannelhandlercontext> handler_state_updater = atomicintegerfieldupdater.newupdater(abstractchannelhandlercontext.class, "handlerstate");

所以setaddcomplete方法,就是通过cas操作,将handlerstate状态更新为add_complete

tailcontext的构造:

1 final class tailcontext extends abstractchannelhandlercontext implements channelinboundhandler {
2     tailcontext(defaultchannelpipeline pipeline) {
3         super(pipeline, (eventexecutor)null, defaultchannelpipeline.tail_name, true, false);
4         this.setaddcomplete();
5     }
6 }

和headcontext一样,将handlerstate状态更新为add_complete


结合官方给出的channelpipeline的图示更容易理解:

 1                                              i/o request
 2                                         via channel or
 3                                     channelhandlercontext
 4                                                   |
 5 +---------------------------------------------------+---------------+
 6 |                           channelpipeline         |               |
 7 |                                                  \|/              |
 8 |    +---------------------+            +-----------+----------+    |
 9 |    | inbound handler  n  |            | outbound handler  1  |    |
10 |    +----------+----------+            +-----------+----------+    |
11 |              /|\                                  |               |
12 |               |                                  \|/              |
13 |    +----------+----------+            +-----------+----------+    |
14 |    | inbound handler n-1 |            | outbound handler  2  |    |
15 |    +----------+----------+            +-----------+----------+    |
16 |              /|\                                  .               |
17 |               .                                   .               |
18 | channelhandlercontext.firein_evt() channelhandlercontext.out_evt()|
19 |        [ method call]                       [method call]         |
20 |               .                                   .               |
21 |               .                                  \|/              |
22 |    +----------+----------+            +-----------+----------+    |
23 |    | inbound handler  2  |            | outbound handler m-1 |    |
24 |    +----------+----------+            +-----------+----------+    |
25 |              /|\                                  |               |
26 |               |                                  \|/              |
27 |    +----------+----------+            +-----------+----------+    |
28 |    | inbound handler  1  |            | outbound handler  m  |    |
29 |    +----------+----------+            +-----------+----------+    |
30 |              /|\                                  |               |
31 +---------------+-----------------------------------+---------------+
32               |                                  \|/
33 +---------------+-----------------------------------+---------------+
34 |               |                                   |               |
35 |       [ socket.read() ]                    [ socket.write() ]     |
36 |                                                                   |
37 |  netty internal i/o threads (transport implementation)            |
38 +-------------------------------------------------------------------+

 

下面对一些主要方法分析:
addfirst方法,有如下几种重载:

 1 public final channelpipeline addfirst(channelhandler handler) {
 2     return this.addfirst((string)null, (channelhandler)handler);
 3 }
 4 
 5 public final channelpipeline addfirst(string name, channelhandler handler) {
 6     return this.addfirst((eventexecutorgroup)null, name, handler);
 7 }
 8 
 9 public final channelpipeline addfirst(channelhandler... handlers) {
10     return this.addfirst((eventexecutorgroup)null, (channelhandler[])handlers);
11 }
12 
13 public final channelpipeline addfirst(eventexecutorgroup executor, channelhandler... handlers) {
14     if (handlers == null) {
15         throw new nullpointerexception("handlers");
16     } else if (handlers.length != 0 && handlers[0] != null) {
17         int size;
18         for(size = 1; size < handlers.length && handlers[size] != null; ++size) {
19             ;
20         }
21 
22         for(int i = size - 1; i >= 0; --i) {
23             channelhandler h = handlers[i];
24             this.addfirst(executor, (string)null, h);
25         }
26 
27         return this;
28     } else {
29         return this;
30     }
31 }
32 
33 public final channelpipeline addfirst(eventexecutorgroup group, string name, channelhandler handler) {
34     final abstractchannelhandlercontext newctx;
35     synchronized(this) {
36         checkmultiplicity(handler);
37         name = this.filtername(name, handler);
38         newctx = this.newcontext(group, name, handler);
39         this.addfirst0(newctx);
40         if (!this.registered) {
41             newctx.setaddpending();
42             this.callhandlercallbacklater(newctx, true);
43             return this;
44         }
45 
46         eventexecutor executor = newctx.executor();
47         if (!executor.ineventloop()) {
48             newctx.setaddpending();
49             executor.execute(new runnable() {
50                 public void run() {
51                     defaultchannelpipeline.this.callhandleradded0(newctx);
52                 }
53             });
54             return this;
55         }
56     }
57 
58     this.callhandleradded0(newctx);
59     return this;
60 }

前面几种都是间接调用的第四种没什么好说的,直接看第四种addfirst
首先调用checkmultiplicity,检查channelhandleradapter在不共享的情况下是否重复:

 1 private static void checkmultiplicity(channelhandler handler) {
 2     if (handler instanceof channelhandleradapter) {
 3         channelhandleradapter h = (channelhandleradapter)handler;
 4         if (!h.issharable() && h.added) {
 5             throw new channelpipelineexception(h.getclass().getname() + " is not a @sharable handler, so can't be added or removed multiple times.");
 6         }
 7 
 8         h.added = true;
 9     }
10 
11 }

issharable方法:

 1 public boolean issharable() {
 2     class<?> clazz = this.getclass();
 3     map<class<?>, boolean> cache = internalthreadlocalmap.get().handlersharablecache();
 4     boolean sharable = (boolean)cache.get(clazz);
 5     if (sharable == null) {
 6         sharable = clazz.isannotationpresent(sharable.class);
 7         cache.put(clazz, sharable);
 8     }
 9 
10     return sharable;
11 }

首先尝试从当前线程的internalthreadlocalmap中获取handlersharablecache,(internalthreadlocalmap是在netty中使用高效的fastthreadlocal替代jdk的threadlocal使用的 netty中fastthreadlocal源码分析
internalthreadlocalmap的handlersharablecache方法:

1 public map<class<?>, boolean> handlersharablecache() {
2     map<class<?>, boolean> cache = this.handlersharablecache;
3     if (cache == null) {
4         this.handlersharablecache = (map)(cache = new weakhashmap(4));
5     }
6 
7     return (map)cache;
8 }

当当前线程的internalthreadlocalmap中没有handlersharablecache时,直接创建一个大小为4的weakhashmap弱引用map;

根据clazz从map中get,若是没有,需要检测当前clazz是否有sharable注解,添加了sharable注解的channelhandleradapter可以在不同channel*享使用一个单例,前提是确保线程安全;
之后会将该clazz以及是否实现sharable注解的情况添加在cache缓存中;
其中channelhandler的added是用来标识是否添加过;

回到addfirst方法:
checkmultiplicity成功结束后,调用filtername方法,给当前要产生的abstractchannelhandlercontext对象产生一个名称,
然后调用newcontext方法,产生abstractchannelhandlercontext对象:

1 private abstractchannelhandlercontext newcontext(eventexecutorgroup group, string name, channelhandler handler) {
2     return new defaultchannelhandlercontext(this, this.childexecutor(group), name, handler);
3 }

这里实际上产生了一个defaultchannelhandlercontext对象:

 1 final class defaultchannelhandlercontext extends abstractchannelhandlercontext {
 2     private final channelhandler handler;
 3 
 4     defaultchannelhandlercontext(defaultchannelpipeline pipeline, eventexecutor executor, string name, channelhandler handler) {
 5         super(pipeline, executor, name, isinbound(handler), isoutbound(handler));
 6         if (handler == null) {
 7             throw new nullpointerexception("handler");
 8         } else {
 9             this.handler = handler;
10         }
11     }
12 
13     public channelhandler handler() {
14         return this.handler;
15     }
16 
17     private static boolean isinbound(channelhandler handler) {
18         return handler instanceof channelinboundhandler;
19     }
20 
21     private static boolean isoutbound(channelhandler handler) {
22         return handler instanceof channeloutboundhandler;
23     }
24 }

可以看到defaultchannelhandlercontext 仅仅是将abstractchannelhandlercontext和channelhandler封装了

在产生了defaultchannelhandlercontext 对象后,调用addfirst0方法:

1 private void addfirst0(abstractchannelhandlercontext newctx) {
2     abstractchannelhandlercontext nextctx = this.head.next;
3     newctx.prev = this.head;
4     newctx.next = nextctx;
5     this.head.next = newctx;
6     nextctx.prev = newctx;
7 }

这里就是一个简单的双向链表的操作,将newctx节点插入到了head后面

然后判断registered成员的状态:

1 private boolean registered;

在初始化时是false

registered若是false,首先调用abstractchannelhandlercontext的setaddpending方法:

1 final void setaddpending() {
2    boolean updated = handler_state_updater.compareandset(this, 0, 1);
3 
4     assert updated;
5 
6 }

和前面说过的setaddcomplete方法同理,通过cas操作,将handlerstate状态设置为add_pending
接着调用callhandlercallbacklater方法:

 1 private void callhandlercallbacklater(abstractchannelhandlercontext ctx, boolean added) {
 2     assert !this.registered;
 3 
 4     defaultchannelpipeline.pendinghandlercallback task = added ? new defaultchannelpipeline.pendinghandleraddedtask(ctx) : new defaultchannelpipeline.pendinghandlerremovedtask(ctx);
 5     defaultchannelpipeline.pendinghandlercallback pending = this.pendinghandlercallbackhead;
 6     if (pending == null) {
 7         this.pendinghandlercallbackhead = (defaultchannelpipeline.pendinghandlercallback)task;
 8     } else {
 9         while(pending.next != null) {
10             pending = pending.next;
11         }
12 
13         pending.next = (defaultchannelpipeline.pendinghandlercallback)task;
14     }
15 
16 }

首先断言判断registered可能存在的多线程改变,然后根据added判断产生何种类型的pendinghandlercallback
pendinghandlercallback是用来处理channelhandler的两种回调,定义如下:

 1 private abstract static class pendinghandlercallback implements runnable {
 2     final abstractchannelhandlercontext ctx;
 3     defaultchannelpipeline.pendinghandlercallback next;
 4 
 5     pendinghandlercallback(abstractchannelhandlercontext ctx) {
 6         this.ctx = ctx;
 7     }
 8 
 9     abstract void execute();
10 }


pendinghandleraddedtask定义如下:

 1 private final class pendinghandleraddedtask extends defaultchannelpipeline.pendinghandlercallback {
 2     pendinghandleraddedtask(abstractchannelhandlercontext ctx) {
 3         super(ctx);
 4     }
 5 
 6     public void run() {
 7         defaultchannelpipeline.this.callhandleradded0(this.ctx);
 8     }
 9 
10     void execute() {
11         eventexecutor executor = this.ctx.executor();
12         if (executor.ineventloop()) {
13             defaultchannelpipeline.this.callhandleradded0(this.ctx);
14         } else {
15             try {
16                 executor.execute(this);
17             } catch (rejectedexecutionexception var3) {
18                 if (defaultchannelpipeline.logger.iswarnenabled()) {
19                     defaultchannelpipeline.logger.warn("can't invoke handleradded() as the eventexecutor {} rejected it, removing handler {}.", new object[]{executor, this.ctx.name(), var3});
20                 }
21 
22                 defaultchannelpipeline.remove0(this.ctx);
23                 this.ctx.setremoved();
24             }
25         }
26 
27     }
28 }

除去异常处理,无论是在execute方法还是在run方法中,主要核心是异步执行callhandleradded0方法:

 1 private void callhandleradded0(abstractchannelhandlercontext ctx) {
 2     try {
 3         ctx.setaddcomplete();
 4         ctx.handler().handleradded(ctx);
 5     } catch (throwable var10) {
 6         boolean removed = false;
 7 
 8         try {
 9             remove0(ctx);
10 
11             try {
12                 ctx.handler().handlerremoved(ctx);
13             } finally {
14                 ctx.setremoved();
15             }
16 
17             removed = true;
18         } catch (throwable var9) {
19             if (logger.iswarnenabled()) {
20                 logger.warn("failed to remove a handler: " + ctx.name(), var9);
21             }
22         }
23 
24         if (removed) {
25             this.fireexceptioncaught(new channelpipelineexception(ctx.handler().getclass().getname() + ".handleradded() has thrown an exception; removed.", var10));
26         } else {
27             this.fireexceptioncaught(new channelpipelineexception(ctx.handler().getclass().getname() + ".handleradded() has thrown an exception; also failed to remove.", var10));
28         }
29     }
30 
31 }

除去异常处理,主要核心就两行代码,首先通过setaddcomplete方法,设置handlerstate状态为add_complete,然后回调channelhandler的handleradded方法,这个handleradded方法就很熟悉了,在使用netty处理业务逻辑时,会覆盖这个方法。

pendinghandlerremovedtask定义如下:

 1 private final class pendinghandlerremovedtask extends defaultchannelpipeline.pendinghandlercallback {
 2     pendinghandlerremovedtask(abstractchannelhandlercontext ctx) {
 3         super(ctx);
 4     }
 5 
 6     public void run() {
 7         defaultchannelpipeline.this.callhandlerremoved0(this.ctx);
 8     }
 9 
10     void execute() {
11         eventexecutor executor = this.ctx.executor();
12         if (executor.ineventloop()) {
13             defaultchannelpipeline.this.callhandlerremoved0(this.ctx);
14         } else {
15             try {
16                 executor.execute(this);
17             } catch (rejectedexecutionexception var3) {
18                 if (defaultchannelpipeline.logger.iswarnenabled()) {
19                     defaultchannelpipeline.logger.warn("can't invoke handlerremoved() as the eventexecutor {} rejected it, removing handler {}.", new object[]{executor, this.ctx.name(), var3});
20                 }
21 
22                 this.ctx.setremoved();
23             }
24         }
25 
26     }
27 }

和pendinghandleraddedtask一样,主要还是异步调用callhandlerremoved0方法:

 1 private void callhandlerremoved0(abstractchannelhandlercontext ctx) {
 2     try {
 3         try {
 4             ctx.handler().handlerremoved(ctx);
 5         } finally {
 6             ctx.setremoved();
 7         }
 8     } catch (throwable var6) {
 9         this.fireexceptioncaught(new channelpipelineexception(ctx.handler().getclass().getname() + ".handlerremoved() has thrown an exception.", var6));
10     }
11 
12 }

首先直接回调channelhandler的handlerremoved方法,然后通过setremoved方法将handlerstate状态设置为remove_complete

回到callhandlercallbacklater,其中成员pendinghandlercallbackhead定义:

1 private defaultchannelpipeline.pendinghandlercallback pendinghandlercallbackhead;

结合pendinghandlercallback 可知,这个pendinghandlercallbackhead是 defaultchannelpipeline存储的一条pendinghandlercallback单链表,用来处理channelhandler的handleradded和handlerremoved的回调,在add的这些方法里调用callhandlercallbacklater时,added参数都为true,所以add的channelhandler只向pendinghandlercallbackhead添加了handleradded的回调。

回到addfirst方法,若是registered为true,先获取eventexecutor,判断是否处于轮询中,若不是,则需要开启轮询线程直接异步执行callhandleradded0方法,若处于轮询,由于channelpipeline的调用是发生在轮询时的,所以还是直接异步执行callhandleradded0方法。

addfirst方法到此结束,再来看addlast方法,同样有好几种重载:

 1 public final channelpipeline addlast(channelhandler handler) {
 2     return this.addlast((string)null, (channelhandler)handler);
 3 }
 4 
 5 public final channelpipeline addlast(string name, channelhandler handler) {
 6     return this.addlast((eventexecutorgroup)null, name, handler);
 7 }
 8 
 9 public final channelpipeline addlast(channelhandler... handlers) {
10     return this.addlast((eventexecutorgroup)null, (channelhandler[])handlers);
11 }
12 
13 public final channelpipeline addlast(eventexecutorgroup executor, channelhandler... handlers) {
14     if (handlers == null) {
15         throw new nullpointerexception("handlers");
16     } else {
17         channelhandler[] var3 = handlers;
18         int var4 = handlers.length;
19 
20         for(int var5 = 0; var5 < var4; ++var5) {
21             channelhandler h = var3[var5];
22             if (h == null) {
23                 break;
24             }
25 
26             this.addlast(executor, (string)null, h);
27         }
28 
29         return this;
30     }
31 }
32 
33 public final channelpipeline addlast(eventexecutorgroup group, string name, channelhandler handler) {
34     final abstractchannelhandlercontext newctx;
35     synchronized(this) {
36         checkmultiplicity(handler);
37         newctx = this.newcontext(group, this.filtername(name, handler), handler);
38         this.addlast0(newctx);
39         if (!this.registered) {
40             newctx.setaddpending();
41             this.callhandlercallbacklater(newctx, true);
42             return this;
43         }
44 
45         eventexecutor executor = newctx.executor();
46         if (!executor.ineventloop()) {
47             newctx.setaddpending();
48             executor.execute(new runnable() {
49                 public void run() {
50                     defaultchannelpipeline.this.callhandleradded0(newctx);
51                 }
52             });
53             return this;
54         }
55     }
56 
57     this.callhandleradded0(newctx);
58     return this;
59 }

还是间接调用最后一种:
对比addfirst来看,只有addlast0不一样:

1 private void addlast0(abstractchannelhandlercontext newctx) {
2     abstractchannelhandlercontext prev = this.tail.prev;
3     newctx.prev = prev;
4     newctx.next = this.tail;
5     prev.next = newctx;
6     this.tail.prev = newctx;
7 }

还是非常简单的双向链表基本操作,只不过这次,是将abstractchannelhandlercontext插入到了tail之前
还有两个,addbefore和addafter方法,和上述方法类似,就不再累赘


接下来看看channelpipeline是如何完成请求的传递的:
invokehandleraddedifneeded方法:

1 final void invokehandleraddedifneeded() {
2     assert this.channel.eventloop().ineventloop();
3 
4     if (this.firstregistration) {
5         this.firstregistration = false;
6         this.callhandleraddedforallhandlers();
7     }
8 
9 }

断言判断是否处于轮询线程(channelpipeline处理请求都是在轮询线程中,都需要异步处理)
其中firstregistration成员在defaultchannelpipeline初始化时为true:

1 private boolean firstregistration = true;

此时设置为false,表示第一次调用,以后都不再调用后面的callhandleraddedforallhandlers:

 1 private void callhandleraddedforallhandlers() {
 2     defaultchannelpipeline.pendinghandlercallback pendinghandlercallbackhead;
 3     synchronized(this) {
 4         assert !this.registered;
 5 
 6         this.registered = true;
 7         pendinghandlercallbackhead = this.pendinghandlercallbackhead;
 8         this.pendinghandlercallbackhead = null;
 9     }
10 
11     for(defaultchannelpipeline.pendinghandlercallback task = pendinghandlercallbackhead; task != null; task = task.next) {
12         task.execute();
13     }
14 
15 }

刚才说过registered初始是false,在这里判断符合,之后就令其为true,然后获取处理channelhandler的回调链表pendinghandlercallbackhead,并且将pendinghandlercallbackhead置为null
然后遍历这个单链表,处理channelhandler的handleradded和handlerremoved的回调

firechannelregistered方法,当channel完成了向selector的注册后,会由channel的unsafe进行回调,异步处理:

1 public final channelpipeline firechannelregistered() {
2     abstractchannelhandlercontext.invokechannelregistered(this.head);
3     return this;
4 }

实际上的处理由abstractchannelhandlercontext的静态方法invokechannelregistered完成,这里传递的参数head就是defaultchannelpipeline初始化时创建的headcontext:

 1 static void invokechannelregistered(final abstractchannelhandlercontext next) {
 2     eventexecutor executor = next.executor();
 3     if (executor.ineventloop()) {
 4         next.invokechannelregistered();
 5     } else {
 6         executor.execute(new runnable() {
 7             public void run() {
 8                 next.invokechannelregistered();
 9             }
10         });
11     }
12 
13 }

可以看到实际上是异步执行head对象的invokechannelregistered方法:

 1 private void invokechannelregistered() {
 2     if (this.invokehandler()) {
 3         try {
 4             ((channelinboundhandler)this.handler()).channelregistered(this);
 5         } catch (throwable var2) {
 6             this.notifyhandlerexception(var2);
 7         }
 8     } else {
 9         this.firechannelregistered();
10     }
11 
12 }


其中invokehandler是用来判断当前的handlerstate状态:

1 private boolean invokehandler() {
2     int handlerstate = this.handlerstate;
3     return handlerstate == 2 || !this.ordered && handlerstate == 1;
4 }

若是当前handlerstate状态为add_complete,或者不需要提供eventexecutor并且状态为add_pending时返回true,否则返回false
在成立的情况下,调用channelinboundhandler的channelregistered方法,由于当前是head,所以由headcontext实现了:

1 public void channelregistered(channelhandlercontext ctx) throws exception {
2     defaultchannelpipeline.this.invokehandleraddedifneeded();
3     ctx.firechannelregistered();
4 }

首先调用invokehandleraddedifneeded,处理channelhandler的handleradded和handlerremoved的回调
然后调用ctx的firechannelregistered方法:

1 public channelhandlercontext firechannelregistered() {
2     invokechannelregistered(this.findcontextinbound());
3     return this;
4 }

findcontextinbound方法,用来找出下一个channelinboundinvoker:

1 private abstractchannelhandlercontext findcontextinbound() {
2     abstractchannelhandlercontext ctx = this;
3 
4     do {
5         ctx = ctx.next;
6     } while(!ctx.inbound);
7 
8     return ctx;
9 }

从当前节点向后遍历,inbound之前说过,该方法就是找到下一个channelinboundinvoker的类型的abstractchannelhandlercontext,然后调用静态方法invokechannelregistered,重复上述操作,若是在channelinboundhandler中没有重写channelregistered方法,会一直执直到完所有channelhandler的channelregistered方法。
channelinboundhandleradapter中的默认channelregistered方法:

1 public void channelregistered(channelhandlercontext ctx) throws exception {
2     ctx.firechannelregistered();
3 }

比headcontext中的实现还简单,直接调用firechannelregistered向后传递


firechannelread方法,是在selector轮循到读事件就绪,会由channel的unsafe进行回调,异步处理:

1 public final channelpipeline firechannelread(object msg) {
2     abstractchannelhandlercontext.invokechannelread(this.head, msg);
3     return this;
4 }

还是从head开始调用abstractchannelhandlercontext的静态方法invokechannelread:

 1 static void invokechannelread(final abstractchannelhandlercontext next, object msg) {
 2     final object m = next.pipeline.touch(objectutil.checknotnull(msg, "msg"), next);
 3     eventexecutor executor = next.executor();
 4     if (executor.ineventloop()) {
 5         next.invokechannelread(m);
 6     } else {
 7         executor.execute(new runnable() {
 8             public void run() {
 9                 next.invokechannelread(m);
10             }
11         });
12     }
13 
14 }

和上面一个逻辑异步调用abstractchannelhandlercontext对象的invokechannelread方法:

 1 private void invokechannelread(object msg) {
 2     if (this.invokehandler()) {
 3         try {
 4             ((channelinboundhandler)this.handler()).channelread(this, msg);
 5         } catch (throwable var3) {
 6             this.notifyhandlerexception(var3);
 7         }
 8     } else {
 9         this.firechannelread(msg);
10     }
11 
12 }

这里也和上面一样,调用了headcontext的channelread方法:

1 public void channelread(channelhandlercontext ctx, object msg) throws exception {
2     ctx.firechannelread(msg);
3 }

这里直接不处理,调用channelhandlercontext 的firechannelread方法:

1 public channelhandlercontext firechannelread(object msg) {
2     invokechannelread(this.findcontextinbound(), msg);
3     return this;
4 }

和之前注册一样,选择下一个channelinboundhandler,重复执行上述操作。


再来看到writeandflush方法,和上面的就不太一样,这个发生在轮询前,用户通过channel来间接调用,在abstractchannel中实现:

1 public channelfuture writeandflush(object msg) {
2     return this.pipeline.writeandflush(msg);
3 }

实际上直接调用了defaultchannelpipeline的writeandflush方法:

1 public final channelfuture writeandflush(object msg) {
2     return this.tail.writeandflush(msg);
3 }

这里又有些不一样了,调用了tail的writeandflush方法,即tailcontext的writeandflush,在abstractchannelhandlercontext中实现:

1 public channelfuture writeandflush(object msg) {
2     return this.writeandflush(msg, this.newpromise());
3 }

newpromise产生了一个channelpromise,用来处理异步事件的;实际上调用了writeandflush的重载:

 1 public channelfuture writeandflush(object msg, channelpromise promise) {
 2     if (msg == null) {
 3         throw new nullpointerexception("msg");
 4     } else if (this.isnotvalidpromise(promise, true)) {
 5         referencecountutil.release(msg);
 6         return promise;
 7     } else {
 8         this.write(msg, true, promise);
 9         return promise;
10     }
11 }

继续调用write方法:

 1 private void write(object msg, boolean flush, channelpromise promise) {
 2     abstractchannelhandlercontext next = this.findcontextoutbound();
 3     object m = this.pipeline.touch(msg, next);
 4     eventexecutor executor = next.executor();
 5     if (executor.ineventloop()) {
 6         if (flush) {
 7             next.invokewriteandflush(m, promise);
 8         } else {
 9             next.invokewrite(m, promise);
10         }
11     } else {
12         object task;
13         if (flush) {
14             task = abstractchannelhandlercontext.writeandflushtask.newinstance(next, m, promise);
15         } else {
16             task = abstractchannelhandlercontext.writetask.newinstance(next, m, promise);
17         }
18 
19         safeexecute(executor, (runnable)task, promise, m);
20     }
21 
22 }

还是很相似,只不过先调用findcontextoutbound找到下一个channeloutboundinvoker类型的channelhandlercontext,而且这里是从尾部往前遍历的,这样来看前面所给的图是没有任何问题的
在找到channeloutboundinvoker后,调用invokewriteandflush或者invokewrite方法:
invokewriteandflush方法:

 1 private void invokewriteandflush(object msg, channelpromise promise) {
 2     if (this.invokehandler()) {
 3         this.invokewrite0(msg, promise);
 4         this.invokeflush0();
 5     } else {
 6         this.writeandflush(msg, promise);
 7     }
 8 
 9 }
10 
11 private void invokewrite0(object msg, channelpromise promise) {
12     try {
13         ((channeloutboundhandler)this.handler()).write(this, msg, promise);
14     } catch (throwable var4) {
15         notifyoutboundhandlerexception(var4, promise);
16     }
17 
18 }
19 
20 private void invokeflush0() {
21     try {
22         ((channeloutboundhandler)this.handler()).flush(this);
23     } catch (throwable var2) {
24         this.notifyhandlerexception(var2);
25     }
26 
27 }

可以看到invokewriteandflush回调了channeloutboundhandler的write和flush方法

最终会调用headcontext的write和flush方法:

1 public void write(channelhandlercontext ctx, object msg, channelpromise promise) throws exception {
2     this.unsafe.write(msg, promise);
3 }
4 
5 public void flush(channelhandlercontext ctx) throws exception {
6     this.unsafe.flush();
7 }

可以看到调用了unsafe的write和flush方法,向unsafe缓冲区写入了消息,当selector轮询到写事件就绪时,就会通过unsafe将刚才写入的内容交由jdk的socketchannel完成最终的write操作。


channelpipeline的分析到此全部结束。