欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty中的ChannelFuture和ChannelPromise

程序员文章站 2023-01-01 08:07:05
在Netty使用ChannelFuture和ChannelPromise进行异步操作的处理 这是官方给出的ChannelFutur描述 由图可以知道ChannelFutur有四种状态:Uncompleted、Completed successfully、Completed with failure、 ......

在netty使用channelfuture和channelpromise进行异步操作的处理

这是官方给出的channelfutur描述

 1  *                                      | completed successfully    |
 2  *                                      +---------------------------+
 3  *                                 +---->      isdone() = true      |
 4  * +--------------------------+    |    |   issuccess() = true      |
 5  * |        uncompleted       |    |    +===========================+
 6  * +--------------------------+    |    | completed with failure    |
 7  * |      isdone() = false    |    |    +---------------------------+
 8  * |   issuccess() = false    |----+---->      isdone() = true      |
 9  * | iscancelled() = false    |    |    |       cause() = non-null  |
10  * |       cause() = null     |    |    +===========================+
11  * +--------------------------+    |    | completed by cancellation |
12  *                                 |    +---------------------------+
13  *                                 +---->      isdone() = true      |
14  *                                      | iscancelled() = true      |
15  *                                      +---------------------------+

由图可以知道channelfutur有四种状态:uncompleted、completed successfully、completed with failure、completed by cancellation,这几种状态是由isdone、issuccess、iscancelled、cause这四种方法的返回值决定的。

 

channelfutur接口的定义如下:

 1 public interface channelfuture extends future<void> {
 2     channel channel();
 3 
 4     channelfuture addlistener(genericfuturelistener<? extends future<? super void>> var1);
 5 
 6     channelfuture addlisteners(genericfuturelistener... var1);
 7 
 8     channelfuture removelistener(genericfuturelistener<? extends future<? super void>> var1);
 9 
10     channelfuture removelisteners(genericfuturelistener... var1);
11 
12     channelfuture sync() throws interruptedexception;
13 
14     channelfuture syncuninterruptibly();
15 
16     channelfuture await() throws interruptedexception;
17 
18     channelfuture awaituninterruptibly();
19 
20     boolean isvoid();
21 }

继承自netty的future:

 1 public interface future<v> extends java.util.concurrent.future<v> {
 2     boolean issuccess();
 3 
 4     boolean iscancellable();
 5 
 6     throwable cause();
 7 
 8     future<v> addlistener(genericfuturelistener<? extends future<? super v>> var1);
 9 
10     future<v> addlisteners(genericfuturelistener... var1);
11 
12     future<v> removelistener(genericfuturelistener<? extends future<? super v>> var1);
13 
14     future<v> removelisteners(genericfuturelistener... var1);
15 
16     future<v> sync() throws interruptedexception;
17 
18     future<v> syncuninterruptibly();
19 
20     future<v> await() throws interruptedexception;
21 
22     future<v> awaituninterruptibly();
23 
24     boolean await(long var1, timeunit var3) throws interruptedexception;
25 
26     boolean await(long var1) throws interruptedexception;
27 
28     boolean awaituninterruptibly(long var1, timeunit var3);
29 
30     boolean awaituninterruptibly(long var1);
31 
32     v getnow();
33 
34     boolean cancel(boolean var1);
35 }

 

netty的future又继承自jdk的future:

 1 public interface future<v> {
 2 
 3     boolean cancel(boolean mayinterruptifrunning);
 4     
 5     boolean iscancelled();
 6 
 7     boolean isdone();
 8     
 9     v get() throws interruptedexception, executionexception;
10 
11     v get(long timeout, timeunit unit)
12         throws interruptedexception, executionexception, timeoutexception;
13 }


channelpromise继承了channelfuture:

 1 public interface channelpromise extends channelfuture, promise<void> {
 2     channel channel();
 3 
 4     channelpromise setsuccess(void var1);
 5 
 6     channelpromise setsuccess();
 7 
 8     boolean trysuccess();
 9 
10     channelpromise setfailure(throwable var1);
11 
12     channelpromise addlistener(genericfuturelistener<? extends future<? super void>> var1);
13 
14     channelpromise addlisteners(genericfuturelistener... var1);
15 
16     channelpromise removelistener(genericfuturelistener<? extends future<? super void>> var1);
17 
18     channelpromise removelisteners(genericfuturelistener... var1);
19 
20     channelpromise sync() throws interruptedexception;
21 
22     channelpromise syncuninterruptibly();
23 
24     channelpromise await() throws interruptedexception;
25 
26     channelpromise awaituninterruptibly();
27 
28     channelpromise unvoid();
29 }

