欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

springboot @Async 注解如何实现方法异步

程序员文章站 2022-03-10 13:19:36
目录@async注解如何实现方法异步一、springboot的app类需要的注解二、service层的注解三、调用层异步注解@async的使用以及注意事项第一步开启异步下面显示配置线程的代码实现使用@...

@async注解如何实现方法异步

处理大批量数据的时候,效率很慢。所以考虑一下使用多线程。

刚开始自己手写的一套,用了线程池启动固定的线程数进行跑批。但是后来老大考虑到自己手写的风险不好控制,所以使用spring的方法。

这里没有详细介绍,只有简单的demo,只会用,不懂原理:

一、springboot的app类需要的注解

package com.xxx.xxx.xxx;
import java.util.concurrent.threadpoolexecutor;
import org.springframework.boot.web.support.springbootservletinitializer;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.core.task.taskexecutor;
import org.springframework.scheduling.annotation.enableasync;
import org.springframework.scheduling.concurrent.threadpooltaskexecutor;
/**
 * 类功能说明:服务生产者启动类
 * <p>
 * <strong></strong>
 * </p>
 *
 * @version
 * @author
 * @since 1.8
 */
@configuration
@enableasync
public class application extends springbootservletinitializer { 
    @bean
    public taskexecutor taskexecutor() {
        threadpooltaskexecutor executor = new threadpooltaskexecutor();
        // 设置核心线程数
        executor.setcorepoolsize(5);
        // 设置最大线程数
        executor.setmaxpoolsize(60);
        // 设置队列容量
        executor.setqueuecapacity(20);
        // 设置线程活跃时间(秒)
        executor.setkeepaliveseconds(60);
        // 设置默认线程名称
        executor.setthreadnameprefix("what-");
        // 设置拒绝策略
        executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setwaitfortaskstocompleteonshutdown(true);        
        return executor;
    }    
}

springboot的app类,很简单,就能使用很多东西。

二、service层的注解

package com.xxx.xxx.service.impl; 
import java.util.concurrent.future; 
import org.springframework.scheduling.annotation.async;
import org.springframework.scheduling.annotation.asyncresult;
import org.springframework.stereotype.service; 
import com.xxx.xxx.service.xxxasyncservice ;
 
@service
public class xxxasyncserviceimpl implements xxxasyncservice { 
    @async
    public future<long> rtn1() throws exception {
        //do something
        //有返回值的时候,可以返回string,long之类的。
        return new asyncresult<>(1);
    } 
    @async
    public void rtn2() throws exception {
        //do something
        //这个可以没有返回值.
    }
}

三、调用层

package com.xxx.xxx.controller; 
import java.util.concurrent.future; 
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.requestmethod;
import org.springframework.web.bind.annotation.responsebody;
import org.springframework.web.bind.annotation.restcontroller; 
import com.xxx.xxx.service.xxxasyncservice;
 
@restcontroller
@requestmapping(value="/xxx") 
public class xxxasynccontroller { 
 @autowired
 private xxxasyncservice xxxasyncservice; 
 /**
  * 这里调用异步方法
  */
    @requestmapping(value = "/xxx")
 public void dodo() throws exception {     
  int threads = 10;//十个线程
        list<future<long>> list = new arraylist<>();
        for(int i = 0;i < threads; i++){
         //这里循环调用异步方法。
      //如果存在大量数据,可以在这里把数据切片,然后循环调用,分批处理数据。效率杠杠的。
   list .add(xxxasyncservice.rtn1());
        }
        long count = 0;
        for(future<long> l : tsfcountlist) {
         //异步调用需要返回值的时候,这里可以把返回值都放入到list集合中,然后可以统一处理。 这里的get()是阻塞的,因为需要所以异步方法返回,在继续执行。
         count += l.get();
        }
        system.out.println("调用次数:" + count);
 }
}

这些代码全是手写,记录下来,以后用的时候,省的忘了,查起来麻烦。。

异步注解@async的使用以及注意事项

第一步开启异步

@configuration
@enableasync
public class springasyncconfig { ... }

默认情况下,@enableasync检测spring的@async注释和ejb 3.1 javax. ejb .异步;此选项还可用于检测其他用户定义的注释类型。(也可以在springboot的启动类上直接加@enableasync注解)

在 spring 中,用 @async 注解指定的方法,该方法被调用时会以异步的方式执行。而如果没有在 @async 注解中指定线程池,就会使用默认的线程池。默认的线程池为 simpleasynctaskexecutor 。

该线程池不会复用线程,每有一个新任务被提交,该线程池就会创建一个新的线程实例用于执行任务。下面为相关的代码:

protected void doexecute(runnable task) {
    thread thread = (this.threadfactory != null ? this.threadfactory.newthread(task) : createthread(task));
    thread.start();
}

