Java提高——JUC线程线程池和线程调度
线程池
- 第四种获取线程的方法:线程池,一个ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用Executors工厂方法配置 。
- 线程池可以解决两个不同的问题:
1)、由于减少了每个任务调用的开销,他们通常可以在执行大量异步任务的时候提供增强性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。
2)、每个ThreadPoolExecutor还维护着一些基本的统计数据,如完成的任务数。 - 为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子(hook)。但是强烈建议程序员使用较为方便的Executors工厂方法:
1)Executors.newCachedThreadPool( ):*线程池,可以进行自动线程回收
2)Executors.newFixedThreadPool( ):固定大小线程池
3)Executors.newSingleThreadExecutor( ):单个后台线程
/**
* @author chenpeng
* @date 2018/7/11 9:27
*
* 一、线程池:提供了一个线程队列,队列中保存着所有等待状态的线程。
* 避免了频繁的创建和销毁的额外开销,提高了响应速度
* 二、线程池的体系结构:
* java.util.concurrent.Executor:负责线程的使用和调度的根接口
* |--ExecutorService 子接口:线程池的主要接口
* |--ThreadPoolExecutor:线程池的实现类
* |--ScheduledExecutorService子接口:负责线程的调度
* |--ScheduledThreadPoolExecutor实现类:继承了ThreadPoolExecutor,
* 实现了ScheduledExecutorService子接口
* 三、工具类:Executors
* ExecutorService newFixedThreadPool( ):创建固定大小的线程池
* ExecutorService newCacheThreadPool( ):缓存线程池,线程池的数量不固定,可以根据需求自动更改数量。
* ExecutorService newSingleThreadExecutor( ):创建单个线程池。线程池中只有一个线程。
*
* ScheduledExecutorService newScheduledThreadPool( ):创建固定大小的线程,可以延迟或定时的执行任务
*
*/
public class TestThreadPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1、创建线程池
ExecutorService pool = Executors.newFixedThreadPool(5);
/**==============使用Callable的方式================*/
List<Future<Integer>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Future<Integer> futureTask = pool.submit(new Callable<Integer>(){
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}
});
list.add(futureTask);
}
Integer result = null;
//Integer result = futureTask.get();
for (Future<Integer> f:list) {
result = f.get();
System.out.println(result);
}
pool.shutdown();
/**==============使用Runnable的方式================*/
/* ThreadPoolDemo tp = new ThreadPoolDemo();
//之前——使用多个线程都要创建,然后销毁掉,耗费资源
*//*new Thread(tp).start();
new Thread(tp).start();*//*
//2、为线程池中的线程分配任务
for (int i = 0; i < 10; i++) {
pool.submit(tp);
}
//3、关闭线程池
pool.shutdown();*/
}
}
class ThreadPoolDemo implements Runnable{
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+" "+i);
}
}
线程调度
一个ExecutorService,可安排在给定延迟后运行或定期执行的命令。
/**
* @author chenpeng
* @date 2018/7/11 16:13
* 一、线程池:提供了一个线程队列,队列中保存着所有等待状态的线程。
* 避免了频繁的创建和销毁的额外开销,提高了响应速度
* 二、线程池的体系结构:
* java.util.concurrent.Executor:负责线程的使用和调度的根接口
* |--ExecutorService 子接口:线程池的主要接口
* |--ThreadPoolExecutor:线程池的实现类
* |--ScheduledExecutorService子接口:负责线程的调度
* |--ScheduledThreadPoolExecutor实现类:继承了ThreadPoolExecutor,
* 实现了ScheduledExecutorService子接口
* 三、工具类:Executors
* ExecutorService newFixedThreadPool( ):创建固定大小的线程池
* ExecutorService newCacheThreadPool( ):缓存线程池,线程池的数量不固定,可以根据需求自动更改数量。
* ExecutorService newSingleThreadExecutor( ):创建单个线程池。线程池中只有一个线程。
*
* ScheduledExecutorService newScheduledThreadPool( ):创建固定大小的线程,可以延迟或定时的执行任务
*/
public class TestScheduledThreadPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 10; i++) {
Future<Integer> result = pool.schedule(new Callable<Integer>() {
int num = (int) (Math.random()*101);
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName()+" "+num);
return num;
}
},2, TimeUnit.SECONDS);
System.out.println(result.get());
}
pool.shutdown();
}
}
线程池类的结构
这张图基本简单的的表达了线程池的结构:
- 最*的结构是Executor,不过Executor严格意义上来说不算是一个线程池而是提供一种任务如何运行的机制。
- ExecutorService才可以认为是真正的线程池接口,接口提供了管理线程池的方法。
- 下面两个分支:AbstractExecutorService就是普通的线程池分支,ScheduledExecutorService是用来创建定时任务的。
ThreadPoolExecutor六个核心参数
这篇文章重点讲的就是线程池 ThreadPoolExecutor。下面看看关于ThreadPoolExecutor完整的构造方法的签名,签名中包含了六个参数,是 ThreadPoolExecutor的核心,对这些参数的理解,配置,调优对于使用好线程池也是非常重要的:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
1、corePoolSize
核心池的大小。在创建了线程池之后,默认情况下,线程池中没有任何线程,而是等待有任务来才去创建线程去执行任务。默认情况下,在创建了线程池之后,线程池中线程数为0,当有任务到来之后就会创建一个线程去执行任务。
2、maximumPoolSize
线程池中允许的最大线程数,这个参数表示了线程池中最多能创建的线程数量,当任务数量比corePoolSize大时,任务添加到workQueue,当workQueue满了,将继续创建线程以处理任务,maximumPoolSize表示就是workQueue满了,线程池中最多可以创建的线程数量。
3、keepAliveTime
只有当线程池中的线程数大于 corePoolSize 时,这个参数才会起作用。当线程数大于 corePoolSize 时,终止前多余的空闲线程等待新任务的最长时间。
4、unit
keepAliveTime的时间单位
5、workQueue
存储还没来得及执行的任务
6、threadFactory
执行程序创建新线程的时候使用的工厂
7、handler
由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
corePoolSize与maximumPoolSize举例理解
上面的内容其他都很好理解只有corePoolSize与maximumPoolSize需要多思考:
- 池中数目小于corePoolSize,新任务不用排队直接添加新线程。
- 池中线程数大于等于corePoolSize,workQueue未满,首选将新任务加入workQueue而不是添加新线程。
- 池中线程数大于等于corePoolSize,workQueue已满,但是线程数小于 maximumPoolSize,添加新的线程来处理被添加的任务
- 池中线程数大于 corePoolSize,workQueue已满,并且线程数大于maximumPoolSize,新任务被拒绝,使用handler处理被拒绝的任务, ThreadPoolExecutor 的使用很简单。通过execute( Runnable command)方法来发起一个任务的执行。通过shutDown( )来对已经提交的任务做一个有效的关闭。尽管线程池很好,但我们要注意JDK API的一句话:
强烈建议程序员使用较为方便的Executors工厂方法Executors.newCachedThreadPool()(*线程池,可以进行线程自动回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。
Executors
线程的重点是在合适的场景下使用合适的线程,所谓 “合适的线程池” 的意思就是,ThreadPoolExecutor的构造方法传入不同的参数,构造出不同的线程池,以满足使用的需要。
下面是Executors为用户提供的几种线程池:
1、newSingleThreadExecutors( ) 单线程线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
单线程线程池,那么线程池中运行的线程数肯定是1。workQueue选择了*的 LinkedBlockingQueue,那么不管来多少任务都排队,前面一个任务执行完毕,再执行队列中的线程。从这个角度来讲,第二个参数 maximumPoolSize 是没有意义的,因为maximumPoolSize 描述的是排队的任务多过workQueue的容量,线程池中最多只能容纳 maximumPoolSize 个任务,现在workQueue是*的,那 maximum 其实设置多少都无所谓了。
2、newFixedThreadPool( int nThreads) 固定大小线程
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
固定大小的线程池和单线程线程池异曲同工,无非是让线程池中能运行的线程变成了手动指定的 nThreads 罢了。同样,由于是选择了 LinkedBlockingQueue,因此第二个参数 maximumPoolSize 同样也是无意义的
3、newCacheThreadPool( ) *线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
*线程池意思是无论多少任务提交进来,都直接运行。*线程池采用了 SynchronousQueue,采用这个线程池就没有workQueue容量一说了,只要添加进去的线程就会被拿去用。既然是*线程池,那线程数肯定没有上线,所以 以maximumPoolSize为主了,设置一个近似的无限大 Integer.MAX_VALUE。另外注意一下,单线程线程池和固定大小线程池都不会进行自动回收的,也就是保证最终提交进来的任务都是被处理,但至于什么时候处理,就要看处理能力了。但是*线程池是设置了回收时间的,由于corePoolSize为0 ,所以只要60秒没有被用到的线程都会被直接移除。
关于workQueue
上面提到了关于workQueue,也就是排队策略。排队策略描述的是,当前线程大于corePoolSize时,线程以什么样的方式排队等待被运行。
排队有三种策略:直接提交、有界队列、*队列。
java的JDK使用了*队列LinkedBlockingQueue作为workQueue而不是有界队列ArrayBlockingQueue,尽管后者可以对资源进行控制,但是相比*有三个缺点:
- 使用有界队列,corePoolSize、maximumPoolSize两个参数势必要根据不同的场景不断调整以达到一个最佳,这势必给开发带来极大的麻烦,必须经过大量的性能测试。所以干脆就使用*队列,任务永远添加到队列中,不会溢出,自然maximumPoolSize也灭有什么用了,只需要根据系统的能力调整corePoolSize就可以了。
- 防止业务突刺。尤其在Web应用中,突然大量的请求到来都是很正常的。这时候使用*队列,不管请求的早晚,至少保证了所有任务都能被处理到。但是使用有界队列那些超出了maximumPoolSize的任务直接被丢掉了,处理的慢还可以忍受,但是直接被丢弃了似乎有些糟糕。
- 不仅仅是corePoolSize和maximumPoolSize需要相互调整,有界队列的队列大小和maximumPoolSize也需要相互折中,这也是比较难以控制和调整的方面
但是就像Comparator和Comparable的对比、synchronized和ReentrantLock,再到这里的*队列和有界队列的对比,看似都有一个优点稍微突出一些,但是这绝对不是鼓励只使用其中一个而不使用另一个,任何都需要根据实际情况来看,当然开始可以重点考虑那些优点明显一点的。
四种拒绝策略
拒绝策略就是任务太多,超过了maximumPoolSize了,接收不下来,只能拒绝了。拒绝的时候,可以指定拒绝策略,也就是一段处理程序。
拒绝策略的父接口是RejectedExecutionHandler,JDK本身在ThreadPoolExecutor里给用户提供了四种拒绝策略:
1、AbortPolicy
直接抛出一个RejectedExecutionExeception,这也是JDK默认的拒绝策略
2、CallerRunsPolicy
直接尝试运行被拒绝的任务,但是如果线程池已经被关闭了,任务就被丢弃了
3、DiscardOldestPolicy
移除最晚的那个没有被处理的任务,然后执行被拒绝的任务。同样,如果线程池已经被关闭了,任务就丢弃了
4、DiscardPolicy
不能执行的任务将被移除
线程池的处理请求流程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//1.当前池中线程比核心数少,新建一个线程执行任务
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.核心池已满,但任务队列未满,添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //如果这时被关闭了,拒绝任务
reject(command);
else if (workerCountOf(recheck) == 0)//如果之前的线程已被销毁完,新建一个线程
addWorker(null, false);
}
//3.核心池已满,队列已满,试着创建一个新线程
else if (!addWorker(command, false))
reject(command); //如果创建新线程失败了,说明线程池被关闭或者线程池完全满了,拒绝任务
}
上一篇: 流数据处理与分析
推荐阅读
-
Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析
-
Java concurrency线程池之线程池原理(四)_动力节点Java学院整理
-
Java concurrency线程池之线程池原理(三)_动力节点Java学院整理
-
Java concurrency线程池之Callable和Future_动力节点Java学院整理
-
JAVA线程池原理实例详解
-
Java多线程之显示锁和内置锁总结详解
-
深入理解Java编程线程池的实现原理
-
浅谈Android 的线程和线程池的使用
-
Java多线程模拟售票程序和线程安全问题
-
Java终止线程实例和stop()方法源码阅读