欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ExecutorService

程序员文章站 2022-04-04 09:09:13
...

一.ExecutorService:

它也是一个接口,它扩展自Executor接口,Executor接口更像一个抽象的命令模式,仅有一个方法:execute(runnable);Executor接口简单,但是很重要,重要在这种设计的模式上。。

 

ExecutorService在Executor的基础上增加了“service”特性的方法:

  • shutdown()、shutdownNow():都是关闭当前service服务,释放Executor的所有资源(参见实现类);它所触发的动作就是取消队列中任务的执行。shutdown是一种“友好”的关闭,它将不再(事实上是不能)接受新的任务提交,同时把已经提交到队列中的任务执行完毕。shutdownNow更加直接一些,它将会把尚未执行的任务不再执行,正在执行的任务,通过“线程中断”(thread.interrupt),如果线程无法响应“中断”,那么将不会通过此方式被立即结束。shutdowNow是个有返回类型的方法,它返回那些等待执行的任务列表(List<Runnable>)
  • isShutdown:程序是否已经关闭,1)方法将导致其返回true。
  • isTerminated:是否已经结束,如果关闭后,所有的任务都执行完成,将返回true,否则其他情况均返回false。
  • awaitTermination(timeout):会抛出interruptException,此方法就是个废柴,大概意思是等待一段之间直到“任务全部结束”,如果超时就返回false。
  • Future submit(callable/runnale):向Executor提交任务,并返回一个结果未定的Future。
  • List<Future> invokeAll(Collection<Callable>):一个废柴方法,同步的方法,执行所有的任务列表,当所有任务都执行完成后,返回Future列表。这方法有啥用??貌似,可以对一批任务进行批量跟踪。此方法会抛出interruptException。
  • T invokeAny(Collection<Callable>): 任务集合中,任何一个任务完成就返回。

这些方法都会被ExecutorService的子类实现,其实Executor的子类的实现原理,才是最有意义的。其实基于Executor接口自己也能创造世界。

 

二.ScheduledExecutorService:

ExecutorService接口有一个非常重要的子接口: ScheduledExecutorService,从它的名字,我们就能看出此service是为了支持时间可控的任务执行而设计,其中包括:固定延迟执行,周期性执行;不过他还不支持制定特定date执行,这个工作可以交给Timer来做(稍后参看timer讲解)

ScheduledExecutorService提供了如下3个核心方法:

  • schedule(Callable<V> callable, long delay, TimeUnit unit)
  • schedule(Runnable command, long delay, TimeUnit unit)
  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):不再赘述每个方法的意义.

ScheduledExecutorService的唯一子类(基于线程池)ScheduledThreadPoolExecutor(稍后参看其内部原理)

上述4个方法均会返回一个ScheduleFuture,这接口并没有什么特殊的地方,和Future接口相比仅仅多扩展了一个Delay接口(此接口仅作标记特性),像Future接口一样,这个接口也有着"操蛋的"继承:

ScheduledFuture + RunnableFuture  --> RunnableScheduledFuture(接口) --> ScheduledFutureTask(一个内部类同时继承了FutureTask)

Future + Runnable -->RunnableFuture -- > FutureTask(子类)

 

这么"奇怪"的接口复合,以及最终的ScheduledFutureTask类和FutureTask类,只是为了一个目的,归一化Callable和runnable这2种可执行任务,并可以获取(探测)任务执行的结果,并可以cancel任务,以及在任务执行完成后做辅助的操作(比如加入队列).

 

三.ThreadFactory:

ThreadFactory接口,明摆着,就是工厂模式,其只有一个方法:Thread newThread(runnable),她的作用,就是把任何runnable类型的任务,最终生成一个Thread..因为他是一个工厂方法,任何其实现类都可以在"构建"线程时做更多的辅助行为,比如如果线程构建行为被拒绝返回null等..(稍后参见具体实现):

Java代码  ExecutorService
            
    
    博客分类: 并发编程 ExecutorService 
  1. /////sample//  
  2.   
  3. public SsampleThreadFactory implements ThreadFactory {  
  4.     private Queue<Thread> inner = new LinkedList<Thread>();  
  5.     public Thread newThread(Runnable runnable){  
  6.         Thread t = new Thread(runnable);  
  7.         t.setDaemon(true);  
  8.         boolean isFull = inner.offer(t);  
  9.         return isFull == true ? t : null;  
  10.     }  
  11. }  

 
四.ThreadPoolExecutor:

ExecutorService其中最重要的实现类,就是ThreadPoolExecutor(线程池执行器),我们使用Executor,无非也就是使用线程池Executor。

 

