Java多线程之异步Future机制的原理和实现
项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码:
import java.util.concurrent.callable; import java.util.concurrent.executionexception; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.future; public class addtask implements callable<integer> { private int a,b; public addtask(int a, int b) { this.a = a; this.b = b; } @override public integer call throws exception { integer result = a + b; return result; } public static void main(string[] args) throws interruptedexception, executionexception { executorservice executor = executors.newsinglethreadexecutor; //jdk目前为止返回的都是futuretask的实例 future<integer> future = executor.submit(new addtask(1, 2)); integer result = future.get;// 只有当future的状态是已完成时(future.isdone = true),get方法才会返回 } }
虽然可以实现获取异步执行结果的需求,但是我们发现这个future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isdone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.future 的接口方法:
public interface future<v> { boolean cancel(boolean mayinterruptifrunning); boolean iscancelled; boolean isdone; v get throws interruptedexception, executionexception; v get(long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception; }
由此可见jdk的future机制其实并不好用,如果能给这个future加个监听器,让它在完成时通知监听器的话就比较好用了,就像下面这个ifuture:
package future; import java.util.concurrent.cancellationexception; import java.util.concurrent.future; import java.util.concurrent.timeunit; /** * the result of an asynchronous operation. * * @author lixiaohui * @param <v> 执行结果的类型参数 */ public interface ifuture<v> extends future<v> { boolean issuccess; // 是否成功 v getnow; //立即返回结果(不管future是否处于完成状态) throwable cause; //若执行失败时的原因 boolean iscancellable; //是否可以取消 ifuture<v> await throws interruptedexception; //等待future的完成 boolean await(long timeoutmillis) throws interruptedexception; // 超时等待future的完成 boolean await(long timeout, timeunit timeunit) throws interruptedexception; ifuture<v> awaituninterruptibly; //等待future的完成,不响应中断 boolean awaituninterruptibly(long timeoutmillis);//超时等待future的完成,不响应中断 boolean awaituninterruptibly(long timeout, timeunit timeunit); ifuture<v> addlistener(ifuturelistener<v> l); //当future完成时,会通知这些加进来的监听器 ifuture<v> removelistener(ifuturelistener<v> l); }
接下来就一起来实现这个ifuture,在这之前要说明下object.wait,object.notifyall方法,因为整个future实现的原���的核心就是这两个方法.看看jdk里面的解释:
public class object { /** * causes the current thread to wait until another thread invokes the * {@link java.lang.object#notify} method or the * {@link java.lang.object#notifyall} method for this object. * in other words, this method behaves exactly as if it simply * performs the call {@code wait(0)}. * 调用该方法后,当前线程会释放对象监视器锁,并让出cpu使用权。直到别的线程调用notify/notifyall */ public final void wait throws interruptedexception { wait(0); } /** * wakes up all threads that are waiting on this object's monitor. a * thread waits on an object's monitor by calling one of the * {@code wait} methods. * <p> * the awakened threads will not be able to proceed until the current * thread relinquishes the lock on this object. the awakened threads * will compete in the usual manner with any other threads that might * be actively competing to synchronize on this object; for example, * the awakened threads enjoy no reliable privilege or disadvantage in * being the next thread to lock this object. */ public final native void notifyall; }
知道这个后,我们要自己实现future也就有了思路,当线程调用了ifuture.await等一系列的方法时,如果future还未完成,那么就调用future.wait 方法使线程进入waiting状态。而当别的线程设置future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyall方法来唤醒之前因为调用过wait方法而处于waiting状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的future机制的。有兴趣的可以去看看netty的源码):
package future; import java.util.collection; import java.util.concurrent.cancellationexception; import java.util.concurrent.copyonwritearraylist; import java.util.concurrent.executionexception; import java.util.concurrent.timeunit; import java.util.concurrent.timeoutexception; /** * <pre> * 正常结束时, 若执行的结果不为null, 则result为执行结果; 若执行结果为null, 则result = {@link abstractfuture#success_signal} * 异常结束时, result为 {@link causeholder} 的实例;若是被取消而导致的异常结束, 则result为 {@link cancellationexception} 的实例, 否则为其它异常的实例 * 以下情况会使异步操作由未完成状态转至已完成状态, 也就是在以下情况发生时调用notifyall方法: * <ul> * <li>异步操作被取消时(cancel方法)</li> * <li>异步操作正常结束时(setsuccess方法)</li> * <li>异步操作异常结束时(setfailure方法)</li> * </ul> * </pre> * * @author lixiaohui * * @param <v> * 异步执行结果的类型 */ public class abstractfuture<v> implements ifuture<v> { protected volatile object result; // 需要保证其可见性 /** * 监听器集 */ protected collection<ifuturelistener<v>> listeners = new copyonwritearraylist<ifuturelistener<v>>; /** * 当任务正常执行结果为null时, 即客户端调用{@link abstractfuture#setsuccess(null)}时, * result引用该对象 */ private static final successsignal success_signal = new successsignal; @override public boolean cancel(boolean mayinterruptifrunning) { if (isdone) { // 已完成了不能取消 return false; } synchronized (this) { if (isdone) { // double check return false; } result = new causeholder(new cancellationexception); notifyall; // isdone = true, 通知等待在该对象的wait的线程 } notifylisteners; // 通知监听器该异步操作已完成 return true; } @override public boolean iscancellable { return result == null; } @override public boolean iscancelled { return result != null && result instanceof causeholder && ((causeholder) result).cause instanceof cancellationexception; } @override public boolean isdone { return result != null; } @override public v get throws interruptedexception, executionexception { await; // 等待执行结果 throwable cause = cause; if (cause == null) { // 没有发生异常,异步操作正常结束 return getnow; } if (cause instanceof cancellationexception) { // 异步操作被取消了 throw (cancellationexception) cause; } throw new executionexception(cause); // 其他异常 } @override public v get(long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception { if (await(timeout, unit)) {// 超时等待执行结果 throwable cause = cause; if (cause == null) {// 没有发生异常,异步操作正常结束 return getnow; } if (cause instanceof cancellationexception) {// 异步操作被取消了 throw (cancellationexception) cause; } throw new executionexception(cause);// 其他异常 } // 时间到了异步操作还没有结束, 抛出超时异常 throw new timeoutexception; } @override public boolean issuccess { return result == null ? false : !(result instanceof causeholder); } @suppresswarnings("unchecked") @override public v getnow { return (v) (result == success_signal ? null : result); } @override public throwable cause { if (result != null && result instanceof causeholder) { return ((causeholder) result).cause; } return null; } @override public ifuture<v> addlistener(ifuturelistener<v> listener) { if (listener == null) { throw new nullpointerexception("listener"); } if (isdone) { // 若已完成直接通知该监听器 notifylistener(listener); return this; } synchronized (this) { if (!isdone) { listeners.add(listener); return this; } } notifylistener(listener); return this; } @override public ifuture<v> removelistener(ifuturelistener<v> listener) { if (listener == null) { throw new nullpointerexception("listener"); } if (!isdone) { listeners.remove(listener); } return this; } @override public ifuture<v> await throws interruptedexception { return await0(true); } private ifuture<v> await0(boolean interruptable) throws interruptedexception { if (!isdone) { // 若已完成就直接返回了 // 若允许终端且被中断了则抛出中断异常 if (interruptable && thread.interrupted) { throw new interruptedexception("thread " + thread.currentthread.getname + " has been interrupted."); } boolean interrupted = false; synchronized (this) { while (!isdone) { try { wait; // 释放锁进入waiting状态,等待其它线程调用本对象的notify/notifyall方法 } catch (interruptedexception e) { if (interruptable) { throw e; } else { interrupted = true; } } } } if (interrupted) { // 为什么这里要设中断标志位?因为从wait方法返回后, 中断标志是被clear了的, // 这里重新设置以便让其它代码知道这里被中断了。 thread.currentthread.interrupt; } } return this; } @override public boolean await(long timeoutmillis) throws interruptedexception { return await0(timeunit.milliseconds.tonanos(timeoutmillis), true); } @override public boolean await(long timeout, timeunit unit) throws interruptedexception { return await0(unit.tonanos(timeout), true); } private boolean await0(long timeoutnanos, boolean interruptable) throws interruptedexception { if (isdone) { return true; } if (timeoutnanos <= 0) { return isdone; } if (interruptable && thread.interrupted) { throw new interruptedexception(tostring); } long starttime = timeoutnanos <= 0 ? 0 : system.nanotime; long waittime = timeoutnanos; boolean interrupted = false; try { synchronized (this) { if (isdone) { return true; } if (waittime <= 0) { return isdone; } for (;;) { try { wait(waittime / 1000000, (int) (waittime % 1000000)); } catch (interruptedexception e) { if (interruptable) { throw e; } else { interrupted = true; } } if (isdone) { return true; } else { waittime = timeoutnanos - (system.nanotime - starttime); if (waittime <= 0) { return isdone; } } } } } finally { if (interrupted) { thread.currentthread.interrupt; } } } @override public ifuture<v> awaituninterruptibly { try { return await0(false); } catch (interruptedexception e) { // 这里若抛异常了就无法处理了 throw new java.lang.internalerror; } } @override public boolean awaituninterruptibly(long timeoutmillis) { try { return await0(timeunit.milliseconds.tonanos(timeoutmillis), false); } catch (interruptedexception e) { throw new java.lang.internalerror; } } @override public boolean awaituninterruptibly(long timeout, timeunit unit) { try { return await0(unit.tonanos(timeout), false); } catch (interruptedexception e) { throw new java.lang.internalerror; } } protected ifuture<v> setfailure(throwable cause) { if (setfailure0(cause)) { notifylisteners; return this; } throw new illegalstateexception("complete already: " + this); } private boolean setfailure0(throwable cause) { if (isdone) { return false; } synchronized (this) { if (isdone) { return false; } result = new causeholder(cause); notifyall; } return true; } protected ifuture<v> setsuccess(object result) { if (setsuccess0(result)) { // 设置成功后通知监听器 notifylisteners; return this; } throw new illegalstateexception("complete already: " + this); } private boolean setsuccess0(object result) { if (isdone) { return false; } synchronized (this) { if (isdone) { return false; } if (result == null) { // 异步操作正常执行完毕的结果是null this.result = success_signal; } else { this.result = result; } notifyall; } return true; } private void notifylisteners { for (ifuturelistener<v> l : listeners) { notifylistener(l); } } private void notifylistener(ifuturelistener<v> l) { try { l.operationcompleted(this); } catch (exception e) { e.printstacktrace; } } private static class successsignal { } private static final class causeholder { final throwable cause; causeholder(throwable cause) { this.cause = cause; } } }
那么要怎么使用这个呢,有了上面的骨架实现,我们就可以定制各种各样的异步结果了。下面模拟一下一个延时的任务:
package future.test; import future.ifuture; import future.ifuturelistener; /** * 延时加法 * @author lixiaohui * */ public class delayadder { public static void main(string[] args) { new delayadder.add(3 * 1000, 1, 2).addlistener(new ifuturelistener<integer> { @override public void operationcompleted(ifuture<integer> future) throws exception { system.out.println(future.getnow); } }); } /** * 延迟加 * @param delay 延时时长 milliseconds * @param a 加数 * @param b 加数 * @return 异步结果 */ public delayadditionfuture add(long delay, int a, int b) { delayadditionfuture future = new delayadditionfuture; new thread(new delayadditiontask(delay, a, b, future)).start; return future; } private class delayadditiontask implements runnable { private long delay; private int a, b; private delayadditionfuture future; public delayadditiontask(long delay, int a, int b, delayadditionfuture future) { super; this.delay = delay; this.a = a; this.b = b; this.future = future; } @override public void run { try { thread.sleep(delay); integer i = a + b; // todo 这里设置future为完成状态(正常执行完毕) future.setsuccess(i); } catch (interruptedexception e) { // todo 这里设置future为完成状态(异常执行完毕) future.setfailure(e.getcause); } } } } package future.test; import future.abstractfuture; import future.ifuture; //只是把两个方法对外暴露 public class delayadditionfuture extends abstractfuture<integer> { @override public ifuture<integer> setsuccess(object result) { return super.setsuccess(result); } @override public ifuture<integer> setfailure(throwable cause) { return super.setfailure(cause); } }
可以看到客户端不用主动去询问future是否完成,而是future完成时自动回调operationcompleted方法,客户端只需在回调里实现逻辑即可。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。