欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty源码分析 (四)----- ChannelPipeline

程序员文章站 2023-11-18 19:14:34
netty在服务端端口绑定和新连接建立的过程中会建立相应的channel,而与channel的动作密切相关的是pipeline这个概念,pipeline像是可以看作是一条流水线,原始的原料(字节流)进来,经过加工,最后输出 pipeline 初始化 在上一篇文章中,我们已经知道了创建NioSocke ......

netty在服务端端口绑定和新连接建立的过程中会建立相应的channel,而与channel的动作密切相关的是pipeline这个概念,pipeline像是可以看作是一条流水线,原始的原料(字节流)进来,经过加工,最后输出

pipeline 初始化

在上一篇文章中,我们已经知道了创建niosocketchannel的时候会将netty的核心组件创建出来

Netty源码分析 (四)----- ChannelPipeline

pipeline是其中的一员,在下面这段代码中被创建

protected abstractchannel(channel parent) {
    this.parent = parent;
    id = newid();
    unsafe = newunsafe();
    pipeline = newchannelpipeline();
}
protected defaultchannelpipeline newchannelpipeline() {
    return new defaultchannelpipeline(this);
}

niosocketchannel中保存了pipeline的引用

defaultchannelpipeline

protected defaultchannelpipeline(channel channel) {
    this.channel = objectutil.checknotnull(channel, "channel");
    tail = new tailcontext(this);
    head = new headcontext(this);

    head.next = tail;
    tail.prev = head;
}

pipeline中保存了channel的引用,创建完pipeline之后,整个pipeline是这个样子的

Netty源码分析 (四)----- ChannelPipeline

 

pipeline中的每个节点是一个channelhandlercontext对象,每个context节点保存了它包裹的执行器 channelhandler 执行操作所需要的上下文,其实就是pipeline,因为pipeline包含了channel的引用,可以拿到所有的context信息

pipeline添加节点

下面是一段非常常见的客户端代码

bootstrap.childhandler(new channelinitializer<socketchannel>() {
     @override
     public void initchannel(socketchannel ch) throws exception {
         channelpipeline p = ch.pipeline();
         p.addlast(new spliter())
         p.addlast(new decoder());
         p.addlast(new businesshandler())
         p.addlast(new encoder());
     }
});

首先,用一个spliter将来源tcp数据包拆包,然后将拆出来的包进行decoder,传入业务处理器businesshandler,业务处理完encoder,输出

整个pipeline结构如下

Netty源码分析 (四)----- ChannelPipeline

 

我用两种颜色区分了一下pipeline中两种不同类型的节点,一个是 channelinboundhandler,处理inbound事件,最典型的就是读取数据流,加工处理;还有一种类型的handler是 channeloutboundhandler, 处理outbound事件,比如当调用writeandflush()类方法时,就会经过该种类型的handler

不管是哪种类型的handler,其外层对象 channelhandlercontext 之间都是通过双向链表连接,而区分一个 channelhandlercontext到底是in还是out,在添加节点的时候我们就可以看到netty是怎么处理的

defaultchannelpipeline

@override
public final channelpipeline addlast(channelhandler... handlers) {
    return addlast(null, handlers);
}
@override
public final channelpipeline addlast(eventexecutorgroup executor, channelhandler... handlers) {
    for (channelhandler h: handlers) {
        addlast(executor, null, h);
    }
    return this;
}
public final channelpipeline addlast(eventexecutorgroup group, string name, channelhandler handler) {
    final abstractchannelhandlercontext newctx;
    synchronized (this) {
        // 1.检查是否有重复handler
        checkmultiplicity(handler);
        // 2.创建节点
        newctx = newcontext(group, filtername(name, handler), handler);
        // 3.添加节点
        addlast0(newctx);
    }
   
    // 4.回调用户方法
    callhandleradded0(handler);
    
    return this;
}

这里简单地用synchronized方法是为了防止多线程并发操作pipeline底层的双向链表

我们还是逐步分析上面这段代码

检查是否有重复handler

在用户代码添加一条handler的时候,首先会查看该handler有没有添加过

