欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java并发编程JUC源码学习之ThreadPoolExecutor

程序员文章站 2022-05-06 14:33:32
...

 ThreadPool的优点,比如资源的控制以及不用频繁的创建线程等就不用多说了。主要来讨论一下ThreadPoolExecutor的几个关键参数以及对task的添加以及线程的管理。它有这么个重要的参数corePoolSize、maximumPoolSize、keepAliveTime和taskqueue。

corePoolSize   线程池维持处于Keep-alive状态的线程数量。如果设置了allowCoreThreadTimeOut为true,该值可能为0。
maximumPoolSize   线程池中可维持线程的最大值。
keepAliveTime   当线程池中线程数量大于 corePoolSize  时,如果某些线程的空闲时间超过该值就会终止,直到线程数小于等于corePoolSize。
    1. 添加一个task的过程
    当要添加一个新的task,如果当前线程数小于 corePoolSize,直接添加一个线程,即使当前有空闲的线程。否则添加队列中。如果队列满了呢?则会判断当前线程数是否小于maximumPoolSize,如是则添加一个新的线程用来执行该task。如果超出最大线程数,那就只能reject了。
       
 int c = ctl.get();
        // 当前线程数小于corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //添加到队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
       //队列满时添加新的线程,如果线程数超过maximumPoolSize则reject
        else if (!addWorker(command, false))
            reject(command);
 
     使用addWorker(),来创建新的线程并传入的task作为第一个task执行。由此来看只有队列满时才会创建大于corePoolSize的线程。
 
    2.   KeepAliveTime是如何实现的呢?
       keepAliveTime的职责时,当线程池的队列满了,创建了多于corePoolSize的线程,这时处于节省资源的目的,会杀死多于corePoolSize空闲时间大于keepAliveTime的线程。它是如何做到呢?我们知道Woker时从taskQueue不断地轮询获取task并执行。如果获取不到task取到null时则退出循环结束线程。大概源码是这样的。
     
 try {
           //当取到task为null,跳出循环结束线程
            while (task != null || (task = getTask()) != null) {
                w.lock();
                clearInterruptsForTaskRun();
                 //此处省略,主要是执行task.run
             }
 
      关键是看看getTask()如何返回null,该方法的comments列出几种返回null并减少当前线程数的情景。我们在这里只关心keeAliveTime的实现。
      
boolean timedOut = false; // Did the last poll() time out? 该变量标识从taskqueue中取任务是否超时,如果超时则返回null
  
       retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);


            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }


            boolean timed;      // Are workers subject to culling?


            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;//线程数大于corePoolSize满足了杀死空闲线程的条件
                //第一次执行到这里,由于timeOut为false,跳出内循环执行从queue取任务
                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                //第二次执行的时候由于timedOut为true,不会跳出内循环。执行下面的代码
               //将当前线程数减1并返回null致使该线程终止
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }


            try {
               //timed为true,这段code是实现空闲时间超过keepalivetime就被终止的精华所在。
              // 在给定的keepalivetime时间内从阻塞队列中取任务。如timeout则返回null
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;//如从queue中取任务超时则为true
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }