C# Parallel用法
1、parallel.invoke 主要用于任务的并行
这个函数的功能和task有些相似,就是并发执行一系列任务,然后等待所有完成。和task比起来,省略了task.waitall这一步,自然也缺少了task的相关管理功能。它有两种形式:
parallel.invoke( params action[] actions);
parallel.invoke(action[] actions,taskmanager manager,taskcreationoptions options);
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { var actions = new action[] { () => actiontest("test 1"), () => actiontest("test 2"), () => actiontest("test 3"), () => actiontest("test 4") }; console.writeline("parallel.invoke 1 test"); parallel.invoke(actions); console.writeline("结束!"); } static void actiontest(object value) { console.writeline(">>> thread:{0}, value:{1}", thread.currentthread.managedthreadid, value); } } }
2、for方法,主要用于处理针对数组元素的并行操作(数据的并行)
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { int[] nums = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; parallel.for(0, nums.length, (i) => { console.writeline("针对数组索引{0}对应的那个元素{1}的一些工作代码……threadid={2}", i, nums[i], thread.currentthread.managedthreadid); }); console.readkey(); } } }
3、foreach方法,主要用于处理泛型集合元素的并行操作(数据的并行)
using system; using system.collections.generic; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { list<int> nums = new list<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; parallel.foreach(nums, (item) => { console.writeline("针对集合元素{0}的一些工作代码……threadid={1}", item, thread.currentthread.managedthreadid); }); console.readkey(); } } }
数据的并行的方式二(asparallel()):
using system; using system.collections.generic; using system.linq; using system.threading; namespace consoleapp1 { class program { static void main(string[] args) { list<int> nums = new list<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; var evennumbers = nums.asparallel().select(item => calculate(item)); //注意这里是个延迟加载,也就是不用集合的时候 这个calculate里面的算法 是不会去运行 可以屏蔽下面的代码看效果; console.writeline(evennumbers.count()); //foreach (int item in evennumbers) // console.writeline(item); console.readkey(); } static int calculate(int number) { console.writeline("针对集合元素{0}的一些工作代码……threadid={1}", number, thread.currentthread.managedthreadid); return number * 2; } } }
.asordered() 对结果进行排序:
using system; using system.collections.generic; using system.linq; using system.threading; using system.threading.tasks; namespace consoleapp { class program { static void main(string[] args) { list<int> nums = new list<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; var evennumbers = nums.asparallel().asordered().select(item => calculate(item)); //注意这里是个延迟加载,也就是不用集合的时候 这个calculate里面的算法 是不会去运行 可以屏蔽下面的代码看效果; //console.writeline(evennumbers.count()); foreach (int item in evennumbers) console.writeline(item); console.readkey(); } static int calculate(int number) { console.writeline("针对集合元素{0}的一些工作代码……threadid={1}", number, thread.currentthread.managedthreadid); return number * 2; } } }
foreach的独到之处就是可以将数据进行分区,每一个小区内实现串行计算,分区采用partitioner.create实现。
using system; using system.collections.concurrent; using system.diagnostics; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { for (int j = 1; j < 4; j++) { concurrentbag<int> bag = new concurrentbag<int>(); var watch = stopwatch.startnew(); watch.start(); parallel.foreach(partitioner.create(0, 3000000), i => { for (int m = i.item1; m < i.item2; m++) { bag.add(m); } }); console.writeline("并行计算:集合有:{0},总共耗时:{1}", bag.count, watch.elapsedmilliseconds); gc.collect(); } } } }
paralleloptions类
paralleloptions options = new paralleloptions();
//指定使用的硬件线程数为4
options.maxdegreeofparallelism = 4;
有时候我们的线程可能会跑遍所有的内核,为了提高其他应用程序的稳定性,就要限制参与的内核,正好paralleloptions提供了maxdegreeofparallelism属性。
using system; using system.collections.concurrent; using system.diagnostics; using system.linq; using system.threading.tasks; namespace consoleapp1 { public class student { public int id { get; set; } public string name { get; set; } public int age { get; set; } public datetime createtime { get; set; } } class program { static void main(string[] args) { var dic = loaddata(); stopwatch watch = new stopwatch(); watch.start(); var query2 = (from n in dic.values.asparallel() where n.age > 20 && n.age < 25 select n).tolist(); watch.stop(); console.writeline("并行计算耗费时间:{0}", watch.elapsedmilliseconds); console.read(); } public static concurrentdictionary<int, student> loaddata() { concurrentdictionary<int, student> dic = new concurrentdictionary<int, student>(); paralleloptions options = new paralleloptions(); //指定使用的硬件线程数为4 options.maxdegreeofparallelism = 4; //预加载1500w条记录 parallel.for(0, 15000000, options, (i) => { var single = new student() { id = i, name = "hxc" + i, age = i % 151, createtime = datetime.now.addseconds(i) }; dic.tryadd(i, single); }); return dic; } } }
常见问题的处理
<1> 如何中途退出并行循环?
是的,在串行代码中我们break一下就搞定了,但是并行就不是这么简单了,不过没关系,在并行循环的委托参数中提供了一个parallelloopstate,该实例提供了break和stop方法来帮我们实现。
break: 当然这个是通知并行计算尽快的退出循环,比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。
stop:这个就不一样了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。
using system; using system.collections.concurrent; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { concurrentbag<int> bag = new concurrentbag<int>(); parallel.for(0, 20000000, (i, state) => { if (bag.count == 1000) { //state.break(); state.stop(); return; } bag.add(i); }); console.writeline("当前集合有{0}个元素。", bag.count); } } }
取消(cancel)
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { public static void main() { var cts = new cancellationtokensource(); var ct = cts.token; task.factory.startnew(() => fun(ct)); console.readkey(); //thread.sleep(3000); cts.cancel(); console.writeline("任务取消了!"); } static void fun(cancellationtoken token) { parallel.for(0, 100000, new paralleloptions { cancellationtoken = token }, (i) => { console.writeline("针对数组索引{0}的一些工作代码……threadid={1}", i, thread.currentthread.managedthreadid); }); } } }
<2> 并行计算中抛出异常怎么处理?
首先任务是并行计算的,处理过程中可能会产生n多的异常,那么如何来获取到这些异常呢?普通的exception并不能获取到异常,然而为并行诞生的aggregateexcepation就可以获取到一组异常。
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { try { parallel.invoke(run1, run2); } catch (aggregateexception ex) { foreach (var single in ex.innerexceptions) { console.writeline(single.message); } } console.writeline("结束了!"); //console.read(); } static void run1() { thread.sleep(3000); throw new exception("我是任务1抛出的异常"); } static void run2() { thread.sleep(5000); throw new exception("我是任务2抛出的异常"); } } }
注意parallel里面 不建议抛出异常 因为在极端的情况下比如进去的第一批线程先都抛异常了 此时aggregateexcepation就只能捕获到这一批的错误,然后程序就结束了
using system; using system.collections.generic; using system.threading.tasks; namespace consoleapp1 { public class testclass { public static list<int> numberlist = null; private static readonly object locker = new object(); public void test(int number) { throw new exception("1111"); //lock (locker) //{ // if (numberlist == null) // { // console.writeline("执行添加"); // numberlist = new list<int>(); // numberlist.add(1); // //thread.sleep(1000); // } //} //if (number == 5 || number == 7) throw new exception(string.format("number{0}boom!", number)); //console.writeline(number); } } class program { private static readonly object locker = new object(); static void main(string[] args) { list<string> errlist = new list<string>(); try { parallel.for(0, 10, (i) => { try { testclass a = new testclass(); a.test(i); } catch (exception ex) { lock (locker) { errlist.add(ex.message); throw ex; } } }); } catch (aggregateexception ex) { foreach (var single in ex.innerexceptions) { console.writeline(single.message); } } int index = 1; foreach (string err in errlist) { console.writeline("{0}、的错误:{1}", index++, err); } } } }
可以向下面这样来处理一下
不在aggregateexcepation中来处理 而是在parallel里面的try catch来记录错误,或处理错误
using system; using system.collections.generic; using system.threading.tasks; namespace consoleapp1 { public class testclass { public static list<int> numberlist = null; private static readonly object locker = new object(); public void test(int number) { throw new exception("1111"); //lock (locker) //{ // if (numberlist == null) // { // console.writeline("执行添加"); // numberlist = new list<int>(); // numberlist.add(1); // //thread.sleep(1000); // } //} //if (number == 5 || number == 7) throw new exception(string.format("number{0}boom!", number)); //console.writeline(number); } } class program { private static readonly object locker = new object(); static void main(string[] args) { list<string> errlist = new list<string>(); parallel.for(0, 10, (i) => { try { testclass a = new testclass(); a.test(i); } catch (exception ex) { lock (locker) { errlist.add(ex.message); } //console.writeline(ex.message); //注:这里不再将错误抛出..... //throw ex; } }); int index = 1; foreach (string err in errlist) { console.writeline("{0}、的错误:{1}", index++, err); } } } }
上一篇: Linux系统下修改用户密码全攻略
下一篇: 介绍一个制作下拉菜单完全不同的办法