因为对于线程池服务,可以再整个生命周期中接受大量的可执行任务,但是线程池服务底层通过队列来保存"任务"集合,毕竟对于有界队列,当队列满时,将不能接受新的任务提交操作,那么线程池服务也将处于一个"模糊"的状态,后来,有了RejectedExecutionHandler(拒绝策略)接口,来告知ThreadPoolExectuor,当出现意外情况时,如何处理("拒绝")任务.此接口提供了一个方法:void rejectedExecution(Runnable r, ThreadPoolExecutor executor) ,这个方法需要提供被拒绝的"任务"以及受理此任务的"线程池服务",之说以如此设计,很明显,此接口和ThreadPoolExecutor需要协作才能做一些事情..此接口的实现类,全部在ThreadPoolExecutor中,作为static的内部类而设计,使用者可以再构建ThreadPoolExecutor时指定"拒绝策略".

 

  1. AbortPolicy:ThreadPoolExecutor默认的拒绝策略,如果队列已满,直接终止(被拒绝提交),抛出RejiectedExecutionException;这个在ThreadPoolExecutor.execute/submit方法都会抛出。
  2. CallerRunsPolicy:直接在ThreadPoolEexucutor中被执行,此举措,可以阻塞其他任务的继续提交。方法实现为直接在regjectedExecution方法内调用runnbale.run()
  3. DiscardOldestPolicy:丢失队列头部的尚未执行的任务,以确保新任务被加入队列。方法实现为:从executor队列中poll一个任务,然后再把此runnable加入队列
  4. DiscardPolicy:直接丢弃此任务。方法为空方法实现。
  • ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler):全参数构造器,指定线程池中核心线程数(即最小线程数)、最大线程数,以及线程空闲的时间,同时可支持外部指定任务队列,此队列需要为blockingQueue。

ThreadPoolExecutor中规中矩的实现了ExecutorService的所有接口,同时还增加了几个特殊的方法:

  • void finalize():重写了Object.finalize方法,其做的一件事情就是在此方法中调用shutdown().
  • void execute(Runnable):提交任务,将在稍后的某个时间执行;execute方法内部就是整个线程池模型的核心;比较复杂:
  1. 如果任务队列未满,则直接将任务加入队列
  2. 如果队列已满,检测线程池的线程数是否达到最大值,如果未达到,则创建新线程并立即执行此任务;否则则将任务交付给RejectExecutionHandler。注意:线程池中的线程,将会不断从任务队列中take出任务并执行(while循环)。因为线程池的模型存在一种情况,就是线程被创建后,在while开始前可以指定一个firstTask,所以对任务的执行做了一个特殊的封装。(参见代码)
  • void shutdown():关闭ThreadPoolExecutor;将executor的runState置为shutdown;退出所有的尚未执行任务的线程(take队列被阻塞的,直接interrupt);如果队列中,还有任务,则分配线程执行任务;任务全部结束,将runState置为terminated(已完毕),同时向awaitTermination()的阻塞,发送信号。
  • void shutdownNow():立即关闭,此方法和shutdown的区别是:将runState置为stop,它将直接interrupt所有的线程,然后将线程池中剩余的任务返回。在返回之前,也执行termination操作。
  • boolean awaitTermination(timeout):此方法只是作为状态校验功能,检测当前ThreadPoolExecutor是否已经执行结束了所有的任务。阻塞指定的时间。
  • private void tryTerminate():这是一个私有方法,但是其非常有特点;尝试去结束此Executor,在线程池的线程执行完一个任务后、调用shutdown/shutdownNow之后,都会调用此方法;当线程池中的线程数量为0且任务队列为空且目前Executor的状态为stop或者shutdown时,将会触发Eexcutor的状态置为terminated;此后ThreadPoolExecutor生命周期消亡。
  • 为了开放性,ThreadPoolExecutor也提供了获取queue集合已经外部remove任务的方法。
  • void purge():清除已经取消的任务,此任务需要扩展自Future接口,此中任务来自submit方法(以及execute提交的RunnableFuture任务);此方法将会遍历整个队列,将Future状态为isCancelled的任务,移除队列。。
  • protected void beforeExecute(Thread t,Runnable r):两个奇怪的参数,此设计可能为了更方便的继承重写;此方法主要作用是自定义操作,在每个runnable交给线程执行前,做一些工作。在ThreadPoolExecutor中,此方法为空实现。
  • protected void afterExecute(Runnable t,Throwable t):同9),表示任务执行结束或者异常终止时(t),执行自定义操作。
  • Future<T> submit(Runnable/Callable command):这个方法是由其父类abstractExecutorService继承而来,其父类也就提供了这么一个有用的方法。此方法,和execute的功能一直,它将传递的runnable/callable任务封装成一个FutureTask,然后交付给execute方法去执行,并返回一个Future存根。
  • boolean prestartCoreThread/prestartAllCoreThreads:这两个方法是一个辅助方法,预启动一个或者所有的核心线程,corePoolSize用来设置核心线程数;默认情况下,线程是按需创建的(对于新任务过来,如果当前线程数小于corePoolSize,会创建一个新线程来执行新任务;直到达到corePoolSize数量;)。
  • 线程池中,具备2个基本的数据结构:workQueue/workers;其中workQueue为所有的任务队列,所有的任务存消均需要操作workQueue;workers在API中实现为HashSet,用来保存线程池中的线程;同线程池还有个指标是keepAliveTime,即“线程空闲时间”,API中对此参数的使用非常巧妙:
