Netty源码分析之ChannelPipeline(一)—ChannelPipeline的构造与初始化
netty中channelpipeline实际上类似与一条数据管道,负责传递channel中读取的消息,它本质上是基于责任链模式的设计与实现,无论是io事件的拦截器,还是用户自定义的channelhandler业务逻辑都做为一个个节点被添加到任务链上。
一、channelpipeline的设计与构成
channelpipeline中作为netty中的数据管道,作用就是通过控制与联通不同的channelhandler,传递channel中的消息。每一个channel,都对应一个channelpipeline作为channelhandler的容器,而channelhandlercontext则把channelhandler的封装成每个节点,以双向链表方式在容器中存在;我们可以通过下图简单看下它们之间的关系。
1、channelhandler
使用过netty的朋友们都清楚,channelhandler就是作为拦截器和业务处理逻辑的存在,它会处理channel中读写的消息;
首先看下channelhandler接口的定义
public interface channelhandler { //当channelhandler添加到pipeline中时调用 void handleradded(channelhandlercontext ctx) throws exception; //当channelhandler在pipeline中被移除时调用 void handlerremoved(channelhandlercontext ctx) throws exception; //在运行过程中 发生异常时调用 @deprecated void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception; @inherited @documented @target(elementtype.type) @retention(retentionpolicy.runtime) @interface sharable { // no value } }
channelhandler在处理或拦截io操作时,分为出站和入站两个方向,对应channelde读写两个操作,所以netty中又从channelhandler中派生出入站channelinboundhandler和出站channeloutboundhandler两个接口
channelinboundhandler处理入站数据以及各种状态的变化,下面列出了channelinboundhandler中数据被接收或者channel状态发生变化时被调用的方法,这些方法和channel的生命周期密切相关
public interface channelinboundhandler extends channelhandler { //当channel注册对应的eventloop并且能够处理i/o操作时被调用 void channelregistered(channelhandlercontext ctx) throws exception; //当channel从它对应的eventloop上注销,并且无法处理i/o操作时被调用 void channelunregistered(channelhandlercontext ctx) throws exception; //当channel已经连接时被调用 void channelactive(channelhandlercontext ctx) throws exception; //当channel为非活动状态,也就是断开时被调用 void channelinactive(channelhandlercontext ctx) throws exception; //当从channel读取数据时被调用 void channelread(channelhandlercontext ctx, object msg) throws exception; //当channel的上一个读数据完成后被调用 void channelreadcomplete(channelhandlercontext ctx) throws exception; //当调用fireusereventtriggered方法时被调用 void usereventtriggered(channelhandlercontext ctx, object evt) throws exception; //当channel的可写状态发生改变时被调用。用户可以确保写操作不会完成太快 void channelwritabilitychanged(channelhandlercontext ctx) throws exception; @override @suppresswarnings("deprecation") //入站操作发生异常时调用 void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception; }
channeloutboundhandler处理出站操作和数据
public interface channeloutboundhandler extends channelhandler { //当请求将channel绑定到本地地址时被调用 void bind(channelhandlercontext ctx, socketaddress localaddress, channelpromise promise) throws exception; //当请求将channel连接到远程节点时被调用 void connect( channelhandlercontext ctx, socketaddress remoteaddress, socketaddress localaddress, channelpromise promise) throws exception; //当请求将channel从远程节点断开时被调用 void disconnect(channelhandlercontext ctx, channelpromise promise) throws exception; //当请求关闭channel时被调用 void close(channelhandlercontext ctx, channelpromise promise) throws exception; //当请求从对应的eventloop中注销时被调用 void deregister(channelhandlercontext ctx, channelpromise promise) throws exception; //当请求从channel读取数据时被调用 void read(channelhandlercontext ctx) throws exception; //当请求通过channel将数据写到远程节点时被调用 void write(channelhandlercontext ctx, object msg, channelpromise promise) throws exception; //当请求通过channel将入队列数据冲刷到远程节点时被调用 void flush(channelhandlercontext ctx) throws exception; }
2、channelhandlercontext
channelhandlercontext可以说是channelpipeline的核心,它代表了channelhandler和channelpipeline之间的关联,我们首先要知道一个channelpipeline内部会维护一个双向链表,每当一个channelhandler被添加到channelpipeline中时,它都会被包装成为一个channelhandlercontext,组成链表的各个节点。
我们看下channelhandlercontext接口中定义的api接口
public interface channelhandlercontext extends attributemap, channelinboundinvoker, channeloutboundinvoker { /** * return the {@link channel} which is bound to the {@link channelhandlercontext}. */ //每个channelhandlercontext都会对一个channel channel channel(); /** * returns the {@link eventexecutor} which is used to execute an arbitrary task. */ //返回用于执行的eventexecutor任务 eventexecutor executor(); /** * the unique name of the {@link channelhandlercontext}.the name was used when then {@link channelhandler} * was added to the {@link channelpipeline}. this name can also be used to access the registered * {@link channelhandler} from the {@link channelpipeline}. */ //返回定义的name名称 string name(); /** * the {@link channelhandler} that is bound this {@link channelhandlercontext}. */ channelhandler handler(); /** * return {@code true} if the {@link channelhandler} which belongs to this context was removed * from the {@link channelpipeline}. note that this method is only meant to be called from with in the * {@link eventloop}. */ //如果绑定到channelpipeline的channelhandler被删除,返回true boolean isremoved(); //触发下一个channelinboundhandler中firechannelregistered方法 @override channelhandlercontext firechannelregistered(); //触发下一个channelinboundhandler中firechannelunregistered方法 @override channelhandlercontext firechannelunregistered(); //触发下一个channelinboundhandler中firechannelactive方法 @override channelhandlercontext firechannelactive(); //触发下一个channelinboundhandler中firechannelinactive方法 @override channelhandlercontext firechannelinactive(); //触发下一个channelinboundhandler中fireexceptioncaught方法 @override channelhandlercontext fireexceptioncaught(throwable cause); //触发下一个channelinboundhandler中fireusereventtriggered方法 @override channelhandlercontext fireusereventtriggered(object evt); //触发下一个channelinboundhandler中firechannelread方法 @override channelhandlercontext firechannelread(object msg); //触发下一个channelinboundhandler中firechannelreadcomplete方法 @override channelhandlercontext firechannelreadcomplete(); //触发下一个channelinboundhandler中firechannelwritabilitychanged方法 @override channelhandlercontext firechannelwritabilitychanged(); //触发下一个channelinboundhandler中channelread方法,如果是最后一个channelinboundhandler,则读取完成后触发channelreadcomplete @override channelhandlercontext read(); //触发下一个channeloutboundhandler中flush方法 @override channelhandlercontext flush(); /** * return the assigned {@link channelpipeline} */ channelpipeline pipeline(); /** * return the assigned {@link bytebufallocator} which will be used to allocate {@link bytebuf}s. */ //返回绑定该channel 的 bytebufallocator bytebufallocator alloc(); /** * @deprecated use {@link channel#attr(attributekey)} */ @deprecated @override //返回attribute <t> attribute<t> attr(attributekey<t> key); /** * @deprecated use {@link channel#hasattr(attributekey)} */ @deprecated @override //是否包含指定的attributekey <t> boolean hasattr(attributekey<t> key); }
二、channelpipeline的初始化
在abstractchannel的构造函数中我们可以看到对channelpipeline的初始化
protected abstractchannel(channel parent) { this.parent = parent; id = newid(); unsafe = newunsafe(); pipeline = newchannelpipeline();//初始化channelpipeline }
看下newchannelpipeline()内部的实现
protected defaultchannelpipeline newchannelpipeline() { return new defaultchannelpipeline(this); }
在这里创建了一个defaultchannelpipeline 对象,并传入channel对象。defaultchannelpipeline 实现了channelpipeline的接口
进入defaultchannelpipeline类内部,看下其具体构造
protected defaultchannelpipeline(channel channel) { this.channel = objectutil.checknotnull(channel, "channel"); succeededfuture = new succeededchannelfuture(channel, null); voidpromise = new voidchannelpromise(channel, true); tail = new tailcontext(this);//定义一个头部节点 head = new headcontext(this);//定义一个尾部节点 //连接头尾节点,构成双向链表 head.next = tail; tail.prev = head; }
在这里我们可以看到defaultchannelpipeline内部通过声明头尾两个context节点对象,构建了一个双向链表结构我们;其实这里的tailcontext与headcontext都是channelhandlercontext接口的具体实现;
三、总结
通过上面的内容,我们可以看出channelpipeline就是一个用于拦截channel入站和出站事件的channelhandler实例链,而channelhandlercontext就是这个实例链上的节点,每一个新创建的channel都会被分配一个新的channelpipeline。这篇文章我们对channelpipeline的构造和设计进行了大概的总结,其中如有不足与不正确的地方还望指出与海涵。后面我会对channelpipeline中channelhandler的添加、删除等具体操作与事件如何在管道中流通传递进行具体的分析。
关注微信公众号,查看更多技术文章。