欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty源码分析 (六)----- 客户端接入accept过程

程序员文章站 2022-03-07 16:05:00
通读本文,你会了解到1.netty如何接受新的请求2.netty如何给新请求分配reactor线程3.netty如何给每个新连接增加ChannelHandler netty中的reactor线程 netty中最核心的东西莫过于两种类型的reactor线程,可以看作netty中两种类型的发动机,驱动着 ......

通读本文,你会了解到
1.netty如何接受新的请求
2.netty如何给新请求分配reactor线程
3.netty如何给每个新连接增加channelhandler

netty中的reactor线程

netty中最核心的东西莫过于两种类型的reactor线程,可以看作netty中两种类型的发动机,驱动着netty整个框架的运转

一种类型的reactor线程是boos线程组,专门用来接受新的连接,然后封装成channel对象扔给worker线程组;还有一种类型的reactor线程是worker线程组,专门用来处理连接的读写

不管是boos线程还是worker线程,所做的事情均分为以下三个步骤

  1. 轮询注册在selector上的io事件
  2. 处理io事件
  3. 执行异步task

对于boos线程来说,第一步轮询出来的基本都是 accept 事件,表示有新的连接,而worker线程轮询出来的基本都是read/write事件,表示网络的读写事件

新连接的建立

简单来说,新连接的建立可以分为三个步骤
1.检测到有新的连接
2.将新的连接注册到worker线程组
3.注册新连接的读事件

检测到有新连接进入

我们已经知道,当服务端绑启动之后,服务端的channel已经注册到boos reactor线程中,reactor不断检测有新的事件,直到检测出有accept事件发生

nioeventloop.java

private static void processselectedkey(selectionkey k, abstractniochannel ch) {
    final niounsafe unsafe = ch.unsafe();
    //检查该selectionkey是否有效,如果无效,则关闭channel
    if (!k.isvalid()) {
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidpromise());
        return;
    }

    try {
        int readyops = k.readyops();
        // also check for readops of 0 to workaround possible jdk bug which may otherwise lead
        // to a spin loop
        // 如果准备好read或accept则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决jdk可能会产生死循环的一个bug。
        if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) {
            unsafe.read();
            if (!ch.isopen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件
                // connection already closed - no need to handle write.
                return;
            }
        }
        // 如果准备好了write则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的op_write标记
        if ((readyops & selectionkey.op_write) != 0) {
            // call forceflush which will also take care of clear the op_write once there is nothing left to write
            ch.unsafe().forceflush();
        }
        // 如果是op_connect,则需要移除op_connect否则selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100%
        if ((readyops & selectionkey.op_connect) != 0) {
            // remove op_connect as otherwise selector.select(..) will always return without blocking
            // see https://github.com/netty/netty/issues/924
            int ops = k.interestops();
            ops &= ~selectionkey.op_connect;
            k.interestops(ops);

            unsafe.finishconnect();
        }
    } catch (cancelledkeyexception ignored) {
        unsafe.close(unsafe.voidpromise());
    }
}

该方法主要是对selectionkey k进行了检查,有如下几种不同的情况

1)op_accept,接受客户端连接

2)op_read, 可读事件, 即 channel 中收到了新数据可供上层读取。

3)op_write, 可写事件, 即上层可以向 channel 写入数据。

4)op_connect, 连接建立事件, 即 tcp 连接已经建立, channel 处于 active 状态。

本篇博文主要来看下当boss线程 selector检测到op_accept事件时,内部干了些什么。

if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) {
    unsafe.read();
    if (!ch.isopen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件
        // connection already closed - no need to handle write.
        return;
    }
}

boos reactor线程已经轮询到 selectionkey.op_accept 事件,说明有新的连接进入,此时将调用channel的 unsafe来进行实际的操作,此时的channel为 nioserversocketchannel,则unsafe为nioserversocketchannel的属性niomessageunsafe

