Netty源码分析 (四)----- ChannelPipeline
netty在服务端端口绑定和新连接建立的过程中会建立相应的channel,而与channel的动作密切相关的是pipeline这个概念,pipeline像是可以看作是一条流水线,原始的原料(字节流)进来,经过加工,最后输出
pipeline 初始化
在上一篇文章中,我们已经知道了创建niosocketchannel
的时候会将netty的核心组件创建出来
pipeline是其中的一员,在下面这段代码中被创建
protected abstractchannel(channel parent) { this.parent = parent; id = newid(); unsafe = newunsafe(); pipeline = newchannelpipeline(); }
protected defaultchannelpipeline newchannelpipeline() { return new defaultchannelpipeline(this); }
niosocketchannel中保存了pipeline的引用
defaultchannelpipeline
protected defaultchannelpipeline(channel channel) { this.channel = objectutil.checknotnull(channel, "channel"); tail = new tailcontext(this); head = new headcontext(this); head.next = tail; tail.prev = head; }
pipeline中保存了channel的引用,创建完pipeline之后,整个pipeline是这个样子的
pipeline中的每个节点是一个channelhandlercontext
对象,每个context节点保存了它包裹的执行器 channelhandler
执行操作所需要的上下文,其实就是pipeline,因为pipeline包含了channel的引用,可以拿到所有的context信息
pipeline添加节点
下面是一段非常常见的客户端代码
bootstrap.childhandler(new channelinitializer<socketchannel>() { @override public void initchannel(socketchannel ch) throws exception { channelpipeline p = ch.pipeline(); p.addlast(new spliter()) p.addlast(new decoder()); p.addlast(new businesshandler()) p.addlast(new encoder()); } });
首先,用一个spliter将来源tcp数据包拆包,然后将拆出来的包进行decoder,传入业务处理器businesshandler,业务处理完encoder,输出
整个pipeline结构如下
我用两种颜色区分了一下pipeline中两种不同类型的节点,一个是 channelinboundhandler
,处理inbound事件,最典型的就是读取数据流,加工处理;还有一种类型的handler是 channeloutboundhandler
, 处理outbound事件,比如当调用writeandflush()
类方法时,就会经过该种类型的handler
不管是哪种类型的handler,其外层对象 channelhandlercontext
之间都是通过双向链表连接,而区分一个 channelhandlercontext
到底是in还是out,在添加节点的时候我们就可以看到netty是怎么处理的
defaultchannelpipeline
@override public final channelpipeline addlast(channelhandler... handlers) { return addlast(null, handlers); }
@override public final channelpipeline addlast(eventexecutorgroup executor, channelhandler... handlers) { for (channelhandler h: handlers) { addlast(executor, null, h); } return this; }
public final channelpipeline addlast(eventexecutorgroup group, string name, channelhandler handler) { final abstractchannelhandlercontext newctx; synchronized (this) { // 1.检查是否有重复handler checkmultiplicity(handler); // 2.创建节点 newctx = newcontext(group, filtername(name, handler), handler); // 3.添加节点 addlast0(newctx); } // 4.回调用户方法 callhandleradded0(handler); return this; }
这里简单地用synchronized
方法是为了防止多线程并发操作pipeline底层的双向链表
我们还是逐步分析上面这段代码
检查是否有重复handler
在用户代码添加一条handler的时候,首先会查看该handler有没有添加过
private static void checkmultiplicity(channelhandler handler) { if (handler instanceof channelhandleradapter) { channelhandleradapter h = (channelhandleradapter) handler; if (!h.issharable() && h.added) { throw new channelpipelineexception( h.getclass().getname() + " is not a @sharable handler, so can't be added or removed multiple times."); } h.added = true; } }
netty使用一个成员变量added
标识一个channel是否已经添加,上面这段代码很简单,如果当前要添加的handler是非共享的,并且已经添加过,那就抛出异常,否则,标识该handler已经添加
由此可见,一个handler如果是sharable的,就可以无限次被添加到pipeline中,我们客户端代码如果要让一个handler被共用,只需要加一个@sharable标注即可,如下
@sharable public class businesshandler { }
而如果handler是sharable的,一般就通过spring的注入的方式使用,不需要每次都new 一个
issharable()
方法正是通过该handler对应的类是否标注@sharable来实现的
channelhandleradapter
public boolean issharable() { class<?> clazz = getclass(); map<class<?>, boolean> cache = internalthreadlocalmap.get().handlersharablecache(); boolean sharable = cache.get(clazz); if (sharable == null) { sharable = clazz.isannotationpresent(sharable.class); cache.put(clazz, sharable); } return sharable; }
通过反射判断是否有sharable.class注解
创建节点
回到主流程,看创建上下文这段代码
newctx = newcontext(group, filtername(name, handler), handler);
这里我们需要先分析 filtername(name, handler)
这段代码,这个函数用于给handler创建一个唯一性的名字
private string filtername(string name, channelhandler handler) { if (name == null) { return generatename(handler); } checkduplicatename(name); return name; }
显然,我们传入的name为null,netty就给我们生成一个默认的name,否则,检查是否有重名,检查通过的话就返回
netty创建默认name的规则为 简单类名#0
,下面我们来看些具体是怎么实现的
private static final fastthreadlocal<map<class<?>, string>> namecaches = new fastthreadlocal<map<class<?>, string>>() { @override protected map<class<?>, string> initialvalue() throws exception { return new weakhashmap<class<?>, string>(); } }; private string generatename(channelhandler handler) { // 先查看缓存中是否有生成过默认name map<class<?>, string> cache = namecaches.get(); class<?> handlertype = handler.getclass(); string name = cache.get(handlertype); // 没有生成过,就生成一个默认name,加入缓存 if (name == null) { name = generatename0(handlertype); cache.put(handlertype, name); } // 生成完了,还要看默认name有没有冲突 if (context0(name) != null) { string basename = name.substring(0, name.length() - 1); for (int i = 1;; i ++) { string newname = basename + i; if (context0(newname) == null) { name = newname; break; } } } return name; }
netty使用一个 fastthreadlocal
(后面的文章会细说)变量来缓存handler的类和默认名称的映射关系,在生成name的时候,首先查看缓存中有没有生成过默认name(简单类名#0
),如果没有生成,就调用generatename0()
生成默认name,然后加入缓存
接下来还需要检查name是否和已有的name有冲突,调用context0()
,查找pipeline里面有没有对应的context
private abstractchannelhandlercontext context0(string name) { abstractchannelhandlercontext context = head.next; while (context != tail) { if (context.name().equals(name)) { return context; } context = context.next; } return null; }
context0()
方法链表遍历每一个 channelhandlercontext
,只要发现某个context的名字与待添加的name相同,就返回该context,最后抛出异常,可以看到,这个其实是一个线性搜索的过程
如果context0(name) != null
成立,说明现有的context里面已经有了一个默认name,那么就从 简单类名#1
往上一直找,直到找到一个唯一的name,比如简单类名#3
如果用户代码在添加handler的时候指定了一个name,那么要做到事仅仅为检查一下是否有重复
private void checkduplicatename(string name) { if (context0(name) != null) { throw new illegalargumentexception("duplicate handler name: " + name); } }
处理完name之后,就进入到创建context的过程,由前面的调用链得知,group
为null,因此childexecutor(group)
也返回null
defaultchannelpipeline
private abstractchannelhandlercontext newcontext(eventexecutorgroup group, string name, channelhandler handler) { return new defaultchannelhandlercontext(this, childexecutor(group), name, handler); } private eventexecutor childexecutor(eventexecutorgroup group) { if (group == null) { return null; } //.. }
defaultchannelhandlercontext
defaultchannelhandlercontext( defaultchannelpipeline pipeline, eventexecutor executor, string name, channelhandler handler) { super(pipeline, executor, name, isinbound(handler), isoutbound(handler)); if (handler == null) { throw new nullpointerexception("handler"); } this.handler = handler; }
构造函数中,defaultchannelhandlercontext
将参数回传到父类,保存handler的引用,进入到其父类
abstractchannelhandlercontext
abstractchannelhandlercontext(defaultchannelpipeline pipeline, eventexecutor executor, string name, boolean inbound, boolean outbound) { this.name = objectutil.checknotnull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; }
netty中用两个字段来表示这个channelhandlercontext
属于inbound
还是outbound
,或者两者都是,两个boolean是通过下面两个小函数来判断(见上面一段代码)
defaultchannelhandlercontext
private static boolean isinbound(channelhandler handler) { return handler instanceof channelinboundhandler; } private static boolean isoutbound(channelhandler handler) { return handler instanceof channeloutboundhandler; }
通过instanceof
关键字根据接口类型来判断,因此,如果一个handler实现了两类接口,那么他既是一个inbound类型的handler,又是一个outbound类型的handler,比如下面这个类
常用的,将decode操作和encode操作合并到一起的codec,一般会继承 messagetomessagecodec
,而messagetomessagecodec
就是继承channelduplexhandler
messagetomessagecodec
public abstract class messagetomessagecodec<inbound_in, outbound_in> extends channelduplexhandler { protected abstract void encode(channelhandlercontext ctx, outbound_in msg, list<object> out) throws exception; protected abstract void decode(channelhandlercontext ctx, inbound_in msg, list<object> out) throws exception; }
context 创建完了之后,接下来终于要将创建完毕的context加入到pipeline中去了
添加节点
private void addlast0(abstractchannelhandlercontext newctx) { abstractchannelhandlercontext prev = tail.prev; newctx.prev = prev; // 1 newctx.next = tail; // 2 prev.next = newctx; // 3 tail.prev = newctx; // 4 }
用下面这幅图可见简单的表示这段过程,说白了,其实就是一个双向链表的插入操作
操作完毕,该context就加入到pipeline中
到这里,pipeline添加节点的操作就完成了,你可以根据此思路掌握所有的addxxx()系列方法
回调用户方法
abstractchannelhandlercontext
private void callhandleradded0(final abstractchannelhandlercontext ctx) { ctx.handler().handleradded(ctx); ctx.setaddcomplete(); }
到了第四步,pipeline中的新节点添加完成,于是便开始回调用户代码 ctx.handler().handleradded(ctx);
,常见的用户代码如下
public class demohandler extends simplechannelinboundhandler<...> { @override public void handleradded(channelhandlercontext ctx) throws exception { // 节点被添加完毕之后回调到此 // do something } }
接下来,设置该节点的状态
abstractchannelhandlercontext
final void setaddcomplete() { for (;;) { int oldstate = handlerstate; if (oldstate == remove_complete || handler_state_updater.compareandset(this, oldstate, add_complete)) { return; } } }
用cas修改节点的状态至:remove_complete(说明该节点已经被移除) 或者 add_complete
pipeline删除节点
netty 有个最大的特性之一就是handler可插拔,做到动态编织pipeline,比如在首次建立连接的时候,需要通过进行权限认证,在认证通过之后,就可以将此context移除,下次pipeline在传播事件的时候就就不会调用到权限认证处理器
下面是权限认证handler最简单的实现,第一个数据包传来的是认证信息,如果校验通过,就删除此handler,否则,直接关闭连接
public class authhandler extends simplechannelinboundhandler<bytebuf> { @override protected void channelread0(channelhandlercontext ctx, bytebuf data) throws exception { if (verify(authdatapacket)) { ctx.pipeline().remove(this); } else { ctx.close(); } } private boolean verify(bytebuf bytebuf) { //... } }
重点就在 ctx.pipeline().remove(this)
这段代码
@override public final channelpipeline remove(channelhandler handler) { remove(getcontextordie(handler)); return this; }
remove操作相比add简单不少,分为三个步骤:
1.找到待删除的节点
2.调整双向链表指针删除
3.回调用户函数
找到待删除的节点
defaultchannelpipeline
private abstractchannelhandlercontext getcontextordie(channelhandler handler) { abstractchannelhandlercontext ctx = (abstractchannelhandlercontext) context(handler); if (ctx == null) { throw new nosuchelementexception(handler.getclass().getname()); } else { return ctx; } } @override public final channelhandlercontext context(channelhandler handler) { if (handler == null) { throw new nullpointerexception("handler"); } abstractchannelhandlercontext ctx = head.next; for (;;) { if (ctx == null) { return null; } if (ctx.handler() == handler) { return ctx; } ctx = ctx.next; } }
这里为了找到handler对应的context,照样是通过依次遍历双向链表的方式,直到某一个context的handler和当前handler相同,便找到了该节点
调整双向链表指针删除
defaultchannelpipeline
private abstractchannelhandlercontext remove(final abstractchannelhandlercontext ctx) { assert ctx != head && ctx != tail; synchronized (this) { // 2.调整双向链表指针删除 remove0(ctx); } // 3.回调用户函数 callhandlerremoved0(ctx); return ctx; } private static void remove0(abstractchannelhandlercontext ctx) { abstractchannelhandlercontext prev = ctx.prev; abstractchannelhandlercontext next = ctx.next; prev.next = next; // 1 next.prev = prev; // 2 }
经历的过程要比添加节点要简单,可以用下面一幅图来表示
最后的结果为
结合这两幅图,可以很清晰地了解权限验证handler的工作原理,另外,被删除的节点因为没有对象引用到,果过段时间就会被gc自动回收
回调用户函数
private void callhandlerremoved0(final abstractchannelhandlercontext ctx) { try { ctx.handler().handlerremoved(ctx); } finally { ctx.setremoved(); } }
到了第三步,pipeline中的节点删除完成,于是便开始回调用户代码 ctx.handler().handlerremoved(ctx);
,常见的代码如下
public class demohandler extends simplechannelinboundhandler<...> { @override public void handlerremoved(channelhandlercontext ctx) throws exception { // 节点被删除完毕之后回调到此,可做一些资源清理 // do something } }
最后,将该节点的状态设置为removed
final void setremoved() { handlerstate = remove_complete; }
总结
1、在 netty 中每个 channel 都有且仅有一个 channelpipeline 与之对应。
2、channelpipeline是一个维护了一个以 abstractchannelhandlercontext 为节点的双向链表,其中此链表是 以head(headcontext)作为头,以tail(tailcontext)作为尾的双向链表.
3、pipeline中的每个节点包着具体的处理器channelhandler
,节点根据channelhandler
的类型是channelinboundhandler
还是channeloutboundhandler
来判断该节点属于in还是out或者两者都是
推荐阅读
-
Netty源码分析 (四)----- ChannelPipeline
-
[Abp 源码分析]四、模块配置
-
Netty源码分析 (三)----- 服务端启动源码分析
-
Netty源码分析之ChannelPipeline(二)—ChannelHandler的添加与删除
-
jQuery 源码分析(四) each函数 $.each和$.fn.each方法 详解
-
Netty源码分析 (十)----- 拆包器之LineBasedFrameDecoder
-
netty之NioEventLoopGroup源码分析二
-
spring boot 2.0 源码分析(四)
-
Netty源码分析 (七)----- read过程 源码分析
-
Netty源码分析 (一)----- NioEventLoopGroup