欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty源码分析 (五)----- 数据如何在 pipeline 中流动

程序员文章站 2022-05-03 22:25:41
在上一篇文章中,我们已经了解了pipeline在netty中所处的角色,像是一条流水线,控制着字节流的读写,本文,我们在这个基础上继续深挖pipeline在事件传播 Unsafe 顾名思义,unsafe是不安全的意思,就是告诉你不要在应用程序里面直接使用Unsafe以及他的衍生类对象。 netty官 ......

在上一篇文章中,我们已经了解了pipeline在netty中所处的角色,像是一条流水线,控制着字节流的读写,本文,我们在这个基础上继续深挖pipeline在事件传播

unsafe

顾名思义,unsafe是不安全的意思,就是告诉你不要在应用程序里面直接使用unsafe以及他的衍生类对象。

netty官方的解释如下

unsafe operations that should never be called from user-code. these methods are only provided to implement the actual transport, and must be invoked from an i/o thread

unsafe 在channel定义,属于channel的内部类,表明unsafe和channel密切相关

下面是unsafe接口的所有方法

interface unsafe {
   recvbytebufallocator.handle recvbufallochandle();
   
   socketaddress localaddress();
   socketaddress remoteaddress();

   void register(eventloop eventloop, channelpromise promise);
   void bind(socketaddress localaddress, channelpromise promise);
   void connect(socketaddress remoteaddress, socketaddress localaddress, channelpromise promise);
   void disconnect(channelpromise promise);
   void close(channelpromise promise);
   void closeforcibly();
   void beginread();
   void write(object msg, channelpromise promise);
   void flush();
   
   channelpromise voidpromise();
   channeloutboundbuffer outboundbuffer();
}

按功能可以分为分配内存,socket四元组信息,注册事件循环,绑定网卡端口,socket的连接和关闭,socket的读写,看的出来,这些操作都是和jdk底层相关

unsafe 继承结构

Netty源码分析 (五)----- 数据如何在 pipeline 中流动

 

 

niounsafe 在 unsafe基础上增加了以下几个接口

public interface niounsafe extends unsafe {
    selectablechannel ch();
    void finishconnect();
    void read();
    void forceflush();
}

从增加的接口以及类名上来看,niounsafe 增加了可以访问底层jdk的selectablechannel的功能,定义了从selectablechannel读取数据的read方法

unsafe的分类

从以上继承结构来看,我们可以总结出两种类型的unsafe分类,一个是与连接的字节数据读写相关的niobyteunsafe,一个是与新连接建立操作相关的niomessageunsafe

niobyteunsafe中的读:委托到外部类niosocketchannel

protected int doreadbytes(bytebuf bytebuf) throws exception {
    final recvbytebufallocator.handle allochandle = unsafe().recvbufallochandle();
    allochandle.attemptedbytesread(bytebuf.writablebytes());
    return bytebuf.writebytes(javachannel(), allochandle.attemptedbytesread());
}

最后一行已经与jdk底层以及netty中的bytebuf相关,将jdk的 selectablechannel的字节数据读取到netty的bytebuf

niomessageunsafe中的读:委托到外部类niosocketchannel

protected int doreadmessages(list<object> buf) throws exception {
    socketchannel ch = javachannel().accept();

    if (ch != null) {
        buf.add(new niosocketchannel(this, ch));
        return 1;
    }
    return 0;
}

niomessageunsafe 的读操作很简单,就是调用jdk的accept()方法,新建立一条连接

niobyteunsafe中的写:委托到外部类niosocketchannel

@override
protected int dowritebytes(bytebuf buf) throws exception {
    final int expectedwrittenbytes = buf.readablebytes();
    return buf.readbytes(javachannel(), expectedwrittenbytes);
}

最后一行已经与jdk底层以及netty中的bytebuf相关,将netty的bytebuf中的字节数据写到jdk的 selectablechannel

pipeline中的head

nioeventloop

private void processselectedkey(selectionkey k, abstractniochannel ch) {
     final abstractniochannel.niounsafe unsafe = ch.unsafe();
     //新连接的已准备接入或者已存在的连接有数据可读
     if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) {
         unsafe.read();
     }
}

