ChannelPipeline----贯穿io事件处理的大动脉
channelpipeline贯穿io事件处理的大动脉
上一篇,我们分析了nioeventloop及其相关类的主干逻辑代码,我们知道netty采用线程封闭的方式来避免多线程之间的资源竞争,最大限度地减少并发问题,减少锁的使用,因而能够有效减低线程切换的开销,减少cpu的使用时间。此外,我们还简单分析了netty对于线程组的封装eventloopgroup,目前一般采用roundrobin的方式在多个线程上均匀地分配channel。通过前面几篇文章的分析,我们已经对channel的初始化,注册到eventloop上,singlethreadeventloop的线程启动过程以及线程中运行的代码逻辑有了一些了解,此外我们也分析了用于处理基于tcp协议的io事件的nioeventloop类的具体的循环逻辑,通过对代码的详细分析,我们了解了对于connect,write,read,accept事件的不同处理逻辑,但是对于write和read事件的处理逻辑我们并没有分析的很详细,因为这些事件的处理涉及到netty中另一个很重要的模块,channelpipeline以及一系列相关的类如channel, channelhandler, channelhandlercontext等的理解,netty中的事件处理采用了经典的责任链(responsbility chain)的的设计模式,这种设计模式使得netty的io事件处理框架易于扩展,并且为业务逻辑提供了一个很好的抽象模型,大大降低了netty的使用难度,使得io事件的处理变得更符合思维习惯。
好了,废话了那么多,其实主要是想把前面分析的几篇的文章做一个小结和回顾,然后引出本篇的主题--netty的io事件处理链模式。
因为netty的代码结构相对来说还是很规整,它的模块之间的边界划分比较明确,eventloop作为io事件的“发源地”,与其交互的对象是channel类,而channelpipeline,channelhandlercontext, channelhandler等几个类则是与channel交互,他们并不直接与eventloop交互。
channelpipeline的结构图
首先每一个channel在初始化的时候就会创建一个channelpipeline,这点我们在前面分析niosocketchannel的初始化时也分析到了。目前channelpipeline的实现只有defaultchannelpipeline一种,所以我们也以defaultchannelpipeline来分析。defaultchannelpipeline内部有一个双向链表结构,这个链表的每个节点都是一个abstractchannelhandlercontext类型的节点,defaultchannelpipeline刚初始化时就会创建两个初始节点,分别是headcontext和tailcontext,这两个节点也并不完全是标记节点,他们都有各自实际的作用,
- headcontext,实现了bind,connect,disconnect,close,write,flush等等几个方法,基本都是通过直接调用unsafe的相关方法实现的。而对于其他的方法基本都是通过调用abstractchannelhandlercontext的fire方法将事件传给下一个节点。
- tailcontext, 主要用于处理写数据几乎没有实现任何逻辑,它的功能几乎全部继承自abstractchannelhandlercontext,而abstractchannelhandlercontext对于大部分事件处理的实现都是简单地将事件向下一个节点传递。注意,这里下一个节点不一定是前一个还是后一个,要根据具体事件类型或者具体的操作而定,对于channeloutboundinvoker接口中的方法都是从尾节点向首节点传递事件,而对于channelinboundinvoker接口中的方法都是从首节点往尾节点传递。我们可以形象地理解为,首节点是最靠近socket的,而尾节点是最原理socket的,所以有数据进来时,产生的读事件最先从首节点开始向后传递,当有写数据的动作时,则会从尾节点向头结点传递。
下面,我们以两个最重要的事件读事件和写事件,来分析netty的这种链式处理结构到底是怎么运转的。
读事件
首先,我们需要找到一个产生读事件并调用相关方法使得读事件开始传递的例子,很自然我们应该想到在eventloop中会产生读事件。
如下,就是nioeventloop中对于读事件的处理,通过调用niounsafe.read方法
// 处理read和accept事件 if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) { unsafe.read(); }
我们继续看niobyteunsafe.read方法,这个方法我们之前在分析nioeventloop事件处理逻辑时提到过,这个方法首先会通过缓冲分配器分配一个缓冲,然后从channel(也就是socket)中将数据读到缓冲中,每读一个缓冲,就会触发一个读事件,我们看具体的触发读事件的调用:
do { // 分配一个缓冲 bytebuf = allochandle.allocate(allocator); // 将通道的数据读取到缓冲中 allochandle.lastbytesread(doreadbytes(bytebuf)); // 如果没有读取到数据,说明通道中没有待读取的数据了, if (allochandle.lastbytesread() <= 0) { // nothing was read. release the buffer. // 因为没读取到数据,所以应该释放缓冲 bytebuf.release(); bytebuf = null; // 如果读取到的数据量是负数,说明通道已经关闭了 close = allochandle.lastbytesread() < 0; if (close) { // there is nothing left to read as we received an eof. readpending = false; } break; } // 更新handle内部的簿记量 allochandle.incmessagesread(1); readpending = false; // 向channel的处理器流水线中触发一个事件, // 让取到的数据能够被流水线上的各个channelhandler处理 pipeline.firechannelread(bytebuf); bytebuf = null; // 这里根据如下条件判断是否继续读: // 上一次读取到的数据量大于0,并且读取到的数据量等于分配的缓冲的最大容量, // 此时说明通道中还有待读取的数据 } while (allochandle.continuereading());
为了代码逻辑的完整性,我这里把整个循环的代码都贴上来,其实我们要关注的仅仅是pipeline.firechannelread(bytebuf)这一句,好了,现在我们找到channelpipeline触发读事件的入口方法,我们顺着这个方法,顺藤摸瓜就能一步步理清事件的传递过程了。
defaultchannelpipeline.firechannelread
如果我们看一下channelpipeline接口,这里面的方法名都是以fire开头的,实际就是想表达这些方法都是触发了一个事件,然后这个事件就会在内部的处理器链表中传递。
我们看到这里调用了一个静态方法,并且以头结点为参数,也就是说事件传递是从头结点开始的。
public final channelpipeline firechannelread(object msg) { abstractchannelhandlercontext.invokechannelread(head, msg); return this; }
abstractchannelhandlercontext.invokechannelread(final abstractchannelhandlercontext next, object msg)
可以看到,这个方法中通过调用invokechannelread执行处理逻辑
static void invokechannelread(final abstractchannelhandlercontext next, object msg) { // 维护引用计数,主要是为了侦测资源泄漏问题 final object m = next.pipeline.touch(objectutil.checknotnull(msg, "msg"), next); eventexecutor executor = next.executor(); if (executor.ineventloop()) { // 调用invokechannelread执行处理逻辑 next.invokechannelread(m); } else { executor.execute(new runnable() { @override public void run() { next.invokechannelread(m); } }); } }
abstractchannelhandlercontext.invokechannelread(object msg)
这里可以看到,abstractchannelhandlercontext通过自己内部的handler对象来实现读数据的逻辑。这也体现了channelhandlercontext在整个结构中的作用,其实它是起到了在channelpipeline和handler之间的一个中间人的角色,那我们要问:既然channelhandlercontext不起什么实质性的作用,那为什么要多这一个中间层呢,这样设计的好处是什么?我认为这样设计其实是为了尽最大可能对使用者屏蔽netty框架的细节,试想如果没有这个context的中间角色,使用者必然要详细地了解channelpipeline,并且还要考虑事件传递是找下一个节点,还要考虑下一个节点应该沿着链表的正序找还是沿着链表 倒叙找,所以这里channelhandlercontext的角色我认为最大的作用就是封装了链表的逻辑,并且封装了不同类型操作的传播方式。当然也起到了一些引用传递的作用,如channel引用可以简介地传递给用户。
好了,回到正题,从前面的方法中我们知道读事件最先是从headcontext节点开始的,所以我们看一下headcontext的channelread方法(因为headcontext也实现了handler方法,并且返回的就是自身)
private void invokechannelread(object msg) { // 如果这个handler已经准备就绪,那么就执行处理逻辑 // 否则将事件传递给下一个处理器节点 if (invokehandler()) { try { // 调用内部的handler的channelread方法 ((channelinboundhandler) handler()).channelread(this, msg); } catch (throwable t) { notifyhandlerexception(t); } } else { firechannelread(msg); } }
headcontext.channelread
这里的调用也是一个重要的注意点,这里调用了channelhandlercontext.firechannelread方法,这正是事件传播的方法,fire开头的方法的作用就是将当前的操作(或者叫事件)从当前处理节点传递给下一个处理节点。这样就实现了事件在链表中的传播。
public void channelread(channelhandlercontext ctx, object msg) { ctx.firechannelread(msg); }
小结
到这里我们先暂停一下,总结一下读事件(或者是读操作)在channelpipeline内部的传播机制,其实很简单,
- 首先外部调用者会通过unsafe最终调用channelpipeline.firechannelread方法,并将从channel中读取到的数据作为参数传进来
- 以头结点作为参数调用静态方法abstractchannelhandlercontext.firechannelread
- 然后头结点headcontext开始调用节点的invokechannelread方法(即channelhandlercontext的invokechannelread方法),
- invokechannelread方法会调用当前节点的handler对象的channelread方法执行处理逻辑
- handler对象的channelread方法中可以调用abstractchannelhandlercontext.firechannelread将这个事件传递到下一个节点
- 这样事件就能够沿着链条不断传递下去,当然如果业务处理需要,完全可以在某个节点将事件的传递终止,也就是在这个节点不调用channelhandlercontext.firechannelread
写事件
此外,我们分析一下写数据的操作是怎么传播的。分析写数据操作的入口并不想读事件那么好找,在netty中用户的代码中写数据最终都是被放到内部的缓冲中,当nioeventloop中监听到底层的socket可以写数据的事件时,实际上是吧当前缓冲中的数据发送到socket中,而对于用户来讲,是接触不到socketchannel这一层的。
根据前面的分析,我们知道,用户一般都会与channel,channelhandler, channelhandlercontext这几种类打交道,写数据的操作也是通过channel的write和writeandflush触发的,这两个方法区别在于writeandflush在写完数据后还会触发一次刷写操作,将缓冲中的数据实际写入到socket中。
abstractchannel.write
仍然是将操作交给内部的channelpipeline,触发流水线操作
public channelfuture write(object msg, channelpromise promise) { return pipeline.write(msg, promise); }
defaultchannelpipeline.write
这里可以很清楚地看出来,写数据的操作从为节点开始,但是tailcontext并未重写write方法,所以最终调用的还是abstractchannelhandlercontext中的相应方法。
我们沿着调用链往下走,发现write系列的方法其实是将写操作传递给了下一个channeloutboundhandler类型的处理节点,注意这里是从尾节点向前找,遍历链表的顺序和读数据正好相反。
真正调用
public final channelfuture write(object msg, channelpromise promise) { return tail.write(msg, promise); }
abstractchannelhandlercontext.write
从这个方法可以明显地看出来,write方法将写操作交给了下一个channeloutboundhandler类型的处理器节点。
private void write(object msg, boolean flush, channelpromise promise) { objectutil.checknotnull(msg, "msg"); try { if (isnotvalidpromise(promise, true)) { referencecountutil.release(msg); // cancelled return; } } catch (runtimeexception e) { referencecountutil.release(msg); throw e; } // 沿着链表向前遍历,找到下一个channeloutboundhandler类型的处理器节点 final abstractchannelhandlercontext next = findcontextoutbound(flush ? (mask_write | mask_flush) : mask_write); final object m = pipeline.touch(msg, next); eventexecutor executor = next.executor(); if (executor.ineventloop()) { if (flush) { // 调用abstractchannelhandlercontext.invokewriteandflush方法执行真正的写入逻辑 next.invokewriteandflush(m, promise); } else { next.invokewrite(m, promise); } } else { // 如果当前是异步地写入数据,那么需要将写入的逻辑封装成一个任务添加到eventloop的任务对队列中 final abstractwritetask task; if (flush) { task = writeandflushtask.newinstance(next, m, promise); } else { task = writetask.newinstance(next, m, promise); } if (!safeexecute(executor, task, promise, m)) { // we failed to submit the abstractwritetask. we need to cancel it so we decrement the pending bytes // and put it back in the recycler for re-use later. // // see https://github.com/netty/netty/issues/8343. task.cancel(); } } }
abstractchannelhandlercontext.invokewrite
我们接着看invokewrite0方法
private void invokewrite(object msg, channelpromise promise) { if (invokehandler()) { invokewrite0(msg, promise); } else { write(msg, promise); } }
abstractchannelhandlercontext.invokewrite0
这里可以清楚地看到,最终是调用了handler的write方法执行真正的写入逻辑,这个逻辑实际上就是有用户自己实现的。
private void invokewrite0(object msg, channelpromise promise) { try { // 调用当前节点的handler的write方法执行真正的写入逻辑 ((channeloutboundhandler) handler()).write(this, msg, promise); } catch (throwable t) { notifyoutboundhandlerexception(t, promise); } }
到这里,我们已经知道写入的操作是怎么从尾节点开始,也知道了通过调用当前处理节点的abstractchannelhandlercontext.write方法可以将写入操作传递给下一个节点,那么数据经过层层传递后,最终是怎么写到socket中的呢?回答这个问题,我们需要看一下headcontext的代码!我们知道写入的操作是从尾节点向前传递的,那么头节点headcontext就是传递的最后一个节点。
headcontext.write
最终调用了unsafe.write方法。
在abstractchannel.abstractunsafe的实现中,write方法将经过前面一系列处理器处理过的数据存放到内部的缓冲中。
public void write(channelhandlercontext ctx, object msg, channelpromise promise) { unsafe.write(msg, promise); }
刷写操作的传递
前面我们提到,写数据的操作除了write还有writeandflush,这个操作除了写数据,还会紧接着执行一次刷写操作。刷写操作也会从尾节点向前传递,最终传递到头结点headcontext,其中的flush方法如下:
public void flush(channelhandlercontext ctx) { unsafe.flush(); }
在abstractchannel.abstractunsafe的实现中,flush操作会将前面存储在内部缓冲区中的数据吸入到socket中,从而完成刷写。
总结
本节,我们主要通过io事件处理中最重要的两种事件,即读事件和写事件为切入点 详细分析了netty中对于这两种事件的处理方法。其中写数据的事件与我们之前在jdk nio中建立起的印象差别还是不大的,都是对从socket中读取的数据进行处理,但是写事件跟jdk nio中的概念就有较大差别了,因为netty对数据的写入做了很大的改变和优化,用户代码中通过channel调用相关的写数据的方法,这个方法会触发处理器链条上的所有相关的处理器对待写入的数据进行加工,最后在头结点headcontext中被写入channel内部的缓冲区,通过flush操作将缓冲的数据写入socket中。
这里面最重要的也是最值得我们学习的一点就是责任链模式,显然,这又是一次对责任链模式的成功运用,是的框架的扩展性大大增强,而且面向用户的接口更加容易理解,简单易用,向用户屏蔽了大部分框架实现细节。
下一篇: oracle数据库用户之间授权