欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

《CLR Via C#》读书笔记:27.计算限制的异步操作

程序员文章站 2022-07-01 23:05:06
一、CLR 线程池基础 一般来说如果计算机的 CPU 利用率没有 100% ,那么说明很多进程的部分线程没有运行。可能在等待 文件/网络/数据库等设备读取或者写入数据,又可能是等待按键、鼠标移动等事件。 执行 I/O 限制的操作时,操作系统通过设备驱动程序通知硬件干活,而 CPU 处于一种空闲状态。 ......

一、clr 线程池基础

一般来说如果计算机的 cpu 利用率没有 100% ,那么说明很多进程的部分线程没有运行。可能在等待 文件/网络/数据库等设备读取或者写入数据,又可能是等待按键、鼠标移动等事件。

执行 i/o 限制的操作时,操作系统通过设备驱动程序通知硬件干活,而 cpu 处于一种空闲状态。而在现代应用程序当中,使用线程池来执行计算限制的操作,而不是手动创建线程。

每个 clr 都有自己独立的线程池,并且由各自 clr 控制的所有 appdomain 所共享。

线程池本身维护了一个请求队列,当程序需要执行一个异步操作的时候,会将一个记录项追加到队列之中,然后由线程池将该记录项分派给线程池线程,如果没有线程则创建一个新的线程。线程任务处理完整之后,将该线程放入线程池中等待以后进行复用。

线程池本身是启发式的,结合程序负载,他会自己根据当前线程池内线程的状态销毁/新增线程。

二、执行简单的计算限制操作

通过 threadpool 静态类,我们可以方便地使用线程池中的线程为我们执行一些计算限制的异步操作。只需要调用 threadpoolqueueuserworkitem(waitcallback callback) 方法,或者是他的另一个重载方法,接收一个 state 值作为参数。

他的两个方法都是非阻塞的,调用之后会立即返回。

waitcallback 的方法签名如下:

delegate void waitcallback(object state);

在 clr 的线程池中,将 callback 委托作为工作项添加到队列当中,然后由线程池分发线程进行处理。

【注意】

一旦回调方法抛出了未处理的异常,clr 会立即终止进程。

三、执行上下文

每个线程都有一个执行上下文的数据结构,包含由安全设置,宿主设置和逻辑调用上下文数据(asynclocal 与 callcontext)。

当在某个线程(例如主线程)使用了另外一个线程(辅助线程),就会产生执行上下文由调用者线程流向被调用线程。这会对性能造成一定的影响,这是因为执行上下文包含有大量地信息。而如果辅助线程又调用了更多的辅助线程,这个时候执行上下问的复制开销就非常大。

我们可以通过 executioncontext 类控制线程的执行上下文是否流向辅助线程,只有辅助线程不需要访问执行上下文时可以阻止执行上下文流动。当阻止了执行上下文流动时,辅助线程会使用最后一次与其关联的任意执行上下文,这个时候对于安全设置等就不可信,不应执行任何依赖于执行上下文的操作。

一般来说在主线程,可以通过 executioncontext.suppressflow(); 方法阻止执行上下文流动,然后再通过 executioncontext.restoreflow(); 恢复流动。

四、协作式取消和超时

.net 提供了标准的取消操作模式,这个模式是协作式的,也就是你要取消的操作必须显式声明自己可以被取消。这是因为用户在执行某些长耗时的计算限制操作的时候,可能会因为等待时间太长或者其他原因需要取消这个操作。

首先我们通过 system.threading.cancellationtokensource 对象管理或者取消对象状态,使用时直接 new 一个即可,而该对象拥有一个 cancellationtoken 对象。

这个 token 对象用于传递给执行计算限制操作的方法,通过该 token 的 iscancellationrequested 属性你可以在方法内部确认任务是否被取消,如果被取消你就可以进行返回操作。

例子如下:

static void main(string[] args)
{
    var tokensource = new cancellationtokensource();

    threadpool.queueuserworkitem(z => calculate(cancellationtoken.none, 10000));

    console.readkey();
    tokensource.cancel();

    console.readline();
}

