Java并发编程JUC源码学习之ThreadPoolExecutor
程序员文章站
2022-05-06 14:33:32
...
ThreadPool的优点,比如资源的控制以及不用频繁的创建线程等就不用多说了。主要来讨论一下ThreadPoolExecutor的几个关键参数以及对task的添加以及线程的管理。它有这么个重要的参数corePoolSize、maximumPoolSize、keepAliveTime和taskqueue。
corePoolSize 线程池维持处于Keep-alive状态的线程数量。如果设置了allowCoreThreadTimeOut为true,该值可能为0。
maximumPoolSize 线程池中可维持线程的最大值。
keepAliveTime 当线程池中线程数量大于 corePoolSize 时,如果某些线程的空闲时间超过该值就会终止,直到线程数小于等于corePoolSize。
1. 添加一个task的过程
当要添加一个新的task,如果当前线程数小于 corePoolSize,直接添加一个线程,即使当前有空闲的线程。否则添加队列中。如果队列满了呢?则会判断当前线程数是否小于maximumPoolSize,如是则添加一个新的线程用来执行该task。如果超出最大线程数,那就只能reject了。
int c = ctl.get(); // 当前线程数小于corePoolSize if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //添加到队列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //队列满时添加新的线程,如果线程数超过maximumPoolSize则reject else if (!addWorker(command, false)) reject(command);
使用addWorker(),来创建新的线程并传入的task作为第一个task执行。由此来看只有队列满时才会创建大于corePoolSize的线程。
2. KeepAliveTime是如何实现的呢?
keepAliveTime的职责时,当线程池的队列满了,创建了多于corePoolSize的线程,这时处于节省资源的目的,会杀死多于corePoolSize空闲时间大于keepAliveTime的线程。它是如何做到呢?我们知道Woker时从taskQueue不断地轮询获取task并执行。如果获取不到task取到null时则退出循环结束线程。大概源码是这样的。
try { //当取到task为null,跳出循环结束线程 while (task != null || (task = getTask()) != null) { w.lock(); clearInterruptsForTaskRun(); //此处省略,主要是执行task.run }
关键是看看getTask()如何返回null,该方法的comments列出几种返回null并减少当前线程数的情景。我们在这里只关心keeAliveTime的实现。
boolean timedOut = false; // Did the last poll() time out? 该变量标识从taskqueue中取任务是否超时,如果超时则返回null retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc > corePoolSize;//线程数大于corePoolSize满足了杀死空闲线程的条件 //第一次执行到这里,由于timeOut为false,跳出内循环执行从queue取任务 if (wc <= maximumPoolSize && ! (timedOut && timed)) break; //第二次执行的时候由于timedOut为true,不会跳出内循环。执行下面的代码 //将当前线程数减1并返回null致使该线程终止 if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { //timed为true,这段code是实现空闲时间超过keepalivetime就被终止的精华所在。 // 在给定的keepalivetime时间内从阻塞队列中取任务。如timeout则返回null Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true;//如从queue中取任务超时则为true } catch (InterruptedException retry) { timedOut = false; } }
上一篇: Oracle物化视图的使用
下一篇: oracle物化视图讲解(很详细)
推荐阅读
-
并发编程(十)—— Java 并发队列 BlockingQueue 实现之 SynchronousQueue源码分析
-
Java并发编程—ConcurrentHashMap源码学习
-
并发编程(十三)—— Java 线程池 实现原理与源码深度解析 之 Executors(三)
-
Java并发之线程池ThreadPoolExecutor源码分析学习
-
【Java并发编程】21、线程池ThreadPoolExecutor源码解析
-
并发编程(八)—— Java 并发队列 BlockingQueue 实现之 ArrayBlockingQueue 源码分析
-
java并发编程工具类JUC之ArrayBlockingQueue
-
Java并发编程学习笔记之九:死锁(含代码)
-
Java并发编程JUC源码学习之ThreadPoolExecutor
-
JUC并发编程基石AQS源码之结构篇-ReentrantLock