而如果想要指定线程池,可以通过在 @async 注解中的 value 参数中指定所要使用的线程池的 bean name 。另一种方法是是一个实现了 asyncconfigurer 接口或是继承其默认适配器类 asyncconfigurersupport 的配置类,这样 @async 注解的方法就会使用指定的自定义的线程池。

使用@async注解的话采用的是springboot默认的线程池,不过一般我们会自定义线程池(因为比较灵活),配置方式有:

  • 使用 xml 文件配置的方式
  • 使用java代码结合@configuration进行配置(推荐使用)

下面显示配置线程的代码实现

package com.deppon.ptos.load.config; 
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.scheduling.annotation.enableasync;
import org.springframework.scheduling.concurrent.threadpooltaskexecutor; 
import java.util.concurrent.executor;
import java.util.concurrent.threadpoolexecutor;
 
/**
 * @description: 异步线程管理
 * @author:   lyh
 * @createdate: 2019/6/27 8:54
 * @version: 1.0
 * @jdk: 1.8
 */
@configuration
@enableasync
@slf4j
public class executorconfig {  
    @value("${async.executor.thread.core_pool_size}")
    private int corepoolsize;
    @value("${async.executor.thread.max_pool_size}")
    private int maxpoolsize;
    @value("${async.executor.thread.queue_capacity}")
    private int queuecapacity;
    @value("${async.executor.thread.name.prefix}")
    private string nameprefix;
 
    @bean(name = "asyncserviceexecutor")
    public executor asyncserviceexecutor() {
        log.info("start asyncserviceexecutor");
        threadpooltaskexecutor executor = new visiablethreadpooltaskexecutor();
        //配置核心线程数
        executor.setcorepoolsize(corepoolsize);
        //配置最大线程数
        executor.setmaxpoolsize(maxpoolsize);
        //配置队列大小
        executor.setqueuecapacity(queuecapacity);
        //配置线程池中的线程的名称前缀
        executor.setthreadnameprefix(nameprefix);
 
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // caller_runs:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}
package com.deppon.ptos.load.config;  
import lombok.extern.slf4j.slf4j;
import org.springframework.scheduling.concurrent.threadpooltaskexecutor;
import org.springframework.stereotype.component;
import org.springframework.util.concurrent.listenablefuture; 
import java.util.concurrent.callable;
import java.util.concurrent.future;
import java.util.concurrent.threadpoolexecutor;
 
/**
 * @description: 打印异步线程的执行情况   使用callbale future 来返回线程的信息
 * @author: 633805  lyh
 * @createdate: 2019/6/27 8:59
 * @version: 1.0
 * @jdk: 1.8
 */
@component
@slf4j
public class visiablethreadpooltaskexecutor extends threadpooltaskexecutor {  
    private void showthreadpoolinfo(string prefix) {
        threadpoolexecutor threadpoolexecutor = getthreadpoolexecutor(); 
        if (null == threadpoolexecutor) {
            return;
        }
 
        log.info("{}, {},taskcount [{}], completedtaskcount [{}], activecount [{}], queuesize [{}]",
                this.getthreadnameprefix(),
                prefix,
                threadpoolexecutor.gettaskcount(),
                threadpoolexecutor.getcompletedtaskcount(),
                threadpoolexecutor.getactivecount(),
                threadpoolexecutor.getqueue().size());
    }
 
    @override
    public void execute(runnable task) {
        showthreadpoolinfo("1. do execute");
        super.execute(task);
    }
 
    @override
    public void execute(runnable task, long starttimeout) {
        showthreadpoolinfo("2. do execute");
        super.execute(task, starttimeout);
    }
 
    @override
    public future<?> submit(runnable task) {
        showthreadpoolinfo("1. do submit");
        return super.submit(task);
    }
 
    @override
    public <t> future<t> submit(callable<t> task) {
        showthreadpoolinfo("2. do submit");
        return super.submit(task);
    }
 
    @override
    public listenablefuture<?> submitlistenable(runnable task) {
        showthreadpoolinfo("1. do submitlistenable");
        return super.submitlistenable(task);
    }
 
    @override
    public <t> listenablefuture<t> submitlistenable(callable<t> task) {
        showthreadpoolinfo("2. do submitlistenable");
        return super.submitlistenable(task);
    }
}

使用:

@async("asyncserviceexecutor")

到这一步,异步就算开启了。

下面主要说一说错误的

使用@async导致异步不成功的情况

如下方式会使@async失效

  • 异步方法使用static修饰
  • 异步类没有使用@component注解(或其他注解)导致spring无法扫描到异步类
  • 异步方法不能与被调用的异步方法在同一个类中
  • 类中需要使用@autowired或@resource等注解自动注入,不能自己手动new对象
  • 如果使用springboot框架必须在启动类中增加@enableasync注解

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。