private static void checkmultiplicity(channelhandler handler) {
    if (handler instanceof channelhandleradapter) {
        channelhandleradapter h = (channelhandleradapter) handler;
        if (!h.issharable() && h.added) {
            throw new channelpipelineexception(
                    h.getclass().getname() +
                    " is not a @sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;
    }
}

netty使用一个成员变量added标识一个channel是否已经添加,上面这段代码很简单,如果当前要添加的handler是非共享的,并且已经添加过,那就抛出异常,否则,标识该handler已经添加

由此可见,一个handler如果是sharable的,就可以无限次被添加到pipeline中,我们客户端代码如果要让一个handler被共用,只需要加一个@sharable标注即可,如下

@sharable
public class businesshandler {
    
}

而如果handler是sharable的,一般就通过spring的注入的方式使用,不需要每次都new 一个

issharable() 方法正是通过该handler对应的类是否标注@sharable来实现的

channelhandleradapter

public boolean issharable() {
   class<?> clazz = getclass();
    map<class<?>, boolean> cache = internalthreadlocalmap.get().handlersharablecache();
    boolean sharable = cache.get(clazz);
    if (sharable == null) {
        sharable = clazz.isannotationpresent(sharable.class);
        cache.put(clazz, sharable);
    }
    return sharable;
}

通过反射判断是否有sharable.class注解

创建节点

回到主流程,看创建上下文这段代码

newctx = newcontext(group, filtername(name, handler), handler);

这里我们需要先分析 filtername(name, handler) 这段代码,这个函数用于给handler创建一个唯一性的名字

private string filtername(string name, channelhandler handler) {
    if (name == null) {
        return generatename(handler);
    }
    checkduplicatename(name);
    return name;
}

显然,我们传入的name为null,netty就给我们生成一个默认的name,否则,检查是否有重名,检查通过的话就返回

netty创建默认name的规则为 简单类名#0,下面我们来看些具体是怎么实现的

private static final fastthreadlocal<map<class<?>, string>> namecaches =
        new fastthreadlocal<map<class<?>, string>>() {
    @override
    protected map<class<?>, string> initialvalue() throws exception {
        return new weakhashmap<class<?>, string>();
    }
};

private string generatename(channelhandler handler) {
    // 先查看缓存中是否有生成过默认name
    map<class<?>, string> cache = namecaches.get();
    class<?> handlertype = handler.getclass();
    string name = cache.get(handlertype);
    // 没有生成过,就生成一个默认name,加入缓存 
    if (name == null) {
        name = generatename0(handlertype);
        cache.put(handlertype, name);
    }

    // 生成完了,还要看默认name有没有冲突
    if (context0(name) != null) {
        string basename = name.substring(0, name.length() - 1);
        for (int i = 1;; i ++) {
            string newname = basename + i;
            if (context0(newname) == null) {
                name = newname;
                break;
            }
        }
    }
    return name;
}

netty使用一个 fastthreadlocal(后面的文章会细说)变量来缓存handler的类和默认名称的映射关系,在生成name的时候,首先查看缓存中有没有生成过默认name(简单类名#0),如果没有生成,就调用generatename0()生成默认name,然后加入缓存

接下来还需要检查name是否和已有的name有冲突,调用context0(),查找pipeline里面有没有对应的context

private abstractchannelhandlercontext context0(string name) {
    abstractchannelhandlercontext context = head.next;
    while (context != tail) {
        if (context.name().equals(name)) {
            return context;
        }
        context = context.next;
    }
    return null;
}

context0()方法链表遍历每一个 channelhandlercontext,只要发现某个context的名字与待添加的name相同,就返回该context,最后抛出异常,可以看到,这个其实是一个线性搜索的过程

如果context0(name) != null 成立,说明现有的context里面已经有了一个默认name,那么就从 简单类名#1 往上一直找,直到找到一个唯一的name,比如简单类名#3

如果用户代码在添加handler的时候指定了一个name,那么要做到事仅仅为检查一下是否有重复

private void checkduplicatename(string name) {
    if (context0(name) != null) {
        throw new illegalargumentexception("duplicate handler name: " + name);
    }
}

处理完name之后,就进入到创建context的过程,由前面的调用链得知,group为null,因此childexecutor(group)也返回null

defaultchannelpipeline

private abstractchannelhandlercontext newcontext(eventexecutorgroup group, string name, channelhandler handler) {
    return new defaultchannelhandlercontext(this, childexecutor(group), name, handler);
}

private eventexecutor childexecutor(eventexecutorgroup group) {
    if (group == null) {
        return null;
    }
    //..
}

defaultchannelhandlercontext

defaultchannelhandlercontext(
        defaultchannelpipeline pipeline, eventexecutor executor, string name, channelhandler handler) {
    super(pipeline, executor, name, isinbound(handler), isoutbound(handler));
    if (handler == null) {
        throw new nullpointerexception("handler");
    }
    this.handler = handler;
}

构造函数中,defaultchannelhandlercontext将参数回传到父类,保存handler的引用,进入到其父类

abstractchannelhandlercontext

abstractchannelhandlercontext(defaultchannelpipeline pipeline, eventexecutor executor, string name,
                              boolean inbound, boolean outbound) {
    this.name = objectutil.checknotnull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
}

netty中用两个字段来表示这个channelhandlercontext属于inbound还是outbound,或者两者都是,两个boolean是通过下面两个小函数来判断(见上面一段代码)

defaultchannelhandlercontext

private static boolean isinbound(channelhandler handler) {
    return handler instanceof channelinboundhandler;
}

private static boolean isoutbound(channelhandler handler) {
    return handler instanceof channeloutboundhandler;
}

通过instanceof关键字根据接口类型来判断,因此,如果一个handler实现了两类接口,那么他既是一个inbound类型的handler,又是一个outbound类型的handler,比如下面这个类

Netty源码分析 (四)----- ChannelPipeline

 

常用的,将decode操作和encode操作合并到一起的codec,一般会继承 messagetomessagecodec,而messagetomessagecodec就是继承channelduplexhandler

messagetomessagecodec

public abstract class messagetomessagecodec<inbound_in, outbound_in> extends channelduplexhandler {

    protected abstract void encode(channelhandlercontext ctx, outbound_in msg, list<object> out)
        throws exception;

    protected abstract void decode(channelhandlercontext ctx, inbound_in msg, list<object> out)
        throws exception;
}

context 创建完了之后,接下来终于要将创建完毕的context加入到pipeline中去了

添加节点

private void addlast0(abstractchannelhandlercontext newctx) {
    abstractchannelhandlercontext prev = tail.prev;
    newctx.prev = prev; // 1
    newctx.next = tail; // 2
    prev.next = newctx; // 3
    tail.prev = newctx; // 4
}

用下面这幅图可见简单的表示这段过程,说白了,其实就是一个双向链表的插入操作

Netty源码分析 (四)----- ChannelPipeline

操作完毕,该context就加入到pipeline中

Netty源码分析 (四)----- ChannelPipeline

到这里,pipeline添加节点的操作就完成了,你可以根据此思路掌握所有的addxxx()系列方法

回调用户方法

abstractchannelhandlercontext

private void callhandleradded0(final abstractchannelhandlercontext ctx) {
    ctx.handler().handleradded(ctx);
    ctx.setaddcomplete();
}

到了第四步,pipeline中的新节点添加完成,于是便开始回调用户代码 ctx.handler().handleradded(ctx);,常见的用户代码如下

public class demohandler extends simplechannelinboundhandler<...> {
    @override
    public void handleradded(channelhandlercontext ctx) throws exception {
        // 节点被添加完毕之后回调到此
        // do something
    }
}

接下来,设置该节点的状态

abstractchannelhandlercontext

final void setaddcomplete() {
    for (;;) {
        int oldstate = handlerstate;
        if (oldstate == remove_complete || handler_state_updater.compareandset(this, oldstate, add_complete)) {
            return;
        }
    }
}

用cas修改节点的状态至:remove_complete(说明该节点已经被移除) 或者 add_complete

pipeline删除节点

netty 有个最大的特性之一就是handler可插拔,做到动态编织pipeline,比如在首次建立连接的时候,需要通过进行权限认证,在认证通过之后,就可以将此context移除,下次pipeline在传播事件的时候就就不会调用到权限认证处理器

下面是权限认证handler最简单的实现,第一个数据包传来的是认证信息,如果校验通过,就删除此handler,否则,直接关闭连接

public class authhandler extends simplechannelinboundhandler<bytebuf> {
    @override
    protected void channelread0(channelhandlercontext ctx, bytebuf data) throws exception {
        if (verify(authdatapacket)) {
            ctx.pipeline().remove(this);
        } else {
            ctx.close();
        }
    }

    private boolean verify(bytebuf bytebuf) {
        //...
    }
}

重点就在 ctx.pipeline().remove(this) 这段代码

@override
public final channelpipeline remove(channelhandler handler) {
    remove(getcontextordie(handler));
    
    return this;
}

remove操作相比add简单不少,分为三个步骤:

1.找到待删除的节点
2.调整双向链表指针删除
3.回调用户函数

找到待删除的节点

defaultchannelpipeline

private abstractchannelhandlercontext getcontextordie(channelhandler handler) {
    abstractchannelhandlercontext ctx = (abstractchannelhandlercontext) context(handler);
    if (ctx == null) {
        throw new nosuchelementexception(handler.getclass().getname());
    } else {
        return ctx;
    }
}

@override
public final channelhandlercontext context(channelhandler handler) {
    if (handler == null) {
        throw new nullpointerexception("handler");
    }

    abstractchannelhandlercontext ctx = head.next;
    for (;;) {

        if (ctx == null) {
            return null;
        }

        if (ctx.handler() == handler) {
            return ctx;
        }

        ctx = ctx.next;
    }
}

这里为了找到handler对应的context,照样是通过依次遍历双向链表的方式,直到某一个context的handler和当前handler相同,便找到了该节点

调整双向链表指针删除

defaultchannelpipeline

private abstractchannelhandlercontext remove(final abstractchannelhandlercontext ctx) {
    assert ctx != head && ctx != tail;

    synchronized (this) {
        // 2.调整双向链表指针删除
        remove0(ctx);
    }
    // 3.回调用户函数
    callhandlerremoved0(ctx);
    return ctx;
}

private static void remove0(abstractchannelhandlercontext ctx) {
    abstractchannelhandlercontext prev = ctx.prev;
    abstractchannelhandlercontext next = ctx.next;
    prev.next = next; // 1
    next.prev = prev; // 2
}

经历的过程要比添加节点要简单,可以用下面一幅图来表示

Netty源码分析 (四)----- ChannelPipeline

 

最后的结果为

 Netty源码分析 (四)----- ChannelPipeline

结合这两幅图,可以很清晰地了解权限验证handler的工作原理,另外,被删除的节点因为没有对象引用到,果过段时间就会被gc自动回收

回调用户函数

private void callhandlerremoved0(final abstractchannelhandlercontext ctx) {
    try {
        ctx.handler().handlerremoved(ctx);
    } finally {
        ctx.setremoved();
    }
}

到了第三步,pipeline中的节点删除完成,于是便开始回调用户代码 ctx.handler().handlerremoved(ctx);,常见的代码如下

public class demohandler extends simplechannelinboundhandler<...> {
    @override
    public void handlerremoved(channelhandlercontext ctx) throws exception {
        // 节点被删除完毕之后回调到此,可做一些资源清理
        // do something
    }
}

最后,将该节点的状态设置为removed

final void setremoved() {
    handlerstate = remove_complete;
}

总结

1、在 netty 中每个 channel 都有且仅有一个 channelpipeline 与之对应。

2、channelpipeline是一个维护了一个以 abstractchannelhandlercontext 为节点的双向链表,其中此链表是 以head(headcontext)作为头,以tail(tailcontext)作为尾的双向链表.

3、pipeline中的每个节点包着具体的处理器channelhandler,节点根据channelhandler的类型是channelinboundhandler还是channeloutboundhandler来判断该节点属于in还是out或者两者都是