并行编程(Parallel Framework)
前言
并行编程:通过编码方式利用多核或多处理器称为并行编程,多线程概念的一个子集。
并行处理:把正在执行的大量的任务分割成小块,分配给多个同时运行的线程。多线程的一种。
并行编程分为如下几个结构:
1.并行的linq或plinq
2.parallel类
3.任务并行结构
4.并发集合
5.spinlock和spinwait
这些是.net 4.0引入的功能,一般被称为pfx(parallel framework,并行框架)。
parallel类和任务并行结构称为tpl(task parallel library,任务并行库)。
并行框架(pfx)
1.并行框架基础
当前cpu技术达到瓶颈,而制造商将关注重点转移到提高内核技术上,而标准单线程代码并不会因此而自动提高运行速度。
利用多核提升程序性能通常需要对计算密集型代码进行一些处理:
1.将代码划分成块。
2.通过多线程并行执行这些代码块。
3.结果变为可用后,以线程安全和高性能的方式整合这些结果。
传统多线程结构虽然实现功能,但难度颇高且不方便,特别是划分和整理的步骤(本质问题是:多线程同时使用相同数据时,出于线程安全考虑进行锁定的常用策略会引发大量竞争)。
而并行框架(parallel framework)专门用于在这些应用场景中提供帮助。
2.并行框架组成
pfx:高层由两个数据并行api组成:plinq或parallel类。底层包含任务并行类和一组另外的结构为并行编程提供帮助。
基础并行语言集成查询(plinq)
语言集成查询(language integrated query,linq)提供了一个简捷的语法来查询数据集合。而这种由一个线程顺序处理数据集合的方式我们称为顺序查询(sequential query)。
并行语言集成查询(parallel linq)是linq的并行版。它将顺序查询转换为并行查询,在内部使用任务,将集合中数据项的处理工作分散到多个cpu上,以并发处理多个数据项。
plinq将自动并行化本地的linq查询,system.linq.parallelenumerable类(它定义在system.core.dll中,需要引用system.linq)公开了所有标准linq操作符的并行版本。这些所有方法是依据system.linq.parallelquery<t>扩展而来。
1.linq to plinq
要让linq查询调用并行版本,必须将自己的顺序查询(基于ienumerable或ienumerable<t>)转换成并行查询(基于parallelquery或parallelquery<t>),使用parallelenumerable的asparallel方法实现,如示例:
1 class program 2 { 3 static void main(string[] args) 4 { 5 ienumerable<int> numbers = enumerable.range(1, 1000); 6 parallelquery parallelquery = 7 from n in numbers.asparallel()//转换为并行 8 where n > 3 9 select n; 10 foreach (var item in parallelquery) 11 { 12 console.writeline(item); 13 } 14 console.readkey(); 15 } 16 }
结果如下:使用enumerable.range生成的集合是顺序的,但是经过并行查询后顺序被打乱。
2.plinq to linq
将执行并行查询的操作切换回执行顺序查询(并不常用),通过paralleienumerable的assequential实现。此时操作只由一个线程执行。
1 class program 2 { 3 static void main(string[] args) 4 { 5 ienumerable<int> numbers = enumerable.range(1, 1000); 6 ienumerable<int> enumerable = numbers.asparallel().assequential().where(c => c > 3); 7 foreach (var item in enumerable) 8 { 9 console.writeline(item); 10 } 11 console.readkey(); 12 } 13 }
3.整合结果集(forall)
通常,一个linq查询的结果数据是让某个线程执行一个foreach来处理,此时只有一个线程遍历查询的所有结果,如果希望以并行方式处理查询结果,通过paralleienumerable的forall方法处理查询,如示例:
1 class program 2 { 3 static void main(string[] args) 4 { 5 ienumerable<int> numbers = enumerable.range(1, 1000); 6 (from n in numbers.asparallel() where n > 3 select n).forall((d) => 7 { 8 d = d + 1000; 9 console.writeline(d);//console在此回损害性能,因为内部回对线程进行同步,此处因演示所以暂且一用 10 }); 11 console.readkey(); 12 } 13 }
执行结果如下:
解析plinq
1.plinq执行模型
如图所示:
2.异常处理
plinq的报错将以aggregateexception形式被重抛,其中innerexceeptions属性包含一个或多个真正异常,示例可看 内的异常处理部分。
3.plinq结果的排序
并行化查询当整理结果时不能保持初始化数据的原始次序。如果要保持序列的原始序列,可以通过在asparallel之后调用asordered来强制实现:
1 ienumerable<int> numbers = enumerable.range(1, 10000); 2 var enumerable = numbers.asparallel().where(c => c > 3);
调用asordered时,因为plinq要保持跟踪每个元素的原始位置,会导致性能损失。
调用asunordered,可以在后续的查询中低效asordered产生的副作用,允许查询从调用asunordered时起更高效的执行。
4.plinq存在的局限与限制
1.若要使plinq发挥作用,必须具有一定数量的计算密集型工作可分配给工作者线程。大多数的linq to objects查询执行速度很快,不仅没有必要并行化,而且划分、整理和协调额外线程的开销实际上会降低执行速度。而且查询若调用了非线程安全的方法,plinq的结果有可能不正确。
2.plinq能够并行化的内容还有些限制,以下查询运算符防止查询被并行化,除非源元素位于他们的元素索引位置:take、takewhile、skip和skipwhileselect、selectmany和elementat的索引版本。
3.以下查询运算符是并行化的,但所使用的复杂划分策略有时可能比顺序处理的速度还要低:join、groupby、groupjonin、distinct、union、intersect和except。
5.plinq的结果
和普通linq查询一样,plinq查询也是延迟求值的。意味着执行只在开始使用时触发。但是列举结果集时和普通顺序查询有区别:
顺序查询:完全由使用者从输入序列中“拉取”每个元素。
并行查询:通常使用独立线程来获取序列中的元素,时间上比使用者需要它们时要提前,再通过查询链并行处理元素后将结果保存在一块缓存中,以便使用者按需取用。
注意:过早暂停结果列举,查询处理器也会暂停或结束,目的是不浪费cpu的时间或内存。在调用asparallel之后调用withmergeoptions可以调节plinq的缓冲行为。
6.如何使用plinq
为何优化将linq都并行化是不可取的,因为linq能解决大多数问题,执行速度也很快,因此无法从并行化中收益。
一种更好的方式是找出cpu密集的瓶颈,然后考虑通过linq的形式表达(这类重构,linq往往会使代码量变少,而且增强可读性)。
plinq十分适用于易并行问题。他还可以很好地处理结构化的阻塞任务。
plinq不适于镜像制作,因为将数百万元素整理为一个输出序列将带来瓶颈,相反将元素写入一个数组或托管内存块中,然后使用parallel类或任务并行管理多线程是更好的选择。
parallel类
parallel类是对线程的一个很好的抽象。该类位于system.threading.tasks命名空间中,提供了数据和任务并行性。
pfx通过parallel类中的三个静态方法,提供了一种基本形式的结构化并行机制:
1.parallel.invoke
parallel.invoke:用于并行执行一组委托,示例如下:
1 static void main(string[] args) 2 { 3 parallel.invoke( 4 () => console.writeline($"当前线程id:{thread.currentthread.managedthreadid}"), 5 () => console.writeline($"当前线程id:{thread.currentthread.managedthreadid}") 6 ); 7 console.readkey(); 8 }
执行结果
parallel.invoke方法并行执行一组action委托,然后等待它们完成。
1 public static void invoke(params action[] actions);
示例看起来像是创建和等待两个task对象的一种捷径。但两者存在重要的区别:
如果传入一个包含数据量非常大的委托数组时,parallel.invoke方法仍然能高效工作,这是因为在底层,parallel.invoke方法是将大量元素划分成较小的块,分配给底层的task执行,而不是每个委托创建一个独立task。
2.parallel.for
parallel.for:执行c# for循环的并行化等价循环,示例如下:
1 class program 2 { 3 static void main(string[] args) 4 { 5 //顺序循环 6 { 7 for (int i = 0; i < 10; i++) 8 { 9 test(i); 10 } 11 } 12 console.writeline("并行化for开始"); 13 //顺序执行转换为并行化 14 { 15 parallel.for(0, 10, i => test(i)); 16 } 17 //顺序执行转换为并行化(更简单的方式) 18 { 19 parallel.for(0, 10, test); 20 } 21 console.readkey(); 22 } 23 static void test(int i) 24 { 25 console.writeline($"当前线程id:{thread.currentthread.managedthreadid},输出结果为:{i}"); 26 } 27 }
结果如下:
3.parallel.foreach
parallel.foreach:执行c# foreach循环的并行化等价循环,示例如下:
1 class program 2 { 3 static void main(string[] args) 4 { 5 string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" }; 6 //顺序循环 7 { 8 foreach (string num in data) 9 { 10 test(num); 11 } 12 } 13 console.writeline("并行化foreach开始"); 14 //顺序执行转换为并行化 15 { 16 parallel.foreach(data, num => test(num)); 17 } 18 console.readkey(); 19 } 20 static void test(string str) 21 { 22 console.writeline($"当前线程id:{thread.currentthread.managedthreadid},输出结果为:{str}"); 23 } 24 }
执行结果:
注意:以上三个方法都会引发阻塞直到所有工作完成为止。和plinq一样,在出现未处理的异常之后,余下的工作者在它们当前的迭代之后停止,而一场将被抛回给调用者,并封装在一个aggregateexception中。
4.索引&跳出(parallelloopstate)
有时迭代索引很有用处,但是切忌不可像顺序循环的用法使用共享变量(循环内i++)的方式使用,因为共享变量值在并行上下文中是线程不安全的。
同样的,因为并行for或foreach中的循环体是一个委托,所以无法使用break语句提前退出循环,必须调用parallelloopstate对象上的break或stop方法。
以foreach为例,foreach重载的其中之一如下,它包含acton的其中有三个参数(tsourec=子元素,parallelloopstate=并行循环状态,long=索引):
1 public static parallelloopresult foreach<tsource>(ienumerable<tsource> source, action<tsource, parallelloopstate, long> body)
所以,想要得到索引和提前跳出的正确方式如示例:
1 class program 2 { 3 static void main(string[] args) 4 { 5 string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" }; 6 parallel.foreach(data, (num, state, i) => 7 { 8 console.writeline($"当前索引为:{i},状态为:{state}"); 9 test(num); 10 if (num == "six") 11 state.break(); 12 }); 13 console.readkey(); 14 } 15 static void test(string str) 16 { 17 console.writeline($"当前线程id:{thread.currentthread.managedthreadid},输出结果为:{str}"); 18 } 19 }
结果如下:
for的版本如下:
1 class program 2 { 3 static void main(string[] args) 4 { 5 string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" }; 6 parallel.for(0, data.length, (i, state) => 7 { 8 console.writeline($"当前索引为:{i},状态为:{state}"); 9 test(data[i]); 10 if (data[i] == "six") 11 state.break(); 12 }); 13 console.readkey(); 14 } 15 static void test(string str) 16 { 17 console.writeline($"当前线程id:{thread.currentthread.managedthreadid},输出结果为:{str}"); 18 } 19 }
任务并行
对于任务并行的内容,请戳 任务(task) 和 。
并发集合概述
.net 4.0在system.collections.concurrent命名空间中提供了一组新的集合。所有这些集合都完全是线程安全的:
这些集合不仅是为使用带锁的普通集合提供了快捷方式,而且可以在一般的多线程中使用并发集合,但需要注意:
1.并发集合针对并行编程进行了调整。只有在高度并发的应用场景中,传统集合的性能才能胜过它们。
2.线程安全的集合不能确保使用它的代码也是安全的。
3.如果枚举一个并发集合的同时,另一个线程要修改它,不会抛出任何异常,相反,得到旧内容与新内容的混合。
4.不存在任何list<t>的并发版本。
5.它们的内存利用率没有非并发的stack和queue类高效,但对于并发访问的效果更好。
1.结构概述
这些并发集合与传统集合的区别是:它们公开了特殊方法来执行原子测试和行动操作,而这些方法都是通过iproducerconsumercollection<t>接口提供的。
iproducerconsumercollection<t>接口代表一个线程安全的生产者/消费者集合,这三个类继承并实现了iproducerconsumercollection<t>接口:
concurrentstack<t>、concurrentqueue<t>、concurrentbag<t>。
它们实现的tryadd和trytake方法用于测试一个添加/删除操作能否执行,如果可以,则执行添加/删除操作。测试与行动不需要对传统集合上锁。
concurrentbag<t>用于保存对象的无需集合,适用于调用take或trytake时不关心获取那个元素的额情况。
相对于并发队列或堆栈,在多线程同时调用一个concurrentbag的add时,不存在竞争,但队列或堆栈并行调用add会引起一些竞争,所以concurrentbag上调用take方法非常高效。
blockingcollection<t>类似阻塞集合,适用于等待新元素的出现,可以把它看作一个容器,使用一个阻塞集合封装所有实现iproducerconsumercollection<t>的集合,并且允许从封装的集合中去除元素,若没有元素,操作会阻塞
2.基础方法
常用的一些方法,整理自 :
concurrentqueue:完全无锁,但面临资源竞争失败时可能会陷入自旋并重试操作。
enqueue:在队尾插入元素
trydequeue:尝试删除队头元素,并通过out参数返回
trypeek:尝试将对头元素通过out参数返回,但不删除该元素。
concurrentstack:完全无锁,但面临资源竞争失败时可能会陷入自旋并重试操作。
push:向栈顶插入元素
trypop:从栈顶弹出元素,并且通过out 参数返回
trypeek:返回栈顶元素,但不弹出。
concurrentbag:一个无序的集合,程序可以向其中插入元素,或删除元素。在同一个线程中向集合插入,删除元素的效率很高。
add:向集合中插入元素
trytake:从集合中取出元素并删除
trypeek:从集合中取出元素,但不删除该元素。
blockingcollection:一个支持界限和阻塞的容器
add :向容器中插入元素
trytake:从容器中取出元素并删除
trypeek:从容器中取出元素,但不删除。
completeadding:告诉容器,添加元素完成。此时如果还想继续添加会发生异常。
iscompleted:告诉消费线程,生产者线程还在继续运行中,任务还未完成。
concurrentdictionary:对于读操作是完全无锁的,当很多线程要修改数据时,它会使用细粒度的锁。
addorupdate:如果键不存在,方法会在容器中添加新的键和值,如果存在,则更新现有的键和值。
getoradd:如果键不存在,方法会向容器中添加新的键和值,如果存在则返回现有的值,并不添加新值。
tryadd:尝试在容器中添加新的键和值。trygetvalue:尝试根据指定的键获得值。
tryremove:尝试删除指定的键。
tryupdate:有条件的更新当前键所对应的值。
getenumerator:返回一个能够遍历整个容器的枚举器。
结语
根据concurrentbag编写线程安全的生产者消费者请戳:这里 。
说实在的写这篇文章挺烦的,主要涉及的知识点太多讲的太细篇幅会很长况且我自己有些也还没用过,所以是概述性文章,对pfx有个基本的认识,当需要具体深入使用某些知识时再查询相关文档。
关于 并发编程(concurrent programming)更新到这里基本已经完结,谢谢大家的支持。
因个人的兴趣,所以准备沉淀下来专攻 数据结构和算法,然后研究 人工智能(microsoft的人工智能平台windows ml不会涉及,选择研究google的第二代人工智能学习系统tensorflow )。
接下来会对linux、python进行基础的学习并更新文章。
但是最核心的还是数据结构&算法使用那种编程语言并不重要,。
感兴趣的朋友可以关注。
参考文献
clr via c#(第4版) jeffrey richter
c#高级编程(第10版) c# 6 & .net core 1.0 christian nagel
果壳中的c# c#5.0权威指南 joseph albahari
...