欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

juc-ThreadPoolExecutor线程池总结

程序员文章站 2022-04-19 11:01:36
...

之前对线程池的理解很是主观,我的理解是在线程池初始化的时候就生成指定的数量的线程,然后将一些任务添加到一个阻塞队列中,然后多个线程同时从阻塞队列中取任务执行,当没有任务时线程阻塞,今天下午看了下大神的博客以及源码,发现我自己的理解有些偏颇,所以今天再总结一下。

我们创建线程池时一般都是通过使用Executors.newXXX来创建,所以我们从这个方法入手,无论是创建Fixed还是cached,还是single的线程池,都是调用的同一个方法——ThreadPoolExecutor的构造方法,只不过是参数不同。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,//这两二个参数都是整数,用于限制线程的数量,在后面会用到
                       0L, TimeUnit.MILLISECONDS,//这两个参数都是用来指定从阻塞队列中获取任务的时候的超时时间,more on this later。
                       new LinkedBlockingQueue<Runnable>());//用来保存任务的阻塞队列
}

 上面的构造方法最终调用的构造方法为:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,//上面这5个已经说过了。
                              ThreadFactory threadFactory,//这个用来产生thread,默认是 Executors.defaultThreadFactory()这个的返回值,他负责产生thread,设置thread的name,优先级,设置为daemon,没有别的作用。
                              RejectedExecutionHandler handler) {//这个用来处理当过多的任务添加到队列时,如果队列无法添加的时候的操作,默认的是AbortPolicy,当发生上述情况时,会抛一个异常。
。。。。//复制操作省略了
    }

我们看一下提交任务的方法execute:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
      
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {//如果当前的worker(也即是工作的线程数量)小于corePoolSize,进入if,这里的corePoolSize也就是我们在构造方法中穿入的第一个参数,很重要,他表示一个线程池只少要保存的线程的个数,这些线程是不会过期的,如果阻塞队列中没有任务,则会
在阻塞队列的poll方法中阻塞,而超过这个值的线程在没有任务的时候就要被回收掉。
            if (addWorker(command, true))//添加一个worker,即一个线程,more on this later,
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//如果上面的没有return,则进入这个if的判断,条件是正在运行,且这个任务能添加到队列中,则进入if
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//重新检查,虽然上面应有个isRunning的判断,但是可能有时间差。进入这个if,如果不是running的话
                reject(command);//拒绝这个任务,调用rejectPolicy,默认是抛个异常。
            else if (workerCountOf(recheck) == 0)//如果当前的工作线程个数为0,则调用addWorker添加一个工作线程,如果不为0的话,那么就会有线程不停的从阻塞队列中获得任务,所以一定会有线程执行这个任务的。
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//如果添加到队列中失败,则判断是否可以添加工作线程,如果不可以,则拒绝这个任务。
            reject(command);
    }

 经过上面,我们知道了corePoolSize,他是要保存的最小的工作的线程数(more on this later),但是还没有涉及到maximumPoolSize,我们趁热打铁看看addWorder方法

 private boolean addWorker(Runnable firstTask, boolean core) {//第一个参数表示如果要创建工作线程的话,他要执行的第一个任务,第二个参数是用来做判断的,如果是true,表示和corePoolSize判断,false表示和maximumPoolSize判断。
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&//已经shutdown,不能继续接受任务。
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);//工作线程的个数
                if (wc >= CAPACITY ||//如果现在已经创建的线程的数量太多,
                    wc >= (core ? corePoolSize : maximumPoolSize))//这里显示了core的作用,如果是true,则根据第一个判断,
                    return false;//如果创建的worker太多,则返回false,表示不能继续创建。
                if (compareAndIncrementWorkerCount(c))//使用cas增加c,退出循环,进行创建worker
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);//创建worker
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//将创建的worker添加到集合中
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//调用创建的worker中的Thread的start方法,
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

 下面我们看一下Worker这个类:

