springboot @Async 注解如何实现方法异步
@async注解如何实现方法异步
处理大批量数据的时候,效率很慢。所以考虑一下使用多线程。
刚开始自己手写的一套,用了线程池启动固定的线程数进行跑批。但是后来老大考虑到自己手写的风险不好控制,所以使用spring的方法。
这里没有详细介绍,只有简单的demo,只会用,不懂原理:
一、springboot的app类需要的注解
package com.xxx.xxx.xxx; import java.util.concurrent.threadpoolexecutor; import org.springframework.boot.web.support.springbootservletinitializer; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.core.task.taskexecutor; import org.springframework.scheduling.annotation.enableasync; import org.springframework.scheduling.concurrent.threadpooltaskexecutor; /** * 类功能说明:服务生产者启动类 * <p> * <strong></strong> * </p> * * @version * @author * @since 1.8 */ @configuration @enableasync public class application extends springbootservletinitializer { @bean public taskexecutor taskexecutor() { threadpooltaskexecutor executor = new threadpooltaskexecutor(); // 设置核心线程数 executor.setcorepoolsize(5); // 设置最大线程数 executor.setmaxpoolsize(60); // 设置队列容量 executor.setqueuecapacity(20); // 设置线程活跃时间(秒) executor.setkeepaliveseconds(60); // 设置默认线程名称 executor.setthreadnameprefix("what-"); // 设置拒绝策略 executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy()); // 等待所有任务结束后再关闭线程池 executor.setwaitfortaskstocompleteonshutdown(true); return executor; } }
springboot的app类,很简单,就能使用很多东西。
二、service层的注解
package com.xxx.xxx.service.impl; import java.util.concurrent.future; import org.springframework.scheduling.annotation.async; import org.springframework.scheduling.annotation.asyncresult; import org.springframework.stereotype.service; import com.xxx.xxx.service.xxxasyncservice ; @service public class xxxasyncserviceimpl implements xxxasyncservice { @async public future<long> rtn1() throws exception { //do something //有返回值的时候,可以返回string,long之类的。 return new asyncresult<>(1); } @async public void rtn2() throws exception { //do something //这个可以没有返回值. } }
三、调用层
package com.xxx.xxx.controller; import java.util.concurrent.future; import org.springframework.web.bind.annotation.requestmapping; import org.springframework.web.bind.annotation.requestmethod; import org.springframework.web.bind.annotation.responsebody; import org.springframework.web.bind.annotation.restcontroller; import com.xxx.xxx.service.xxxasyncservice; @restcontroller @requestmapping(value="/xxx") public class xxxasynccontroller { @autowired private xxxasyncservice xxxasyncservice; /** * 这里调用异步方法 */ @requestmapping(value = "/xxx") public void dodo() throws exception { int threads = 10;//十个线程 list<future<long>> list = new arraylist<>(); for(int i = 0;i < threads; i++){ //这里循环调用异步方法。 //如果存在大量数据,可以在这里把数据切片,然后循环调用,分批处理数据。效率杠杠的。 list .add(xxxasyncservice.rtn1()); } long count = 0; for(future<long> l : tsfcountlist) { //异步调用需要返回值的时候,这里可以把返回值都放入到list集合中,然后可以统一处理。 这里的get()是阻塞的,因为需要所以异步方法返回,在继续执行。 count += l.get(); } system.out.println("调用次数:" + count); } }
这些代码全是手写,记录下来,以后用的时候,省的忘了,查起来麻烦。。
异步注解@async的使用以及注意事项
第一步开启异步
@configuration @enableasync public class springasyncconfig { ... }
默认情况下,@enableasync检测spring的@async注释和ejb 3.1 javax. ejb .异步;此选项还可用于检测其他用户定义的注释类型。(也可以在springboot的启动类上直接加@enableasync注解)
在 spring 中,用 @async 注解指定的方法,该方法被调用时会以异步的方式执行。而如果没有在 @async 注解中指定线程池,就会使用默认的线程池。默认的线程池为 simpleasynctaskexecutor 。
该线程池不会复用线程,每有一个新任务被提交,该线程池就会创建一个新的线程实例用于执行任务。下面为相关的代码:
protected void doexecute(runnable task) { thread thread = (this.threadfactory != null ? this.threadfactory.newthread(task) : createthread(task)); thread.start(); }
而如果想要指定线程池,可以通过在 @async 注解中的 value 参数中指定所要使用的线程池的 bean name 。另一种方法是是一个实现了 asyncconfigurer 接口或是继承其默认适配器类 asyncconfigurersupport 的配置类,这样 @async 注解的方法就会使用指定的自定义的线程池。
使用@async注解的话采用的是springboot默认的线程池,不过一般我们会自定义线程池(因为比较灵活),配置方式有:
- 使用 xml 文件配置的方式
- 使用java代码结合@configuration进行配置(推荐使用)
下面显示配置线程的代码实现
package com.deppon.ptos.load.config; import lombok.extern.slf4j.slf4j; import org.springframework.beans.factory.annotation.value; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.scheduling.annotation.enableasync; import org.springframework.scheduling.concurrent.threadpooltaskexecutor; import java.util.concurrent.executor; import java.util.concurrent.threadpoolexecutor; /** * @description: 异步线程管理 * @author: lyh * @createdate: 2019/6/27 8:54 * @version: 1.0 * @jdk: 1.8 */ @configuration @enableasync @slf4j public class executorconfig { @value("${async.executor.thread.core_pool_size}") private int corepoolsize; @value("${async.executor.thread.max_pool_size}") private int maxpoolsize; @value("${async.executor.thread.queue_capacity}") private int queuecapacity; @value("${async.executor.thread.name.prefix}") private string nameprefix; @bean(name = "asyncserviceexecutor") public executor asyncserviceexecutor() { log.info("start asyncserviceexecutor"); threadpooltaskexecutor executor = new visiablethreadpooltaskexecutor(); //配置核心线程数 executor.setcorepoolsize(corepoolsize); //配置最大线程数 executor.setmaxpoolsize(maxpoolsize); //配置队列大小 executor.setqueuecapacity(queuecapacity); //配置线程池中的线程的名称前缀 executor.setthreadnameprefix(nameprefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy()); //执行初始化 executor.initialize(); return executor; } }
package com.deppon.ptos.load.config; import lombok.extern.slf4j.slf4j; import org.springframework.scheduling.concurrent.threadpooltaskexecutor; import org.springframework.stereotype.component; import org.springframework.util.concurrent.listenablefuture; import java.util.concurrent.callable; import java.util.concurrent.future; import java.util.concurrent.threadpoolexecutor; /** * @description: 打印异步线程的执行情况 使用callbale future 来返回线程的信息 * @author: 633805 lyh * @createdate: 2019/6/27 8:59 * @version: 1.0 * @jdk: 1.8 */ @component @slf4j public class visiablethreadpooltaskexecutor extends threadpooltaskexecutor { private void showthreadpoolinfo(string prefix) { threadpoolexecutor threadpoolexecutor = getthreadpoolexecutor(); if (null == threadpoolexecutor) { return; } log.info("{}, {},taskcount [{}], completedtaskcount [{}], activecount [{}], queuesize [{}]", this.getthreadnameprefix(), prefix, threadpoolexecutor.gettaskcount(), threadpoolexecutor.getcompletedtaskcount(), threadpoolexecutor.getactivecount(), threadpoolexecutor.getqueue().size()); } @override public void execute(runnable task) { showthreadpoolinfo("1. do execute"); super.execute(task); } @override public void execute(runnable task, long starttimeout) { showthreadpoolinfo("2. do execute"); super.execute(task, starttimeout); } @override public future<?> submit(runnable task) { showthreadpoolinfo("1. do submit"); return super.submit(task); } @override public <t> future<t> submit(callable<t> task) { showthreadpoolinfo("2. do submit"); return super.submit(task); } @override public listenablefuture<?> submitlistenable(runnable task) { showthreadpoolinfo("1. do submitlistenable"); return super.submitlistenable(task); } @override public <t> listenablefuture<t> submitlistenable(callable<t> task) { showthreadpoolinfo("2. do submitlistenable"); return super.submitlistenable(task); } }
使用:
@async("asyncserviceexecutor")
到这一步,异步就算开启了。
下面主要说一说错误的
使用@async导致异步不成功的情况
如下方式会使@async失效
- 异步方法使用static修饰
- 异步类没有使用@component注解(或其他注解)导致spring无法扫描到异步类
- 异步方法不能与被调用的异步方法在同一个类中
- 类中需要使用@autowired或@resource等注解自动注入,不能自己手动new对象
- 如果使用springboot框架必须在启动类中增加@enableasync注解
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。
推荐阅读
-
小程序开发中如何使用async-await并封装公共异步请求的方法
-
SpringBoot使用AOP+注解实现简单的权限验证的方法
-
JS中async/await实现异步调用的方法
-
.NET Core控制台应用程序如何使用异步(Async)Main方法详解
-
Spring @async方法如何添加注解实现异步调用
-
springboot @Async 注解如何实现方法异步
-
SpringBoot @Validated注解实现参数分组校验的方法实例
-
Docker如何给Springboot项目动态传参的实现方法
-
在SpringBoot中,如何使用Netty实现远程调用方法总结
-
SpringBoot如何通过自定义注解实现权限检查详解