其中promise接口定义如下:

 1 public interface promise<v> extends future<v> {
 2     promise<v> setsuccess(v var1);
 3 
 4     boolean trysuccess(v var1);
 5 
 6     promise<v> setfailure(throwable var1);
 7 
 8     boolean tryfailure(throwable var1);
 9 
10     boolean setuncancellable();
11 
12     promise<v> addlistener(genericfuturelistener<? extends future<? super v>> var1);
13 
14     promise<v> addlisteners(genericfuturelistener... var1);
15 
16     promise<v> removelistener(genericfuturelistener<? extends future<? super v>> var1);
17 
18     promise<v> removelisteners(genericfuturelistener... var1);
19 
20     promise<v> await() throws interruptedexception;
21 
22     promise<v> awaituninterruptibly();
23 
24     promise<v> sync() throws interruptedexception;
25 
26     promise<v> syncuninterruptibly();
27 }


在netty中,无论是服务端还是客户端,在channel注册时都会为其绑定一个channelpromise,默认实现是defaultchannelpromise

defaultchannelpromise定义如下:

  1 public class defaultchannelpromise extends defaultpromise<void> implements channelpromise, flushcheckpoint {
  2 
  3     private final channel channel;
  4     private long checkpoint;
  5 
  6     public defaultchannelpromise(channel channel) {
  7         this.channel = checknotnull(channel, "channel");
  8     }
  9     
 10     public defaultchannelpromise(channel channel, eventexecutor executor) {
 11         super(executor);
 12         this.channel = checknotnull(channel, "channel");
 13     }
 14 
 15     @override
 16     protected eventexecutor executor() {
 17         eventexecutor e = super.executor();
 18         if (e == null) {
 19             return channel().eventloop();
 20         } else {
 21             return e;
 22         }
 23     }
 24 
 25     @override
 26     public channel channel() {
 27         return channel;
 28     }
 29 
 30     @override
 31     public channelpromise setsuccess() {
 32         return setsuccess(null);
 33     }
 34 
 35     @override
 36     public channelpromise setsuccess(void result) {
 37         super.setsuccess(result);
 38         return this;
 39     }
 40 
 41     @override
 42     public boolean trysuccess() {
 43         return trysuccess(null);
 44     }
 45 
 46     @override
 47     public channelpromise setfailure(throwable cause) {
 48         super.setfailure(cause);
 49         return this;
 50     }
 51 
 52     @override
 53     public channelpromise addlistener(genericfuturelistener<? extends future<? super void>> listener) {
 54         super.addlistener(listener);
 55         return this;
 56     }
 57 
 58     @override
 59     public channelpromise addlisteners(genericfuturelistener<? extends future<? super void>>... listeners) {
 60         super.addlisteners(listeners);
 61         return this;
 62     }
 63 
 64     @override
 65     public channelpromise removelistener(genericfuturelistener<? extends future<? super void>> listener) {
 66         super.removelistener(listener);
 67         return this;
 68     }
 69 
 70     @override
 71     public channelpromise removelisteners(genericfuturelistener<? extends future<? super void>>... listeners) {
 72         super.removelisteners(listeners);
 73         return this;
 74     }
 75 
 76     @override
 77     public channelpromise sync() throws interruptedexception {
 78         super.sync();
 79         return this;
 80     }
 81 
 82     @override
 83     public channelpromise syncuninterruptibly() {
 84         super.syncuninterruptibly();
 85         return this;
 86     }
 87 
 88     @override
 89     public channelpromise await() throws interruptedexception {
 90         super.await();
 91         return this;
 92     }
 93 
 94     @override
 95     public channelpromise awaituninterruptibly() {
 96         super.awaituninterruptibly();
 97         return this;
 98     }
 99 
100     @override
101     public long flushcheckpoint() {
102         return checkpoint;
103     }
104 
105     @override
106     public void flushcheckpoint(long checkpoint) {
107         this.checkpoint = checkpoint;
108     }
109 
110     @override
111     public channelpromise promise() {
112         return this;
113     }
114 
115     @override
116     protected void checkdeadlock() {
117         if (channel().isregistered()) {
118             super.checkdeadlock();
119         }
120     }
121 
122     @override
123     public channelpromise unvoid() {
124         return this;
125     }
126 
127     @override
128     public boolean isvoid() {
129         return false;
130     }
131 }

