欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring(28)——Task抽象

程序员文章站 2022-04-17 23:19:46
...

Spring为异步执行任务和定时任务抽象了TaskExecutor接口和TaskScheduler接口,Spring之所以进行这样的抽象是为了在其内部统一任务调度的接口。TaskExecutor和JDK自带的Executor有点类似,只定义了一个execute(),用来执行一个任务,至于对应的任务怎么调度的,则由具体的实现类来实现,比如可以使用一个新的线程,或者使用一个线程池来调度。而实际上TaskExecutor接口也是继承了JDK的Executor接口的。

常见的TaskExecutor实现类

  • SimpleAsyncTaskExecutor 每次调度的时候都会启用一个新的线程执行任务。
  • ConcurrentTaskExecutor 接收一个java.util.concurrent.Executor对象作为参数,然后执行任务的时候会使用内部的java.util.concurrent.Executor调度。
  • ThreadPoolTaskExecutor TaskExecutor的线程池实现,类似于JDK的ThreadPoolExecutor,可以进行线程池的大小定义等。
  • ThreadPoolTaskScheduler 可以定时执行任务的实现,同时实现了TaskExecutor接口和TaskScheduler接口。

使用ThreadPoolTaskExecutor

使用ThreadPoolTaskExecutor时可以自己在程序中new对象,这样我们可以对它进行充分的自定义,比如:

ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(20);
taskExecutor.initialize();
for (int i=0; i<100; i++) {
    TimeUnit.MILLISECONDS.sleep(500);
    taskExecutor.execute(() -> {
        System.out.println(Thread.currentThread() + "--------" + LocalDateTime.now());
    });
}
TimeUnit.SECONDS.sleep(20);

也可以把它定义为Spring的一个bean:

<bean id="myExecutor" 
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"
    p:core-pool-size="10"
    p:maxPoolSize="50"
    p:queueCapacity="10000"
    />

也可以通过task这个namespace定义,这需要先引入task命名空间。

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
     http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.1.xsd">

然后就可以通过<task:executor/>来定义一个ThreadPoolTaskExecutor对象了。下面的代码就定义了一个id为myExecutor的bean,其类型是ThreadPoolTaskExecutor。keep-alive指定当创建的线程超过了核心线程数之后,线程在销毁前的最大闲置时间,单位是秒。也就是说超过了核心线程数后创建的线程在闲置了120秒后就将被销毁,不指定keep-alive时默认的最大闲置时间是60秒。通过pool-size指定了核心线程数是10,最大线程数也是10。如果需要指定最大线程数,则可以采用min-max的形式指定,比如10-50则表示核心线程数是10,最大线程数是50。如果不指定pool-size则默认的核心线程数是1,最大线程数是不限制。最大线程数只有在队列容量不是无限制的时候才有用。 通过queue-capacity指定任务队列的大小为10000,不指定queue-capacity则表示队列大小无限制,可以任意增长。通过rejection-policy指定了当线程池的任务队列满了以后将使用当前线程来调用任务,可选值还有ABORT(抛出异常)、DISCARD(忽略当前任务)和DISCARD_OLDEST(忽略在队列中最老的任务,即存放最久的任务)。

<task:executor 
    id="myExecutor" 
    keep-alive="120" 
    pool-size="10" 
    queue-capacity="10000" 
    rejection-policy="CALLER_RUNS"
    />

使用ThreadPoolTaskScheduler

ThreadPoolTaskScheduler类似于JDK的ScheduledThreadPoolExecutor,也是可以直接在程序中使用的。比如下面就定义了一个ThreadPoolTaskScheduler,其接收一个定时任务,每秒打印一次当前时间。

ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
scheduler.initialize();
scheduler.scheduleAtFixedRate(() -> {
    System.out.println(LocalDateTime.now());
}, 1000);
TimeUnit.SECONDS.sleep(30);

ThreadPoolTaskScheduler也可以通过task命名空间来定义,下面的代码就定义了一个id为myScheduler的ThreadPoolTaskScheduler,对应的线程池大小是10。pool-size不指定时默认是1。

<task:scheduler id="myScheduler" pool-size="10"/>

通过<task:scheduled-tasks/>定义定时任务

可以通过<task:scheduled-tasks/>来定义定时任务调度器,执行定时任务的ThreadPoolTaskScheduler可以通过属性scheduler来指定,当未指定scheduler属性时,默认将创建一个线程池大小为1的ThreadPoolTaskScheduler来执行任务。需要执行的任务可以通过<task:scheduled/>来定义,其必须指定的参数是ref和method,ref用来指定定时任务方法对应的对象,method用来指定定时任务调度的方法。然后可以选择性的通过cron、fixed-delay、fixed-rate来定义任务的调度时机,fixed-delay和fixed-rate还可以配合initial-delay一起使用,用来指定初次任务调度的延迟时间。如果有需要也可以自己通过trigger属性,指定触发任务调度的org.springframework.scheduling.Trigger对象。实际上最底层调度的时候都是转换为Trigger对象进行调度的,比如CronTrigger或PeriodicTrigger。如果有需要也可以实现自己的Trigger对象。