niobyteunsafe

@override
public final void read() {
    final channelconfig config = config();
    final channelpipeline pipeline = pipeline();
    // 创建bytebuf分配器
    final bytebufallocator allocator = config.getallocator();
    final recvbytebufallocator.handle allochandle = recvbufallochandle();
    allochandle.reset(config);

    bytebuf bytebuf = null;
    do {
        // 分配一个bytebuf
        bytebuf = allochandle.allocate(allocator);
        // 将数据读取到分配的bytebuf中去
        allochandle.lastbytesread(doreadbytes(bytebuf));
        if (allochandle.lastbytesread() <= 0) {
            bytebuf.release();
            bytebuf = null;
            close = allochandle.lastbytesread() < 0;
            break;
        }

        // 触发事件,将会引发pipeline的读事件传播
        pipeline.firechannelread(bytebuf);
        bytebuf = null;
    } while (allochandle.continuereading());
    pipeline.firechannelreadcomplete();
}

同样,我抽出了核心代码,细枝末节先剪去,niobyteunsafe 要做的事情可以简单地分为以下几个步骤

  1. 拿到channel的config之后拿到bytebuf分配器,用分配器来分配一个bytebuf,bytebuf是netty里面的字节数据载体,后面读取的数据都读到这个对象里面
  2. 将channel中的数据读取到bytebuf
  3. 数据读完之后,调用 pipeline.firechannelread(bytebuf); 从head节点开始传播至整个pipeline
  4. 最后调用firechannelreadcomplete();

这里,我们的重点其实就是 pipeline.firechannelread(bytebuf);

defaultchannelpipeline

final abstractchannelhandlercontext head;
//...
head = new headcontext(this);

public final channelpipeline firechannelread(object msg) {
    abstractchannelhandlercontext.invokechannelread(head, msg);
    return this;
}

结合这幅图

Netty源码分析 (五)----- 数据如何在 pipeline 中流动

 

 

可以看到,数据从head节点开始流入,在进行下一步之前,我们先把head节点的功能过一遍

headcontext

final class headcontext extends abstractchannelhandlercontext
        implements channeloutboundhandler, channelinboundhandler {

    private final unsafe unsafe;

    headcontext(defaultchannelpipeline pipeline) {
        super(pipeline, null, head_name, false, true);
        unsafe = pipeline.channel().unsafe();
        setaddcomplete();
    }

    @override
    public channelhandler handler() {
        return this;
    }

    @override
    public void handleradded(channelhandlercontext ctx) throws exception {
        // noop
    }

    @override
    public void handlerremoved(channelhandlercontext ctx) throws exception {
        // noop
    }

    @override
    public void bind(
            channelhandlercontext ctx, socketaddress localaddress, channelpromise promise)
            throws exception {
        unsafe.bind(localaddress, promise);
    }

    @override
    public void connect(
            channelhandlercontext ctx,
            socketaddress remoteaddress, socketaddress localaddress,
            channelpromise promise) throws exception {
        unsafe.connect(remoteaddress, localaddress, promise);
    }

    @override
    public void disconnect(channelhandlercontext ctx, channelpromise promise) throws exception {
        unsafe.disconnect(promise);
    }

    @override
    public void close(channelhandlercontext ctx, channelpromise promise) throws exception {
        unsafe.close(promise);
    }

    @override
    public void deregister(channelhandlercontext ctx, channelpromise promise) throws exception {
        unsafe.deregister(promise);
    }

    @override
    public void read(channelhandlercontext ctx) {
        unsafe.beginread();
    }

    @override
    public void write(channelhandlercontext ctx, object msg, channelpromise promise) throws exception {
        unsafe.write(msg, promise);
    }

    @override
    public void flush(channelhandlercontext ctx) throws exception {
        unsafe.flush();
    }

    @override
    public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception {
        ctx.fireexceptioncaught(cause);
    }

    @override
    public void channelregistered(channelhandlercontext ctx) throws exception {
        invokehandleraddedifneeded();
        ctx.firechannelregistered();
    }

    @override
    public void channelunregistered(channelhandlercontext ctx) throws exception {
        ctx.firechannelunregistered();

        // remove all handlers sequentially if channel is closed and unregistered.
        if (!channel.isopen()) {
            destroy();
        }
    }

    @override
    public void channelactive(channelhandlercontext ctx) throws exception {
        ctx.firechannelactive();

        readifisautoread();
    }

    @override
    public void channelinactive(channelhandlercontext ctx) throws exception {
        ctx.firechannelinactive();
    }

    @override
    public void channelread(channelhandlercontext ctx, object msg) throws exception {
        ctx.firechannelread(msg);
    }

    @override
    public void channelreadcomplete(channelhandlercontext ctx) throws exception {
        ctx.firechannelreadcomplete();

        readifisautoread();
    }

    private void readifisautoread() {
        if (channel.config().isautoread()) {
            channel.read();
        }
    }

    @override
    public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception {
        ctx.fireusereventtriggered(evt);
    }

    @override
    public void channelwritabilitychanged(channelhandlercontext ctx) throws exception {
        ctx.firechannelwritabilitychanged();
    }
}