那么,我们进入到它的read方法,进入新连接处理的第二步

注册到reactor线程

niomessageunsafe.java

private final list<object> readbuf = new arraylist<object>();

public void read() {
    assert eventloop().ineventloop();
    final channelpipeline pipeline = pipeline();
    final recvbytebufallocator.handle allochandle = unsafe().recvbufallochandle();
    do {
        int localread = doreadmessages(readbuf);
        if (localread == 0) {
            break;
        }
        if (localread < 0) {
            closed = true;
            break;
        }
    } while (allochandle.continuereading());
    int size = readbuf.size();
    for (int i = 0; i < size; i ++) {
        pipeline.firechannelread(readbuf.get(i));
    }
    readbuf.clear();
    pipeline.firechannelreadcomplete();
}

调用 doreadmessages 方法不断地读取消息,用 readbuf 作为容器,这里,其实可以猜到读取的是一个个连接,然后调用 pipeline.firechannelread(),将每条新连接经过一层服务端channel的洗礼,之后清理容器,触发 pipeline.firechannelreadcomplete()

下面我们具体看下这两个方法

1.doreadmessages(list)
2.pipeline.firechannelread(niosocketchannel)

doreadmessages()

protected int doreadmessages(list<object> buf) throws exception {
    socketchannel ch = javachannel().accept();

    try {
        if (ch != null) {
            buf.add(new niosocketchannel(this, ch));
            return 1;
        }
    } catch (throwable t) {
        logger.warn("failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (throwable t2) {
            logger.warn("failed to close a socket.", t2);
        }
    }

    return 0;
}

我们终于窥探到netty调用jdk底层nio的边界 javachannel().accept();,由于netty中reactor线程第一步就扫描到有accept事件发生,因此,这里的accept方法是立即返回的,返回jdk底层nio创建的一条channel

serversocketchannel有阻塞和非阻塞两种模式:

a、阻塞模式:serversocketchannel.accept() 方法监听新进来的连接,当 accept()方法返回的时候,它返回一个包含新进来的连接的 socketchannel。阻塞模式下, accept()方法会一直阻塞到有新连接到达。

b、非阻塞模式:,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的socketchannel是否是null.

在nioserversocketchannel的构造函数分析中,我们知道,其通过ch.configureblocking(false);语句设置当前的serversocketchannel为非阻塞的

netty将jdk的 socketchannel 封装成自定义的 niosocketchannel,加入到list里面,这样外层就可以遍历该list,做后续处理

从上一篇文章中,我们已经知道服务端的创建过程中会创建netty中一系列的核心组件,包括pipeline,unsafe等等,那么,接受一条新连接的时候是否也会创建这一系列的组件呢?

带着这个疑问,我们跟进去

niosocketchannel.java

public niosocketchannel(channel parent, socketchannel socket) {
    super(parent, socket);
    config = new niosocketchannelconfig(this, socket.socket());
}

我们重点分析 super(parent, socket),niosocketchannel的父类为 abstractniobytechannel

abstractniobytechannel.java

protected abstractniobytechannel(channel parent, selectablechannel ch) {
    super(parent, ch, selectionkey.op_read);
}

这里,我们看到jdk nio里面熟悉的影子—— selectionkey.op_read,一般在原生的jdk nio编程中,也会注册这样一个事件,表示对channel的读感兴趣

我们继续往上,追踪到abstractniobytechannel的父类 abstractniochannel, 这里,我相信读了上一篇文章你对于这部分代码肯定是有印象的

protected abstractniochannel(channel parent, selectablechannel ch, int readinterestop) {
    super(parent);
    this.ch = ch;
    this.readinterestop = readinterestop;
    try {
        ch.configureblocking(false);
    } catch (ioexception e) {
        try {
            ch.close();
        } catch (ioexception e2) {
            if (logger.iswarnenabled()) {
                logger.warn(
                        "failed to close a partially initialized socket.", e2);
            }
        }
        throw new channelexception("failed to enter non-blocking mode.", e);
    }
}

