欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty源码分析 (一)----- NioEventLoopGroup

程序员文章站 2022-05-20 22:09:27
提到Netty首当其冲被提起的肯定是支持它承受高并发的线程模型,说到线程模型就不得不提到NioEventLoopGroup这个线程池,接下来进入正题。 线程模型 首先来看一段Netty的使用示例 下面将分析第一、二行代码,看下NioEventLoopGroup类的构造函数干了些什么。其余的部分将在其 ......

提到netty首当其冲被提起的肯定是支持它承受高并发的线程模型,说到线程模型就不得不提到nioeventloopgroup这个线程池,接下来进入正题。

线程模型

首先来看一段netty的使用示例

package com.wrh.server;

import io.netty.bootstrap.serverbootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.socketchannel;
import io.netty.channel.socket.nio.nioserversocketchannel;

public final class simpleserver {

    public static void main(string[] args) throws exception {
        eventloopgroup bossgroup = new nioeventloopgroup(1);
        eventloopgroup workergroup = new nioeventloopgroup();

        try {
            serverbootstrap b = new serverbootstrap();
            b.group(bossgroup, workergroup)
                    .channel(nioserversocketchannel.class)
                    .handler(new simpleserverhandler())
                    .childhandler(new channelinitializer<socketchannel>() {
                        @override
                        public void initchannel(socketchannel ch) throws exception {
                        }
                    });

            channelfuture f = b.bind(8888).sync();

            f.channel().closefuture().sync();
        } finally {
            bossgroup.shutdowngracefully();
            workergroup.shutdowngracefully();
        }
    }

    private static class simpleserverhandler extends channelinboundhandleradapter {
        @override
        public void channelactive(channelhandlercontext ctx) throws exception {
            system.out.println("channelactive");
        }

        @override
        public void channelregistered(channelhandlercontext ctx) throws exception {
            system.out.println("channelregistered");
        }

        @override
        public void handleradded(channelhandlercontext ctx) throws exception {
            system.out.println("handleradded");
        }
    }
}

下面将分析第一、二行代码,看下nioeventloopgroup类的构造函数干了些什么。其余的部分将在其他博文中分析。

eventloopgroup bossgroup = new nioeventloopgroup(1);
eventloopgroup workergroup = new nioeventloopgroup();

从代码中可以看到这里使用了两个线程池bossgroupworkergroup,那么为什么需要定义两个线程池呢?这就要说到netty的线程模型了。

Netty源码分析 (一)----- NioEventLoopGroup

 

 

netty的线程模型被称为reactor模型,具体如图所示,图上的mainreactor指的就是bossgroup,这个线程池处理客户端的连接请求,并将accept的连接注册到subreactor的其中一个线程上;图上的subreactor当然指的就是workergroup,负责处理已建立的客户端通道上的数据读写;图上还有一块threadpool是具体的处理业务逻辑的线程池,一般情况下可以复用subreactor,比我的项目中就是这种用法,但官方建议处理一些较为耗时的业务时还是要使用单独的threadpool。

nioeventloopgroup构造函数

nioeventloopgroup的构造函数的代码如下

public nioeventloopgroup() {
    this(0);
}

public nioeventloopgroup(int nthreads) {
    this(nthreads, null);
}

public nioeventloopgroup(int nthreads, threadfactory threadfactory) {
    this(nthreads, threadfactory, selectorprovider.provider());
}

public nioeventloopgroup(
        int nthreads, threadfactory threadfactory, final selectorprovider selectorprovider) {
    super(nthreads, threadfactory, selectorprovider);
} 

nioeventloopgroup类中的构造函数最终都是调用的父类multithreadeventloopgroup如下的构造函数:

protected multithreadeventloopgroup(int nthreads, threadfactory threadfactory, object... args) {
    super(nthreads == 0? default_event_loop_threads : nthreads, threadfactory, args);
}

从上面的构造函数可以得到 如果使用eventloopgroup workergroup = new nioeventloopgroup()来创建对象,即不指定线程个数,则netty给我们使用默认的线程个数,如果指定则用我们指定的线程个数。

