Spring Boot 异步线程
一般的后台管理系统都有导出报表的功能,对于大数据量的报表导出,通常比较耗时,比如管理员点击一个导出按钮,往往要等待很长的时间直到报表成功导出才可以进行下一步操作,显然这种同步的方式已经满足不了需求了。现在实际开发中常用的方式是采用JMS消息队列方式,发送消息到其他的系统中进行导出,或者是在项目中开启异步线程来完成耗时的导出工作。本文将结合报表导出的场景,来讲解一些Spring Boot中如何开启异步线程。
定义线程池和开启异步可用
Spring中存在一个接口AsyncConfigurer接口,该接口就是用来配置异步线程池的接口,它有两个方法,getAsyncExecutor和getAsyncUncaughtExceptionHandler,第一个方法是获取一个线程池,第二个方法是用来处理异步线程中发生的异常。它的源码如下所示:
package org.springframework.scheduling.annotation;
import java.util.concurrent.Executor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.lang.Nullable;
public interface AsyncConfigurer {
// 获取线程池
@Nullable
default Executor getAsyncExecutor() {
return null;
}
// 异步异常处理器
@Nullable
default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
这里的接口提供的都是空实现,所以想要开启异步线程机制,那么就需要我们手动实现这个接口,将实现该接口的类标注为Spring的配置类,那么就开启了Spring的异步可用,那么Spring就会通过getAsyncExecutor来获取一个可用的线程来执行某项异步操作,当然,整个异步的开启还需要结合两个注解,一个是@EnableAsync,另外一个是@Async,第一个是标注在配置类中,用来告诉Spring异步可用,第二个注解通常标注在某个方法中,当调用这个方法的时候,就会从线程池中获取新的线程来执行它。
现在我们来定义线程池并开启异步可用,这里写一个配置类AsyncConfig来实现AsyncConfigurer,代码如下所示:
package cn.itlemon.springboot.async.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
* @author jiangpingping
* @date 2018/10/30 19:28
*/
@Configuration
@EnableAsync
@Slf4j
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
// 自定义线程池
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// 核心线程数
taskExecutor.setCorePoolSize(10);
// 最大线程数
taskExecutor.setMaxPoolSize(30);
// 线程队列最大线程数
taskExecutor.setQueueCapacity(2000);
// 初始化线程池
taskExecutor.initialize();
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> {
log.error("Error Occurs in async method:{}", ex.getMessage());
};
}
}
第一个方法我们定义了一个线程池,并设置了一些基本参数,比如核心线程数、最大线程数、线程队列最大线程数等,第二个方法是处理异步线程中发生的异常,它是一个异常处理器,返回AsyncUncaughtExceptionHandler接口的实现类对象,由于AsyncUncaughtExceptionHandler是一个函数式接口(只有一个抽象方法的接口,通常使用@FunctionalInterface注解标注的接口),所以这里使用了Lambda表达式来简写它的实现类对象,这里的异步异常处理就是记录一下日志,并没有做其他的逻辑操作,如果对Lambda表达式不熟悉,也可以直接使用匿名内部类的方式来创建AsyncUncaughtExceptionHandler的实现类对象,如下所示:
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("Error Occurs in async method:{}", ex.getMessage());
}
};
}
需要注意的一点的是,我们在上面的配置类中加入了@EnableAsync注解,那么在Spring注册该配置类为Spring Bean的时候,就会开启异步可用机制。
测试异步可用机制
写一个Service层接口,用来表明生成报表:
package cn.itlemon.springboot.async.service;
import java.util.concurrent.Future;
/**
* @author jiangpingping
* @date 2018/10/30 19:32
*/
public interface AsyncService {
/**
* 模拟生成报表的异步方法
*/
void generateReport();
}
它的实现类是:
package cn.itlemon.springboot.async.service.impl;
import cn.itlemon.springboot.async.service.AsyncService;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.Future;
/**
* @author jiangpingping
* @date 2018/10/30 19:33
*/
@Service
public class AsyncServiceImpl implements AsyncService {
@Override
@Async
public void generateReport() {
// 模拟异步生成报表代码,这里设置为打印
System.out.println("报表线程名称:【" + Thread.currentThread().getName() + "】");
}
}
这里假设进行了报表的导出工作,所以使用打印语句来进行简单的模拟,并在方法中标注了@Async注解,那么当调用该方法的时候,Spring会获取一个新的线程来执行这个方法,所以这里打印出执行当前方法的线程名称。我们在写一个控制器,代码如下:
package cn.itlemon.springboot.async.controller;
import cn.itlemon.springboot.async.service.AsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* @author jiangpingping
* @date 2018/10/30 19:36
*/
@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncController {
private final AsyncService asyncService;
@Autowired
public AsyncController(AsyncService asyncService) {
this.asyncService = asyncService;
}
@GetMapping("/page")
public String asyncPage() {
System.out.println("当前请求线程名称为:【" + Thread.currentThread().getName() + "】");
// 异步调用
asyncService.generateReport();
// 返回结果
return "async";
}
}
我们在当前Controller方法中也打印了当前的线程,运行项目,访问指定的URL,就可以对比在调用generateReport方法的时候是否启用了新的线程。我们启动Spring Boot应用,在浏览器地址栏输入:http://localhost:8080/async/page,在控制台打印的结果是:
当前请求线程名称为:【http-nio-8080-exec-1】
报表线程名称:【ThreadPoolTaskExecutor-1】
很明显,这不是同一个线程,说明我们开启异步线程成功。
处理异步线程中的异常
一般在Spring中处理异步线程异常分成两类,一类是异步方法没有返回值,另一类是异步方法有返回值。
第一类无返回值方法
对于第一类无返回值情况,我们已经在AsyncConfig配置类中进行了配置,即实现getAsyncUncaughtExceptionHandler方法,也就是当异步线程中的代码发生了异常,就会调用这个方法来进行异常处理,为了检验,我们在AsyncServiceImpl的方法generateReport中手动加一行代码System.out.println(1 / 0);,从而导致其出除零异常,代码如下所示:
@Override
@Async
public void generateReport() {
// 模拟异步生成报表代码,这里设置为打印
System.out.println("报表线程名称:【" + Thread.currentThread().getName() + "】");
System.out.println(1 / 0);
}
当再次启动Spring Boot应用,在浏览器地址栏输入:http://localhost:8080/async/page,那么将在异步流程中发生异常,由于是在不同线程中发生的异常,所以它并不会影响主线程的执行,且发生异常后,由配置了getAsyncUncaughtExceptionHandler方法,那么该异常将会被处理,处理的方式就是使用日志进行了记录:
2018-10-31 10:57:09.952 ERROR 2391 --- [lTaskExecutor-1] c.i.springboot.async.config.AsyncConfig : Error Occurs in async method:/ by zero
1
第二类有返回值方法
对于第二种情况,即异步方法会有返回值,那么我们如何获取到异步线程处理后的返回值呢,通常的方法是将异步方法的返回值使用接口Future、ListenableFuture或者类AsyncResult进行包装,即将返回值作为泛型传入到上述接口或者类中。这里我们来简要分析一下它们的源码中的常用方法。
Future接口:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
方法分析:
cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true。
isDone方法表示任务是否已经完成,若任务完成,则返回true;
get方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
ListenableFuture接口:
public interface ListenableFuture<T> extends Future<T> {
void addCallback(ListenableFutureCallback<? super T> callback);
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
default CompletableFuture<T> completable() {
CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
addCallback(completable::complete, completable::completeExceptionally);
return completable;
}
}
ListenableFuture继承了Future接口,它还额外添加了三个方法,主要用来添加异步现场的回调,可以用来处理异常和获取异步方法的返回值的。AsyncResult类实现了ListenableFuture接口,也实现了它所有的方法。接下来,我们将分别介绍如何获取异步处理后的返回值和异常处理。
使用Future接口
我们在AsyncService接口中添加一个方法:returnMessage(),并使用Future接口来进行包装,代码如下:
/**
* 异步回调消息方法
*
* @return 字符串
*/
Future<String> returnMessage();
实现类中的代码如下:
@Override
@Async
public Future<String> returnMessage() {
System.out.println(Thread.currentThread().getName());
String message = "Async Method Result";
return new AsyncResult<>(message);
}
那么在Controller层,就可以获取到Future的实现类对象,代码如下:
@GetMapping("/page1")
public String asyncPage1() {
try {
System.out.println(Thread.currentThread().getName());
Future<String> result = asyncService.returnMessage();
System.out.println(result.get());
} catch (ExecutionException | InterruptedException e) {
log.error("发生了异常:{}", e.getMessage());
}
return "async";
}
这里对异步进行了try...catch异常处理,也使用了Future的get方法获取了异步方法的返回值,但是这种获取返回值的方式会阻塞当前线程,也就是说调用了get方法之后,会等待异步线程执行完毕后才进行下一行代码的执行。
使用ListenableFuture接口
我们在AsyncService接口中添加一个方法:returnMsg(),并使用ListenableFuture接口来进行包装,代码如下:
/**
* 异步回调消息方法
*
* @return 字符串
*/
ListenableFuture<String> returnMsg();
实现类中的代码如下:
@Override
@Async
public ListenableFuture<String> returnMsg() {
System.out.println(Thread.currentThread().getName());
String message = "Async Method Result";
return new AsyncResult<>(message);
}
那么在Controller层,就可以获取到ListenableFuture的实现类对象,代码如下:
@GetMapping("/page2")
public String asyncPage2() {
System.out.println(Thread.currentThread().getName());
ListenableFuture<String> result = asyncService.returnMsg();
result.addCallback(new SuccessCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("返回的结果是:" + result);
}
}, new FailureCallback() {
@Override
public void onFailure(Throwable ex) {
log.error("发生了异常:{}", ex.getMessage());
}
});
return "async";
}
从上面的代码中可以看出,在返回的结果中添加了两个回调,分别是异步处理成功的回调SuccessCallback接口的实现类对象和异步处理失败发生异常的回调FailureCallback接口的实现类对象。ListenableFuture接口是对Future接口的扩展,支持回调,有效的避免了线程阻塞问题,也就是说,它会监听Future接口的执行情况,一旦完成,就会调用onSuccess方法进行成功后的处理,一旦发生异常,就会调用onFailure方法进行异常处理。相比较而言,更加推荐使用ListenableFuture来进行有返回值的异步处理。对于Java1.8,其实更加推荐使用CompletableFuture或者guava的ListenableFuture,感兴趣的同学可以进行深入研究,他们的处理异步能力会更加强悍
上一篇: spring boot 线程池
推荐阅读
-
Spring Boot整合Spring Security的示例代码
-
Spring boot工具类静态属性注入及多环境配置详解
-
干货分享:ASP.NET CORE(C#)与Spring Boot MVC(JAVA)异曲同工的编程方式总结
-
Spring Boot 2 - 初识与新工程的创建
-
.NET进阶篇06-async异步、thread多线程4
-
聊聊多线程那一些事儿(task)之 三 异步取消和异步方法
-
九、Spring Boot 优雅的实现CORS跨域
-
spring Boot环境下dubbo+zookeeper的一个基础讲解与示例
-
spring boot 2 全局统一返回RESTful风格数据、统一异常处理
-
在 Spring Boot 项目中使用 activiti