欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java线程池ThreadPoolExecutor类使用详解

程序员文章站 2022-06-13 11:29:19
一、Executors创建线程池 二、ThreadPoolExecutor类 三、ThreadPoolExecutor类扩展 一、Executors创建线程池 Java中创建线程池很简单,只需要调用Executors中相应的便捷方法即可,如Executors.newFixedThreadPool() ......

一、executors创建线程池

二、threadpoolexecutor类

三、threadpoolexecutor类扩展

 

一、executors创建线程池

  java中创建线程池很简单,只需要调用executors中相应的便捷方法即可,如executors.newfixedthreadpool()、executors.newsinglethreadexecutor()、executors.newcachedthreadpool()等方法。这些方法虽然便捷,但是也有其局限性,如:oom,线程耗尽。

  小程序使用这些快捷方法没什么问题,对于服务端需要长期运行的程序,创建线程池应该直接使用threadpoolexecutor进行创建。上述便捷方法的创建也是通过threadpoolexecutor实现的。

二、threadpoolexecutor类

1、线程池工作顺序

  线程池的工作顺序为:corepoolsize -> 任务队列 -> maximumpoolsize -> 拒绝策略

2、threadpoolexecutor构造函数

  executors中创建线程池的快捷方法,实际上是调用了threadpoolexecutor的构造方法,定时任务线程池便捷方法executors.newscheduledthreadpool()内部使用的是scheduledthreadpoolexecutor。threadpoolexecutor构造函数参数列表如下:

1 public threadpoolexecutor(int corepoolsize,                     //线程池核心线程数量
2                           int maximumpoolsize,                  //线程池最大线程数量
3                           long keepalivetime,                   //超过corepoolesize的空闲线程的存活时长
4                           timeunit unit,                        //空闲线程存活时长单位
5                           blockingqueue<runnable> workqueue,    //任务的排队队列
6                           threadfactory threadfactory,          //新线程的线程工厂
7                           rejectedexecutionhandler handler)     //拒绝策略

  比较容易出问题的参数有corepoolsize、maximumpoolsize、workqueue以及handler:

  • corepoolsize和maximumpoolsize设置不当会影响效率,甚至耗尽线程
  • workqueue设置不当容易导致oom
  • handler设置不当会导致提交任务时抛出异常

3、workqueue任务队列

  任务队列一般分为直接提交队列、有界任务队列、无解任务队列、优先任务队列。

  • 直接任务队列:设置为synchronousqueue队列。synchronousqueue是一个特殊的blockingqueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒;反之,每一个删除操作也要等待对应的插入操作。
 1 public class synchronousqueuetest {
 2 
 3     private static executorservice pool;
 4     
 5     public static void main(string[] args) {
 6         
 7         //核心线程数设为1,最大线程数设为2,任务队列为synchronousqueue,拒绝策略为abortpolicy,直接抛出异常
 8         pool = new threadpoolexecutor(1, 2, 0, timeunit.seconds, new synchronousqueue<runnable>(), executors.defaultthreadfactory(), new threadpoolexecutor.abortpolicy());
 9         for(int i = 0;i < 3;i++){
10             pool.execute(new threadtask());
11         }
12     }
13 }
14 
15 class threadtask implements runnable{
16     
17     @override
18     public void run() {
19         system.out.println(thread.currentthread().getname());
20     }
21 }

  执行结果如下:

pool-1-thread-1
pool-1-thread-2
exception in thread "main" java.util.concurrent.rejectedexecutionexception: 
task com.aisino.threadpool.threadtask@2f0e140b rejected from java.util.concurrent.threadpoolexecutor@7440e464[running,
pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2] at java.util.concurrent.threadpoolexecutor$abortpolicy.rejectedexecution(threadpoolexecutor.java:2063) at java.util.concurrent.threadpoolexecutor.reject(threadpoolexecutor.java:830) at java.util.concurrent.threadpoolexecutor.execute(threadpoolexecutor.java:1379) at com.aisino.threadpool.synchronousqueuetest.main(synchronousqueuetest.java:18)

   由执行结果可知,当任务队列为synchronousqueue,创建的线程数大于maximumpoolsize时,直接执行拒绝策略抛出异常。

  使用synchronousqueue队列时,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数小于maximumpoolsize,则尝试创建新的线程,如果达到maximumpoolsize设置的最大值,则根据设置的handler执行对应的拒绝策略。因此使用synchronousqueue队列时,任务不会被缓存起来,而是马上执行,在这种情况下,需要对程序的并发量有个准确的评估,才能设置合适的maximumpoolsize数量,否则很容易执行拒绝策略。

  • 有界任务队列:可使用arrayblockingqueue实现,如下所示:
pool = new threadpoolexecutor(1, 2, 1000, timeunit.milliseconds, new arrayblockingqueue<runnable>(10), 
executors.defaultthreadfactory(), new threadpoolexecutor.abortpolicy());

   使用arrayblockingqueue有界任务队列时,如果有新的任务需要执行,线程池会创建新的线程,直到创建的线程数量达到corepoolsize,之后新的任务会被加入到等待队列中。若等待队列已满,即超过arrayblockingqueue初始化的容量,则继续创建线程,直到线程数量达到maximumpoolsize设置的最大线程数量,若大于maximumpoolsize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界任务队列初始量较大或者没有达到超负荷的状态,线程数将一直维持在corepoolsize及以下;反之,当任务队列已满时,则会以maximumpoolsize为最大线程数上限。

  • *的任务队列:可使用linkedblockingqueue实现,如下所示:
pool = new threadpoolexecutor(1, 2, 1000, timeunit.milliseconds, new linkedblockingqueue<runnable>(),
executors.defaultthreadfactory(),new threadpoolexecutor.abortpolicy());

   使用*任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是corepoolsize。在这种情况下maximumpoolsize参数是无效的,哪怕任务队列中缓存了很多未执行的任务,当线程池的线程数达到corepoolsize后,线程数也不会再增加了。若后续有新的任务加入,则直接进入队列等待。使用这种任务队列模式时,要注意任务提交与处理之间的协调控制,不然会出现队列中的任务由于无法及时处理导致的一直增长,直到最后资源耗尽的问题。

  • 优先任务队列:通过priorityblockingqueue实现,如下所示:
 1 public class priorityblockingqueuetest {
 2 
 3     private static executorservice pool;
 4     public static void main(string[] args) {
 5         
 6         //使用优先任务队列
 7         pool = new threadpoolexecutor(1, 2, 1000, timeunit.milliseconds, new priorityblockingqueue<runnable>(), executors.defaultthreadfactory(), new threadpoolexecutor.abortpolicy());
 8         for(int i = 0;i < 10;i++){
 9             pool.execute(new prioritythreadtask(i));
10         }
11     }
12     
13 }
14 
15 class prioritythreadtask implements runnable, comparable<prioritythreadtask>{
16 
17     private int priority;
18     
19     public int getpriority(){
20         return priority;
21     }
22     public void setpriority(int priority){
23         this.priority = priority;
24     }
25     
26     public prioritythreadtask(){}
27     
28     public prioritythreadtask(int priority){
29         this.priority = priority;
30     }
31 
32     @override
33     public void run() {
34         
35         try{
36             //让线程阻塞,使后续任务进入缓存队列
37             thread.sleep(1000);
38             
39             system.out.println("priority:" + this.priority + ", threadname:" + thread.currentthread().getname());
40         }catch(interruptedexception e){
41             e.printstacktrace();
42         }
43     }
44 
45     //当前对象和其他对象作比较,当前优先级大就返回-1,当前优先级小就返回1,值越小优先级越高
46     @override
47     public int compareto(prioritythreadtask o) {
48         return this.priority > o.priority ? -1 : 1;
49     }
50 }

  执行结果如下:

priority:0, threadname:pool-1-thread-1
priority:19, threadname:pool-1-thread-1
priority:18, threadname:pool-1-thread-1
priority:17, threadname:pool-1-thread-1
priority:16, threadname:pool-1-thread-1
priority:15, threadname:pool-1-thread-1
priority:14, threadname:pool-1-thread-1
priority:13, threadname:pool-1-thread-1
priority:12, threadname:pool-1-thread-1
priority:11, threadname:pool-1-thread-1
priority:10, threadname:pool-1-thread-1
priority:9, threadname:pool-1-thread-1
priority:8, threadname:pool-1-thread-1
priority:7, threadname:pool-1-thread-1
priority:6, threadname:pool-1-thread-1
priority:5, threadname:pool-1-thread-1
priority:4, threadname:pool-1-thread-1
priority:3, threadname:pool-1-thread-1
priority:2, threadname:pool-1-thread-1
priority:1, threadname:pool-1-thread-1

   由执行结果可看出,除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列priorityblockingqueue中,按优先级进行了重新排列执行,且线程池的线程数一直为corepoolsize,在本例中corepoolsize为1,即线程数一直为1。

  priorityblockingqueue其实是一个特殊的*队列,它其中无论添加了多少个任务,线程池创建的线程数量也不会超过corepoolsize。其他队列一般是按照先进先出的规则处理任务,而priorityblockingqueue队列可以自定义规则根据任务的优先级顺序先后执行。