在创建服务端channel的时候,最终也会进入到这个方法,super(parent), 便是在abstractchannel中创建一系列和该channel绑定的组件,如下

protected abstractchannel(channel parent) {
    this.parent = parent;
    id = newid();
    unsafe = newunsafe();
    pipeline = newchannelpipeline();
}

而这里的 readinterestop 表示该channel关心的事件是 selectionkey.op_read,后续会将该事件注册到selector,之后设置该通道为非阻塞模式,在channel中创建 unsafe 和一条 pipeline 

pipeline.firechannelread(niosocketchannel)

对于 pipeline我们前面已经了解过,在netty的各种类型的channel中,都会包含一个pipeline,字面意思是管道,我们可以理解为一条流水线工艺,流水线工艺有起点,有结束,中间还有各种各样的流水线关卡,一件物品,在流水线起点开始处理,经过各个流水线关卡的加工,最终到流水线结束

对应到netty里面,流水线的开始就是headcontxt,流水线的结束就是tailconextheadcontxt中调用unsafe做具体的操作,tailconext中用于向用户抛出pipeline中未处理异常以及对未处理消息的警告

通过前面的文章中,我们已经知道在服务端的channel初始化时,在pipeline中,已经自动添加了一个pipeline处理器 serverbootstrapacceptor, 并已经将用户代码中设置的一系列的参数传入了构造函数,接下来,我们就来看下serverbootstrapacceptor

serverbootstrapacceptor.java

private static class serverbootstrapacceptor extends channelinboundhandleradapter {
    private final eventloopgroup childgroup;
    private final channelhandler childhandler;
    private final entry<channeloption<?>, object>[] childoptions;
    private final entry<attributekey<?>, object>[] childattrs;

    serverbootstrapacceptor(
            eventloopgroup childgroup, channelhandler childhandler,
            entry<channeloption<?>, object>[] childoptions, entry<attributekey<?>, object>[] childattrs) {
        this.childgroup = childgroup;
        this.childhandler = childhandler;
        this.childoptions = childoptions;
        this.childattrs = childattrs;
    }

    public void channelread(channelhandlercontext ctx, object msg) {
        final channel child = (channel) msg;

        child.pipeline().addlast(childhandler);

        for (entry<channeloption<?>, object> e: childoptions) {
            try {
                if (!child.config().setoption((channeloption<object>) e.getkey(), e.getvalue())) {
                    logger.warn("unknown channel option: " + e);
                }
            } catch (throwable t) {
                logger.warn("failed to set a channel option: " + child, t);
            }
        }

        for (entry<attributekey<?>, object> e: childattrs) {
            child.attr((attributekey<object>) e.getkey()).set(e.getvalue());
        }

        try {
            childgroup.register(child).addlistener(new channelfuturelistener() {
                @override
                public void operationcomplete(channelfuture future) throws exception {
                    if (!future.issuccess()) {
                        forceclose(child, future.cause());
                    }
                }
            });
        } catch (throwable t) {
            forceclose(child, t);
        }
    }
}

前面的 pipeline.firechannelread(niosocketchannel); 最终通过head->unsafe->serverbootstrapacceptor的调用链,调用到这里的 serverbootstrapacceptor 的channelread方法,而 channelread 一上来就把这里的msg强制转换为 channel

然后,拿到该channel,也就是我们之前new出来的 niosocketchannel中对应的pipeline,将用户代码中的 childhandler,添加到pipeline,这里的 childhandler 在用户代码中的体现为

serverbootstrap b = new serverbootstrap();
b.group(bossgroup, workergroup)
 .channel(nioserversocketchannel.class)
 .childhandler(new channelinitializer<socketchannel>() {
     @override
     public void initchannel(socketchannel ch) throws exception {
         channelpipeline p = ch.pipeline();
         p.addlast(new echoserverhandler());
     }
 });