private static void calculate(cancellationtoken token, int count)
{
    for (int i = 0; i < count; i++)
    {
        if (token.iscancellationrequested)
        {
            console.writeline("用户提前终止操作,退出线程..");
            break;
        }
        
        console.writeline(count);
        thread.sleep(200);
    }
    
    console.writeline("计数完成.");
}

【注意】

如果你要执行一个不允许被取消的操作,可以为方法传递一个 cancellationtoken.none 对象,因为该对象没有 source 源,则不可能会被调用 cancel() 进行取消。

注册取消事件

cancellationtoken 允许我们通过 register() 方法注册多个委托,这些被注册了的委托会在 tokensource 调用 cancel 取消的时候优先调用,其调用的先后顺序为注册时的顺序。

【注意】

调用 register() 方法的时候,他有两个 bool 类型的参数,分别是 usesynccontextuseexecutioncontext。这两个参数用于指定,是否要用调用线程的同步上下文或者执行上下文来调用回调函数。

同时在注册成功之后会返回一个 cancellationtokenregistration 对象,通过调用该对象的 dispose 方法可以删除已经注册的委托回调,这样在取消时就不会调用该回调。

tokensource 链接

可以通过 cancellationtokensource.createlinkedtokensource() 链接两个或多个对象,链接成功后会返回一个单独的 tokensource 对象。

一旦这个新对象链接的任何一个 tokensource 对象被取消的时候,该对象也会被取消掉。

cancel 的异常处理

在调用 tokensource 的 cancel() 方法时(默认为 false),该方法还有另外一个重载传入 bool 值,如果为 true 的时候,有多个注册的回调委托,一旦某个出现异常直接会被抛出该异常,不会等待其他回调执行完毕。

如果为 false,则会等到所有回调方法执行完成时,抛出一个 aggregateexception 异常,内部的 innerexceptions 包含有所有在执行过程中产生的异常信息集合。

超时取消

除了直接调用 cancel() 立即取消操作之外,还有一个延迟取消的方法 cancelafter(),通过传递具体的延迟时间,我们可以在指定的之间之后取消某个任务。(ps:有点像 polly 的 timeout )

五、任务

为啥使用任务,虽然通过 threadpool 可以很方便地发起一次计算限制的操作,但是你不知道你的方法啥时候执行完成,也无法在操作完成之后获得返回值。

使用任务执行一个计算限制操作有两种方式,两者也一样的可以传递 cancellationtoken 进行取消操作:

  1. new task(sum,20).start();
  2. task.run(()=>sum(20));

除此之外还可以在构造 task 时 传递一些标志位,用于任务调度器进行一些特殊处理。

等待任务完成并获取结果

任务除了标准的无返回值的 task 类型之外,还有一个包含有泛型参数的 task<tresult> 类型,其中 tresult 参数就是任务的返回值类型。

在创建好 task<tresult> 对象之后,可以通过 task.wait() 等待任务执行完成,task 的 wait() 方法会阻塞调用者线程直到任务执行完成。执行完成之后,可以通过 task.reuslt 获取任务执行之后的返回值。

ps:

这里获取 result 属性值时,其内部也会调用 wait() 方法。

如果该 task 内的计算限制操作抛出了未经处理的异常,这个异常会被吞噬掉,调用 wait() 方法或者使用 result 属性的时候,这些异常信息会被包裹在 aggregateexception 内部并返回给调用者线程。

【注意】

不推荐直接调用 wait() ,如果 task 已经开始执行,该方法会阻塞调用者线程,直到执行完成。第二种情况是任务还没有开始执行的时候,调用者线程不会被阻塞,task 立即执行并返回。而调度器可能会使用调用者线程来执行这些操作,这个时候,如果调用者线程获取了一个线程同步锁,而 task 因为也是在调用者线程执行,尝试获取锁的时候,就会产生死锁。

aggregateexception 可能会包含有多个异常,这个时候可以使用该对象的 handle(func<exception,bool> predicate) 方法来为每一个异常进行处理,处理返回 true,未处理返回 false。

