欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#8.0: 在 LINQ 中支持异步的 IAsyncEnumerable

程序员文章站 2023-08-29 17:26:04
C# 8.0中,提供了一种新的IAsyncEnumerable接口,在对集合进行迭代时,支持异步操作。比如在读取文本中的多行字符串时,如果读取每行字符串的时候使用同步方法,那么会导致线程堵塞。IAsyncEnumerable可以解决这种情况,在迭代的时候支持使用异步方法。也就是说,之前我 ......

c# 8.0中,提供了一种新的iasyncenumerable<t>接口,在对集合进行迭代时,支持异步操作。比如在读取文本中的多行字符串时,如果读取每行字符串的时候使用同步方法,那么会导致线程堵塞。iasyncenumerable<t>可以解决这种情况,在迭代的时候支持使用异步方法。也就是说,之前我们使用foreach来对ienumerable进行迭代,现在可以使用await foreach来对iasyncenumerable<t>进行迭代,每个项都是可等待的。这种新的接口称为async-streams,将会随.net core 3发布。我们来看一下如何在linq中实现异步的迭代。

使用常规的ienumerable<t>

首先我们创建一个新的console项目,基于.net core 3

namespace asynclinqdemo
{
   class program
  {
       static void main(string[] args)
      {
           console.writeline("input the file path:");
           var file = console.readline();
           var lines = readalllines(file);
           foreach (var line in lines)
          {
               console.writeline(line);
          }
      }

       static ienumerable<string> readalllines(string file)
      {
           using (var fs = file.openread(file))
          {
               using (var sr = new streamreader(fs))
              {
                   while (true)
                  {
                       string line = sr.readline();
                       if(line == null)
                      {
                           break;
                      }
                       yield return line;
                  }
              }
          }
      }
  }
}

 

这是一个很简单的console程序,实现了一个简单的返回类型为ienumerable<string>readalllines(string file)方法,从文本文件中逐行读取文本,并逐行输出。如果文本内容较少的话,没什么问题。但如果我们使用过aync/await,就会了解,在io操作如读取或写入文件的时候,最好使用异步方法以避免线程阻塞。让我们来改进一下。

使用异步的iasyncenumerable<t>

可以优化的是下面这句:

string line = sr.readline();

 

对于io操作,最好使用异步方式。这里可使用相应的异步方法:

string line = await sr.readlineasync();

 

我们说“异步是传染的”,如果这里使用异步,那么相应的该方法的返回值也要使用异步,所以需要将返回值改为static async task<ienumerable<string>>,但这样会得到一个错误:

errorcs1624the body of 'program.readalllines(string)' cannot be an iterator block because 'task<ienumerable<string>>' is not an iterator interface typeasynclinqdemoc:\source\workspaces\console\asynclinqdemo\asynclinqdemo\program.cs23active

 

因为task<ienumerable<string>>并不是一个可以迭代的接口类型,所以我们无法在方法内部使用yield关键字。解决问题的办法是使用新的iasyncenumerable接口:

static async iasyncenumerable<string> readalllines(string file)
{
   using (var fs = file.openread(file))
  {
       using (var sr = new streamreader(fs))
      {
           while (true)
          {
               string line = await sr.readlineasync();
               if(line == null)
              {
                   break;
              }
               yield return line;
          }

      }
  }
}

 

f12查看该接口的定义:

namespace system.collections.generic
{
   public interface iasyncenumerable<out t>
  {
       iasyncenumerator<t> getasyncenumerator(cancellationtokencancellationtoken = default);
  }
}

 

这是一个异步的迭代器,并提供了cancellationtoken。再按f12查看iasyncenumerator<t>的定义,可发现里面是这样的:

namespace system.collections.generic
{
   public interface iasyncenumerator<out t> : iasyncdisposable
  {
       t current { get; }
       valuetask<bool> movenextasync();
  }
}

 

这里movenextasync()方法实际是返回了一个结果类型为booltask,每次迭代都是可等待的,从而实现了迭代器的异步。

使用await foreach消费iasyncenumerable<t>

当我们做了以上改动之后,readalllines()方法返回的是一个支持异步的iasyncenumerable,那么在使用的时候,也不能简单的使用foreach了。修改main方法如下:

static async task main(string[] args)
{
   console.writeline("input the file path:");
   var file = console.readline();
   var lines = readalllines(file);
   await foreach (var line in lines)
  {
       console.writeline(line);
  }
}

 

首先在foreach之前添加await关键字,还要需要将main方法由void改为async task。这样整个程序都是异步执行了,不会再导致堵塞了。这个例子只是一个简单的demo,是否使用异步并不会感觉到明显的区别。如果在迭代内部需要比较重的操作,如从网络获取大量数据或读取大量磁盘文件,异步的优势还是会比较明显的。

使用linq消费iasyncenumerable<t>

使用linq来操作集合是常用的功能。如果使用ienumberable,在main方法中可以做如下改动:

var lines = readalllines(file);
var res = from line in lines where line.startswith("error: ") selectline.substring("error: ".length);
foreach (var line in res)
{
   console.writeline(line);
}

 

或:

var res = lines.where(x => x.startswith("error: ")).select(x => x.substring("error: ".length));

 

如果使用了新的iasyncenumerable,你会发现无法使用where等操作符了:

errorcs1936could not find an implementation of the query pattern for source type 'iasyncenumerable<string>'. 'where' not found.asynclinqdemoc:\source\workspaces\console\asynclinqdemo\asynclinqdemo\program.cs16active

 

目前linq还没有提供对iasyncenumerable的原生支持,不过微软提供了一个nuget包来实现此功能。在项目中打开nuget package manger搜索安装system.linq.async,注意该包目前还是预览版,所以要勾选include prerelease才能看到。安装该nuget包后,linq查询语句中的错误就消失了。

system.linq.async这个包中,对每个同步的linq方法都做了相应的扩展。所以基本上代码无需什么改动即可正常编译。

对于linq中的条件语句,也可以使用whereawait()方法来支持await

public static iasyncenumerable<tsource> whereawait<tsource>(thisiasyncenumerable<tsource> source, func<tsource, int, valuetask<bool>>predicate);

 

如需要在条件语句中进行io或网络请求等异步操作,可以这样用:

var res = lines.whereawait(async x => await dosomeheavyoperationsasync(x));

 

dosomeheavyoperationsasync方法的签名如下:

private static valuetask<bool> dosomeheavyoperationsasync(string x)
{
   //do some works...
}

 

小结

通过以上的示例,我们简要了解了如何使用iasyncenumerable接口以及如何在linq中实现异步查询。在使用该接口时,我们需要创建一个自定义方法返回iasyncenumerable<t>来代替ienumberable<t>,这个方法可称为async-iterator方法,需要注意以下几点:

  • 该方法应该被声明为async

  • 返回iasyncenumerable<t>

  • 同时使用awaityield。如await foreachyield returnyield break等。

例如:

async iasyncenumerable<int> getvaluesfromserver()
{
   while (true)
  {
       ienumerable<int> batch = await getnextbatch();
       if (batch == null) yield break;

       foreach (int item in batch)
      {
           yield return item;
      }
  }
}

 

此外还有一些限制:

  • 无法在tryfinally块中使用任何形式的yield语句。

  • 无法在包含任何catch语句的try语句中使用yield return语句。

     

期待.net core 3的正式发布!

 

C#8.0: 在 LINQ 中支持异步的 IAsyncEnumerable

了解新西兰it行业真实码农生活

请长按上方二维码关注“程序员在新西兰”