可以看到这个defaultchannelpromise仅仅是将channel封装了,而且其基本上所有方法的实现都依赖于父类defaultpromise

defaultpromise中的实现是整个channelfuture和channelpromise的核心所在:

defaultpromise中有如下几个状态量:

1 private static final int max_listener_stack_depth = math.min(8,
2             systempropertyutil.getint("io.netty.defaultpromise.maxlistenerstackdepth", 8));
3 private static final object success = new object();
4 private static final object uncancellable = new object();
5 private static final causeholder cancellation_cause_holder = new causeholder(throwableutil.unknownstacktrace(
6             new cancellationexception(), defaultpromise.class, "cancel(...)"));
7 private static final atomicreferencefieldupdater<defaultpromise, object> result_updater =
8             atomicreferencefieldupdater.newupdater(defaultpromise.class, object.class, "result");         

max_listener_stack_depth: 表示最多可执行listeners的数量,默认是8
success :表示异步操作正常完成
uncancellable:表示异步操作不可取消,并且尚未完成
cancellation_cause_holder:表示异步操作取消监听,用于cancel操作,
而causeholder 的实例对象是用来表示异步操作异常结束,同时保存异常信息:

1 private static final class causeholder {
2     final throwable cause;
3     causeholder(throwable cause) {
4         this.cause = cause;
5     }
6 }


result_updater:是一个原子更新器,通过cas操作,原子化更新 defaultpromise对象的名为result的成员,这个result成员是其异步操作判断的关键所在

defaultpromise的成员及构造方法定义:

 1 public class defaultpromise<v> extends abstractfuture<v> implements promise<v> {
 2     private volatile object result;
 3     private final eventexecutor executor;
 4     private object listeners;
 5     private short waiters;
 6     private boolean notifyinglisteners;
 7     
 8     public defaultpromise(eventexecutor executor) {
 9         this.executor = checknotnull(executor, "executor");
10     }
11 }

