java 多线程分页读取放入队列,多线程写入EasyExcel
程序员文章站
2022-05-02 13:12:37
...
@Test
public void test() throws InterruptedException, ExecutionException {
String resStr = "{\"languageCode\":\"en-US\",\"categoryType\":\"\",\"categoryLevel\":\"\"}";
GoodsPriceSellListDTO condition = JSON.parseObject(resStr, GoodsPriceSellListDTO.class);
ExecutorService executorService = Executors.newFixedThreadPool(5);
int pageSize = 20;
AtomicInteger pageNumber = new AtomicInteger(1);
BlockingQueue<List<GoodsPriceSellListDTO>> queue = new ArrayBlockingQueue<>(1000);
List<GoodsPriceSellListDTO> goodsPriceSellListDTOList = null;
while (true) {
if (null != goodsPriceSellListDTOList) {
goodsPriceSellListDTOList.clear();
}
System.out.println("遍历查询:" + pageNumber.get());
// 线程返回值
Future<Result<PagerResult<GoodsPriceSellListDTO>>> query = executorService.submit(() -> {
Pager<GoodsPriceSellListDTO> pager = new Pager<>();
pager.setPageNumber(pageNumber.get());
pager.setPageSize(pageSize);
pager.setCondition(condition);
System.out.println("查询线程》销售价目导出while查询入参:" + JSON.toJSONString(pager));
Result<PagerResult<GoodsPriceSellListDTO>> pageResult = goodsPriceService.pageSellResult(pager);
return pageResult;
});
Result<PagerResult<GoodsPriceSellListDTO>> pageResult = query.get();
try {
if (null != pageResult && null != pageResult.getData() && !JtmmCollectionUtil.isEmpty(pageResult.getData().getData())) {
goodsPriceSellListDTOList = pageResult.getData().getData();
}
if (!CollectionUtils.isEmpty(goodsPriceSellListDTOList)) {
System.out.println("查询线程》查询结果不为空:while查询出参" + goodsPriceSellListDTOList.size());
queue.put(goodsPriceSellListDTOList);
} else {
queue.put(Collections.EMPTY_LIST);
System.out.println("遍历查询-查询结果为空:" + pageNumber.get());
}
if (CollectionUtils.isEmpty(goodsPriceSellListDTOList) || (null != pageResult && null != pageResult.getData() && pageResult.getData().getTotal() < pageSize)) {
System.out.println("遍历查询-退出:" + pageNumber.get());
break;
}
pageNumber.getAndAdd(1);
Thread.sleep(5);
} catch (Exception e) {
System.out.println("销售价目导出while查询异常 {}");
}
}
Future<?> submit = executorService.submit(() -> {
int count = 0;
while (true) {
List<GoodsPriceSellListDTO> list = null;
try {
list = queue.take();
System.out.println("第" + count + "次消费:" + list.size());
} catch (InterruptedException e) {
Thread.interrupted();
System.out.println("异常退出消费"+count);
break;
}
count++;
}
});
// 关闭线程池
try {
// 向学生传达“问题解答完毕后请举手示意!”
executorService.shutdown();
final long awaitTime = 5 * 1000;
// 向学生传达“XX分之内解答不完的问题全部带回去作为课后作业!”后老师等待学生答题
// (所有的任务都结束的时候,返回TRUE)
if(!executorService.awaitTermination(awaitTime, TimeUnit.MILLISECONDS)){
// 超时的时候向线程池中所有的线程发出中断(interrupted)。
executorService.shutdownNow();
}
} catch (InterruptedException e) {
// awaitTermination方法被中断的时候也中止线程池中全部的线程的执行。
executorService.shutdownNow();
}
}
上一篇: 多线程-共享全局变量问题
下一篇: java多线程成员变量共享问题