4、handler拒绝策略

  在创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列。在创建有界任务队列模式下,当任务队列已满且线程池创建的线程数达到最大线程数时,需要指定threadpoolexecutor的rejectedexecutionhandler参数来处理线程池"超载"的情况。threadpoolexecutor自带的拒绝策略如下:

  • abortpolicy策略:该策略会直接会直接抛出异常,阻止系统正常工作
  • discardpolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。使用此策略时,业务场景中需允许任务的丢失
  • discardoldestpolicy策略:该策略会丢弃任务队列中最老的一个任务,即任务队列中最先被添加进去的、马上要被执行的任务,并尝试再次提交任务(重复此过程)
  • callerrunspolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行

  以上内置的拒绝策略均实现了rejectedexecutionhandler接口,也可自己扩展rejectedexecutionhandler接口,定义自己的拒绝策略。示例代码如下:

 1 /**
 2  * 自定义拒绝策略
 3  */
 4 public class customrejectedexecutionhandlertest {
 5 
 6     private static executorservice pool;
 7 
 8     public static void main(string[] args) {
 9 
10         //自定义拒绝策略
11         pool = new threadpoolexecutor(1, 2, 1000, timeunit.milliseconds,
12                 new arrayblockingqueue<>(5),
13                 executors.defaultthreadfactory(),
14                 new rejectedexecutionhandler() {
15                     @override
16                     public void rejectedexecution(runnable r, threadpoolexecutor executor) {
17                         system.out.println(r.tostring() + " 执行了拒绝策略");
18                     }
19                 });
20 
21         for (int i = 0; i < 10; i++) {
22             pool.execute(new customrejectedexecutionhandlerthreadtask());
23         }
24     }
25 }
26 
27 class customrejectedexecutionhandlerthreadtask implements runnable {
28 
29     @override
30     public void run() {
31         try {
32             //让线程阻塞,使后续任务机进入缓存队列
33             thread.sleep(1000);
34             system.out.println("线程名称:" + thread.currentthread().getname());
35         } catch (interruptedexception e) {
36             e.printstacktrace();
37         }
38     }
39 }

  执行结果如下:

com.aisino.threadpool.customrejectedexecutionhandlerthreadtask@3cd1a2f1 执行了拒绝策略
com.aisino.threadpool.customrejectedexecutionhandlerthreadtask@2f0e140b 执行了拒绝策略
com.aisino.threadpool.customrejectedexecutionhandlerthreadtask@7440e464 执行了拒绝策略
线程名称:pool-1-thread-2
线程名称:pool-1-thread-1
线程名称:pool-1-thread-2
线程名称:pool-1-thread-1
线程名称:pool-1-thread-2
线程名称:pool-1-thread-1
线程名称:pool-1-thread-2

   由执行结果可看出,由于任务添加了休眠阻塞,执行任务需要花费一定时间,导致有一定数量的任务被丢弃,从而执行自定义的拒绝策略。

5、threadfactory自定义线程创建

  线程池中的线程是通过threadpoolexecutor中的线程工厂threadfactory创建的。可自定义threadfactory对线程池中的线程进行一些特殊的设置(命名、设置优先级等)。示例代码如下:

 1 public class customthreadfactorytest {
 2     
 3     private static executorservice pool;
 4 
 5     public static void main(string[] args) {
 6         //自定义线程工厂
 7         pool = new threadpoolexecutor(2, 4, 1000, timeunit.milliseconds, new arrayblockingqueue<>(5), new threadfactory() {
 8             @override
 9             public thread newthread(runnable r) {
10                 system.out.println("创建线程:" + r.hashcode());
11                 //线程名称
12                 thread th = new thread(r, "threadpool-" + r.hashcode());
13                 return th;
14             }
15         }, new threadpoolexecutor.callerrunspolicy());
16         
17         for(int i = 0;i < 10;i++){
18             pool.execute(new customthreadfactorythreadtask());
19         }
20     }
21 }
22 
23 class customthreadfactorythreadtask implements runnable{
24     
25     @override
26     public void run(){
27         //输出执行线程的名称
28         system.out.println("线程名称:" + thread.currentthread().getname());
29     }
30 }

  执行结果如下:

创建线程:1259475182
创建线程:1300109446
创建线程:1020371697
线程名称:threadpool-1259475182
线程名称:threadpool-1300109446
线程名称:threadpool-1259475182
创建线程:789451787
线程名称:threadpool-1020371697
线程名称:threadpool-1259475182
线程名称:threadpool-1300109446
线程名称:threadpool-1259475182
线程名称:threadpool-1020371697
线程名称:threadpool-789451787
线程名称:threadpool-1300109446

   由执行结果可看出,每个线程的创建都进行了记录输出与命名。