默认线程个数相关的代码如下:

static {
    default_event_loop_threads = math.max(1, systempropertyutil.getint(
            "io.netty.eventloopthreads", runtime.getruntime().availableprocessors() * 2));

    if (logger.isdebugenabled()) {
        logger.debug("-dio.netty.eventloopthreads: {}", default_event_loop_threads);
    }
}

而systempropertyutil.getint函数的功能为:得到系统属性中指定key(这里:key=”io.netty.eventloopthreads”)所对应的value,如果获取不到获取失败则返回默认值,这里的默认值为:cpu的核数的2倍。

结论:如果没有设置程序启动参数(或者说没有指定key=”io.netty.eventloopthreads”的属性值),那么默认情况下线程的个数为cpu的核数乘以2。

继续看,由于multithreadeventloopgroup的构造函数是调用的是其父类multithreadeventexecutorgroup的构造函数,因此,看下此类的构造函数

protected multithreadeventexecutorgroup(int nthreads, threadfactory threadfactory, object... args) {
    if (nthreads <= 0) {
        throw new illegalargumentexception(string.format("nthreads: %d (expected: > 0)", nthreads));
    }

    if (threadfactory == null) {
        threadfactory = newdefaultthreadfactory();
    }

    children = new singlethreadeventexecutor[nthreads];
    //根据线程个数是否为2的幂次方,采用不同策略初始化chooser
    if (ispoweroftwo(children.length)) {
        chooser = new poweroftwoeventexecutorchooser();
    } else {
        chooser = new genericeventexecutorchooser();
    }
        //产生ntreads个nioeventloop对象保存在children数组中
    for (int i = 0; i < nthreads; i ++) {
        boolean success = false;
        try {
            children[i] = newchild(threadfactory, args);
            success = true;
        } catch (exception e) {
            // todo: think about if this is a good exception type
            throw new illegalstateexception("failed to create a child event loop", e);
        } finally {
                //如果newchild方法执行失败,则对前面执行new成功的几个nioeventloop进行shutdown处理
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdowngracefully();
                }

                for (int j = 0; j < i; j ++) {
                    eventexecutor e = children[j];
                    try {
                        while (!e.isterminated()) {
                            e.awaittermination(integer.max_value, timeunit.seconds);
                        }
                    } catch (interruptedexception interrupted) {
                        thread.currentthread().interrupt();
                        break;
                    }
                }
            }
        }
    }
}

该构造函数干了如下三件事:

1、产生了一个线程工场:threadfactory = newdefaultthreadfactory();

multithreadeventexecutorgroup.java
protected threadfactory newdefaultthreadfactory() {
    return new defaultthreadfactory(getclass());//getclass()为:nioeventloopgroup.class
}

defaultthreadfactory.java    
public defaultthreadfactory(class<?> pooltype) {
    this(pooltype, false, thread.norm_priority);
}

2、根据线程个数是否为2的幂次方,采用不同策略初始化chooser

private static boolean ispoweroftwo(int val) {
    return (val & -val) == val;
}

3、 产生ntreads个nioeventloop对象保存在children数组中 ,线程都是通过调用newchild方法来产生的。

@override
protected eventexecutor newchild(
        threadfactory threadfactory, object... args) throws exception {
    return new nioeventloop(this, threadfactory, (selectorprovider) args[0]);
}

这里传给nioeventloop构造函数的参数为:nioeventloopgroup、defaultthreadfactory、selectorprovider。

nioeventloop构造函数分析

既然上面提到来new一个nioeventloop对象,下面我们就看下这个类以及其父类。

nioeventloop(nioeventloopgroup parent, threadfactory threadfactory, selectorprovider selectorprovider) {
    super(parent, threadfactory, false);
    if (selectorprovider == null) {
        throw new nullpointerexception("selectorprovider");
    }
    provider = selectorprovider;
    selector = openselector();
}

继续看父类 singlethreadeventloop的构造函数

protected singlethreadeventloop(eventloopgroup parent, threadfactory threadfactory, boolean addtaskwakesup) {
    super(parent, threadfactory, addtaskwakesup);
}

