血的教训--如何正确使用线程池submit和execute方法
血的教训之背景:使用线程池对存量数据进行迁移,但是总有一批数据迁移失败,无异常日志打印
凶案起因
听说parallelstream
并行流是个好东西,由于日常开发stream
串行流的场景比较多,这次需要写迁移程序刚好可以用得上,那还不赶紧拿来装*一下,此时不装更待何时。机智的我还知道在 jvm 的后台,使用通用的 fork/join 池来完成上述功能,该池是所有并行流共享的,默认情况,fork/join 池会为每个处理器分配一个线程,对应的变通方案就是创建自己的线程池如
forkjoinpool pool = new forkjoinpool(runtime.getruntime().availableprocessors()); pool.submit(() -> { list.parallelstream().collect(collectors.tolist()); });
于是地雷就是从这里埋下的。
submit还是execute
public static void main(string[] args) throws interruptedexception, executionexception { final executorservice pool = new forkjoinpool(runtime.getruntime().availableprocessors()); list<integer> list = lists.newarraylist(1, 2, 3, null); //1.使用submit pool.submit(() -> { list.parallelstream().map(a -> a.tostring()).collect(collectors.tolist()); }); timeunit.seconds.sleep(3); //2.使用 execute pool.execute(() -> { list.parallelstream().map(a -> a.tostring()).collect(collectors.tolist()); }); //3.使用submit,调用get() pool.submit(() -> { list.parallelstream().map(a -> a.tostring()).collect(collectors.tolist()); }).get(); timeunit.seconds.sleep(3); }
读者自行跑一下上面的用例,会发现单独使用submit
方法的并不会打印出错误日志,而使用execute
方法打印出了错误日志,但是对submit
返回的futurejointask
调用get()
方法,又会抛出异常。于是真相大白,部分批次中的数据存在脏数据,为null值,遍历到该null值的时候出现了异常,但是异常日志在submit
方法中给catch住,没有打印出来(心痛的感觉),而被捕获的异常,被包装在返回的结果类futurejointask
中,并没有再次抛出。
如果不需要异步返回结果,请不要用submit
方法
结论先行,我犯的错误就是,浅显的认为submit
和execute
的区别就只是一个有返回异步结果,一个没有返回一步结果,但是事实是残酷的。在submit()
中逻辑一定包含了将异步任务抛出的异常捕获,而因为使用方法不当而导致该异常没有再次抛出。
现在提出一个问题,forkjoinpool#submit()
中返回的forkjointask
可以获取异步任务的结果,现这个异步抛出了异常,我们尝试获取该任务的结果会是如何? 我们直接看forkjointask#get()
的源码。
public final v get() throws interruptedexception, executionexception { int s = (thread.currentthread() instanceof forkjoinworkerthread) ? dojoin() : externalinterruptibleawaitdone(); throwable ex; if ((s &= done_mask) == cancelled) throw new cancellationexception(); //这里可以直接看到,异步任务出现异常会在调用get()获取结果的时候,会被包装成executionexception再次抛出 if (s == exceptional && (ex = getthrowableexception()) != null) throw new executionexception(ex); return getrawresult(); }
异步任务出现异常会在调用get()获取结果的时候,会被包装成executionexception
再次抛出,但是异常是在哪里被捕获的呢?万变不离其宗,所有线程的线程都需要重写thread#run()
方法, 投递到forkjoinpool
的线程会被包装成forkjoinworkerthread
,因此我们看一下forkjoinworkerthread#run()
的实现.
public void run() { if (workqueue.array == null) { // only run once throwable exception = null; try { onstart(); pool.runworker(workqueue); } catch (throwable ex) { //出现异常,捕获,再次抛出会在调用forkjointask#get()的时候 exception = ex; } finally { try { ontermination(exception); } catch (throwable ex) { if (exception == null) exception = ex; } finally { pool.deregisterworker(this, exception); } } } }
上面的分析是基于forkjoinpool
的,是不是所有的线程池的submit
和execute
方法的实现都是类似这样,我们常用的线程池threadpoolthread
实现会是怎样的,同样的思路,我们需要找到投递到threadpoolthread
的异步任务最终被包装为哪个thread
的子类或者是实现java.lang.runnable#run
,答案就是java.util.concurrent.futuretask
public void run() { ... try { callable<v> c = callable; if (c != null && state == new) { v result; boolean ran; try { result = c.call(); ran = true; } catch (throwable ex) { //捕获异常 result = null; ran = false; setexception(ex); } if (ran) set(result); } } .... }
总结
java.util.concurrent.executorservice#submit(java.lang.runnable)
为何线程池会有这种设定,实际上我们的思路不应该局限于线程池,而是放在获取异步任务结果,异常是否也是属于异步结果,futuretask
作为jdk提供的并发工具类的实现中,已经给出了很好的答案,即获取异步任务结果,异常也是属于异步结果,如果异步任务出现运行时异常,那么在获取该任务的结果时,该异常会被重新包装抛出
作者:plz叫我红领巾
出处:
本博客欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。码子不易,您的点赞是我习作最大的动力
上一篇: 聪明的泰迪
下一篇: 【第三篇】Python流程控制和运算符