Netty中的ChannelFuture和ChannelPromise
在netty使用channelfuture和channelpromise进行异步操作的处理
这是官方给出的channelfutur描述
1 * | completed successfully | 2 * +---------------------------+ 3 * +----> isdone() = true | 4 * +--------------------------+ | | issuccess() = true | 5 * | uncompleted | | +===========================+ 6 * +--------------------------+ | | completed with failure | 7 * | isdone() = false | | +---------------------------+ 8 * | issuccess() = false |----+----> isdone() = true | 9 * | iscancelled() = false | | | cause() = non-null | 10 * | cause() = null | | +===========================+ 11 * +--------------------------+ | | completed by cancellation | 12 * | +---------------------------+ 13 * +----> isdone() = true | 14 * | iscancelled() = true | 15 * +---------------------------+
由图可以知道channelfutur有四种状态:uncompleted、completed successfully、completed with failure、completed by cancellation,这几种状态是由isdone、issuccess、iscancelled、cause这四种方法的返回值决定的。
channelfutur接口的定义如下:
1 public interface channelfuture extends future<void> { 2 channel channel(); 3 4 channelfuture addlistener(genericfuturelistener<? extends future<? super void>> var1); 5 6 channelfuture addlisteners(genericfuturelistener... var1); 7 8 channelfuture removelistener(genericfuturelistener<? extends future<? super void>> var1); 9 10 channelfuture removelisteners(genericfuturelistener... var1); 11 12 channelfuture sync() throws interruptedexception; 13 14 channelfuture syncuninterruptibly(); 15 16 channelfuture await() throws interruptedexception; 17 18 channelfuture awaituninterruptibly(); 19 20 boolean isvoid(); 21 }
继承自netty的future:
1 public interface future<v> extends java.util.concurrent.future<v> { 2 boolean issuccess(); 3 4 boolean iscancellable(); 5 6 throwable cause(); 7 8 future<v> addlistener(genericfuturelistener<? extends future<? super v>> var1); 9 10 future<v> addlisteners(genericfuturelistener... var1); 11 12 future<v> removelistener(genericfuturelistener<? extends future<? super v>> var1); 13 14 future<v> removelisteners(genericfuturelistener... var1); 15 16 future<v> sync() throws interruptedexception; 17 18 future<v> syncuninterruptibly(); 19 20 future<v> await() throws interruptedexception; 21 22 future<v> awaituninterruptibly(); 23 24 boolean await(long var1, timeunit var3) throws interruptedexception; 25 26 boolean await(long var1) throws interruptedexception; 27 28 boolean awaituninterruptibly(long var1, timeunit var3); 29 30 boolean awaituninterruptibly(long var1); 31 32 v getnow(); 33 34 boolean cancel(boolean var1); 35 }
netty的future又继承自jdk的future:
1 public interface future<v> { 2 3 boolean cancel(boolean mayinterruptifrunning); 4 5 boolean iscancelled(); 6 7 boolean isdone(); 8 9 v get() throws interruptedexception, executionexception; 10 11 v get(long timeout, timeunit unit) 12 throws interruptedexception, executionexception, timeoutexception; 13 }
channelpromise继承了channelfuture:
1 public interface channelpromise extends channelfuture, promise<void> { 2 channel channel(); 3 4 channelpromise setsuccess(void var1); 5 6 channelpromise setsuccess(); 7 8 boolean trysuccess(); 9 10 channelpromise setfailure(throwable var1); 11 12 channelpromise addlistener(genericfuturelistener<? extends future<? super void>> var1); 13 14 channelpromise addlisteners(genericfuturelistener... var1); 15 16 channelpromise removelistener(genericfuturelistener<? extends future<? super void>> var1); 17 18 channelpromise removelisteners(genericfuturelistener... var1); 19 20 channelpromise sync() throws interruptedexception; 21 22 channelpromise syncuninterruptibly(); 23 24 channelpromise await() throws interruptedexception; 25 26 channelpromise awaituninterruptibly(); 27 28 channelpromise unvoid(); 29 }
其中promise接口定义如下:
1 public interface promise<v> extends future<v> { 2 promise<v> setsuccess(v var1); 3 4 boolean trysuccess(v var1); 5 6 promise<v> setfailure(throwable var1); 7 8 boolean tryfailure(throwable var1); 9 10 boolean setuncancellable(); 11 12 promise<v> addlistener(genericfuturelistener<? extends future<? super v>> var1); 13 14 promise<v> addlisteners(genericfuturelistener... var1); 15 16 promise<v> removelistener(genericfuturelistener<? extends future<? super v>> var1); 17 18 promise<v> removelisteners(genericfuturelistener... var1); 19 20 promise<v> await() throws interruptedexception; 21 22 promise<v> awaituninterruptibly(); 23 24 promise<v> sync() throws interruptedexception; 25 26 promise<v> syncuninterruptibly(); 27 }
在netty中,无论是服务端还是客户端,在channel注册时都会为其绑定一个channelpromise,默认实现是defaultchannelpromise
defaultchannelpromise定义如下:
1 public class defaultchannelpromise extends defaultpromise<void> implements channelpromise, flushcheckpoint { 2 3 private final channel channel; 4 private long checkpoint; 5 6 public defaultchannelpromise(channel channel) { 7 this.channel = checknotnull(channel, "channel"); 8 } 9 10 public defaultchannelpromise(channel channel, eventexecutor executor) { 11 super(executor); 12 this.channel = checknotnull(channel, "channel"); 13 } 14 15 @override 16 protected eventexecutor executor() { 17 eventexecutor e = super.executor(); 18 if (e == null) { 19 return channel().eventloop(); 20 } else { 21 return e; 22 } 23 } 24 25 @override 26 public channel channel() { 27 return channel; 28 } 29 30 @override 31 public channelpromise setsuccess() { 32 return setsuccess(null); 33 } 34 35 @override 36 public channelpromise setsuccess(void result) { 37 super.setsuccess(result); 38 return this; 39 } 40 41 @override 42 public boolean trysuccess() { 43 return trysuccess(null); 44 } 45 46 @override 47 public channelpromise setfailure(throwable cause) { 48 super.setfailure(cause); 49 return this; 50 } 51 52 @override 53 public channelpromise addlistener(genericfuturelistener<? extends future<? super void>> listener) { 54 super.addlistener(listener); 55 return this; 56 } 57 58 @override 59 public channelpromise addlisteners(genericfuturelistener<? extends future<? super void>>... listeners) { 60 super.addlisteners(listeners); 61 return this; 62 } 63 64 @override 65 public channelpromise removelistener(genericfuturelistener<? extends future<? super void>> listener) { 66 super.removelistener(listener); 67 return this; 68 } 69 70 @override 71 public channelpromise removelisteners(genericfuturelistener<? extends future<? super void>>... listeners) { 72 super.removelisteners(listeners); 73 return this; 74 } 75 76 @override 77 public channelpromise sync() throws interruptedexception { 78 super.sync(); 79 return this; 80 } 81 82 @override 83 public channelpromise syncuninterruptibly() { 84 super.syncuninterruptibly(); 85 return this; 86 } 87 88 @override 89 public channelpromise await() throws interruptedexception { 90 super.await(); 91 return this; 92 } 93 94 @override 95 public channelpromise awaituninterruptibly() { 96 super.awaituninterruptibly(); 97 return this; 98 } 99 100 @override 101 public long flushcheckpoint() { 102 return checkpoint; 103 } 104 105 @override 106 public void flushcheckpoint(long checkpoint) { 107 this.checkpoint = checkpoint; 108 } 109 110 @override 111 public channelpromise promise() { 112 return this; 113 } 114 115 @override 116 protected void checkdeadlock() { 117 if (channel().isregistered()) { 118 super.checkdeadlock(); 119 } 120 } 121 122 @override 123 public channelpromise unvoid() { 124 return this; 125 } 126 127 @override 128 public boolean isvoid() { 129 return false; 130 } 131 }
可以看到这个defaultchannelpromise仅仅是将channel封装了,而且其基本上所有方法的实现都依赖于父类defaultpromise
defaultpromise中的实现是整个channelfuture和channelpromise的核心所在:
defaultpromise中有如下几个状态量:
1 private static final int max_listener_stack_depth = math.min(8, 2 systempropertyutil.getint("io.netty.defaultpromise.maxlistenerstackdepth", 8)); 3 private static final object success = new object(); 4 private static final object uncancellable = new object(); 5 private static final causeholder cancellation_cause_holder = new causeholder(throwableutil.unknownstacktrace( 6 new cancellationexception(), defaultpromise.class, "cancel(...)")); 7 private static final atomicreferencefieldupdater<defaultpromise, object> result_updater = 8 atomicreferencefieldupdater.newupdater(defaultpromise.class, object.class, "result");
max_listener_stack_depth: 表示最多可执行listeners的数量,默认是8
success :表示异步操作正常完成
uncancellable:表示异步操作不可取消,并且尚未完成
cancellation_cause_holder:表示异步操作取消监听,用于cancel操作,
而causeholder 的实例对象是用来表示异步操作异常结束,同时保存异常信息:
1 private static final class causeholder { 2 final throwable cause; 3 causeholder(throwable cause) { 4 this.cause = cause; 5 } 6 }
result_updater:是一个原子更新器,通过cas操作,原子化更新 defaultpromise对象的名为result的成员,这个result成员是其异步操作判断的关键所在
defaultpromise的成员及构造方法定义:
1 public class defaultpromise<v> extends abstractfuture<v> implements promise<v> { 2 private volatile object result; 3 private final eventexecutor executor; 4 private object listeners; 5 private short waiters; 6 private boolean notifyinglisteners; 7 8 public defaultpromise(eventexecutor executor) { 9 this.executor = checknotnull(executor, "executor"); 10 } 11 }
result:就是前面说的,判断异步操作状态的关键
result的取值有:success 、uncancellable、causeholder以及null (其实还可以是泛型v类型的任意对象,这里暂不考虑)
executor:就是channel绑定的nioeventloop,在我之前的博客说过,channel的异步操作都是在nioeventloop的线程中完成的([netty中nioeventloopgroup的创建源码分析](https://blog.csdn.net/z_chenchen/article/details/90567863))
listeners:通过一个object保存所有对异步操作的监听,用于异步操作的回调
waiters:记录阻塞中的listeners的数量
notifyinglisteners:是否需要唤醒的标志
首先来看isdone方法,通过之前的图可以知道,
isdone为false对应了uncompleted状态,即异步操作尚未完成;
isdone为true则代表了异步操作完成,但是还是有三种完成情况,需要结合别的判断方法才能具体知道是哪种情况;
isdone方法:
1 @override 2 public boolean isdone() { 3 return isdone0(result); 4 }
调用isdone0:
1 private static boolean isdone0(object result) { 2 return result != null && result != uncancellable; 3 }
有如下几种情况:
result等于null,result没有赋值,表示异步操作尚未完成(从这里就能想到异步操作完成,需要调用某个set方法来改变result的状态)
result是uncancellable状态,表示执行中的异步操作不可取消,当然也就是异步操作尚未完成
result不等于null,且不等于uncancellable,就表示异步操作完成(包括正常完成,以及异常结束,需要由cause方法进一步判断)
issuccess方法:
1 @override 2 public boolean issuccess() { 3 object result = this.result; 4 return result != null && result != uncancellable && !(result instanceof causeholder); 5 }
由这里可以知道当且仅当result 为success状态时,才返回true(其余除uncancellable和null的值其实也可以,这里暂不考虑)
iscancelled方法:
1 @override 2 public boolean iscancelled() { 3 return iscancelled0(result); 4 }
调用iscancelled0方法:
1 private static boolean iscancelled0(object result) { 2 return result instanceof causeholder && ((causeholder) result).cause instanceof cancellationexception; 3 }
只有当result是cancellationexception的实例时,表示取消异步操作
接着来看cause方法:
1 @override 2 public throwable cause() { 3 object result = this.result; 4 return (result instanceof causeholder) ? ((causeholder) result).cause : null; 5 }
和上面同理,通过判别resul是否是causeholder的实现类,若是,将causeholder保存的异常返回。
几种状态的判别说完了,下面看一下如何设置这几种状态的:
setsuccess方法:
1 @override 2 public promise<v> setsuccess(v result) { 3 if (setsuccess0(result)) { 4 notifylisteners(); 5 return this; 6 } 7 throw new illegalstateexception("complete already: " + this); 8 }
首先调用setsuccess0方法,其中result的泛型通过defaultchannelpromise可知是void,在defaultchannelpromise中所有的set和try操作参数都是null,这里的result也就不去考虑:
1 private boolean setsuccess0(v result) { 2 return setvalue0(result == null ? success : result); 3 }
继续调用setvalue0方法:
1 private boolean setvalue0(object objresult) { 2 if (result_updater.compareandset(this, null, objresult) || 3 result_updater.compareandset(this, uncancellable, objresult)) { 4 checknotifywaiters(); 5 return true; 6 } 7 return false; 8 }
通过cas操作,将result状态变为success
其中checknotifywaiters方法:
1 private synchronized void checknotifywaiters() { 2 if (waiters > 0) { 3 notifyall(); 4 } 5 }
检查waiters的个数,唤醒所有阻塞中的this,sync方法会引起阻塞
回到setsuccess方法中,setsuccess0通过cas操作,将result状态更新为success成功后,调用
notifylisteners方法,唤醒所有listener完成对异步操作的回调
listeners是通过addlistener方法添加的,用来对异步操作进行侦听:
看到addlistener方法:
1 @override 2 public promise<v> addlistener(genericfuturelistener<? extends future<? super v>> listener) { 3 checknotnull(listener, "listener"); 4 5 synchronized (this) { 6 addlistener0(listener); 7 } 8 9 if (isdone()) { 10 notifylisteners(); 11 } 12 13 return this; 14 } 15 16 @override 17 public promise<v> addlisteners(genericfuturelistener<? extends future<? super v>>... listeners) { 18 checknotnull(listeners, "listeners"); 19 20 synchronized (this) { 21 for (genericfuturelistener<? extends future<? super v>> listener : listeners) { 22 if (listener == null) { 23 break; 24 } 25 addlistener0(listener); 26 } 27 } 28 29 if (isdone()) { 30 notifylisteners(); 31 } 32 33 return this; 34 }
其中genericfuturelistener接口定义如下:
1 public interface genericfuturelistener<f extends future<?>> extends eventlistener { 2 /** 3 * invoked when the operation associated with the {@link future} has been completed. 4 * 5 * @param future the source {@link future} which called this callback 6 */ 7 void operationcomplete(f future) throws exception; 8 }
可以看到listener其实就是通过operationcomplete方法,来完成回调,对future对象进行处理,由注释可知operationcomplete方法是在future操作完成时调用
addlisteners方法的实现比较简单,实现核心是在addlistener0中:
1 private void addlistener0(genericfuturelistener<? extends future<? super v>> listener) { 2 if (listeners == null) { 3 listeners = listener; 4 } else if (listeners instanceof defaultfuturelisteners) { 5 ((defaultfuturelisteners) listeners).add(listener); 6 } else { 7 listeners = new defaultfuturelisteners((genericfuturelistener<?>) listeners, listener); 8 } 9 }
其中defaultfuturelisteners是将genericfuturelistener对象封装的一个数组:
1 final class defaultfuturelisteners { 2 3 private genericfuturelistener<? extends future<?>>[] listeners; 4 private int size; 5 private int progressivesize; 6 7 @suppresswarnings("unchecked") 8 defaultfuturelisteners( 9 genericfuturelistener<? extends future<?>> first, genericfuturelistener<? extends future<?>> second) { 10 listeners = new genericfuturelistener[2]; 11 listeners[0] = first; 12 listeners[1] = second; 13 size = 2; 14 if (first instanceof genericprogressivefuturelistener) { 15 progressivesize ++; 16 } 17 if (second instanceof genericprogressivefuturelistener) { 18 progressivesize ++; 19 } 20 } 21 22 public void add(genericfuturelistener<? extends future<?>> l) { 23 genericfuturelistener<? extends future<?>>[] listeners = this.listeners; 24 final int size = this.size; 25 if (size == listeners.length) { 26 this.listeners = listeners = arrays.copyof(listeners, size << 1); 27 } 28 listeners[size] = l; 29 this.size = size + 1; 30 31 if (l instanceof genericprogressivefuturelistener) { 32 progressivesize ++; 33 } 34 } 35 36 public void remove(genericfuturelistener<? extends future<?>> l) { 37 final genericfuturelistener<? extends future<?>>[] listeners = this.listeners; 38 int size = this.size; 39 for (int i = 0; i < size; i ++) { 40 if (listeners[i] == l) { 41 int listenerstomove = size - i - 1; 42 if (listenerstomove > 0) { 43 system.arraycopy(listeners, i + 1, listeners, i, listenerstomove); 44 } 45 listeners[-- size] = null; 46 this.size = size; 47 48 if (l instanceof genericprogressivefuturelistener) { 49 progressivesize --; 50 } 51 return; 52 } 53 } 54 } 55 56 public genericfuturelistener<? extends future<?>>[] listeners() { 57 return listeners; 58 } 59 60 public int size() { 61 return size; 62 } 63 64 public int progressivesize() { 65 return progressivesize; 66 } 67 }
size:记录listeners的个数
progressivesize:记录genericprogressivefuturelistener类型的listeners的个数
defaultfuturelisteners 中对数组的操作比较简单,
add方法,当size达到数组长度时,进行二倍扩容,
其中genericprogressivefuturelistener继承自genericfuturelistener:
1 public interface genericprogressivefuturelistener<f extends progressivefuture<?>> extends genericfuturelistener<f> { 2 /** 3 * invoked when the operation has progressed. 4 * 5 * @param progress the progress of the operation so far (cumulative) 6 * @param total the number that signifies the end of the operation when {@code progress} reaches at it. 7 * {@code -1} if the end of operation is unknown. 8 */ 9 void operationprogressed(f future, long progress, long total) throws exception; 10 }
由注释可知operationprogressed是在future操作进行时调用,这里不对genericprogressivefuturelistener过多讨论
回到addlistener0方法,由defaultfuturelisteners就可以知道,实际上通过defaultfuturelisteners管理的一维数组来保存listeners
在addlistener方法完成对listener的添加后,还会调用isdone方法检查当前异步操作是否完成,若是完成需要调用notifylisteners,直接唤醒所有listeners完后对异步操作的回调
有add就有remove,removelistener方法:
1 @override 2 public promise<v> removelistener(final genericfuturelistener<? extends future<? super v>> listener) { 3 checknotnull(listener, "listener"); 4 5 synchronized (this) { 6 removelistener0(listener); 7 } 8 9 return this; 10 } 11 12 @override 13 public promise<v> removelisteners(final genericfuturelistener<? extends future<? super v>>... listeners) { 14 checknotnull(listeners, "listeners"); 15 16 synchronized (this) { 17 for (genericfuturelistener<? extends future<? super v>> listener : listeners) { 18 if (listener == null) { 19 break; 20 } 21 removelistener0(listener); 22 } 23 } 24 25 return this; 26 }
还是由removelistener0来实现:
1 private void removelistener0(genericfuturelistener<? extends future<? super v>> listener) { 2 if (listeners instanceof defaultfuturelisteners) { 3 ((defaultfuturelisteners) listeners).remove(listener); 4 } else if (listeners == listener) { 5 listeners = null; 6 } 7 }
看过之前的内容在看这里就比较简单了,通过defaultfuturelisteners去删除
notifylisteners方法:
1 private void notifylisteners() { 2 eventexecutor executor = executor(); 3 if (executor.ineventloop()) { 4 final internalthreadlocalmap threadlocals = internalthreadlocalmap.get(); 5 final int stackdepth = threadlocals.futurelistenerstackdepth(); 6 if (stackdepth < max_listener_stack_depth) { 7 threadlocals.setfuturelistenerstackdepth(stackdepth + 1); 8 try { 9 notifylistenersnow(); 10 } finally { 11 threadlocals.setfuturelistenerstackdepth(stackdepth); 12 } 13 return; 14 } 15 } 16 17 safeexecute(executor, new runnable() { 18 @override 19 public void run() { 20 notifylistenersnow(); 21 } 22 }); 23 }
其中executor方法:
1 protected eventexecutor executor() { 2 return executor; 3 }
用来获取executor轮询线程对象
判断executor是否处于轮询,否则需要通过safeexecute方法处理listeners的侦听,
safeexecute方法:
1 private static void safeexecute(eventexecutor executor, runnable task) { 2 try { 3 executor.execute(task); 4 } catch (throwable t) { 5 rejectedexecutionlogger.error("failed to submit a listener notification task. event loop shut down?", t); 6 } 7 }
这里保证了listeners的侦听回调是异步执行
internalthreadlocalmap在我之前的博客中说过,是netty使用的threadlocal (netty中fastthreadlocal源码分析)
去线程本地变量中找futurelistenerstackdepth(默认为0),判断stackdepth是否小于max_listener_stack_depth,否则也要通过safeexecute方法处理listeners的侦听
核心都是调用notifylistenersnow方法:
1 private void notifylistenersnow() { 2 object listeners; 3 synchronized (this) { 4 // only proceed if there are listeners to notify and we are not already notifying listeners. 5 if (notifyinglisteners || this.listeners == null) { 6 return; 7 } 8 notifyinglisteners = true; 9 listeners = this.listeners; 10 this.listeners = null; 11 } 12 for (;;) { 13 if (listeners instanceof defaultfuturelisteners) { 14 notifylisteners0((defaultfuturelisteners) listeners); 15 } else { 16 notifylistener0(this, (genericfuturelistener<?>) listeners); 17 } 18 synchronized (this) { 19 if (this.listeners == null) { 20 // nothing can throw from within this method, so setting notifyinglisteners back to false does not 21 // need to be in a finally block. 22 notifyinglisteners = false; 23 return; 24 } 25 listeners = this.listeners; 26 this.listeners = null; 27 } 28 } 29 }
先检查是否需要监听,满足条件后,判断listeners是否是defaultfuturelisteners,即包装后的数组
notifylisteners0方法:
1 private void notifylisteners0(defaultfuturelisteners listeners) { 2 genericfuturelistener<?>[] a = listeners.listeners(); 3 int size = listeners.size(); 4 for (int i = 0; i < size; i ++) { 5 notifylistener0(this, a[i]); 6 } 7 }
遍历这个数组,实则调用notifylistener0方法:
1 private static void notifylistener0(future future, genericfuturelistener l) { 2 try { 3 l.operationcomplete(future); 4 } catch (throwable t) { 5 if (logger.iswarnenabled()) { 6 logger.warn("an exception was thrown by " + l.getclass().getname() + ".operationcomplete()", t); 7 } 8 } 9 }
这里就可以看到,完成了对operationcomplete的回调,处理future
setsuccess结束,再来看trysuccess方法:
1 @override 2 public boolean trysuccess(v result) { 3 if (setsuccess0(result)) { 4 notifylisteners(); 5 return true; 6 } 7 return false; 8 }
对比setsuccess来看,只有返回值不一样
setfailure方法:
1 @override 2 public promise<v> setfailure(throwable cause) { 3 if (setfailure0(cause)) { 4 notifylisteners(); 5 return this; 6 } 7 throw new illegalstateexception("complete already: " + this, cause); 8 } 9 10 private boolean setfailure0(throwable cause) { 11 return setvalue0(new causeholder(checknotnull(cause, "cause"))); 12 } 13 14 private boolean setvalue0(object objresult) { 15 if (result_updater.compareandset(this, null, objresult) || 16 result_updater.compareandset(this, uncancellable, objresult)) { 17 checknotifywaiters(); 18 return true; 19 } 20 return false; 21 }
和setsuccess逻辑一样,只不过cas操作将状态变为了causeholder对象,成功后唤醒listeners对异步操作的回调
tryfailure方法:
1 @override 2 public boolean tryfailure(throwable cause) { 3 if (setfailure0(cause)) { 4 notifylisteners(); 5 return true; 6 } 7 return false; 8 }
也都是一个逻辑
还有一个setuncancellable方法:
1 @override 2 public boolean setuncancellable() { 3 if (result_updater.compareandset(this, null, uncancellable)) { 4 return true; 5 } 6 object result = this.result; 7 return !isdone0(result) || !iscancelled0(result); 8 }
若是result状态为null,异步操作尚未结束,直接通过cas操作将状态变为uncancellable
否则若是根据状态来判断
下来看到cancel方法:
1 /** 2 * {@inheritdoc} 3 * 4 * @param mayinterruptifrunning this value has no effect in this implementation. 5 */ 6 @override 7 public boolean cancel(boolean mayinterruptifrunning) { 8 if (result_updater.compareandset(this, null, cancellation_cause_holder)) { 9 checknotifywaiters(); 10 notifylisteners(); 11 return true; 12 } 13 return false; 14 }
mayinterruptifrunning正如注释中所说,在这里没有什么作用
还是通过cas操作,将状态变为cancellation_cause_holder,调用checknotifywaiters唤醒因sync阻塞的线程,notifylisteners方法回调listeners的侦听
最后看到sync方法:
1 @override 2 public promise<v> sync() throws interruptedexception { 3 await(); 4 rethrowiffailed(); 5 return this; 6 }
先调用await方法:
1 @override 2 public promise<v> await() throws interruptedexception { 3 if (isdone()) { 4 return this; 5 } 6 7 if (thread.interrupted()) { 8 throw new interruptedexception(tostring()); 9 } 10 11 checkdeadlock(); 12 13 synchronized (this) { 14 while (!isdone()) { 15 incwaiters(); 16 try { 17 wait(); 18 } finally { 19 decwaiters(); 20 } 21 } 22 } 23 return this; 24 }
先判断能否执行(异步操作尚未结束,当前线程没有被中断),然后调用checkdeadlock方法:
1 protected void checkdeadlock() { 2 eventexecutor e = executor(); 3 if (e != null && e.ineventloop()) { 4 throw new blockingoperationexception(tostring()); 5 } 6 }
检查轮询线程是否在工作
在synchronized块中以自身为锁,自旋等待异步操作的完成,若是没完成,调用incwaiters方法:
1 private void incwaiters() { 2 if (waiters == short.max_value) { 3 throw new illegalstateexception("too many waiters: " + this); 4 } 5 ++waiters; 6 }
在小于short.max_value的情况下,对waiters自增,
然后使用wait将自身阻塞,等待被唤醒
所以在之前setvalue0时,checknotifywaiters操作会notifyall,
由此可以知道sync方法的作用:在某一线程中调用sync方法会使得当前线程被阻塞,只有当异步操作执完毕,通过上面的set方法改变状态后,才会调用checknotifywaiters方法唤醒当前线程。
当从阻塞中被唤醒后调用decwaiters方法:
1 private void decwaiters() { 2 --waiters; 3 }
使得waiters自减
通过这样一种自旋方式,一直等到isdone成立,结束自旋,进而结束await方法,然后调用rethrowiffailed方法:
1 private void rethrowiffailed() { 2 throwable cause = cause(); 3 if (cause == null) { 4 return; 5 } 6 7 platformdependent.throwexception(cause); 8 }
根据异步操作是否有异常,进而使用platformdependent抛出异常。
至此netty中的channelfuture和channelpromise分析到此全部结束。
推荐阅读