谈谈 Callable 任务是怎么运行的?它的执行结果又是怎么获取的?
谈谈 callable 任务是怎么运行的?它的执行结果又是怎么获取的?
向线程池提交callable任务,会创建一个新线程(执行任务的线程)去执行这个callable任务,但是通过future#get获取任务的执行结果是在提交任务的调用者线程中,那问题一:调用者线程如何获取执行任务的线程的结果?
在jdk中,有2种类型的任务,runnable和callable,但是具体到线程池执行任务的java.util.concurrent.threadpoolexecutor#execute(runnable)
方法,它只接收runnable任务,那问题二:callable任务是提交给线程池后是如何执行的呢?
callable 任务是怎么运行的?
import java.util.concurrent.*; public class futuretest { public static void main(string[] args) { callable<integer> callable = new callable<integer>() { @override public integer call() throws exception { //sleep 是为了调试方便 timeunit.seconds.sleep(4); return 3; } }; //创建一个 threadpoolexecutor 对象 executorservice executorservice = executors.newfixedthreadpool(1); future<integer> future = executorservice.submit(callable); try { integer i = future.get(); system.out.println(i); } catch (exception e) { system.out.println(e); } } }
future<integer> future = executorservice.submit(callable);
//java.util.concurrent.abstractexecutorservice#submit(java.util.concurrent.callable<t>) public <t> future<t> submit(callable<t> task) { if (task == null) throw new nullpointerexception(); //futuretask其实是个runnablefuture, runnablefuture其实是个runnable //重点是: runnable#run方法的执行,其实就是 futuretask#run方法的执行!!! runnablefuture<t> ftask = newtaskfor(task); //java.util.concurrent.threadpoolexecutor#execute execute(ftask); return ftask; }
runnablefuture<t> ftask = newtaskfor(task);
//java.util.concurrent.abstractexecutorservice#newtaskfor(java.util.concurrent.callable<t>) protected <t> runnablefuture<t> newtaskfor(callable<t> callable) { return new futuretask<t>(callable); }
当submit一个callable任务时,会生成一个runnablefuture接口对象,默认情况下 runnablefuture对象是一个futuretask对象。看java.util.concurrent.abstractexecutorservice
类的源码注释:我们也可以重写 newtaskfor 方法生成我们自己的 runnablefuture。一个具体的示例可参考es源码org.elasticsearch.common.util.concurrent.prioritizedesthreadpoolexecutor#newtaskfor(java.util.concurrent.callable<t>)
,它就重写了 newtaskfor 方法,实现了执行优先级任务时,获取任务执行结果的逻辑。
the implementation of submit(runnable) creates an associated runnablefuture that is executed and returned. subclasses may override the newtaskfor methods to return runnablefuture implementations other than futuretask
然后再来看futuretask这个类的run()方法:java.util.concurrent.futuretask#run
,它会触发执行我们定义的callable#call()方法。搞清楚java.util.concurrent.futuretask#run方法是怎么被调用的,就搞清楚了线程池执行callable任务的原理。该方法主要是做了2件事:
- 执行callable#call方法,即:futuretest.java中 我们定义的处理逻辑:返回一个integer 3
- 设置任务的执行结果:
set(result)
java.util.concurrent.abstractexecutorservice#submit(java.lang.runnable) 中execute(ftask)
提交任务(注意:futuretask implements runnable)
threadpoolexecutor是abstractexecutorservice具体实现类,因此最终会执行到:java.util.concurrent.threadpoolexecutor#execute提交任务。
//java.util.concurrent.threadpoolexecutor#execute, 重点看addworker()实现 if (workercountof(c) < corepoolsize) { if (addworker(command, true)) return; c = ctl.get(); }
java.util.concurrent.threadpoolexecutor#addworker 有2行代码很关键:
//java.util.concurrent.threadpoolexecutor#addworker try { w = new worker(firsttask);//关键代码1, firsttask 本质上是 futuretask对象 final thread t = w.thread; if (t != null) { //...省略非关键代码 if (workeradded) { t.start();//关键代码 2 workerstarted = true; } } }
w = new worker(firsttask)
创建一个新线程!把worker作为this对象传进去,因为worker implements runnable,并且实现了java.lang.runnable#run方法。
worker(runnable firsttask) { setstate(-1); // inhibit interrupts until runworker this.firsttask = firsttask;// this.thread = getthreadfactory().newthread(this); }
这意味着啥?执行java.lang.runnable#run 就会去真正地执行 java.util.concurrent.threadpoolexecutor.worker#run,那么java.lang.runnable#run是被谁调用的呢?
聪明的你一定知道了,new thread(runnable).start()
执行时,会由jvm去自动调用java.lang.runnable#run
所以,上面java.util.concurrent.threadpoolexecutor#addworker 中的关键代码2 t.start();
,触发了java.util.concurrent.threadpoolexecutor.worker#run的调用。
java.util.concurrent.threadpoolexecutor.worker#run
里面只是调用了runwoker(this)
而已。
//java.util.concurrent.threadpoolexecutor.worker#run /** delegates main run loop to outer runworker. */ public void run() { runworker(this); }
重点来了!再跟进去看看runwoker是何方神圣:
//java.util.concurrent.threadpoolexecutor#runworker final void runworker(worker w) { thread wt = thread.currentthread(); runnable task = w.firsttask;//task 实际上是futuretask类型的对象 w.firsttask = null; try { while (task != null || (task = gettask()) != null) { //省略一些 非关键代码.... try { beforeexecute(wt, task);// try { //重点代码!触发 java.util.concurrent.futuretask#run 执行 task.run(); afterexecute(task, null); } catch (throwable ex) { //去看看afterexecute方法注释,无论线程执行过程中是否抛异常,afterexecute()都会 执行,看了源码,明白为什么是这样了,因为catch异常处理里面会执行afterexecute afterexecute(task, ex); throw ex; } } finally { task = null; w.completedtasks++; w.unlock(); } }
看懂了java.util.concurrent.threadpoolexecutor#runworker
几乎就明白线程池执行任务时的beforeexecute、afterexecute方法的所起的作用了(比如经常在afterexecute方法里面做一些线程池任务运行时间的统计工作)。
总结以下点:
callable任务被submit时,会生成一个futuretask对象,封装callable,在futuretask的run方法里面执行callable#call方法,并且调用
java.util.concurrent.futuretask#set
设置callable任务的执行结果(结果保存在一个futuretask的object类型的实例变量里面:private object outcome;
)。future<integer> future = executorservice.submit(callable);
返回一个future,它实际上是一个futuretask对象,通过java.util.concurrent.futuretask#get()
获取callable任务的执行结果。-
java.util.concurrent.futuretask#run
方法是由java.util.concurrent.threadpoolexecutor#runworker
触发调用的;而java.util.concurrent.threadpoolexecutor#runworker
又是由java.util.concurrent.threadpoolexecutor.worker#run
触发调用的;而java.util.concurrent.threadpoolexecutor.worker#run
又是由java.util.concurrent.threadpoolexecutor#addworker
里面的t.start();
这条语句触发调用的;而t.start();
会触发runnable#run
方法的执行。这就是前面提到的这个原理:new thread(runnable).start()
会由jvm来调用runnable#run。具体可参考:用一个词表示就是多态。用一张图表示就是:
继承 threadpoolexecutor 实现自定义的线程池时,可重写 afterexecute()方法做一些异常处理逻辑的实现,不管任务正常执行完成、还是抛出异常,都会调用afterexecute(),具体可看jdk源码关于threadpoolexecutor#runworker方法的注释。有兴趣可研究下es search线程池源码就使用afterexecute来统计提交给线程池的每个任务的等待时间、执行时间,从而根据little's law 自动调整线程池任务队列的长度:
org.elasticsearch.common.util.concurrent.queueresizingesthreadpoolexecutor#afterexecute
最后,想说的是:callable任务,到threadpoolexecutor线程池执行 层面,它实际上是一个runnable任务在执行。因为,executorservice submit callable时,其实是将callable封装到futuretask/runnablefuture中,而runnablefuture implements runnable,因此可以提交给线程池的java.util.concurrent.threadpoolexecutor#execute(runnable command)
执行,这就回答了本文开头提出的第二个问题。
//java.util.concurrent.runnablefuture public interface runnablefuture<v> extends runnable, future<v> { /** * sets this future to the result of its computation * unless it has been cancelled. */ void run(); }
用一张图表示就是:
callable任务的设置与获取,则都是在futuretask这个层面上完成,把callable封装到futuretask中,而futuretask implements runnable,从而转化成threadpoolexecutor#execute执行runnable任务。
callable任务的执行结果又是怎么获取的?future.get为什么会阻塞?
java.util.concurrent.futuretask 的private volatile int state;
变量:
//java.util.concurrent.futuretask#run public void run() { if (state != new || !runner.compareandset(this, null, thread.currentthread())) return; try { callable<v> c = callable; if (c != null && state == new) { v result; boolean ran; try { //callable#call执行成功, ran=true result = c.call(); ran = true; } catch (throwable ex) { result = null; ran = false; setexception(ex); } //ran=true,才会设置callable任务的执行结果 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= interrupting) handlepossiblecancellationinterrupt(s); } }
set方法设置callable任务的执行结果时,会修改 futuretask的 state 实例变量的值!
//java.util.concurrent.futuretask#set protected void set(v v) { if (state.compareandset(this, new, completing)) { outcome = v; state.setrelease(this, normal); // final state finishcompletion(); } }
而java.util.concurrent.futuretask#get()
方法,也正是通过检查 state 的值,来确定是否能够拿到callable任务的执行结果。
//java.util.concurrent.futuretask#get() public v get() throws interruptedexception, executionexception { int s = state; if (s <= completing) //如果 state 不是在 normal 状态,futuretask#get()就会阻塞 //这就是 java.util.concurrent.future#get() 阻塞的原因 s = awaitdone(false, 0l);//这里面会调用:thread.yield()、locksupport.park(this) return report(s); }
java.util.concurrent.futuretask#awaitdone
//java.util.concurrent.futuretask#awaitdone private int awaitdone(boolean timed, long nanos) throws interruptedexception { waitnode q = null; //省略一些无关代码... for (;;) {//for循环一直检查任务的运行状态....直到可以"结束" int s = state; //state的值大于 completing 说明已经有callable任务的结果了 //java.util.concurrent.futuretask#set 设置了callable任务的结果,修改了state的值 if (s > completing) { if (q != null) q.thread = null; return s; } //completing 任务的运行状态是:正在执行中 else if (s == completing) // we may have already promised (via isdone) that we are done // so never return empty-handed or throw interruptedexception thread.yield();//挂起获取执行结果的线程(这就是futur#get阻塞的原因) else if (thread.interrupted()) { removewaiter(q);//任务可能被中断了,当然就不需要等待获取执行结果了 throw new interruptedexception(); } else if (q == null) { if (timed && nanos <= 0l) return s; q = new waitnode(); } else if (!queued) queued = waiters.weakcompareandset(this, q.next = waiters, q); //java.util.concurrent.future#get(long, java.util.concurrent.timeunit)超时阻塞的实现原理 else if (timed) { final long parknanos; if (starttime == 0l) { // first time starttime = system.nanotime(); if (starttime == 0l) starttime = 1l; parknanos = nanos; } else { long elapsed = system.nanotime() - starttime; if (elapsed >= nanos) { removewaiter(q); return state; } parknanos = nanos - elapsed; } // nanotime may be slow; recheck before parking if (state < completing) locksupport.parknanos(this, parknanos); } else locksupport.park(this); } }
总结一下:通过 state变量来判断callable任务的执行结果是否已经生成。如果已经生成了执行结果,那么 java.util.concurrent.futuretask#set
会把结果放到private object outcome;
outcome这个变量中。然后设置state的值为normal,那么java.util.concurrent.futuretask#get()
通过检查 state 的值,就能拿到执行结果了,当然了,如果执行结果还未生成,java.util.concurrent.futuretask#awaitdone
就会导致 get 阻塞。
最后的最后,留一个问题:由于jdk里面future#get都是阻塞的,那有没有什么方法使得获取 callable 任务的执行结果不阻塞?
看看netty的源码?借鉴一下listener回调机制。哈哈……