<bean id="scheduledTasks" class="com.elim.spring.task.ScheduledTasks"/>
<task:scheduled-tasks scheduler="myScheduler">
    <!-- 每5秒中执行一次 -->
    <task:scheduled ref="scheduledTasks" method="printTime" cron="*/5 * * * * ?"/>
    <!-- 每秒执行一次,相当于scheduleWithFixedDelay() -->
    <task:scheduled ref="scheduledTasks" method="printTime" fixed-delay="1000"/>
    <!-- 每秒钟执行一次,初次调度将在5秒后进行,相当于scheduleAtFixedRate() -->
    <task:scheduled ref="scheduledTasks" method="printTime" fixed-rate="1000" initial-delay="5000"/>
</task:scheduled-tasks>

基于注解的支持

可以通过<task:annotation-driven/>启用基于注解的支持,这样就可以在程序中使用@Async标注方法将进行异步调用,使用@Scheduled标注方法将进行定时调度。可以通过executor属性指定在进行@Async标注的方法时默认将使用的TaskExecutor,如果不指定默认将使用一个SimpleAsyncTaskExecutor实例。可以通过scheduler属性指定在进行@Scheduled标注的方法进行调度时将使用的TaskScheduler,如果不指定时默认将使用只有一个线程的TaskScheduler进行调度。

<task:annotation-driven executor="myExecutor" scheduler="myScheduler" />

由于在进行基于注解的异步调用时,对应的方法的调用完全由Spring来接管,异步调用方法中抛出的异常是不会被调用程序捕获到的,Spring默认情况下会将捕获的异常进行异常日志输出,由SimpleAsyncUncaughtExceptionHandler处理。如果有需要程序可以通过实现AsyncUncaughtExceptionHandler接口实现自定义的异常处理器,然后通过<task:annotation-driven/>exception-handler属性指定AsyncUncaughtExceptionHandler对应的bean。

<task:annotation-driven 
    executor="myExecutor" 
    scheduler="myScheduler" 
    exception-handler="myAsyncExceptionHandler"/>

如果是使用基于Java类的配置,则要启用基于注解的异步调用时通过@EnableAsync启用,通过@EnableScheduling启用基于注解的定时调度。需要自定义默认的异常处理器和TaskExecutor,则可以通过使@Configuration类实现AsyncConfigurer接口进行相应的自定义。

@Async

使用@Async标注的方法的异步调用的功能由Spring AOP实现,由AsyncAnnotationBeanPostProcessor定义。所以@Async标注的方法必须是public方法,且必须是外部调用。比如下面代码中的AnnotationTasks这个bean,如果在外部调用其testAsync()方法,则该方法将由配置的默认的TaskExecutor发起异步调用,但是testAsync()内部调用的async()方法则不会发起异步调用,而是对于testAsync()而言的同步调用。

@Component
public class AnnotationTasks {

    @Async
    public void testAsync() {
        long t1 = System.currentTimeMillis();
        this.async();
        long t2 = System.currentTimeMillis();
        System.out.println(Thread.currentThread() + " 耗时:" + (t2 - t1));
    }
    
    @Async
    private void async() {
        try {
            TimeUnit.SECONDS.sleep(3);
            System.out.println(Thread.currentThread() + "-----------");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}

默认的异步调用是基于Spring Aop进行的,可以通过<task:annotation-driven/>的mode属性指定是否需要使用Aspectj的支持。使用Spring Aop代理时又可以通过proxy-target-class属性指定是否需要强制使用CGLIB做基于Class的代理。

<task:annotation-driven 
    executor="myExecutor" 
    scheduler="myScheduler" 
    exception-handler="myAsyncExceptionHandler" 
    mode="proxy"
    proxy-target-class="false"/>

@Async标注的方法默认会被<task:annotation-driven/>指定的TaskExecutor执行,如果需要针对某个特定的异步执行使用一个特定的TaskExecutor,则可以通过@Async的value属性指定需要使用的TaskExecutor对应的bean名称。如下代码则指定了在异步调用该方法时将使用名称为executor2的TaskExecutor。

@Async("executor2")
public void testAsync2() {
    try {
        TimeUnit.SECONDS.sleep(3);
        System.out.println(Thread.currentThread() + "-----------");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

@Async也可以是定义在Class上的,定义在Class上时表示该Class中所有的方法在被外部调用时都将异步调用。

@Component
@Async
public class AsyncClass {

    public void print() {
        System.out.println(Thread.currentThread());
    }
    
}

@Scheduled

需要指定一个方法为定时调度时可以通过使用Scheduled标注,该注解将由ScheduledAnnotationBeanPostProcessor负责解析并发起定时调度,其内部会由ScheduledTaskRegistrar定义任务,如有需要我们也可以使用它来进行自定义定时任务调度的封装。Scheduled的定义如下,可以指定的参数的含义都非常清晰,就不做过多的介绍了。与XML定义不同的是,fixedDelay、fixedRate等多了可以使用字符串定义的形式,这就允许我们可以通过使用占位符来进行定义。

public @interface Scheduled {

    String cron() default "";

    String zone() default "";

    long fixedDelay() default -1;

    String fixedDelayString() default "";

    long fixedRate() default -1;

    String fixedRateString() default "";

    long initialDelay() default -1;

    String initialDelayString() default "";

}

如下代码就定义了该方法将每5秒钟执行一次。

@Scheduled(fixedRate = 5000)
public void schdule() {
    System.out.println("当前时间是:" + LocalDateTime.now());
}

(注:本文是基于Spring4.1.0所写)