其实对应的是 channelinitializer,到了这里,niosocketchannel中pipeline对应的处理器为 head->channelinitializer->tail,牢记,后面会再次提到!

接着,设置 niosocketchannel 对应的 attr和option,然后进入到 childgroup.register(child),这里的childgroup就是我们在启动代码中new出来的nioeventloopgroup

我们进入到nioeventloopgroupregister方法,代理到其父类multithreadeventloopgroup

multithreadeventloopgroup.java

public channelfuture register(channel channel) {
    return next().register(channel);
}

这里又扯出来一个 next()方法,我们跟进去

multithreadeventloopgroup.java

@override
public eventloop next() {
    return (eventloop) super.next();
}

回到其父类

multithreadeventexecutorgroup.java

@override
public eventexecutor next() {
    return chooser.next();
}

这里的chooser对应的类为 eventexecutorchooser,字面意思为事件执行器选择器,放到我们这里的上下文中的作用就是从worker reactor线程组中选择一个reactor线程

public interface eventexecutorchooserfactory {

    /**
     * returns a new {@link eventexecutorchooser}.
     */
    eventexecutorchooser newchooser(eventexecutor[] executors);

    /**
     * chooses the next {@link eventexecutor} to use.
     */
    @unstableapi
    interface eventexecutorchooser {

        /**
         * returns the new {@link eventexecutor} to use.
         */
        eventexecutor next();
    }
}

chooser的实现有两种

public final class defaulteventexecutorchooserfactory implements eventexecutorchooserfactory {

    public static final defaulteventexecutorchooserfactory instance = new defaulteventexecutorchooserfactory();

    private defaulteventexecutorchooserfactory() { }

    @suppresswarnings("unchecked")
    @override
    public eventexecutorchooser newchooser(eventexecutor[] executors) {
        if (ispoweroftwo(executors.length)) {
            return new poweroftoweventexecutorchooser(executors);
        } else {
            return new genericeventexecutorchooser(executors);
        }
    }

    private static boolean ispoweroftwo(int val) {
        return (val & -val) == val;
    }

    private static final class poweroftoweventexecutorchooser implements eventexecutorchooser {
        private final atomicinteger idx = new atomicinteger();
        private final eventexecutor[] executors;

        poweroftoweventexecutorchooser(eventexecutor[] executors) {
            this.executors = executors;
        }

        @override
        public eventexecutor next() {
            return executors[idx.getandincrement() & executors.length - 1];
        }
    }

    private static final class genericeventexecutorchooser implements eventexecutorchooser {
        private final atomicinteger idx = new atomicinteger();
        private final eventexecutor[] executors;

        genericeventexecutorchooser(eventexecutor[] executors) {
            this.executors = executors;
        }

        @override
        public eventexecutor next() {
            return executors[math.abs(idx.getandincrement() % executors.length)];
        }
    }
}

默认情况下,chooser通过 defaulteventexecutorchooserfactory被创建,在创建reactor线程选择器的时候,会判断reactor线程的个数,如果是2的幂,就创建poweroftoweventexecutorchooser,否则,创建genericeventexecutorchooser

两种类型的选择器在选择reactor线程的时候,都是通过round-robin的方式选择reactor线程,唯一不同的是,poweroftoweventexecutorchooser是通过与运算,而genericeventexecutorchooser是通过取余运算,与运算的效率要高于求余运算

选择完一个reactor线程,即 nioeventloop 之后,我们回到注册的地方

public channelfuture register(channel channel) {
    return next().register(channel);
}

singlethreadeventloop.java

@override
public channelfuture register(channel channel) {
    return register(new defaultchannelpromise(channel, this));
}

其实,这里已经和服务端启动的过程一样了,可以参考我前面的文章

abstractniochannel.java