6、正确构造线程池

int poolsize = runtime.getruntime().availableprocessors() * 2;
blockingqueue<runnable> queue = new arrayblockingqueue<>(512);
rejectedexecutionhandler policy = new threadpoolexecutor.discardpolicy();
executorservice = new threadpoolexecutor(poolsize, poolsize,
    0, timeunit.seconds,
            queue,
            policy);

 

三、threadpoolexecutor类扩展

  threadpoolexecutor类扩展主要是围绕beforeexecute()、afterexecute()和terminated()三个接口。

  • beforeexecute:线程池中任务运行前执行
  • afterexecute:线程池中任务运行完毕后执行
  • terminated:线程池退出后执行

  通过这三个接口可以监控每个任务的开始时间和结束时间,或者其他功能。示例代码如下:

 1 public class threadpoolexecutorextensiontest {
 2     
 3     private static executorservice pool;
 4 
 5     public static void main(string[] args) {
 6         
 7         //自定义线程,为线程重命名
 8         pool = new threadpoolexecutor(1, 4, 1000, timeunit.milliseconds, new arrayblockingqueue<>(5), new threadfactory() {
 9             @override
10             public thread newthread(runnable r) {
11                 system.out.println("创建线程:" + r.hashcode());
12                 thread th = new thread(r, "threadpool-" + r.hashcode());
13                 return th;
14             }
15         }, new threadpoolexecutor.callerrunspolicy()){
16             
17             protected void beforeexecute(thread t,runnable r) {
18                 system.out.println("准备执行的任务名称:"+ ((threadpoolexecutorextensionthreadtask)r).gettaskname());
19             }
20 
21             protected void afterexecute(runnable r,throwable t) {
22                 system.out.println("执行完毕的任务名称:"+((threadpoolexecutorextensionthreadtask)r).gettaskname());
23             }
24 
25             protected void terminated() {
26                 system.out.println("线程池退出");
27             }
28         };
29         
30         for(int i = 0;i < 10;i++){
31             pool.execute(new threadpoolexecutorextensionthreadtask("task-" + i));
32         }
33         pool.shutdown();
34     }
35 }
36 
37 class threadpoolexecutorextensionthreadtask implements runnable{
38     
39     private string taskname;
40     
41     public string gettaskname(){
42         return taskname;
43     }
44     
45     public void settaskname(string taskname){
46         this.taskname = taskname;
47     }
48     
49     public threadpoolexecutorextensionthreadtask(){}
50     
51     public threadpoolexecutorextensionthreadtask(string taskname){
52         this.taskname = taskname;
53     }
54 
55     @override
56     public void run() {
57         //输出任务名称以及对应的执行线程名称
58         system.out.println("任务名称:" + this.taskname + ", 执行线程名称:" + thread.currentthread().getname());
59     }
60 }

  执行结果如下:

创建线程:1259475182
创建线程:1300109446
创建线程:1020371697
准备执行的任务名称:task-0
创建线程:789451787
任务名称:task-0, 执行线程名称:threadpool-1259475182
准备执行的任务名称:task-6
任务名称:task-9, 执行线程名称:main
执行完毕的任务名称:task-0
准备执行的任务名称:task-7
准备执行的任务名称:task-8
任务名称:task-6, 执行线程名称:threadpool-1300109446
任务名称:task-8, 执行线程名称:threadpool-789451787
执行完毕的任务名称:task-8
准备执行的任务名称:task-1
任务名称:task-7, 执行线程名称:threadpool-1020371697
执行完毕的任务名称:task-7
任务名称:task-1, 执行线程名称:threadpool-1259475182
执行完毕的任务名称:task-1
准备执行的任务名称:task-2
任务名称:task-2, 执行线程名称:threadpool-789451787
执行完毕的任务名称:task-2
执行完毕的任务名称:task-6
准备执行的任务名称:task-5
任务名称:task-5, 执行线程名称:threadpool-789451787
执行完毕的任务名称:task-5
准备执行的任务名称:task-4
准备执行的任务名称:task-3
任务名称:task-4, 执行线程名称:threadpool-1259475182
执行完毕的任务名称:task-4
任务名称:task-3, 执行线程名称:threadpool-1020371697
执行完毕的任务名称:task-3
线程池退出

   由执行结果可看出,通过对beforeexecute()、afterexecute()和terminated()的实现,可以对线程池中线程的状态进行监控,在线程执行前后输出了相关的打印信息。另外,使用shutdown()方法可以比较安全的关闭线程池,当线程池调用该方法后,线程池将不再接受后续添加的任务。但是,线程池不会立刻退出,而是等到添加到线程池中的任务都处理完成,才会退出。