又是直接调用来父类singlethreadeventexecutor的构造函数,继续看

protected singlethreadeventexecutor(
        eventexecutorgroup parent, threadfactory threadfactory, boolean addtaskwakesup) {

    if (threadfactory == null) {
        throw new nullpointerexception("threadfactory");
    }

    this.parent = parent;
    this.addtaskwakesup = addtaskwakesup;//false

    thread = threadfactory.newthread(new runnable() {
        @override
        public void run() {
            boolean success = false;
            updatelastexecutiontime();
            try {
            //调用nioeventloop类的run方法
                singlethreadeventexecutor.this.run();
                success = true;
            } catch (throwable t) {
                logger.warn("unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
                    int oldstate = state_updater.get(singlethreadeventexecutor.this);
                    if (oldstate >= st_shutting_down || state_updater.compareandset(
                            singlethreadeventexecutor.this, oldstate, st_shutting_down)) {
                        break;
                    }
                }
                // check if confirmshutdown() was called at the end of the loop.
                if (success && gracefulshutdownstarttime == 0) {
                    logger.error(
                            "buggy " + eventexecutor.class.getsimplename() + " implementation; " +
                            singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called " +
                            "before run() implementation terminates.");
                }

                try {
                    // run all remaining tasks and shutdown hooks.
                    for (;;) {
                        if (confirmshutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        cleanup();
                    } finally {
                        state_updater.set(singlethreadeventexecutor.this, st_terminated);
                        threadlock.release();
                        if (!taskqueue.isempty()) {
                            logger.warn(
                                    "an event executor terminated with " +
                                    "non-empty task queue (" + taskqueue.size() + ')');
                        }

                        terminationfuture.setsuccess(null);
                    }
                }
            }
        }
    });

    taskqueue = newtaskqueue();
} 
protected queue<runnable> newtaskqueue() {
    return new linkedblockingqueue<runnable>();
}

主要干如下两件事:

1、利用threadfactory创建来一个thread,传入了一个runnable对象,该runnable重写的run代码比较长,不过重点仅仅是调用nioeventloop类的run方法。

2、使用linkedblockingqueue类初始化taskqueue 。

其中,newthread方法的代码如下:

defaultthreadfactory.java

@override
public thread newthread(runnable r) {
    thread t = newthread(new defaultrunnabledecorator(r), prefix + nextid.incrementandget());

    try {
    //判断是否是守护线程,并进行设置
        if (t.isdaemon()) {
            if (!daemon) {
                t.setdaemon(false);
            }
        } else {
            if (daemon) {
                t.setdaemon(true);
            }
        }
            //设置其优先级
        if (t.getpriority() != priority) {
            t.setpriority(priority);
        }
    } catch (exception ignored) {
        // doesn't matter even if failed to set.
    }
    return t;
}

protected thread newthread(runnable r, string name) {
    return new fastthreadlocalthread(r, name);
}

fastthreadlocalthread.java

public fastthreadlocalthread(runnable target, string name) {
    super(target, name);// fastthreadlocalthread extends thread 
} 

到这里,可以看到底层还是借助于类似于thread thread = new thread(r)这种方式来创建线程。

关于nioeventloop对象可以得到的点有,初始化了如下4个属性。

1、nioeventloopgroup (在父类singlethreadeventexecutor中)

2、selector

3、provider

4、thread (在父类singlethreadeventexecutor中)

总结

关于nioeventloopgroup,总结如下

1、 如果不指定线程数,则线程数为:cpu的核数*2

2、根据线程个数是否为2的幂次方,采用不同策略初始化chooser

3、产生nthreads个nioeventloop对象保存在children数组中。

可以理解nioeventloop就是一个线程,线程nioeventloop中里面有如下几个属性:

1、nioeventloopgroup (在父类singlethreadeventexecutor中)

2、selector

3、provider

4、thread (在父类singlethreadeventexecutor中)

更通俗点就是:nioeventloopgroup就是一个线程池,nioeventloop就是一个线程。nioeventloopgroup线程池中有n个nioeventloop线程。