从head节点继承的两个接口看,ta既是一个channelhandlercontext,同时又属于inbound和outbound handler

在传播读写事件的时候,head的功能只是简单地将事件传播下去,如ctx.firechannelread(msg);

在真正执行读写操作的时候,例如在调用writeandflush()等方法的时候,最终都会委托到unsafe执行,而当一次数据读完,channelreadcomplete方法会被调用

pipeline中的inbound事件传播

我们接着上面的 abstractchannelhandlercontext.invokechannelread(head, msg); 这个静态方法看,参数传入了 head,我们知道入站数据都是从 head 开始的,以保证后面所有的 handler 都由机会处理数据流。

我们看看这个静态方法内部是怎么样的:

static void invokechannelread(final abstractchannelhandlercontext next, object msg) {
    final object m = next.pipeline.touch(objectutil.checknotnull(msg, "msg"), next);
    eventexecutor executor = next.executor();
    if (executor.ineventloop()) {
        next.invokechannelread(m);
    } else {
        executor.execute(new runnable() {
            public void run() {
                next.invokechannelread(m);
            }
        });
    }
}

调用这个 context (也就是 head) 的 invokechannelread 方法,并传入数据。我们再看看head中 invokechannelread 方法的实现,实际上是在headcontext的父类abstractchannelhandlercontext中:

abstractchannelhandlercontext

private void invokechannelread(object msg) {
    if (invokehandler()) {
        try {
            ((channelinboundhandler) handler()).channelread(this, msg);
        } catch (throwable t) {
            notifyhandlerexception(t);
        }
    } else {
        firechannelread(msg);
    }
}

public channelhandler handler() {
    return this;
}

上面 handler()就是headcontext中的handler,也就是headcontext自身,也就是调用 head 的 channelread 方法。那么这个方法是怎么实现的呢?

@override
public void channelread(channelhandlercontext ctx, object msg) throws exception {
    ctx.firechannelread(msg);
}

什么都没做,调用 context 的 fire 系列方法,将请求转发给下一个节点。我们这里是 firechannelread 方法,注意,这里方法名字都挺像的。需要细心区分。下面我们看看 context 的成员方法 firechannelread:

abstractchannelhandlercontext

@override
public channelhandlercontext firechannelread(final object msg) {
    invokechannelread(findcontextinbound(), msg);
    return this;
}

这个是 head 的抽象父类 abstractchannelhandlercontext 的实现,该方法再次调用了静态 fire 系列方法,但和上次不同的是,不再放入 head 参数了,而是使用 findcontextinbound 方法的返回值。从这个方法的名字可以看出,是找到入站类型的 handler。我们看看方法实现:

private abstractchannelhandlercontext findcontextinbound() {
    abstractchannelhandlercontext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

该方法很简单,找到当前 context 的 next 节点(inbound 类型的)并返回。这样就能将请求传递给后面的 inbound handler 了。我们来看看 invokechannelread(findcontextinbound(), msg);

static void invokechannelread(final abstractchannelhandlercontext next, object msg) {
    final object m = next.pipeline.touch(objectutil.checknotnull(msg, "msg"), next);
    eventexecutor executor = next.executor();
    if (executor.ineventloop()) {
        next.invokechannelread(m);
    } else {
        executor.execute(new runnable() {
            public void run() {
                next.invokechannelread(m);
            }
        });
    }

}

上面我们找到了next节点(inbound类型的),然后直接调用 next.invokechannelread(m);如果这个next是我们自定义的handler,此时我们自定义的handler的父类是abstractchannelhandlercontext,则又回到了abstractchannelhandlercontext中实现的invokechannelread,代码如下:

abstractchannelhandlercontext

private void invokechannelread(object msg) {
    if (invokehandler()) {
        try {
            ((channelinboundhandler) handler()).channelread(this, msg);
        } catch (throwable t) {
            notifyhandlerexception(t);
        }
    } else {
        firechannelread(msg);
    }
}

public channelhandler handler() {
    return this;
}

此时的handler()就是我们自定义的handler了,然后调用我们自定义handler中的 channelread(this, msg);

请求进来时,pipeline 会从 head 节点开始输送,通过配合 invoker 接口的 fire 系列方法,实现 context 链在 pipeline 中的完美传递。最终到达我们自定义的 handler。

注意:此时如果我们想继续向后传递该怎么办呢?我们前面说过,可以调用 context 的 fire 系列方法,就像 head 的 channelread 方法一样,调用 fire 系列方法,直接向后传递就 ok 了。

如果所有的handler都调用了fire系列方法,则会传递到最后一个inbound类型的handler,也就是——tail节点,那我们就来看看tail节点

pipeline中的tail

final class tailcontext extends abstractchannelhandlercontext implements channelinboundhandler {

    tailcontext(defaultchannelpipeline pipeline) {
        super(pipeline, null, tail_name, true, false);
        setaddcomplete();
    }

    @override
    public channelhandler handler() {
        return this;
    }

    @override
    public void channelregistered(channelhandlercontext ctx) throws exception { }

    @override
    public void channelunregistered(channelhandlercontext ctx) throws exception { }

    @override
    public void channelactive(channelhandlercontext ctx) throws exception { }

    @override
    public void channelinactive(channelhandlercontext ctx) throws exception { }

    @override
    public void channelwritabilitychanged(channelhandlercontext ctx) throws exception { }

    @override
    public void handleradded(channelhandlercontext ctx) throws exception { }

    @override
    public void handlerremoved(channelhandlercontext ctx) throws exception { }

    @override
    public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception {
        // this may not be a configuration error and so don't log anything.
        // the event may be superfluous for the current pipeline configuration.
        referencecountutil.release(evt);
    }

    @override
    public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception {
        onunhandledinboundexception(cause);
    }

    @override
    public void channelread(channelhandlercontext ctx, object msg) throws exception {
        onunhandledinboundmessage(msg);
    }

    @override
    public void channelreadcomplete(channelhandlercontext ctx) throws exception { }
}

正如我们前面所提到的,tail节点的大部分作用即终止事件的传播(方法体为空)

channelread

protected void onunhandledinboundmessage(object msg) {
    try {
        logger.debug(
                "discarded inbound message {} that reached at the tail of the pipeline. " +
                        "please check your pipeline configuration.", msg);
    } finally {
        referencecountutil.release(msg);
    }
}

tail节点在发现字节数据(bytebuf)或者decoder之后的业务对象在pipeline流转过程中没有被消费,落到tail节点,tail节点就会给你发出一个警告,告诉你,我已经将你未处理的数据给丢掉了

总结一下,tail节点的作用就是结束事件传播,并且对一些重要的事件做一些善意提醒

pipeline中的outbound事件传播

上一节中,我们在阐述tail节点的功能时,忽略了其父类abstractchannelhandlercontext所具有的功能,这一节中,我们以最常见的writeandflush操作来看下pipeline中的outbound事件是如何向外传播的

典型的消息推送系统中,会有类似下面的一段代码

channel channel = getchannel(userinfo);
channel.writeandflush(pushinfo);

这段代码的含义就是根据用户信息拿到对应的channel,然后给用户推送消息,跟进 channel.writeandflush

niosocketchannel

public channelfuture writeandflush(object msg) {
    return pipeline.writeandflush(msg);
}

从pipeline开始往外传播

public final channelfuture writeandflush(object msg) {
    return tail.writeandflush(msg);
}

channel 中大部分outbound事件都是从tail开始往外传播, writeandflush()方法是tail继承而来的方法,我们跟进去

abstractchannelhandlercontext

public channelfuture writeandflush(object msg) {
    return writeandflush(msg, newpromise());
}

public channelfuture writeandflush(object msg, channelpromise promise) {
    write(msg, true, promise);

    return promise;
}

abstractchannelhandlercontext

private void write(object msg, boolean flush, channelpromise promise) {
    abstractchannelhandlercontext next = findcontextoutbound();
    final object m = pipeline.touch(msg, next);
    eventexecutor executor = next.executor();
    if (executor.ineventloop()) {
        if (flush) {
            next.invokewriteandflush(m, promise);
        } else {
            next.invokewrite(m, promise);
        }
    } else {
        abstractwritetask task;
        if (flush) {
            task = writeandflushtask.newinstance(next, m, promise);
        }  else {
            task = writetask.newinstance(next, m, promise);
        }
        safeexecute(executor, task, promise, m);
    }
}

先调用findcontextoutbound()方法找到下一个outbound()节点

abstractchannelhandlercontext

private abstractchannelhandlercontext findcontextoutbound() {
    abstractchannelhandlercontext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

找outbound节点的过程和找inbound节点类似,反方向遍历pipeline中的双向链表,直到第一个outbound节点next,然后调用next.invokewriteandflush(m, promise)

abstractchannelhandlercontext

private void invokewriteandflush(object msg, channelpromise promise) {
    if (invokehandler()) {
        invokewrite0(msg, promise);
        invokeflush0();
    } else {
        writeandflush(msg, promise);
    }
}

调用该节点的channelhandler的write方法,flush方法我们暂且忽略,后面会专门讲writeandflush的完整流程

abstractchannelhandlercontext

private void invokewrite0(object msg, channelpromise promise) {
    try {
        ((channeloutboundhandler) handler()).write(this, msg, promise);
    } catch (throwable t) {
        notifyoutboundhandlerexception(t, promise);
    }
}

可以看到,数据开始出站,从后向前开始流动,和入站的方向是反的。那么最后会走到哪里呢,当然是走到 head 节点,因为 head 节点就是 outbound 类型的 handler。

headcontext

public void write(channelhandlercontext ctx, object msg, channelpromise promise) throws exception {
    unsafe.write(msg, promise);
}

调用了 底层的 unsafe 操作数据,这里,加深了我们对head节点的理解,即所有的数据写出都会经过head节点

当执行完这个 write 方法后,方法开始退栈。逐步退到 unsafe 的 read 方法,回到最初开始的地方,然后继续调用 pipeline.firechannelreadcomplete() 方法

Netty源码分析 (五)----- 数据如何在 pipeline 中流动

总结

总结一下一个请求在 pipeline 中的流转过程:

  1. 调用 pipeline 的 fire 系列方法,这些方法是接口 invoker 设计的,pipeline 实现了 invoker 的所有方法,inbound 事件从 head 开始流入,outbound 事件从 tail 开始流出。
  2. pipeline 会将请求交给 context,然后 context 通过抽象父类 abstractchannelhandlercontext 的 invoke 系列方法(静态和非静态的)配合 abstractchannelhandlercontext 的 fire 系列方法再配合 findcontextinbound 和 findcontextoutbound 方法完成各个 context 的数据流转。
  3. 当入站过程中,调用 了出站的方法,那么请求就不会向后走了。后面的处理器将不会有任何作用。想继续相会传递就调用 context 的 fire 系列方法,让 netty 在内部帮你传递数据到下一个节点。如果你想在整个通道传递,就在 handler 中调用 channel 或者 pipeline 的对应方法,这两个方法会将数据从头到尾或者从尾到头的流转一遍。