在调用了 handle 方法之后,仍然有异常没有处理,这些没有处理的异常又会造成一个新的 aggregateexception 被抛出。

【注意】

如果不知道有哪些 task 内部未处理的异常,可以通过象任务调度器的 unobservedtaskexception 事件登记一个回调方法,如果存在一个没有处理到的异常,则会调用你的回调方法,并将异常传递给你。

除了 task.wait() 方法,还有等待一组任务的 task.waitany()task.waitall()。几个方法都会阻塞调用者线程,前者当传递的一组任务有任意一个完成则立即返回该任务的索引,后者则是要等待这一组任务全部完成之后才会唤醒调用线程。

这两个方法一旦被取消,都会抛出 operationcanceledexception 异常。

取消任务

可以通过一个 cancellationtokensource 来取消 task,一样的需要传入的计算限制方法添加 cancellationtoken 参数。

只不过呢,在 task 任务内部我们不通过 iscancellationrequested 来判断任务是否取消,而是通过调用 token 的 throwifcancellationrequested() 方法来抛出异常。

该方法会判断当前任务是否被取消,如果被取消了,则抛出异常。这是因为与直接通过线程池添加任务不同,线程池无法知道任务何时完成,而任务则可以表示是否完成,并且还能返回值。

任务完成时启动新任务

之前说过通过调用 task.wait() 或者在任务尚未完成的时候调用 task.result 属性,都会造成线程池创建新的线程。而我们可以通过在任务完成之后,立即开启一个新的任务,这样我们就可以通过新的任务知道前一个任务是否已经完成了。

创建一个的计算限制任务对象,我们在启动了该任务对象之后,调用 task.continuewith() 方法来创建一个延续任务,新的延续性任务会有一个 task 参数,该参数就是最开始的任务。

而在使用 task.continuewith() 时,他还可以传递一个标识位。这个标识位用于表明这个延续性任务是在第一个任务什么情况下才会执行,一般有三种:onlyoncanceled(第一个任务取消时才被执行)、onlyonfaulted(第一个任务抛出未处理异常时执行)、onlyonrantocompletion(第一个任务顺利完成时执行)。

启动子任务

一个任务在其内部可以创建其子任务,只需要在内部构造 task 对象的时候,传递一个标识位 taskcreationoptions.attachedtoparent 将其与父任务关联。这样的话,除非其所有子任务执行完成,父任务不会被认为已经完成。

延续性任务也可以作为第一个任务的子任务,指定对应的标识位即可。

任务的内部构造

任务主要由以下几部分构成:

  1. 任务唯一的 task id。
  2. 调度器的引用。
  3. 回调方法的引用。
  4. 执行上下文的引用。
  5. 其他...

可以看到构造一个 task 还是需要比较大的开销的,如果你不需要 task 的附加特性,完全可以使用 taskpool.queueuserworkitem 来获得更好的性能与效率。

通过 task 的只读属性 task.status,我们可以知道任务目前处于哪种状态,其最终的状态主要有 3 种,分别是:rantocompletion(已完成)、canceled(被取消)、faulted(出现异常失败),这三种状态都属于任务完成状态。

另外值得注意的是,通过 continuewith()continuewhenall()continuewhenany() 等方法创建的任务状态都为 waitingforactivation,这个状态代表任务会自动开始。

任务工厂

如果你需要在执行多个相同配置的 task 对象,可以通过 taskfactorytaskfactory<tresult>,其大概含义与 task 的含义相同。

在创建工厂时,可以传递一些常用的配置标识位和 cancellationtoken 对象,之后我们可以通过 startnew() 方法来统一执行一堆任务。

任务调度器

任务调度器一般有两种,第一种是线程池任务调度器,一般用于服务端程序。还有一种是同步上下文任务调度器,一般用于 gui 程序。

六、parallel 的 for、foreach、invoke

for 与 foreach 基本用于操作一个集合,然后循环处理其值。而如果在某个方法内部需要执行多个方法,则可以通过 invoke 来进行执行。使用 parallel 类可以让 cpu 最大化地利用起来而不会阻塞主线程。