result:就是前面说的,判断异步操作状态的关键
result的取值有:success 、uncancellable、causeholder以及null (其实还可以是泛型v类型的任意对象,这里暂不考虑)
executor:就是channel绑定的nioeventloop,在我之前的博客说过,channel的异步操作都是在nioeventloop的线程中完成的([netty中nioeventloopgroup的创建源码分析](https://blog.csdn.net/z_chenchen/article/details/90567863))
listeners:通过一个object保存所有对异步操作的监听,用于异步操作的回调
waiters:记录阻塞中的listeners的数量
notifyinglisteners:是否需要唤醒的标志

首先来看isdone方法,通过之前的图可以知道,
isdone为false对应了uncompleted状态,即异步操作尚未完成;
isdone为true则代表了异步操作完成,但是还是有三种完成情况,需要结合别的判断方法才能具体知道是哪种情况;

isdone方法:

1 @override
2 public boolean isdone() {
3     return isdone0(result);
4 }

调用isdone0:

1 private static boolean isdone0(object result) {
2     return result != null && result != uncancellable;
3 }

有如下几种情况:
result等于null,result没有赋值,表示异步操作尚未完成(从这里就能想到异步操作完成,需要调用某个set方法来改变result的状态)
result是uncancellable状态,表示执行中的异步操作不可取消,当然也就是异步操作尚未完成
result不等于null,且不等于uncancellable,就表示异步操作完成(包括正常完成,以及异常结束,需要由cause方法进一步判断)

issuccess方法:

1 @override
2 public boolean issuccess() {
3     object result = this.result;
4     return result != null && result != uncancellable && !(result instanceof causeholder);
5 }

由这里可以知道当且仅当result 为success状态时,才返回true(其余除uncancellable和null的值其实也可以,这里暂不考虑)

iscancelled方法:

1 @override
2 public boolean iscancelled() {
3     return iscancelled0(result);
4 }

调用iscancelled0方法:

1 private static boolean iscancelled0(object result) {
2     return result instanceof causeholder && ((causeholder) result).cause instanceof cancellationexception;
3 }

只有当result是cancellationexception的实例时,表示取消异步操作

 

接着来看cause方法:

1 @override
2 public throwable cause() {
3     object result = this.result;
4     return (result instanceof causeholder) ? ((causeholder) result).cause : null;
5 }

和上面同理,通过判别resul是否是causeholder的实现类,若是,将causeholder保存的异常返回。

几种状态的判别说完了,下面看一下如何设置这几种状态的:
setsuccess方法:

1 @override
2 public promise<v> setsuccess(v result) {
3     if (setsuccess0(result)) {
4         notifylisteners();
5         return this;
6     }
7     throw new illegalstateexception("complete already: " + this);
8 }

首先调用setsuccess0方法,其中result的泛型通过defaultchannelpromise可知是void,在defaultchannelpromise中所有的set和try操作参数都是null,这里的result也就不去考虑:

1 private boolean setsuccess0(v result) {
2     return setvalue0(result == null ? success : result);
3 }

继续调用setvalue0方法:

1 private boolean setvalue0(object objresult) {
2     if (result_updater.compareandset(this, null, objresult) ||
3         result_updater.compareandset(this, uncancellable, objresult)) {
4         checknotifywaiters();
5         return true;
6     }
7     return false;
8 }

通过cas操作,将result状态变为success

其中checknotifywaiters方法:

1 private synchronized void checknotifywaiters() {
2     if (waiters > 0) {
3         notifyall();
4     }
5 }

检查waiters的个数,唤醒所有阻塞中的this,sync方法会引起阻塞

 

回到setsuccess方法中,setsuccess0通过cas操作,将result状态更新为success成功后,调用
notifylisteners方法,唤醒所有listener完成对异步操作的回调

listeners是通过addlistener方法添加的,用来对异步操作进行侦听:
看到addlistener方法:

 1 @override
 2 public promise<v> addlistener(genericfuturelistener<? extends future<? super v>> listener) {
 3     checknotnull(listener, "listener");
 4     
 5     synchronized (this) {
 6     addlistener0(listener);
 7     }
 8     
 9     if (isdone()) {
10     notifylisteners();
11     }
12     
13     return this;
14 }
15 
16 @override
17 public promise<v> addlisteners(genericfuturelistener<? extends future<? super v>>... listeners) {
18     checknotnull(listeners, "listeners");
19 
20     synchronized (this) {
21         for (genericfuturelistener<? extends future<? super v>> listener : listeners) {
22             if (listener == null) {
23                 break;
24             }
25             addlistener0(listener);
26         }
27     }
28 
29     if (isdone()) {
30         notifylisteners();
31     }
32 
33     return this;
34 }

其中genericfuturelistener接口定义如下:

1 public interface genericfuturelistener<f extends future<?>> extends eventlistener {
2      /**
3      * invoked when the operation associated with the {@link future} has been completed.
4      *
5      * @param future  the source {@link future} which called this callback
6      */
7     void operationcomplete(f future) throws exception;
8 }

可以看到listener其实就是通过operationcomplete方法,来完成回调,对future对象进行处理,由注释可知operationcomplete方法是在future操作完成时调用

addlisteners方法的实现比较简单,实现核心是在addlistener0中:

1 private void addlistener0(genericfuturelistener<? extends future<? super v>> listener) {
2     if (listeners == null) {
3         listeners = listener;
4     } else if (listeners instanceof defaultfuturelisteners) {
5         ((defaultfuturelisteners) listeners).add(listener);
6     } else {
7         listeners = new defaultfuturelisteners((genericfuturelistener<?>) listeners, listener);
8     }
9 }

其中defaultfuturelisteners是将genericfuturelistener对象封装的一个数组:

 1 final class defaultfuturelisteners {
 2 
 3     private genericfuturelistener<? extends future<?>>[] listeners;
 4     private int size;
 5     private int progressivesize;
 6 
 7     @suppresswarnings("unchecked")
 8     defaultfuturelisteners(
 9             genericfuturelistener<? extends future<?>> first, genericfuturelistener<? extends future<?>> second) {
10         listeners = new genericfuturelistener[2];
11         listeners[0] = first;
12         listeners[1] = second;
13         size = 2;
14         if (first instanceof genericprogressivefuturelistener) {
15             progressivesize ++;
16         }
17         if (second instanceof genericprogressivefuturelistener) {
18             progressivesize ++;
19         }
20     }
21 
22     public void add(genericfuturelistener<? extends future<?>> l) {
23         genericfuturelistener<? extends future<?>>[] listeners = this.listeners;
24         final int size = this.size;
25         if (size == listeners.length) {
26             this.listeners = listeners = arrays.copyof(listeners, size << 1);
27         }
28         listeners[size] = l;
29         this.size = size + 1;
30 
31         if (l instanceof genericprogressivefuturelistener) {
32             progressivesize ++;
33         }
34     }
35 
36     public void remove(genericfuturelistener<? extends future<?>> l) {
37         final genericfuturelistener<? extends future<?>>[] listeners = this.listeners;
38         int size = this.size;
39         for (int i = 0; i < size; i ++) {
40             if (listeners[i] == l) {
41                 int listenerstomove = size - i - 1;
42                 if (listenerstomove > 0) {
43                     system.arraycopy(listeners, i + 1, listeners, i, listenerstomove);
44                 }
45                 listeners[-- size] = null;
46                 this.size = size;
47 
48                 if (l instanceof genericprogressivefuturelistener) {
49                     progressivesize --;
50                 }
51                 return;
52             }
53         }
54     }
55 
56     public genericfuturelistener<? extends future<?>>[] listeners() {
57         return listeners;
58     }
59 
60     public int size() {
61         return size;
62     }
63 
64     public int progressivesize() {
65         return progressivesize;
66     }
67 }

size:记录listeners的个数
progressivesize:记录genericprogressivefuturelistener类型的listeners的个数
defaultfuturelisteners 中对数组的操作比较简单,
add方法,当size达到数组长度时,进行二倍扩容,

其中genericprogressivefuturelistener继承自genericfuturelistener:

 1 public interface genericprogressivefuturelistener<f extends progressivefuture<?>> extends genericfuturelistener<f> {
 2     /**
 3      * invoked when the operation has progressed.
 4      *
 5      * @param progress the progress of the operation so far (cumulative)
 6      * @param total the number that signifies the end of the operation when {@code progress} reaches at it.
 7      *              {@code -1} if the end of operation is unknown.
 8      */
 9     void operationprogressed(f future, long progress, long total) throws exception;
10 }

由注释可知operationprogressed是在future操作进行时调用,这里不对genericprogressivefuturelistener过多讨论

回到addlistener0方法,由defaultfuturelisteners就可以知道,实际上通过defaultfuturelisteners管理的一维数组来保存listeners

在addlistener方法完成对listener的添加后,还会调用isdone方法检查当前异步操作是否完成,若是完成需要调用notifylisteners,直接唤醒所有listeners完后对异步操作的回调

有add就有remove,removelistener方法:

 1 @override
 2 public promise<v> removelistener(final genericfuturelistener<? extends future<? super v>> listener) {
 3     checknotnull(listener, "listener");
 4 
 5     synchronized (this) {
 6         removelistener0(listener);
 7     }
 8 
 9     return this;
10 }
11 
12 @override
13 public promise<v> removelisteners(final genericfuturelistener<? extends future<? super v>>... listeners) {
14     checknotnull(listeners, "listeners");
15 
16     synchronized (this) {
17         for (genericfuturelistener<? extends future<? super v>> listener : listeners) {
18             if (listener == null) {
19                 break;
20             }
21             removelistener0(listener);
22         }
23     }
24 
25     return this;
26 }

还是由removelistener0来实现:

1 private void removelistener0(genericfuturelistener<? extends future<? super v>> listener) {
2     if (listeners instanceof defaultfuturelisteners) {
3         ((defaultfuturelisteners) listeners).remove(listener);
4     } else if (listeners == listener) {
5         listeners = null;
6     }
7 }

看过之前的内容在看这里就比较简单了,通过defaultfuturelisteners去删除

notifylisteners方法:

 1 private void notifylisteners() {
 2     eventexecutor executor = executor();
 3     if (executor.ineventloop()) {
 4         final internalthreadlocalmap threadlocals = internalthreadlocalmap.get();
 5         final int stackdepth = threadlocals.futurelistenerstackdepth();
 6         if (stackdepth < max_listener_stack_depth) {
 7             threadlocals.setfuturelistenerstackdepth(stackdepth + 1);
 8             try {
 9                 notifylistenersnow();
10             } finally {
11                 threadlocals.setfuturelistenerstackdepth(stackdepth);
12             }
13             return;
14         }
15     }
16 
17     safeexecute(executor, new runnable() {
18         @override
19         public void run() {
20             notifylistenersnow();
21         }
22     });
23 }

其中executor方法:

1 protected eventexecutor executor() {
2     return executor;
3 }

用来获取executor轮询线程对象

判断executor是否处于轮询,否则需要通过safeexecute方法处理listeners的侦听,
safeexecute方法:

1 private static void safeexecute(eventexecutor executor, runnable task) {
2     try {
3         executor.execute(task);
4     } catch (throwable t) {
5         rejectedexecutionlogger.error("failed to submit a listener notification task. event loop shut down?", t);
6     }
7 }

这里保证了listeners的侦听回调是异步执行

internalthreadlocalmap在我之前的博客中说过,是netty使用的threadlocal (netty中fastthreadlocal源码分析

去线程本地变量中找futurelistenerstackdepth(默认为0),判断stackdepth是否小于max_listener_stack_depth,否则也要通过safeexecute方法处理listeners的侦听
核心都是调用notifylistenersnow方法:

 1 private void notifylistenersnow() {
 2     object listeners;
 3     synchronized (this) {
 4         // only proceed if there are listeners to notify and we are not already notifying listeners.
 5         if (notifyinglisteners || this.listeners == null) {
 6             return;
 7         }
 8         notifyinglisteners = true;
 9         listeners = this.listeners;
10         this.listeners = null;
11     }
12     for (;;) {
13         if (listeners instanceof defaultfuturelisteners) {
14             notifylisteners0((defaultfuturelisteners) listeners);
15         } else {
16             notifylistener0(this, (genericfuturelistener<?>) listeners);
17         }
18         synchronized (this) {
19             if (this.listeners == null) {
20                 // nothing can throw from within this method, so setting notifyinglisteners back to false does not
21                 // need to be in a finally block.
22                 notifyinglisteners = false;
23                 return;
24             }
25             listeners = this.listeners;
26             this.listeners = null;
27         }
28     }
29 }

先检查是否需要监听,满足条件后,判断listeners是否是defaultfuturelisteners,即包装后的数组
notifylisteners0方法:

1 private void notifylisteners0(defaultfuturelisteners listeners) {
2    genericfuturelistener<?>[] a = listeners.listeners();
3    int size = listeners.size();
4    for (int i = 0; i < size; i ++) {
5        notifylistener0(this, a[i]);
6    }
7 }

遍历这个数组,实则调用notifylistener0方法:

1 private static void notifylistener0(future future, genericfuturelistener l) {
2     try {
3         l.operationcomplete(future);
4     } catch (throwable t) {
5         if (logger.iswarnenabled()) {
6             logger.warn("an exception was thrown by " + l.getclass().getname() + ".operationcomplete()", t);
7         }
8     }
9 }

这里就可以看到,完成了对operationcomplete的回调,处理future

 

setsuccess结束,再来看trysuccess方法:

1 @override
2 public boolean trysuccess(v result) {
3     if (setsuccess0(result)) {
4         notifylisteners();
5         return true;
6     }
7     return false;
8 }

对比setsuccess来看,只有返回值不一样

setfailure方法:

 1 @override
 2 public promise<v> setfailure(throwable cause) {
 3     if (setfailure0(cause)) {
 4         notifylisteners();
 5         return this;
 6     }
 7     throw new illegalstateexception("complete already: " + this, cause);
 8 }
 9 
10 private boolean setfailure0(throwable cause) {
11     return setvalue0(new causeholder(checknotnull(cause, "cause")));
12 }
13 
14 private boolean setvalue0(object objresult) {
15     if (result_updater.compareandset(this, null, objresult) ||
16         result_updater.compareandset(this, uncancellable, objresult)) {
17         checknotifywaiters();
18         return true;
19     }
20     return false;
21 }

和setsuccess逻辑一样,只不过cas操作将状态变为了causeholder对象,成功后唤醒listeners对异步操作的回调

tryfailure方法:

1 @override
2 public boolean tryfailure(throwable cause) {
3     if (setfailure0(cause)) {
4         notifylisteners();
5         return true;
6     }
7     return false;
8 }

也都是一个逻辑

还有一个setuncancellable方法:

1 @override
2 public boolean setuncancellable() {
3     if (result_updater.compareandset(this, null, uncancellable)) {
4         return true;
5     }
6     object result = this.result;
7     return !isdone0(result) || !iscancelled0(result);
8 }

若是result状态为null,异步操作尚未结束,直接通过cas操作将状态变为uncancellable
否则若是根据状态来判断


下来看到cancel方法:

 1 /**
 2  * {@inheritdoc}
 3  *
 4  * @param mayinterruptifrunning this value has no effect in this implementation.
 5  */
 6 @override
 7 public boolean cancel(boolean mayinterruptifrunning) {
 8     if (result_updater.compareandset(this, null, cancellation_cause_holder)) {
 9         checknotifywaiters();
10         notifylisteners();
11         return true;
12     }
13     return false;
14 }

mayinterruptifrunning正如注释中所说,在这里没有什么作用
还是通过cas操作,将状态变为cancellation_cause_holder,调用checknotifywaiters唤醒因sync阻塞的线程,notifylisteners方法回调listeners的侦听


最后看到sync方法:

1 @override
2 public promise<v> sync() throws interruptedexception {
3     await();
4     rethrowiffailed();
5     return this;
6 }

先调用await方法:

 1 @override
 2 public promise<v> await() throws interruptedexception {
 3     if (isdone()) {
 4         return this;
 5     }
 6 
 7     if (thread.interrupted()) {
 8         throw new interruptedexception(tostring());
 9     }
10 
11     checkdeadlock();
12 
13     synchronized (this) {
14         while (!isdone()) {
15             incwaiters();
16             try {
17                 wait();
18             } finally {
19                 decwaiters();
20             }
21         }
22     }
23     return this;
24 }

先判断能否执行(异步操作尚未结束,当前线程没有被中断),然后调用checkdeadlock方法:

1 protected void checkdeadlock() {
2     eventexecutor e = executor();
3     if (e != null && e.ineventloop()) {
4         throw new blockingoperationexception(tostring());
5     }
6 }

检查轮询线程是否在工作

在synchronized块中以自身为锁,自旋等待异步操作的完成,若是没完成,调用incwaiters方法:

1 private void incwaiters() {
2     if (waiters == short.max_value) {
3         throw new illegalstateexception("too many waiters: " + this);
4     }
5     ++waiters;
6 }

在小于short.max_value的情况下,对waiters自增,
然后使用wait将自身阻塞,等待被唤醒
所以在之前setvalue0时,checknotifywaiters操作会notifyall,
由此可以知道sync方法的作用:在某一线程中调用sync方法会使得当前线程被阻塞,只有当异步操作执完毕,通过上面的set方法改变状态后,才会调用checknotifywaiters方法唤醒当前线程。

当从阻塞中被唤醒后调用decwaiters方法:

1 private void decwaiters() {
2     --waiters;
3 }

使得waiters自减
通过这样一种自旋方式,一直等到isdone成立,结束自旋,进而结束await方法,然后调用rethrowiffailed方法:

1 private void rethrowiffailed() {
2     throwable cause = cause();
3     if (cause == null) {
4         return;
5     }
6 
7     platformdependent.throwexception(cause);
8 }

根据异步操作是否有异常,进而使用platformdependent抛出异常。


至此netty中的channelfuture和channelpromise分析到此全部结束。