Java代码  ExecutorService
            
    
    博客分类: 并发编程 ExecutorService 
  1. if (state == SHUTDOWN)    
  2.     // Help drain queue  
  3.     r = workQueue.poll();  
  4. else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  
  5.     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  
  6. else  
  7.     r = workQueue.take();  
  8. if (r != null)  
  9.     return r;  
  10. if (workerCanExit()) {  
  11.     if (runState >= SHUTDOWN) // Wake up others  
  12.         interruptIdleWorkers();  
  13.     return null;  
  14. }  
  15. ....  
  16. ///即当workQueue获取任务阻塞keepAliveTime,仍然没有得到任务,此时就去interrupt那些空闲的线程。  

 

 线程池中线程创建条件

  1.  如果运行的线程数少于corePoolSize,则Executor将会首先创建线程并执行任务
  2. 如果运行线程数>= corePoolSize,则Executor将会把任务加入到workQueue中。
  3. 如果任务offer到队列失败(offer是个同步方法,和add一样,只是不抛出异常),并且线程池中线程数量达到maxPoolSize,则Reject。

   

 

五.ScheduledThreadPoolExecutor:

是ThreadPoolExecutor的子类,但是ThreadPoolExecutor的主要几个方法都被重写,也增加了多个特性的方法;此类和Timer(单线程后台线程)有点像,但是它主要是服务于多线程模式下的定时/延迟任务执行。切具有更高的灵活性。

  •  public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler):全参数构造函数,由此可见ScheduledThreadPoolExecutor内部是一个*队列,默人maxPoolSize为Integer.MAX_VALUE;内部队列为DelayedWorkQueue;DelayedWorkQueue是个内部类,基于BlockingQueue;这个内部类并没有什么新奇,其内部持有了一个DelayQueue作为数据支撑;只是队列中接受的数据类型必须是RunnableScheduledFuture。
  • protected <V>  RunnableScheduledFuture<V> decorateTask(Runnable/Callable runnable,RunnableScheduledFuture<V> task):此方法,是个扩展方法,以便子类修改task做自定义的操作。默认直接返回task。
  • public ScheduledFuture<?> schedule(Runnable/Callable command,long delay,TimeUnit unit):提交任务,单次执行。并返回一个ScheduledFuture句柄。此方法主要是通过delay和unit计算出触发的时间,并将一个command封装成一个ScheduledFutureTask,然后将任务添加到队列中(此队列使用了ThreadPoolExecutor的队列,super.getQueue(),当然此队列也是ScheduledThreadPoolExecutor构造器指定的DelayedWorkQueue);
  • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,long period, TimeUnit unit):提交任务,频率执行。此方法比2)多了一个参数period,即执行周期间隔,ScheduledFutureTask将会保存period参数,并在isPeriodic()方法中检测,如果period>0,表示此task是频率执行的,isPeriodic方法扩展自RunnableScheduledFuture接口。ScheduledFutureTask的run方法,将会把“单次执行”和“频率执行”任务,分离开来。对于“单次执行”(即period ==0)的任务,将会直接运行;对于“频率执行”的任务,执行完之后,会重置执行结果,并计算下一次被trigger的时间,然后把任务再次添加到队列。

需要声明一下,ScheduledFutureTask扩展了Delayed接口(这也是此Task能被放入DelayQueue的原因),此接口只有一个方法就是getDelay(TimeUnit),此方法约定返回已经“过期”的剩余时间,如果返回的结果> 0,则表示还未过期,否则表示已过期;对于DelayQueue而言,poll时将会校验“过期时间”,如果还没有过期,将会返回null,只有在过期时,才会被poll出队列;对于take,队列头元素尚未过期时,将会被阻塞一段时间,此时间长度为“剩余时间”。其实,Delayed接口是Comparable接口的子接口,DelayQueue的底层数据结构又是PriorityQueue,权重比较方式,就是"剩余时间"(time - now);一切真相大白。

相关标签: ExecutorService