Netty中的ChannelPipeline源码分析
channelpipeline在netty中是用来处理请求的责任链,默认实现是defaultchannelpipeline,其构造方法如下:
1 private final channel channel; 2 private final channelfuture succeededfuture; 3 private final voidchannelpromise voidpromise; 4 final abstractchannelhandlercontext head; 5 final abstractchannelhandlercontext tail; 6 7 protected defaultchannelpipeline(channel channel) { 8 this.channel = (channel)objectutil.checknotnull(channel, "channel"); 9 this.succeededfuture = new succeededchannelfuture(channel, (eventexecutor)null); 10 this.voidpromise = new voidchannelpromise(channel, true); 11 this.tail = new defaultchannelpipeline.tailcontext(this); 12 this.head = new defaultchannelpipeline.headcontext(this); 13 this.head.next = this.tail; 14 this.tail.prev = this.head; 15 }
channelpipeline和channel是一一对应关系,一个channel绑定一条channelpipeline责任链
succeededfuture 和voidpromise用来处理异步操作
abstractchannelhandlercontext 是持有请求的上下文对象,其和channelhandler是对应关系(在使用sharable注解的情况下,不同的abstractchannelhandlercontext 还可以对应同一个channelhandler),channelpipeline责任链
处理的就abstractchannelhandlercontext ,再将最后的abstractchannelhandlercontext 交给channelhandler去做正真的逻辑处理
abstractchannelhandlercontext构造方法如下:
1 private final string name; 2 private final defaultchannelpipeline pipeline; 3 final eventexecutor executor; 4 private final boolean inbound; 5 private final boolean outbound; 6 private final boolean ordered; 7 volatile abstractchannelhandlercontext next; 8 volatile abstractchannelhandlercontext prev; 9 10 abstractchannelhandlercontext(defaultchannelpipeline pipeline, eventexecutor executor, string name, boolean inbound, boolean outbound) { 11 this.name = (string)objectutil.checknotnull(name, "name"); 12 this.pipeline = pipeline; 13 this.executor = executor; 14 this.inbound = inbound; 15 this.outbound = outbound; 16 this.ordered = executor == null || executor instanceof orderedeventexecutor; 17 }
name是abstractchannelhandlercontext的名称,pipeline就是上面说的channelpipeline;executor是用来进行异步操作的,默认使用的是在前面博客中说过的nioeventloop (netty中nioeventloopgroup的创建源码分析)
inbound 和outbound 代表两种请求处理方式,对应netty中的i/o操作,若是inbound则处理input操作,由channelpipeline从head 开始向后遍历链表,并且只处理channelinboundhandler类型的abstractchannelhandlercontext;若是outbound 则处理output操作,由channelpipeline从tail开始向前遍历链表,并且只处理channeloutboundhandler类型的abstractchannelhandlercontext;
ordered 是判断是否需要提供executor。
由next和prev成员可以知道,channelpipeline维护的是一条abstractchannelhandlercontext的双向链表
其头节点head和尾节点tail分别默认初始化了headcontext和tailcontext
headcontext的构造:
1 final class headcontext extends abstractchannelhandlercontext implements channeloutboundhandler, channelinboundhandler { 2 private final unsafe unsafe; 3 4 headcontext(defaultchannelpipeline pipeline) { 5 super(pipeline, (eventexecutor)null, defaultchannelpipeline.head_name, false, true); 6 this.unsafe = pipeline.channel().unsafe(); 7 this.setaddcomplete(); 8 } 9 }
其中setaddcomplete是由abstractchannelhandlercontext实现的:
1 final void setaddcomplete() { 2 int oldstate; 3 do { 4 oldstate = this.handlerstate; 5 } while(oldstate != 3 && !handler_state_updater.compareandset(this, oldstate, 2)); 6 7 }
handlerstate表示abstractchannelhandlercontext对应的channelhandler的状态,有一下几种:
1 private static final int add_pending = 1; 2 private static final int add_complete = 2; 3 private static final int remove_complete = 3; 4 private static final int init = 0; 5 private volatile int handlerstate = 0;
handlerstate初始化默认是init状态。
handler_state_updater是一个原子更新器:
1 private static final atomicintegerfieldupdater<abstractchannelhandlercontext> handler_state_updater = atomicintegerfieldupdater.newupdater(abstractchannelhandlercontext.class, "handlerstate");
所以setaddcomplete方法,就是通过cas操作,将handlerstate状态更新为add_complete
tailcontext的构造:
1 final class tailcontext extends abstractchannelhandlercontext implements channelinboundhandler { 2 tailcontext(defaultchannelpipeline pipeline) { 3 super(pipeline, (eventexecutor)null, defaultchannelpipeline.tail_name, true, false); 4 this.setaddcomplete(); 5 } 6 }
和headcontext一样,将handlerstate状态更新为add_complete
结合官方给出的channelpipeline的图示更容易理解:
1 i/o request 2 via channel or 3 channelhandlercontext 4 | 5 +---------------------------------------------------+---------------+ 6 | channelpipeline | | 7 | \|/ | 8 | +---------------------+ +-----------+----------+ | 9 | | inbound handler n | | outbound handler 1 | | 10 | +----------+----------+ +-----------+----------+ | 11 | /|\ | | 12 | | \|/ | 13 | +----------+----------+ +-----------+----------+ | 14 | | inbound handler n-1 | | outbound handler 2 | | 15 | +----------+----------+ +-----------+----------+ | 16 | /|\ . | 17 | . . | 18 | channelhandlercontext.firein_evt() channelhandlercontext.out_evt()| 19 | [ method call] [method call] | 20 | . . | 21 | . \|/ | 22 | +----------+----------+ +-----------+----------+ | 23 | | inbound handler 2 | | outbound handler m-1 | | 24 | +----------+----------+ +-----------+----------+ | 25 | /|\ | | 26 | | \|/ | 27 | +----------+----------+ +-----------+----------+ | 28 | | inbound handler 1 | | outbound handler m | | 29 | +----------+----------+ +-----------+----------+ | 30 | /|\ | | 31 +---------------+-----------------------------------+---------------+ 32 | \|/ 33 +---------------+-----------------------------------+---------------+ 34 | | | | 35 | [ socket.read() ] [ socket.write() ] | 36 | | 37 | netty internal i/o threads (transport implementation) | 38 +-------------------------------------------------------------------+
下面对一些主要方法分析:
addfirst方法,有如下几种重载:
1 public final channelpipeline addfirst(channelhandler handler) { 2 return this.addfirst((string)null, (channelhandler)handler); 3 } 4 5 public final channelpipeline addfirst(string name, channelhandler handler) { 6 return this.addfirst((eventexecutorgroup)null, name, handler); 7 } 8 9 public final channelpipeline addfirst(channelhandler... handlers) { 10 return this.addfirst((eventexecutorgroup)null, (channelhandler[])handlers); 11 } 12 13 public final channelpipeline addfirst(eventexecutorgroup executor, channelhandler... handlers) { 14 if (handlers == null) { 15 throw new nullpointerexception("handlers"); 16 } else if (handlers.length != 0 && handlers[0] != null) { 17 int size; 18 for(size = 1; size < handlers.length && handlers[size] != null; ++size) { 19 ; 20 } 21 22 for(int i = size - 1; i >= 0; --i) { 23 channelhandler h = handlers[i]; 24 this.addfirst(executor, (string)null, h); 25 } 26 27 return this; 28 } else { 29 return this; 30 } 31 } 32 33 public final channelpipeline addfirst(eventexecutorgroup group, string name, channelhandler handler) { 34 final abstractchannelhandlercontext newctx; 35 synchronized(this) { 36 checkmultiplicity(handler); 37 name = this.filtername(name, handler); 38 newctx = this.newcontext(group, name, handler); 39 this.addfirst0(newctx); 40 if (!this.registered) { 41 newctx.setaddpending(); 42 this.callhandlercallbacklater(newctx, true); 43 return this; 44 } 45 46 eventexecutor executor = newctx.executor(); 47 if (!executor.ineventloop()) { 48 newctx.setaddpending(); 49 executor.execute(new runnable() { 50 public void run() { 51 defaultchannelpipeline.this.callhandleradded0(newctx); 52 } 53 }); 54 return this; 55 } 56 } 57 58 this.callhandleradded0(newctx); 59 return this; 60 }
前面几种都是间接调用的第四种没什么好说的,直接看第四种addfirst
首先调用checkmultiplicity,检查channelhandleradapter在不共享的情况下是否重复:
1 private static void checkmultiplicity(channelhandler handler) { 2 if (handler instanceof channelhandleradapter) { 3 channelhandleradapter h = (channelhandleradapter)handler; 4 if (!h.issharable() && h.added) { 5 throw new channelpipelineexception(h.getclass().getname() + " is not a @sharable handler, so can't be added or removed multiple times."); 6 } 7 8 h.added = true; 9 } 10 11 }
issharable方法:
1 public boolean issharable() { 2 class<?> clazz = this.getclass(); 3 map<class<?>, boolean> cache = internalthreadlocalmap.get().handlersharablecache(); 4 boolean sharable = (boolean)cache.get(clazz); 5 if (sharable == null) { 6 sharable = clazz.isannotationpresent(sharable.class); 7 cache.put(clazz, sharable); 8 } 9 10 return sharable; 11 }
首先尝试从当前线程的internalthreadlocalmap中获取handlersharablecache,(internalthreadlocalmap是在netty中使用高效的fastthreadlocal替代jdk的threadlocal使用的 netty中fastthreadlocal源码分析)
internalthreadlocalmap的handlersharablecache方法:
1 public map<class<?>, boolean> handlersharablecache() { 2 map<class<?>, boolean> cache = this.handlersharablecache; 3 if (cache == null) { 4 this.handlersharablecache = (map)(cache = new weakhashmap(4)); 5 } 6 7 return (map)cache; 8 }
当当前线程的internalthreadlocalmap中没有handlersharablecache时,直接创建一个大小为4的weakhashmap弱引用map;
根据clazz从map中get,若是没有,需要检测当前clazz是否有sharable注解,添加了sharable注解的channelhandleradapter可以在不同channel*享使用一个单例,前提是确保线程安全;
之后会将该clazz以及是否实现sharable注解的情况添加在cache缓存中;
其中channelhandler的added是用来标识是否添加过;
回到addfirst方法:
checkmultiplicity成功结束后,调用filtername方法,给当前要产生的abstractchannelhandlercontext对象产生一个名称,
然后调用newcontext方法,产生abstractchannelhandlercontext对象:
1 private abstractchannelhandlercontext newcontext(eventexecutorgroup group, string name, channelhandler handler) { 2 return new defaultchannelhandlercontext(this, this.childexecutor(group), name, handler); 3 }
这里实际上产生了一个defaultchannelhandlercontext对象:
1 final class defaultchannelhandlercontext extends abstractchannelhandlercontext { 2 private final channelhandler handler; 3 4 defaultchannelhandlercontext(defaultchannelpipeline pipeline, eventexecutor executor, string name, channelhandler handler) { 5 super(pipeline, executor, name, isinbound(handler), isoutbound(handler)); 6 if (handler == null) { 7 throw new nullpointerexception("handler"); 8 } else { 9 this.handler = handler; 10 } 11 } 12 13 public channelhandler handler() { 14 return this.handler; 15 } 16 17 private static boolean isinbound(channelhandler handler) { 18 return handler instanceof channelinboundhandler; 19 } 20 21 private static boolean isoutbound(channelhandler handler) { 22 return handler instanceof channeloutboundhandler; 23 } 24 }
可以看到defaultchannelhandlercontext 仅仅是将abstractchannelhandlercontext和channelhandler封装了
在产生了defaultchannelhandlercontext 对象后,调用addfirst0方法:
1 private void addfirst0(abstractchannelhandlercontext newctx) { 2 abstractchannelhandlercontext nextctx = this.head.next; 3 newctx.prev = this.head; 4 newctx.next = nextctx; 5 this.head.next = newctx; 6 nextctx.prev = newctx; 7 }
这里就是一个简单的双向链表的操作,将newctx节点插入到了head后面
然后判断registered成员的状态:
1 private boolean registered;
在初始化时是false
registered若是false,首先调用abstractchannelhandlercontext的setaddpending方法:
1 final void setaddpending() { 2 boolean updated = handler_state_updater.compareandset(this, 0, 1); 3 4 assert updated; 5 6 }
和前面说过的setaddcomplete方法同理,通过cas操作,将handlerstate状态设置为add_pending
接着调用callhandlercallbacklater方法:
1 private void callhandlercallbacklater(abstractchannelhandlercontext ctx, boolean added) { 2 assert !this.registered; 3 4 defaultchannelpipeline.pendinghandlercallback task = added ? new defaultchannelpipeline.pendinghandleraddedtask(ctx) : new defaultchannelpipeline.pendinghandlerremovedtask(ctx); 5 defaultchannelpipeline.pendinghandlercallback pending = this.pendinghandlercallbackhead; 6 if (pending == null) { 7 this.pendinghandlercallbackhead = (defaultchannelpipeline.pendinghandlercallback)task; 8 } else { 9 while(pending.next != null) { 10 pending = pending.next; 11 } 12 13 pending.next = (defaultchannelpipeline.pendinghandlercallback)task; 14 } 15 16 }
首先断言判断registered可能存在的多线程改变,然后根据added判断产生何种类型的pendinghandlercallback
pendinghandlercallback是用来处理channelhandler的两种回调,定义如下:
1 private abstract static class pendinghandlercallback implements runnable { 2 final abstractchannelhandlercontext ctx; 3 defaultchannelpipeline.pendinghandlercallback next; 4 5 pendinghandlercallback(abstractchannelhandlercontext ctx) { 6 this.ctx = ctx; 7 } 8 9 abstract void execute(); 10 }
pendinghandleraddedtask定义如下:
1 private final class pendinghandleraddedtask extends defaultchannelpipeline.pendinghandlercallback { 2 pendinghandleraddedtask(abstractchannelhandlercontext ctx) { 3 super(ctx); 4 } 5 6 public void run() { 7 defaultchannelpipeline.this.callhandleradded0(this.ctx); 8 } 9 10 void execute() { 11 eventexecutor executor = this.ctx.executor(); 12 if (executor.ineventloop()) { 13 defaultchannelpipeline.this.callhandleradded0(this.ctx); 14 } else { 15 try { 16 executor.execute(this); 17 } catch (rejectedexecutionexception var3) { 18 if (defaultchannelpipeline.logger.iswarnenabled()) { 19 defaultchannelpipeline.logger.warn("can't invoke handleradded() as the eventexecutor {} rejected it, removing handler {}.", new object[]{executor, this.ctx.name(), var3}); 20 } 21 22 defaultchannelpipeline.remove0(this.ctx); 23 this.ctx.setremoved(); 24 } 25 } 26 27 } 28 }
除去异常处理,无论是在execute方法还是在run方法中,主要核心是异步执行callhandleradded0方法:
1 private void callhandleradded0(abstractchannelhandlercontext ctx) { 2 try { 3 ctx.setaddcomplete(); 4 ctx.handler().handleradded(ctx); 5 } catch (throwable var10) { 6 boolean removed = false; 7 8 try { 9 remove0(ctx); 10 11 try { 12 ctx.handler().handlerremoved(ctx); 13 } finally { 14 ctx.setremoved(); 15 } 16 17 removed = true; 18 } catch (throwable var9) { 19 if (logger.iswarnenabled()) { 20 logger.warn("failed to remove a handler: " + ctx.name(), var9); 21 } 22 } 23 24 if (removed) { 25 this.fireexceptioncaught(new channelpipelineexception(ctx.handler().getclass().getname() + ".handleradded() has thrown an exception; removed.", var10)); 26 } else { 27 this.fireexceptioncaught(new channelpipelineexception(ctx.handler().getclass().getname() + ".handleradded() has thrown an exception; also failed to remove.", var10)); 28 } 29 } 30 31 }
除去异常处理,主要核心就两行代码,首先通过setaddcomplete方法,设置handlerstate状态为add_complete,然后回调channelhandler的handleradded方法,这个handleradded方法就很熟悉了,在使用netty处理业务逻辑时,会覆盖这个方法。
pendinghandlerremovedtask定义如下:
1 private final class pendinghandlerremovedtask extends defaultchannelpipeline.pendinghandlercallback { 2 pendinghandlerremovedtask(abstractchannelhandlercontext ctx) { 3 super(ctx); 4 } 5 6 public void run() { 7 defaultchannelpipeline.this.callhandlerremoved0(this.ctx); 8 } 9 10 void execute() { 11 eventexecutor executor = this.ctx.executor(); 12 if (executor.ineventloop()) { 13 defaultchannelpipeline.this.callhandlerremoved0(this.ctx); 14 } else { 15 try { 16 executor.execute(this); 17 } catch (rejectedexecutionexception var3) { 18 if (defaultchannelpipeline.logger.iswarnenabled()) { 19 defaultchannelpipeline.logger.warn("can't invoke handlerremoved() as the eventexecutor {} rejected it, removing handler {}.", new object[]{executor, this.ctx.name(), var3}); 20 } 21 22 this.ctx.setremoved(); 23 } 24 } 25 26 } 27 }
和pendinghandleraddedtask一样,主要还是异步调用callhandlerremoved0方法:
1 private void callhandlerremoved0(abstractchannelhandlercontext ctx) { 2 try { 3 try { 4 ctx.handler().handlerremoved(ctx); 5 } finally { 6 ctx.setremoved(); 7 } 8 } catch (throwable var6) { 9 this.fireexceptioncaught(new channelpipelineexception(ctx.handler().getclass().getname() + ".handlerremoved() has thrown an exception.", var6)); 10 } 11 12 }
首先直接回调channelhandler的handlerremoved方法,然后通过setremoved方法将handlerstate状态设置为remove_complete
回到callhandlercallbacklater,其中成员pendinghandlercallbackhead定义:
1 private defaultchannelpipeline.pendinghandlercallback pendinghandlercallbackhead;
结合pendinghandlercallback 可知,这个pendinghandlercallbackhead是 defaultchannelpipeline存储的一条pendinghandlercallback单链表,用来处理channelhandler的handleradded和handlerremoved的回调,在add的这些方法里调用callhandlercallbacklater时,added参数都为true,所以add的channelhandler只向pendinghandlercallbackhead添加了handleradded的回调。
回到addfirst方法,若是registered为true,先获取eventexecutor,判断是否处于轮询中,若不是,则需要开启轮询线程直接异步执行callhandleradded0方法,若处于轮询,由于channelpipeline的调用是发生在轮询时的,所以还是直接异步执行callhandleradded0方法。
addfirst方法到此结束,再来看addlast方法,同样有好几种重载:
1 public final channelpipeline addlast(channelhandler handler) { 2 return this.addlast((string)null, (channelhandler)handler); 3 } 4 5 public final channelpipeline addlast(string name, channelhandler handler) { 6 return this.addlast((eventexecutorgroup)null, name, handler); 7 } 8 9 public final channelpipeline addlast(channelhandler... handlers) { 10 return this.addlast((eventexecutorgroup)null, (channelhandler[])handlers); 11 } 12 13 public final channelpipeline addlast(eventexecutorgroup executor, channelhandler... handlers) { 14 if (handlers == null) { 15 throw new nullpointerexception("handlers"); 16 } else { 17 channelhandler[] var3 = handlers; 18 int var4 = handlers.length; 19 20 for(int var5 = 0; var5 < var4; ++var5) { 21 channelhandler h = var3[var5]; 22 if (h == null) { 23 break; 24 } 25 26 this.addlast(executor, (string)null, h); 27 } 28 29 return this; 30 } 31 } 32 33 public final channelpipeline addlast(eventexecutorgroup group, string name, channelhandler handler) { 34 final abstractchannelhandlercontext newctx; 35 synchronized(this) { 36 checkmultiplicity(handler); 37 newctx = this.newcontext(group, this.filtername(name, handler), handler); 38 this.addlast0(newctx); 39 if (!this.registered) { 40 newctx.setaddpending(); 41 this.callhandlercallbacklater(newctx, true); 42 return this; 43 } 44 45 eventexecutor executor = newctx.executor(); 46 if (!executor.ineventloop()) { 47 newctx.setaddpending(); 48 executor.execute(new runnable() { 49 public void run() { 50 defaultchannelpipeline.this.callhandleradded0(newctx); 51 } 52 }); 53 return this; 54 } 55 } 56 57 this.callhandleradded0(newctx); 58 return this; 59 }
还是间接调用最后一种:
对比addfirst来看,只有addlast0不一样:
1 private void addlast0(abstractchannelhandlercontext newctx) { 2 abstractchannelhandlercontext prev = this.tail.prev; 3 newctx.prev = prev; 4 newctx.next = this.tail; 5 prev.next = newctx; 6 this.tail.prev = newctx; 7 }
还是非常简单的双向链表基本操作,只不过这次,是将abstractchannelhandlercontext插入到了tail之前
还有两个,addbefore和addafter方法,和上述方法类似,就不再累赘
接下来看看channelpipeline是如何完成请求的传递的:
invokehandleraddedifneeded方法:
1 final void invokehandleraddedifneeded() { 2 assert this.channel.eventloop().ineventloop(); 3 4 if (this.firstregistration) { 5 this.firstregistration = false; 6 this.callhandleraddedforallhandlers(); 7 } 8 9 }
断言判断是否处于轮询线程(channelpipeline处理请求都是在轮询线程中,都需要异步处理)
其中firstregistration成员在defaultchannelpipeline初始化时为true:
1 private boolean firstregistration = true;
此时设置为false,表示第一次调用,以后都不再调用后面的callhandleraddedforallhandlers:
1 private void callhandleraddedforallhandlers() { 2 defaultchannelpipeline.pendinghandlercallback pendinghandlercallbackhead; 3 synchronized(this) { 4 assert !this.registered; 5 6 this.registered = true; 7 pendinghandlercallbackhead = this.pendinghandlercallbackhead; 8 this.pendinghandlercallbackhead = null; 9 } 10 11 for(defaultchannelpipeline.pendinghandlercallback task = pendinghandlercallbackhead; task != null; task = task.next) { 12 task.execute(); 13 } 14 15 }
刚才说过registered初始是false,在这里判断符合,之后就令其为true,然后获取处理channelhandler的回调链表pendinghandlercallbackhead,并且将pendinghandlercallbackhead置为null
然后遍历这个单链表,处理channelhandler的handleradded和handlerremoved的回调
firechannelregistered方法,当channel完成了向selector的注册后,会由channel的unsafe进行回调,异步处理:
1 public final channelpipeline firechannelregistered() { 2 abstractchannelhandlercontext.invokechannelregistered(this.head); 3 return this; 4 }
实际上的处理由abstractchannelhandlercontext的静态方法invokechannelregistered完成,这里传递的参数head就是defaultchannelpipeline初始化时创建的headcontext:
1 static void invokechannelregistered(final abstractchannelhandlercontext next) { 2 eventexecutor executor = next.executor(); 3 if (executor.ineventloop()) { 4 next.invokechannelregistered(); 5 } else { 6 executor.execute(new runnable() { 7 public void run() { 8 next.invokechannelregistered(); 9 } 10 }); 11 } 12 13 }
可以看到实际上是异步执行head对象的invokechannelregistered方法:
1 private void invokechannelregistered() { 2 if (this.invokehandler()) { 3 try { 4 ((channelinboundhandler)this.handler()).channelregistered(this); 5 } catch (throwable var2) { 6 this.notifyhandlerexception(var2); 7 } 8 } else { 9 this.firechannelregistered(); 10 } 11 12 }
其中invokehandler是用来判断当前的handlerstate状态:
1 private boolean invokehandler() { 2 int handlerstate = this.handlerstate; 3 return handlerstate == 2 || !this.ordered && handlerstate == 1; 4 }
若是当前handlerstate状态为add_complete,或者不需要提供eventexecutor并且状态为add_pending时返回true,否则返回false
在成立的情况下,调用channelinboundhandler的channelregistered方法,由于当前是head,所以由headcontext实现了:
1 public void channelregistered(channelhandlercontext ctx) throws exception { 2 defaultchannelpipeline.this.invokehandleraddedifneeded(); 3 ctx.firechannelregistered(); 4 }
首先调用invokehandleraddedifneeded,处理channelhandler的handleradded和handlerremoved的回调
然后调用ctx的firechannelregistered方法:
1 public channelhandlercontext firechannelregistered() { 2 invokechannelregistered(this.findcontextinbound()); 3 return this; 4 }
findcontextinbound方法,用来找出下一个channelinboundinvoker:
1 private abstractchannelhandlercontext findcontextinbound() { 2 abstractchannelhandlercontext ctx = this; 3 4 do { 5 ctx = ctx.next; 6 } while(!ctx.inbound); 7 8 return ctx; 9 }
从当前节点向后遍历,inbound之前说过,该方法就是找到下一个channelinboundinvoker的类型的abstractchannelhandlercontext,然后调用静态方法invokechannelregistered,重复上述操作,若是在channelinboundhandler中没有重写channelregistered方法,会一直执直到完所有channelhandler的channelregistered方法。
channelinboundhandleradapter中的默认channelregistered方法:
1 public void channelregistered(channelhandlercontext ctx) throws exception { 2 ctx.firechannelregistered(); 3 }
比headcontext中的实现还简单,直接调用firechannelregistered向后传递
firechannelread方法,是在selector轮循到读事件就绪,会由channel的unsafe进行回调,异步处理:
1 public final channelpipeline firechannelread(object msg) { 2 abstractchannelhandlercontext.invokechannelread(this.head, msg); 3 return this; 4 }
还是从head开始调用abstractchannelhandlercontext的静态方法invokechannelread:
1 static void invokechannelread(final abstractchannelhandlercontext next, object msg) { 2 final object m = next.pipeline.touch(objectutil.checknotnull(msg, "msg"), next); 3 eventexecutor executor = next.executor(); 4 if (executor.ineventloop()) { 5 next.invokechannelread(m); 6 } else { 7 executor.execute(new runnable() { 8 public void run() { 9 next.invokechannelread(m); 10 } 11 }); 12 } 13 14 }
和上面一个逻辑异步调用abstractchannelhandlercontext对象的invokechannelread方法:
1 private void invokechannelread(object msg) { 2 if (this.invokehandler()) { 3 try { 4 ((channelinboundhandler)this.handler()).channelread(this, msg); 5 } catch (throwable var3) { 6 this.notifyhandlerexception(var3); 7 } 8 } else { 9 this.firechannelread(msg); 10 } 11 12 }
这里也和上面一样,调用了headcontext的channelread方法:
1 public void channelread(channelhandlercontext ctx, object msg) throws exception { 2 ctx.firechannelread(msg); 3 }
这里直接不处理,调用channelhandlercontext 的firechannelread方法:
1 public channelhandlercontext firechannelread(object msg) { 2 invokechannelread(this.findcontextinbound(), msg); 3 return this; 4 }
和之前注册一样,选择下一个channelinboundhandler,重复执行上述操作。
再来看到writeandflush方法,和上面的就不太一样,这个发生在轮询前,用户通过channel来间接调用,在abstractchannel中实现:
1 public channelfuture writeandflush(object msg) { 2 return this.pipeline.writeandflush(msg); 3 }
实际上直接调用了defaultchannelpipeline的writeandflush方法:
1 public final channelfuture writeandflush(object msg) { 2 return this.tail.writeandflush(msg); 3 }
这里又有些不一样了,调用了tail的writeandflush方法,即tailcontext的writeandflush,在abstractchannelhandlercontext中实现:
1 public channelfuture writeandflush(object msg) { 2 return this.writeandflush(msg, this.newpromise()); 3 }
newpromise产生了一个channelpromise,用来处理异步事件的;实际上调用了writeandflush的重载:
1 public channelfuture writeandflush(object msg, channelpromise promise) { 2 if (msg == null) { 3 throw new nullpointerexception("msg"); 4 } else if (this.isnotvalidpromise(promise, true)) { 5 referencecountutil.release(msg); 6 return promise; 7 } else { 8 this.write(msg, true, promise); 9 return promise; 10 } 11 }
继续调用write方法:
1 private void write(object msg, boolean flush, channelpromise promise) { 2 abstractchannelhandlercontext next = this.findcontextoutbound(); 3 object m = this.pipeline.touch(msg, next); 4 eventexecutor executor = next.executor(); 5 if (executor.ineventloop()) { 6 if (flush) { 7 next.invokewriteandflush(m, promise); 8 } else { 9 next.invokewrite(m, promise); 10 } 11 } else { 12 object task; 13 if (flush) { 14 task = abstractchannelhandlercontext.writeandflushtask.newinstance(next, m, promise); 15 } else { 16 task = abstractchannelhandlercontext.writetask.newinstance(next, m, promise); 17 } 18 19 safeexecute(executor, (runnable)task, promise, m); 20 } 21 22 }
还是很相似,只不过先调用findcontextoutbound找到下一个channeloutboundinvoker类型的channelhandlercontext,而且这里是从尾部往前遍历的,这样来看前面所给的图是没有任何问题的
在找到channeloutboundinvoker后,调用invokewriteandflush或者invokewrite方法:
invokewriteandflush方法:
1 private void invokewriteandflush(object msg, channelpromise promise) { 2 if (this.invokehandler()) { 3 this.invokewrite0(msg, promise); 4 this.invokeflush0(); 5 } else { 6 this.writeandflush(msg, promise); 7 } 8 9 } 10 11 private void invokewrite0(object msg, channelpromise promise) { 12 try { 13 ((channeloutboundhandler)this.handler()).write(this, msg, promise); 14 } catch (throwable var4) { 15 notifyoutboundhandlerexception(var4, promise); 16 } 17 18 } 19 20 private void invokeflush0() { 21 try { 22 ((channeloutboundhandler)this.handler()).flush(this); 23 } catch (throwable var2) { 24 this.notifyhandlerexception(var2); 25 } 26 27 }
可以看到invokewriteandflush回调了channeloutboundhandler的write和flush方法
最终会调用headcontext的write和flush方法:
1 public void write(channelhandlercontext ctx, object msg, channelpromise promise) throws exception { 2 this.unsafe.write(msg, promise); 3 } 4 5 public void flush(channelhandlercontext ctx) throws exception { 6 this.unsafe.flush(); 7 }
可以看到调用了unsafe的write和flush方法,向unsafe缓冲区写入了消息,当selector轮询到写事件就绪时,就会通过unsafe将刚才写入的内容交由jdk的socketchannel完成最终的write操作。
channelpipeline的分析到此全部结束。