C#8.0——异步流(AsyncStream)
异步流(asyncstream)
原文地址:
注意:以下内容最好能根据反编译工具查看异步流相关类生成的代码效果最佳
异步流是可枚举类(enumerable)的异步变体,它会在遍历下一个元素的时候(next)会涉及异步操作。只要继承自 iasyncenumerable
首先我们来看下这些在 .netcore3.0 新增的异步流 api 当有异步流时,你可以使用异步形式的 同样,当你有可异步释放的对象时,你就可以通过 使用者也可以自己手动实现这些接口,或者使用编译器的高级特性从使用着(程序员)定义的方法(调用一个异步迭代器方法)自动生成状态机。异步迭代器方法特征如下: 举个例子: 就像在迭代方法里一样,这里有几个限制,在 yield 声明语句出现的附近: 异步 using 与 常规 using 是一样低(lowered)的,只是用 disposeasync() 代替 dispose() 方法。 要注意,基于模式查找 disposeasync 绑定到一个实例方法,它能够在被没有参数下被调用。 拓展方法是不可用在异步流上的,disposeasync 的结果必须是可等待的。 await foreach 就如常规 foreach 一样,只是: 注意,基于模式查找的 异步 foreach 迭代是不允许集合类型为 dynamic 类型,因为没有等价的异步非泛型 ienumerable 接口。 包装器类型能传递非默认值(查看 api
异步迭代器方法被初始化的状态机的启动(kick-off)方法替换。它不会在开始时就运行状态机(不像常规的 async 方法的启动方法)。这个启动方法的方法被标记为 对于可枚举的异步迭代器方法的状态机首先就要实现 状态机运行过程可详见 《async in c#》。 这里也提供我尝试理解翻译的地址 但是对于新的异步内容,这里添加了新的状态: 状态机主要的方法是 value-or-end 状态从 与常规的异步方法的状态机相比,在异步迭代器方法的 这反映在实现中,它将异步方法的降低机制(lowering machinery)拓展如下: 异步迭代器方法的启动方法和状态机的初始化都遵循常规迭代器方法。尤其是 对于被标记为 [enumeratorcancellation] 的参数,getenumerator 通过结合两个可用的 token 初始化: 对于线程 id 的检查,可查看 类似地,启动方法与常规迭代器方法非常相似: 迭代器和异步迭代器方法都需要处理,因为他们执行的步骤都被调用者控制,它可以选择在得到所有的元素之前释放迭代器。例如, 总之,异步迭代器的释放基于四个设计元素: 异步迭代器方法的调用者应该在当一个方法完成或被 到达 从给定的 finally 块来看处理,可以执行块中的代码: a yield return is lowered as: a yield break is lowered as: 当 finally 块内不含 await 表达式,try/catch is lowered as: 当 finally 块内包含 await 表达式时,在异步重写之前被提取(通过 asyncexeptionhandlerrewriter)。 在这种情况下,我们将知道: 这两种情况下,我们将在 finally 逻辑块后添加 可枚举从状态 -2 开始。调用 getasyncenumerator 来设置状态为 -3,或返回一个新的枚举器(状态也为 -3)。 从这里开始,movenext 将会做: 从暂停状态 n 或 -n,movenext 将恢复执行(-1)。但是如果暂停 yield return(-n),你也能调用 disposeasync,它在 dispose mode 中恢复执行(-1)。 当在 dispose mode 下,movenext 会继续暂停(n)和恢复(-1)直到到达方法结束(-2)。 从状态 -1 或 n 调用 disposeasync 的结果未指明。编译器在这种情况下会生成 注意:a namespace system.collections.generic
{
public interface iasyncenumerable<out t>
{
iasyncenumerator<t> getasyncenumerator(cancellationtoken cancellationtoken = default);
}
public interface iasyncenumerator<out t> : iasyncdisposable
{
t current { get; }
valuetask<bool> movenextasync();
}
}
namespace system
{
public interface iasyncdisposable
{
valuetask disposeasync();
}
}
foreach
代码段迭代可枚举的类:await foreach(var item in asyncstream){...}
。await foreach
语句与 foreach
语句是一样的,只是它使用了 iasyncenumerable
代替 enumerable
,在每次迭代都会去执行调用 await movenextasync()
,并且这些枚举器的释放都是异步的。using
语句使用并异步的处理它:await using(var response = asyncdisposable){...}
。await using
跟 平常用的 using
是一样的,就如异步流对象一样,只是用了 iasyncdisposable
代替 idisposable
,disposeasync()
代替方法 dispose()
。
async iasyncenumerable<int> getvaluesfromserver()
{
while (true)
{
ienumerable<int> batch = await getnextbatch();
if (batch == null) yield break;
foreach (int item in batch)
{
yield return item;
}
}
}
await using 语句的详细设计
await foreach 语句的详细设计
getasyncenumerator
,movenextasync
以及 disposeasync
绑定到一个没有参数就能被调用的实例方法。拓展方法无效。movenextasync
以及 disposeasync
同样必须是可等待的。释放清理 await foreach
不包含回掉函数来检查这个接口的实现。.withcancellation(cancellationtoken)
拓展方法),因此允许消费者控制异步流的取消。异步流的生产者也能够使用取消令牌通过在自定义类写 iasyncenumeratore e = ((c)(x)).getasyncenumerator(default);
try
{
while(await e.movenextasync())
{
v v = (v)(t)e.current; -or- (d1 d1, ...) = (v)(t)e.current;
//body
}
}
finally
{
await e.disposeasync();
}
异步迭代器方法的详细设计
asynciteratorstatemachineattribute
。iasyncenumerable<t>
以及 iasyncenumerator<t>
。对于可枚举异步迭代器,它只需要实现 iasyncenumerator<t>
。它与状态机生成异步方法是相似的。它包含生成器(builder)以及等待者(awaiter)字段,用于在后台运行状态机(当 await 在异步迭代器到达的时候)。它也能捕捉参数值(如果有)或者当前对象 this
(如果需要)。
movenext()
。它能被通过 movenextasync()
获得运行,或者从 await
方法中作为一个后台继续等待启动(continuation)。movenextasync
返回。下面提到的都能满足:
manualresetvaluetasksourcecore<bool>
实现(它是可重用的,并且无需再分配的方式来生成和实现 valuetask<bool>
或者 valuetask
实例)。关于这些类的更多细节请访问 movenext()
增加如下逻辑:
yield return
语句,它保存了当前值并实现 结果 true 的状态(promise with result true)yield break
语句,它设置释放模式为 on,并跳转到封闭的(enclosing)finally 或 exit 块。cancellationtokensource
(如果存在)并且实现结果为 false 的状态cancellationtokensource
(如果存在)并且在状态里设置异常信息
visityieldreturnstatement
方法以及 visityieldbreakstatement
方法到 asynciteratormethodtostatemachinerewriter
)asynciteratormethodtostatemachinerewriter
里的 visittrystatement
和 visitextractedfinallyblock
方法)asynciteratorrewriter
方法,它产生其他的成员变量:movenextasync
,currenct
,disposeasync
以及一些支持可重置的 valuetask
行为的成员,也就是 getresult
,setstatus
,oncompleted
)valuetask<bool> movenextasync()
{
if(state == statemachinestates.finishedstatemachine)
{
return default(valuetask<bool>);
}
valueorendpromise.reset();
var inst = this;
builder.start(ref inst);
var version = valueorendpromise.version;
if(valueorendpromise.getstatus(version) == valuetasksourcestatus.succeeded)
{
return new valuetask<bool>(valueorendpromise.getresult(version));
}
return new valuetask<bool>(this,version);//注意,这里用到了状态机的 ivaluetasksource 实现
}
t current => current;
getasyncenumerator()
方法就像 getenumerator()
只是它设置初始化状态为 statemachinestates.notstartedstatemachine (-1):iasyncenumerator<t> getasyncenumerator(cancellationtoken token)
{
{statemachinetype} result;
if(initialthreadid == /*托管线程id*/ && state == statemachinestates.finishedstatemachine)
{
state = initialstate; //-3
disposemode = false;
result = this;
}
else
{
result = new {statemachinetype}(initialstate);
}
/*复制每个参数代理,或者在每个参数在被标记为[enumeratorcancellation]的情况下和 getasyncenumerator 的 token 参数结合使用*/
}
if(this.parameterproxy.equals(default))
{
result.parameter = token;
}
else if(token.equals(this.parameterproxy) || token.equals(default))
{
result.paramter = this.parameterproxy;
}
else
{
result.combinedtokens = cancellationtokensource.createdlinkedtokensource(this.parameterproxy,token);
result.parameter = combinedtokens.token;
}
{
{statemachinetype} result = new {statemachinetype}(statemachinestatus.finishedstatemachine);//-2
/* 保存参数到参数代理中 */
return result;
}
disposal
foreach(...){if(...) break;}
。相比之下,异步方法会持续自动运行直到结束。在调用者角度来说,它们不会在执行过程中挂起(暂停),所以它们不需要被释放。
yield return
(当恢复至 dispose 模式时,跳转到 finally )yield break
(进入 dispose 模式并且跳转到封闭的 finally 块)finally
(finally
后我们会跳转到下一个)disposeasync
(进入 dispose 模式并恢复执行)yield return
挂起时,只调用 disposeasync()
。disposeasync
会在状态机("dispose mode")设置一个标识以及(如果这个方法没有完成)从当前状态恢复执行。状态机能够在被给定的状态恢复执行(甚至那些分配到 try 中的)。在 dispose 模式下恢复执行时,它会直接跳转到 finally
。finally
块只在 await 表达式下可能包含暂停以及恢复。由于在 yield return
上的限制(上面描述的),dispose 模式不会运行进入 yield return
。一旦 finally 块完成,在 dispose 模式中执行跳转到下一个 finally 块,或是在到达了方法的最顶层结尾。yield break
(或是方法结束)会设置 dispose 模式标识以及跳转到 finally 块。当我们返回控制权给调用者时(通过到达方法尾部来完成 promise false)所有的处理都会被完成以及状态机也处于完成状态。所以 disposeasync()
不需要做其他事了。
disposeasync()
(它在 dispose 模式恢复执行并且跳转到 finally 块)_current = expression;
_state = <next_state>;
goto <exprreturntrueable>;// 它执行了 _valueorendpromise.setresult(true);return;
//从 state = <next_state> 恢复,将执行到这个标签
<next_state_label>:;
this.state = cachedstate = notstartedstatemachine;
if(disposemodel)/* 跳转到 finally 或退出*/
disposemodel = true;
/* 跳转到 finally 或退出 */
valuetask iasyncdisposable.disposeasync()
{
if(state >= statemachinestates.notstartedstatemachine /* -1 */)
{
throw new notsupportedexception();
}
if(state == statemachinestates.finishedstatemachine /* -2 */)
{
return default;
}
disposemodel = true;
_valueorendpromise.reset();
var inst = this;
_builder.start(ref inst);
return new valuetask(this,_valueorendpromise.version);//注意,这里用到了状态机的 ivaluetasksource 实现
}
与通常提取 finally 的对比
try
{
...
finallyentrylabel:
}
finally
{
...
}
if(disposemode) /* 跳转 finally 或退出 */
try
{
...
goto finallyentrylabel:
}
catch(exception e)
{
... save exception ...
}
finallyentrylabel:
{
... 从 finally 的原始代码和异常的附加处理
}
if(disposemode) /* 跳转到 finally 或退出 */
状态值和转换
yield break
(状态不变,dispose mode = true)yield return
(-n,从 -4 递减)await
(n,从 0 递增)throw new notsupportedexception
。 disposeasync await
+------------------------+ +------------------------> n
| | | |
v getasyncenumerator | | resuming |
-2 --------------------> -3 --------> -1 <-------------------------+ dispose mode = false
^ | | |
| done and disposed | | yield return |
+-----------------------------------+ +-----------------------> -n
| or exception thrown | |
| | |
| yield | |
| break | disposeasync |
| | +--------------------------+
| | |
| | |
| done and disposed v v suspension (await)
+----------------------------------- -1 ------------------------> n
^ | dispose mode = true
| resuming |
+--------------------------+
yield return
is lowered as 我实在是不知道 is lowerd as 到底是什么意思,单个单词都会,拼在一起怎么读都不顺
上一篇: python 常用的标准库