不过一般不会将所有 for 与 foreach 都替换为并行化的查询,这是因为某些循环会修改共享数据,这个时候使用 parallel 的操作则会破坏数据,虽然可以通过增加线程同步锁来解决,不过这样会造成单线程访问,无法享受并行处理的好处。

同时 parallel 方法本身也会有开销,当然在大量重复性任务中这种开销可以忽略不计,但是如果仅为几十个短耗时的计算限制任务启用 parallel 就会得不偿失。

这三种操作都接受一个paralleloptions 对象用于配置最大并行的工作项数目与调度器。

parallel 的 for 与 foreach 的一个重载方法允许传入 3 个委托,他们分别是:

  • 任务局部初始化委托(localinit):该委托是在每次工作项处理之前被调用。
  • 任务主体委托(body):具体的工作项处理逻辑,参与工作的各个线程都会调用一次。
  • 任务局部终结器委托(localfinally):本委托是在每个工作项处理完成之后都会被调用。

从上述逻辑来看,可以看作局部初始化委托为一个父任务,后面两个为子级连续任务的构造。

实例:

static void main(string[] args)
{
    var files = new list<string>();
    files.addrange(directory.getfiles(@"e:\program files","*.*",searchoption.alldirectories));
    files.addrange(directory.getfiles(@"e:\program files (x86)","*.*",searchoption.alldirectories));
    files.addrange(directory.getfiles(@"e:\project","*.*",searchoption.alldirectories));
    files.addrange(directory.getfiles(@"e:\cache","*.*",searchoption.alldirectories));
    files.addrange(directory.getfiles(@"e:\windows kits","*.*",searchoption.alldirectories));
    files.addrange(directory.getfiles(@"c:\program files\dotnet","*.*",searchoption.alldirectories));
    
    console.writeline($"总文件数量:{files.count}");
    long allfilecount = 0;

    var watch = new stopwatch();
    watch.start();
    parallel.foreach<string, long>(files,
        localinit: () =>
        {
            // 初始化文件大小为 0,
            // 这里的参数类型取决于任务返回的参数
            return 0;
        },
        body: (filename, parallelstatus, index, filecount) =>
        {
            // 计算文件大小并返回
            long count = 0;
            try
            {
                var info = new fileinfo(filename);
                count = info.length;
            }
            catch (exception e)
            {
            }
            
            // 这里因为该任务会被线程池复用,所以要进行累加
            return count + filecount;
        },
        localfinally: filecount => { interlocked.add(ref allfilecount, filecount); }
    );
    
    watch.stop();
    console.writeline($"并行效率:{watch.elapsedmilliseconds} ms");
    console.writeline($"文件总大小:{allfilecount / 1024 / 1024 / 1024} gb");


    allfilecount = 0;
    watch.reset();
    
    watch.start();
    foreach (var file in files)
    {
        long count = 0;
        
        try
        {
            var info = new fileinfo(file);
            count = info.length;
        }
        catch (exception e)
        {
        }
        
        allfilecount+=count;
    }
    
    watch.stop();
    console.writeline($"单线程效率:{watch.elapsedmilliseconds} ms");
    console.writeline($"文件总大小:{allfilecount / 1024 / 1024 / 1024} gb");


    console.readline();
}

性能提升:

《CLR Via C#》读书笔记:27.计算限制的异步操作

通过 parallel 的 foreach 与普通的 foreach 遍历计算,性能总体提升了约 56%,越耗时的操作提升的效果就越明显。

在 body 的主体委托当中,传入了一个 parallelloopstate 对象,该对象用于每个线程与其他任务进行交互。主要有两个方法 stop()break(),前者用于停止循环,后者用于跳出循环,并且跳出循环之后,其 lowestbreakiteration 会返回调用过 break() 方法的最低项。

并且 parallel 还会返回一个 parallelloopresult 对象,该通过该对象我们可以得知这些循环是否正常完成。

七、并行语言集成查询 plinq

