Spring boot多线程异步查询
程序员文章站
2022-05-01 14:18:54
...
Spring boot 多线程异步查询步骤
1、创建线程池
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class ThreadPoolConfig {
/** 核心线程数(默认线程数) */
private static final int corePoolSize = 5;
/** 最大线程数 */
private static final int maxPoolSize = 8;
/** 允许线程空闲时间(单位:默认为秒) */
private static final int keepAliveTime = 60;
/** 缓冲队列大小 */
private static final int queueCapacity = 100;
/** 线程池名前缀 */
private static final String threadNamePrefix = "DirectSearchEsService-";
@Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
public ThreadPoolTaskExecutor taskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveTime);
executor.setThreadNamePrefix(threadNamePrefix);
// 线程池对拒绝任务的处理策略
// CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
return executor;
}
}
2、SpringBoot启动类开启异步
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@EnableSwagger2Doc
@EnableAsync
public class BootstrapApplication {
public static void main(String[] args) {
SpringApplication.run(BootstrapApplication.class, args);
}
}
3、编写异步方法
@Service
public class DirectSearchEsService {
//线程池bean名称,可不填。
@Async("taskExecutor")
public Future<JsonObject> queryApiDirectIndex(String sysCode, String modelCode){
JsonObject result= new JsonObject();
//查询等业务逻辑,返回值必须为 Future类型
return new AsyncResult<JsonObject>(result);
}
}
4、调用异步方法
//按天查询数据方法
private JsonObject queryApiDirect(String sysCode, String modelCode) {
//只保留了有用代码,直接复制会报错
BlockingQueue<Future<JsonObject>> queue = new LinkedBlockingQueue();
Future<JsonObject> future;
//循环调用查询 queryApiDirectIndex
for (int i = 0; i <= between; i++) {
if(i==0) {//第一天
//本方法和异步方法尽量不要放在同一个类,否则有不生效问题,需要特殊处理。
future = directSearchEsService.queryApiDirectIndex(sysCode,modelCodeTime);
}else if(i == between) {//最后一天
future = directSearchEsService.queryApiDirectIndex(sysCode,modelCodeTime);
}else {//中间
future = directSearchEsService.queryApiDirectIndex(sysCode,modelCodeTime);
}
queue.add(future);
}
int queueSize = queue.size();
JsonArray dataArray = new JsonArray();
LOG.info("queue size:" + queueSize);
//拼装结果
for (int i = 0; i < queueSize; i++) {
JsonObject data = queue.take().get();
JsonArray queryArray = data.getAsJsonArray("data");
dataArray.addAll(data.getAsJsonArray("data"));
total+=queryArray.size();
}
result.add("data", dataArray);
result.addProperty("total", total);
return result;
}
上一篇: 线程池
下一篇: 线程池(Executors)