private void register0(channelpromise promise) {
    boolean firstregistration = neverregistered;
    doregister();
    neverregistered = false;
    registered = true;

    pipeline.invokehandleraddedifneeded();

    safesetsuccess(promise);
    pipeline.firechannelregistered();
    if (isactive()) {
        if (firstregistration) {
            pipeline.firechannelactive();
        } else if (config().isautoread()) {
            beginread();
        }
    }
}

和服务端启动过程一样,先是调用 doregister();做真正的注册过程,如下

protected void doregister() throws exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionkey = javachannel().register(eventloop().selector, 0, this);
            return;
        } catch (cancelledkeyexception e) {
            if (!selected) {
                eventloop().selectnow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}

将该条channel绑定到一个selector上去,一个selector被一个reactor线程使用,后续该channel的事件轮询,以及事件处理,异步task执行都是由此reactor线程来负责

绑定完reactor线程之后,调用 pipeline.invokehandleraddedifneeded()

前面我们说到,到目前为止niosocketchannel 的pipeline中有三个处理器,head->channelinitializer->tail,最终会调用到 channelinitializer 的 handleradded 方法

public void handleradded(channelhandlercontext ctx) throws exception {
    if (ctx.channel().isregistered()) {
        initchannel(ctx);
    }
}

handleradded方法调用 initchannel 方法之后,调用remove(ctx);将自身删除,如下

abstractniochannel.java

private boolean initchannel(channelhandlercontext ctx) throws exception {
    if (initmap.putifabsent(ctx, boolean.true) == null) { 
        try {
            initchannel((c) ctx.channel());
        } catch (throwable cause) {
            exceptioncaught(ctx, cause);
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}

而这里的 initchannel 方法又是神马玩意?让我们回到用户方法,比如下面这段用户代码

用户代码

serverbootstrap b = new serverbootstrap();
b.group(bossgroup, workergroup)
 .channel(nioserversocketchannel.class)
 .option(channeloption.so_backlog, 100)
 .handler(new logginghandler(loglevel.info))
 .childhandler(new channelinitializer<socketchannel>() {
     @override
     public void initchannel(socketchannel ch) throws exception {
         channelpipeline p = ch.pipeline();
         p.addlast(new logginghandler(loglevel.info));
         p.addlast(new echoserverhandler());
     }
 });

原来最终跑到我们自己的代码里去了啊!完了之后,niosocketchannel绑定的pipeline的处理器就包括 head->logginghandler->echoserverhandler->tail

注册读事件

接下来,我们还剩下这些代码没有分析完

abstractniochannel.java

private void register0(channelpromise promise) {
    // ..
    pipeline.firechannelregistered();
    if (isactive()) {
        if (firstregistration) {
            pipeline.firechannelactive();
        } else if (config().isautoread()) {
            beginread();
        }
    }
}

pipeline.firechannelregistered();,其实没有干啥有意义的事情,最终无非是再调用一下业务pipeline中每个处理器的 channelhandleradded方法处理下回调

isactive()在连接已经建立的情况下返回true,所以进入方法块,进入到 pipeline.firechannelactive();在这里我详细步骤先省略,直接进入到关键环节

abstractniochannel.java

@override
protected void dobeginread() throws exception {
    // channel.read() or channelhandlercontext.read() was called
    final selectionkey selectionkey = this.selectionkey;
    if (!selectionkey.isvalid()) {
        return;
    }

    readpending = true;

    final int interestops = selectionkey.interestops();
    if ((interestops & readinterestop) == 0) {
        selectionkey.interestops(interestops | readinterestop);
    }
}

这里其实就是将 selectionkey.op_read事件注册到selector中去,表示这条通道已经可以开始处理read事件了

总结

至此,netty中关于新连接的处理已经向你展示完了,我们做下总结

1.boos reactor线程轮询到有新的连接进入
2.通过封装jdk底层的channel创建 niosocketchannel以及一系列的netty核心组件
3.将该条连接通过chooser,选择一条worker reactor线程绑定上去
4.注册读事件,开始新连接的读写