linq 默认查询的方式是一个线程顺序处理数据集合中的所有项,称之为顺序查询。而 plinq 就是将这些操作分散到各个 cpu 并行执行,通过 asparallel() 扩展方法可以将 ienumerable<tsource> 转换为 parallelquery<tsource>

而从并行操作切换回顺序操作,只需要调用 parallelenumableassequential() 方法即可。

经过 plinq 处理后的数据项其结果是无序的,如果需要有序结果,可以调用 asordered() 方法。但是该方法比较损耗性能,一般不推荐使用,如果需要排序应该直接使用与 linq 同名的 plinq 扩展方法。

plinq 一般会自己分析使用最好的查询方式进行查询,有时候使用顺序查询性能更好。

  • withcancellation() :允许取消某个 plinq 查询。
  • withexecutionmode():允许配置 plinq 执行模式,是否强制并行查询。
  • withmergeoptions():允许配置结果的缓冲与合并方式。
  • withdegreeofparallelism():允许配置查询的最大并行数。

ps:

不建议在多线程环镜中使用 console.write() 进行输出,因为 console 类内部会对线程进行同步,确保只有一个线程可以访问控制台窗口,这样会损害性能。

八、定时计算限制操作

通过 clr 提供的 timer 定时器,我们可以传入一个回调方法。这样的话计时器会可以根据传入的周期,来定时将我们的回调方法通过线程池线程进行调用。

同时计时器还允许传入一个 duetime 参数来指定这个计时器首次调用回调方法时需要等待多久(立即执行可以传入 0),而 period 可以指定 timer 调用回调方法的周期。

【原理】

在线程池内部所有的 timer 对象只使用了一个线程,当某个 timer 到期的时候,这个线程就会被唤醒。该线程通过 threadpool.queueuserworkitem() 方法将一个工作项添加到线程池队列,这样你的回调方法就会得到执行。

【注意】

如果回调方法执行的时常超过了你设置的周期时常,这样会造成多个线程都在执行你的回调。因为 timer 不知道你的回调执行完成没有,他只会到期执行你的回调方法。

解决措施是构造一个 timer 的时候,为 period 指定一个 timeout.infinite 常量,这样计时器只会触发一次。之后在你的回调方法执行完成之后,在其内部通过 timer.change() 方法指定一个执行周期,并且设置其 duetime 为立即执行。

这样做了之后,你的 timer 就会确保你的回调被执行完成之后再开始下一个周期。

这一点可以参考 abp 实现的 abptimer 对象。

九、线程池如何管理线程

clr 允许开发人员设置线程池最大工作者线程数,但是一般不要轻易设置该值,但你可以通过 threadpool.getmaxthreads()threadpool.getminthreads()getavailablethreads() 方法来获取一些相关信息。

通过 threadpool.queueuserworkitem() 方法和 timer 类处理的工作项总是存储到 clr 线程池的 全局队列 中。工作者线程采用一个 fifo 算法将工作项从 全局队列 取出,因为所有工作者线程都有可能去这个队列拿去工作项,这个时候会使用 线程同步锁 以确保工作项只会被工作者线程处理一次。这可能会造成性能瓶颈,对伸缩性和性能会造成某些限制。

默认的任务调度器中,非工作者线程调度 task 时都是存放在全局队列,而工作者线程调度 task 则是存放在他自己的本地队列。

工作者线程处理 task 的步骤:

  • 首先从本地队列采用 lifo 算法取得一个 task 进行处理。
  • 如果本地队列没有 task,则从其他的工作者线程本地队列拿一个 task 自己来处理。(会使用线程同步锁)
  • 所有本地队列都为空,则工作者线程会使用 fifo 算法去全局队列拿一个 task 进行处理。
  • 如果全局队列为空,则线程处于休眠状态,时间过长则销毁自身。

ps:

结合上下文,说明工作项首先被添加到了全局队列,然后由工作者线程取到自己的本地队列进行处理。

线程池会动态地根据工作项的多少动态地调整工作者线程的数量,一般不需要开发人员进行管控。