netty源码解析(4.0)-29 Future模式的实现
future模式是一个重要的异步并发模式,在jdk有实现。但jdk实现的future模式功能比较简单,使用起来比较复杂。netty在jdk future基础上,加强了future的能力,具体体现在:
- 更加简单的结果返回方式。在jdk中,需要用户自己实现future对象的执行及返回结果。而在netty中可以使用promise简单地调用方法返回结果。
- 更加灵活的结果处理方式。jdk中只提供了主动得到结果的get方法,要么阻塞,要么轮询。netty除了支持主动get方法外,还可以使用listener被动监听结果。
- 实现了进度监控。netty提供了progressivefuture、progressivepromise和genericprogressivefuturelistener接口及其实现,支持对执行进程的监控。
吹了那么多牛,有一个关键问题还没弄清楚:future到底是干嘛的?io.netty.util.concurrent.future代码的第一行注释简洁第回答了这个问题:future就是异步操作的结果。这里面有三个关键字:异步,操作,结果。首先,future首先是一个“结果”;其次这个结果产生于一个“操作”,操作具体是什么可以随便定义;最后这个操作是"异步"执行的,这就意味着“操作”可能在另一个线程中并发执行,也可能随后在同一个线程中执行,什么时候产生结果是一件不确定的事。
异步调用过程的一般过程是:调用方唤起一个异步操作,在接下来的某个恰当的时间点得到的异步操作操作的结果。要正确地完成上述步骤,需要解决以下几个问题:
- 怎样维护这个调用状态?
- 如何获取异步操作的结果?
- 何时处理结果?
io.netty.util.concurrent.defaultpromise是future的默认实现,以上三个问题的答案都能在这个类的代码中找到。
defaultpromise的派生体系
下面是defaultpromis及其父类,接口的声明:
public class defaultpromise<v> extends abstractfuture<v> implements promise<v> public abstract class abstractfuture<v> implements future<v> public interface promise<v> extends future<v> public interface future<v> extends java.util.concurrent.future<v>
可以看出,defaultpromise派生自abstractfuture类,并实现了promise接口。抽象类型abstractfuture派生自future, 接口promise派生自future。future派生自jdk的future接口。
和jdk的future相比,netty的future接口增加一些自己的方法:
/** 当操作成功时返回true*/ boolean issuccess(); /**
只有当操作可以被取消时返回true
*/ boolean iscancellable(); /** 返回操作的异常*/ throwable cause(); /** 添加一个监听器到future。当操作完成(成功或失败都算完成,此事isdone()返回true)时, 会通知这个监听器。如果添加时操作已经完成,
这个监听器会立即被通知。*/ future<v> addlistener(genericfuturelistener<? extends future<? super v>> listener); /** 和上个方法一样,可以同时添加多个监听器*/ future<v> addlisteners(genericfuturelistener<? extends future<? super v>>... listeners); /** 删除指定的监听器, 如果这个监听器还没被通知的话。*/ future<v> removelistener(genericfuturelistener<? extends future<? super v>> listener); /** 功能和上个方法一样,可以同时删除多个监听器。*/ future<v> removelisteners(genericfuturelistener<? extends future<? super v>>... listeners); /** 同步等待直到操作完成。会被打断。 */ future<v> sync() throws interruptedexception; /** 同步等着知道操作完成。不会被打断。 */ future<v> syncuninterruptibly(); /** 同sync*/ future<v> await() throws interruptedexception; /** 同synuniterruptibliy*/ future<v> awaituninterruptibly(); /** 等待,直到操作完成或超过指定的时间。会被打断。*/ boolean await(long timeout, timeunit unit) throws interruptedexception; /** 同上*/ boolean await(long timeoutmillis) throws interruptedexception; /** 同上,不会被打断。*/ boolean awaituninterruptibly(long timeout, timeunit unit); /** 同上。*/ boolean awaituninterruptibly(long timeoutmillis); /** 立即得到结果,不会阻塞。如果操作没有完成或没有成功,返回null*/ v getnow();
netty的future最大的特点是增加了listener被动接收任务完成通知,下面是两个listener接口的定义:
public interface genericfuturelistener<f extends future<?>> extends eventlistener { void operationcomplete(f future) throws exception; } public interface genericprogressivefuturelistener<f extends progressivefuture<?>> extends genericfuturelistener<f> { void operationprogressed(f future, long progress, long total) throws exception; }
把一个listener添加到future之后。当异步操作完成之后,listener会被通知一次,同时会回调operationcomplete方法。参数future是当前通知的future,这意味这,一个listener可以被添加到多个future中。
当异步操作进度发送变化时,listener会被通知,同时会回调operationprogressed方法。progress是当前进度,total是总进度。progress==total表示操作完成。如果不知道何时完成操作progress=-1。
promise定义的方法:
/** 设置结果。把这个future设置为success,通知所有的listener,
如果这个future已经是success或failed(操作已经完成),会抛出illegalstateexception
*/ promise<v> setsuccess(v result); /**
同上。只有在操作没有完成的时候才会生效,且会返回true */ boolean trysuccess(v result); /** 设置异常。把这个future设置为failed状态,通知所有的listener.
如果这个future已经完成,会抛出illegalstateexception */ promise<v> setfailure(throwable cause); /** 同上。只有在操作没有完成时才会生效,且返回ture */ boolean tryfailure(throwable cause); /** 设置当前前future的操作不能被取消。这个future没有完成且可以设置成功或这个future已经完成,返回true。否则返回false */ boolean setuncancellable();
defaultpromise的设计
关键属性
volatile object result;
异步操作的结果。可以通过它的值知道当前future的状态。
final eventexecutor executor;
通知listener的线程。
object listeners;
维护添加到当前future的listener对象。
short waiters;
记录当前真正等待结果的线程数量。
boolean notifyinglisteners;
是否正在通知listener,防止多线程并发执行通知操作。
状态管理
future有4种状态: 未完成, 未完成-不能取消,完成-成功,完成-失败。使用isdone()判断是否完成,它代码如下:
1 @override 2 public boolean isdone() { 3 return isdone0(result); 4 } 5 6 private static boolean isdone0(object result) { 7 return result != null && result != uncancellable; 8 }
第7行是判断当前完成状态的。result != null 且 result != uncancellable,表示处于完成状态。
result默认是null, 此时future处于未完成状态。可以使用setuncancellable方法把它设置成为完成-不能取消状态。
1 @override 2 public boolean setuncancellable() { 3 if (result_updater.compareandset(this, null, uncancellable)) { 4 return true; 5 } 6 object result = this.result; 7 return !isdone0(result) || !iscancelled0(result); 8 }
第3行,使用原子操作设置result的值,只有result==null时才能把result设置成uncancellable。当result==uncancellable时,不允许取消异步操作。
使用issuccess方法判断future是否处于完成-成功状态。
1 @override 2 public boolean issuccess() { 3 object result = this.result; 4 return result != null && result != uncancellable && !(result instanceof causeholder); 5 }
第4行是完成-成功状态result的取值:除null, uncancellable和causeholder对象的任何值。
只有满足isdone() && !issuccess()时,future处于完成失败状态,可以使用cause方法获取异常。
调用setsuccess和trysuccess方法,能够把状态转换成完成-成功。
1 @override 2 public promise<v> setsuccess(v result) { 3 if (setsuccess0(result)) { 4 notifylisteners(); 5 return this; 6 } 7 throw new illegalstateexception("complete already: " + this); 8 } 9 10 private boolean setsuccess0(v result) { 11 return setvalue0(result == null ? success : result); 12 }
第3行尝试把状态设置成完成-成功状态。如果可以,在第4行通知所有的listener。否则第7行抛出错误。第11行给出了成功的默认值success。trysuccess少了第7行,不会抛出异常。
调用setfailure和tryfailure方法,能够包状态转换成完成-失败状态。
1 @override 2 public promise<v> setfailure(throwable cause) { 3 if (setfailure0(cause)) { 4 notifylisteners(); 5 return this; 6 } 7 throw new illegalstateexception("complete already: " + this, cause); 8 } 9 10 private boolean setfailure0(throwable cause) { 11 return setvalue0(new causeholder(checknotnull(cause, "cause"))); 12 }
第3行尝试把专题设置成完成-失败状态。如果可以,在第4行通知所有listener。否则在第7行抛出异常。第11行把异常包装成causeholder对象。tryfailure少了第7行,不会抛出异常。
获取异步操作的结果
当异步操作完成时,调用promise提供的setsuccess和trysuccess设置成功的结果,调用setfailure和tryfailure设置异常结果。不论什么结果,都会使用setvalue0方法保存到result属性上。
1 private boolean setvalue0(object objresult) { 2 if (result_updater.compareandset(this, null, objresult) || 3 result_updater.compareandset(this, uncancellable, objresult)) { 4 checknotifywaiters(); 5 return true; 6 } 7 return false; 8 }
第2,3行,使用原子操作设置result的值,只有result==null或result==uncancellable时,才能设置成功。如果设置成功,在第4行唤醒所有等待中的线程。可以使用get方法得到result值。如果issucess()==true, result的值是success或异步操作的结果。否则result的值是causeholder对象,此时可以调用cause方法得到异常对象。
使用get或cause,只有在异步操作完成后才能顺利得到结果。可以使用listener,被动等待操作完成通知。
使用listener异步通知处理结果
future的listener是必须实现genericfuturelistener接口,调用方法可以在operationcomplete方法中处理异步操作的结果。
listeners属性用来保存使用addlistener,addlisteners方法添加到future的listener。listeners可能使用一个genericfuturelistener对象,也可能是一个genericfuturelistener数组。所有添加listener方法都会调用addlistener0方法添加listener。
1 private void addlistener0(genericfuturelistener<? extends future<? super v>> listener) { 2 if (listeners == null) { 3 listeners = listener; 4 } else if (listeners instanceof defaultfuturelisteners) { 5 ((defaultfuturelisteners) listeners).add(listener); 6 } else { 7 listeners = new defaultfuturelisteners((genericfuturelistener<? extends future<v>>) listeners, listener); 8 } 9 }
这段代码中使用了一个defaultfuturelisteners类,它内部维护了一个genericfuturelistener数组。
当一次操作完成时,会调用notifylisteners方法通知listeners中所有的listener,并调用listener的operationcomplete方法。只有当isdone()==true时才会调用notifylisteners方法。触发点在下面的一些方法中:
addlistener, addlisteners。
setsuccess, trysuccess。
setfailure, tryfailure。
notifylisteners的代码如下:
1 private void notifylisteners() { 2 eventexecutor executor = executor(); 3 if (executor.ineventloop()) { 4 final internalthreadlocalmap threadlocals = internalthreadlocalmap.get(); 5 final int stackdepth = threadlocals.futurelistenerstackdepth(); 6 if (stackdepth < max_listener_stack_depth) { 7 threadlocals.setfuturelistenerstackdepth(stackdepth + 1); 8 try { 9 notifylistenersnow(); 10 } finally { 11 threadlocals.setfuturelistenerstackdepth(stackdepth); 12 } 13 return; 14 } 15 } 16 17 safeexecute(executor, new runnable() { 18 @override 19 public void run() { 20 notifylistenersnow(); 21 } 22 }); 23 }
这段代码的作用是调用notifylistenersnow。如果当前线程就是executor的线程,在第9行直接调用notifylistenernow,否则在第20行,把notifylistnernow放在executor中执行。第4-7行和11行的作用是防止递归调用导致线程栈溢出,max_listener_stack_depth就是listener递归调用的最大深度。
notifylistenernow的作用是,确保没有并发执行notifylistener0或notifylistners0方法,且所有的listener只能被通知一次。
1 private void notifylistenersnow() { 2 object listeners; 3 synchronized (this) { 4 // only proceed if there are listeners to notify and we are not already notifying listeners. 5 if (notifyinglisteners || this.listeners == null) { 6 return; 7 } 8 notifyinglisteners = true; 9 listeners = this.listeners; 10 this.listeners = null; 11 } 12 for (;;) { 13 if (listeners instanceof defaultfuturelisteners) { 14 notifylisteners0((defaultfuturelisteners) listeners); 15 } else { 16 notifylistener0(this, (genericfuturelistener<? extends future<v>>) listeners); 17 } 18 synchronized (this) { 19 if (this.listeners == null) { 20 // nothing can throw from within this method, so setting notifyinglisteners back to false does not 21 // need to be in a finally block. 22 notifyinglisteners = false; 23 return; 24 } 25 listeners = this.listeners; 26 this.listeners = null; 27 } 28 } 29 }
第3-11行的作用是防止多个线程并发执行11行之后的代码。
结合第5,9,10行可知, listeners中的所有listener只能被通知一次。
13-17行,通知所有listeners。notifylistener0通知一个listener,notifylisteners0通知所有的listener。
最后,18-27行,检查在通知listeners的过程中,是否有新的listener被添加进来。如果有,25,26行得到所有新添加的listener并清空listeners属性,13-17行继续通知新添加的listener。否则,运行22,23行结束通知过程。
1 private void notifylisteners0(defaultfuturelisteners listeners) { 2 genericfuturelistener<?>[] a = listeners.listeners(); 3 int size = listeners.size(); 4 for (int i = 0; i < size; i ++) { 5 notifylistener0(this, a[i]); 6 } 7 } 8 9 @suppresswarnings({ "unchecked", "rawtypes" }) 10 private static void notifylistener0(future future, genericfuturelistener l) { 11 try { 12 l.operationcomplete(future); 13 } catch (throwable t) { 14 logger.warn("an exception was thrown by " + l.getclass().getname() + ".operationcomplete()", t); 15 } 16 }
1-7行,notifylisteners0对每个listener调用一次notifylistener0,参数是当前的future。
10-16,调用listener的operationcomplete方法,捕获了所有的异常,确保接下来可以继续通知下一个listener。
使用await机制同步等待结果
可以使用一系列的await,awaitxxx方法同步等待结果。这些方法可以分为: 能被打断的,不能被打断的。一直等待的,有超时时间的。await0方法是最复杂的等待实现,所有带超时时间的await方法都会调用它。
1 private boolean await0(long timeoutnanos, boolean interruptable) throws interruptedexception { 2 if (isdone()) { 3 return true; 4 } 5 6 if (timeoutnanos <= 0) { 7 return isdone(); 8 } 9 10 if (interruptable && thread.interrupted()) { 11 throw new interruptedexception(tostring()); 12 } 13 14 checkdeadlock(); 15 16 long starttime = system.nanotime(); 17 long waittime = timeoutnanos; 18 boolean interrupted = false; 19 try { 20 for (;;) { 21 synchronized (this) { 22 if (isdone()) { 23 return true; 24 } 25 incwaiters(); 26 try { 27 wait(waittime / 1000000, (int) (waittime % 1000000)); 28 } catch (interruptedexception e) { 29 if (interruptable) { 30 throw e; 31 } else { 32 interrupted = true; 33 } 34 } finally { 35 decwaiters(); 36 } 37 } 38 if (isdone()) { 39 return true; 40 } else { 41 waittime = timeoutnanos - (system.nanotime() - starttime); 42 if (waittime <= 0) { 43 return isdone(); 44 } 45 } 46 } 47 } finally { 48 if (interrupted) { 49 thread.currentthread().interrupt(); 50 } 51 } 52 }
这个方法返回的条件有: (1)isdone()==true;(2)允许被打断(interrupted==true)的情况下被打断;(3)已经超时。2-12行分别检查了这3种情况。
25,35行管理waiters属性,这个属性用来记录当前正在等待的线程数。inwaiters方法正常情况下会把waiters加1,当检查到waiters==short.max_value时会抛出异常,防止过多的线程等待。
27行,调用wait等待,经历waittime后超时返回。在等待过程中,会被setvalue0方法调用notifyall唤醒。
29-33行,处理被打断的异常,如果运行被打断,在30行抛出这个异常返回。
38-45行,不论什么原因线程被唤醒,检查是否满足返回条件,如果不满足,继续循环等待。
没有超时的wait方法实现要简单一些,只需判读返回条件(1)(2)。
跟踪异步操作的执行进度
如果想要跟踪异步操作的执行进度,future需要换成defaultprogressivepromise对象,listener需要换成genericprogressivefuturelistener类型。defaultprogressivepromise派生自defaultpromise同时实现了progressivepromise接口。genericprogressivefuturelistener接口派生自genericfuturelistener接口。
progressivepromise定义了setprogress和tryprogress方法用来更新进度,是不是很眼熟,和promise接口定义返回结果的方法很类似。
progressivepromise<v> setprogress(long progress, long total); boolean tryprogress(long progress, long total);
genericprogressivefuturelistener定义了operationprogressed方法用来处理进度更新通知。
void operationprogressed(f future, long progress, long total) throws exception;
defaultprogressivepromise自己只实现了setprogress和tryprogress方法,其它都是复用了defaultpromise的实现。
1 @override 2 public progressivepromise<v> setprogress(long progress, long total) { 3 if (total < 0) { 4 // total unknown 5 total = -1; // normalize 6 if (progress < 0) { 7 throw new illegalargumentexception("progress: " + progress + " (expected: >= 0)"); 8 } 9 } else if (progress < 0 || progress > total) { 10 throw new illegalargumentexception( 11 "progress: " + progress + " (expected: 0 <= progress <= total (" + total + "))"); 12 } 13 14 if (isdone()) { 15 throw new illegalstateexception("complete already"); 16 } 17 18 notifyprogressivelisteners(progress, total); 19 return this; 20 }
3-12行,检查progress和total的合法性。
14行,如isdone()==true,抛出异常。只有在操作还没完成的是否更新进度才有意义。
18行,调用notifyprogressivelisteners触发进度更新通知,这个方法在defaultpromise中实现。
notifyprogressivelisteners实现了触发进度更新通知的主要流程:
1 void notifyprogressivelisteners(final long progress, final long total) { 2 final object listeners = progressivelisteners(); 3 if (listeners == null) { 4 return; 5 } 6 7 final progressivefuture<v> self = (progressivefuture<v>) this; 8 9 eventexecutor executor = executor(); 10 if (executor.ineventloop()) { 11 if (listeners instanceof genericprogressivefuturelistener[]) { 12 notifyprogressivelisteners0( 13 self, (genericprogressivefuturelistener<?>[]) listeners, progress, total); 14 } else { 15 notifyprogressivelistener0( 16 self, (genericprogressivefuturelistener<progressivefuture<v>>) listeners, progress, total); 17 } 18 } else { 19 if (listeners instanceof genericprogressivefuturelistener[]) { 20 final genericprogressivefuturelistener<?>[] array = 21 (genericprogressivefuturelistener<?>[]) listeners; 22 safeexecute(executor, new runnable() { 23 @override 24 public void run() { 25 notifyprogressivelisteners0(self, array, progress, total); 26 } 27 }); 28 } else { 29 final genericprogressivefuturelistener<progressivefuture<v>> l = 30 (genericprogressivefuturelistener<progressivefuture<v>>) listeners; 31 safeexecute(executor, new runnable() { 32 @override 33 public void run() { 34 notifyprogressivelistener0(self, l, progress, total); 35 } 36 }); 37 } 38 } 39 }
第3行,从listeners中选出genericprogressivefuturelistener类型的listener。
10-38行。调用notifyprogressivelisteners0, notifyprogressivelistener0通知进度跟新。11-17行,在当前线程中调用。
19-37行,在executor中调用。notifyprogressivelistener0只是简单地调用listener的operationprogressed方法。notifyprogressivelisteners0是对每个listener调用一次notifyprogressivelistener0。
和完成通知相比,进度更新通知要更加简单。进度更新通知没有处理并发问题,没有处理栈溢出问题。