Worker(Runnable firstTask) {//这个参数表示当这个worker创建成功后执行的第一个任务。
     setState(-1); // inhibit interrupts until runWorker
     this.firstTask = firstTask;//第一个任务
     this.thread = getThreadFactory().newThread(this);//worker中封装了一个线程,并且这个线程执行的任务是this,因为Worker也是一个runnable,也就是说调用的worker类的run方法
}

 同时我们发现Worker也是一个runnable,他的run方法是调用的ThreadPoolExecutor的runWorker方法,也就是当调用worker.thread.start方法的时候是执行的ThreadPoolExecutor的runWorker方法,参数是worker自己,我们看一下

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//穿入的第一个任务
        w.firstTask = null;
        w.unlock(); // worker继承了aqs,实现了lock方法,他的目的是害怕并发执行造成不确定性的后果
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//或者是第一个任务不为null或者是从队列中获得任务不是null,这个getTask方法就是从队列中获得任务,比较重要,等会看
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//执行获取的任务,执行这个任务的是worker中封装的线程。从这里基本就可以总结出大概了:当线程池添加任务时,会判断worker的数量,如果过小,就会创建worker,添加到集合,用这个worker执行第一个任务,执行完了再从阻塞队列中获取任务。
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);//这个方法也很重要,它用来处理在所有的任务都处理完成之后的操作。
        }
    }

看到这里有两个方法比较重要,一个是getTask,一个是processWorderExit,

getTask方法:

 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;  // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果现在工作的线程多于corePoolSize就会是true

                if (wc <= maximumPoolSize && ! (timedOut && timed))//刚上来timed是false,跳出第二个for
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                Runnable r = timed ?//如果当前的工作线程多余corePoolSize,则是true
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://如果是true,表示有超时时间的等待。从这里就能看出corePoolSize的用处了,如果有超过corePoolSize的线程,那么再从队列中获取任务时就会有时间的等待,而且我们之前传的keepAliveTime是0,也就是说
如果当前队列中没有任务的话,超过corePoolSize那些线程就立即返回,而如果不是超过corePoolSize的线程则会一直等待,
                    workQueue.take();//没有超时时间的等待,会一直阻塞。
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

 在这个getTask方法中,我们看到了corePoolSize的重要作用。

processWorderExit方法:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
   if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
   try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);//这里可以发现,对于那些在获取任务时没有阻塞的线程,在没有任务后就会被移除集合,也就是被垃圾回收掉。但是对于那些阻塞的就不会被回收掉。
    } finally {
        mainLock.unlock();
    }
。。。。。//下面的没看。
}

 

再看一下在Executors中的静态方法:

1、创建一个fixedThreadPool的话,corePoolSize和maximumPoolSize是一样的,因为创建的worker的数量不会超过maximumPoolSize,因此worker的数量最多是corePoolSize,即getTask方法中的timed一定是true 所以所有的线程在从阻塞队列中获取任务时都是会永远阻塞的,不会丢掉worker。并且传入的等待时间是0,所以在从阻塞队列中获得任务时不会阻塞,worker直接直接返回。

 2、创建一个cachedThreadPool,

new ThreadPoolExecutor(0, //线程最小的数量是0,即getTask中所有的timed都是false,所以所有的worker在获取不到任务时都会被抛弃,再有任务时就会创建建立worker
     Integer.MAX_VALUE,//最大值超级大,也就是说创建woker的个数几乎是不受限制的。
     60L, TimeUnit.SECONDS,//每个worker在从阻塞队列中获得任务时可以阻塞60秒。
     new SynchronousQueue<Runnable>()   );//一个同步的queue,比较特殊,插入的线程会阻塞直到一个获取的线程的到来,同样获取的线程也会阻塞直到插入的线程到来。

 可以看出,cachedThreadPool几乎可以被认为是不断创建worker的,worker从阻塞队列中获取任务时如果没有任务时就会阻塞60秒,然后被抛弃。

3、创建一个单线程的线程池:

new ThreadPoolExecutor(1, 1,//核心的数量和最大的数量都是1,也就是永远是1,
                                    0L, TimeUnit.MILLISECONDS,//阻塞直到有任务
                                    new LinkedBlockingQueue<Runnable>())

 

很好的纠正了对线程池的理解。 

 

相关标签: ThreadPoolExecutor