JUC之线程池
程序员文章站
2022-07-13 14:42:47
...
创建线程池由工厂类Executors完成。其中newCachedThreadPool,newFixedThreadPool,newSingleThreadExecutor对应的实现类是ThreadPoolExecutor,newScheduledThreadPool对应的实现类是ScheduledThreadPoolExecutor。
ThreadPoolExecutor
- int corePoolSize 线程池的最小线程数目,即使线程闲置,这些线程也一直保留
- int maximumPoolSize 线程池中最大线程数目
- long keepAliveTime 当线程池中的线程超过corePoolSize时,当超过该事件设定的时长线程一直处于闲置状态时,线程将被回收
- TimeUnit unit 上述值对应的时间单位
- BlockingQueue workQueue 一个保留Runable对象的阻塞队列。
- ThreadFactory threadFactory 用于创建线程的工厂。该值可以缺省,此时使用的是默认的线程创建工厂。
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);//设置为非守护线程
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);//设置为正常优先级
return t;
}
}
- RejectedExecutionHandler handler 当线程池中的线程已经达到maximumPoolSize同时BlockingQueue也已经满员时,调用该handler。该值可以缺省,将采用如下默认实现
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
ThreadPoolExecutor该类中,最核心的方法应该就是execute()了。
- 如果当前线程数少于corePoolSize,那么就直接新建线程。无非是JDK基本的线程操作代码。
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();
将命令加入队列。在加入队列后,我们仍然需要确认一下是否还有存活线程。如果没有了,还需要重新创建线程。
如果我们不能成功的把命令加入队列,也不能新建线程,那么就只能reject这个命令
完整逻辑如下
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//isRunning(c)用于检查是否处于shutdown状态
if (isRunning(c) && workQueue.offer(command)) {
//再次确认
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
ScheduledThreadPoolExecutor
构造函数如下:
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor。核心是使用DelayedWorkQueue。DelayedWorkQueue是在该类内部定义的一个静态内部类,实现了BlockingQueue接口。
首先你需要知道算法,堆可以用来实现优先队列。DelayedWorkQueue就是使用了堆,插入元素使用siftUp方法,移除元素使用siftDown方法。在该对象中,维护了一个Condition。通过available.awaitNanos(delay)来消耗时间。
private final Condition available = lock.newCondition();
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
//如果为空则等待通知
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
//到时则出对
if (delay <= 0)
//finishPoll用于在出队时调整堆结构
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}