欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C# Parallel用法

程序员文章站 2022-04-01 19:20:18
1、Parallel.Invoke 主要用于任务的并行 这个函数的功能和Task有些相似,就是并发执行一系列任务,然后等待所有完成。和Task比起来,省略了Task.WaitAll这一步,自然也缺少了Task的相关管理功能。它有两种形式: Parallel.Invoke( params Action ......

1、parallel.invoke 主要用于任务的并行
  这个函数的功能和task有些相似,就是并发执行一系列任务,然后等待所有完成。和task比起来,省略了task.waitall这一步,自然也缺少了task的相关管理功能。它有两种形式:
  parallel.invoke( params action[] actions);
  parallel.invoke(action[] actions,taskmanager manager,taskcreationoptions options);

C# Parallel用法
using system;
using system.threading;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            var actions = new action[]
            {
                () => actiontest("test 1"),
                () => actiontest("test 2"),
                () => actiontest("test 3"),
                () => actiontest("test 4")
            };

            console.writeline("parallel.invoke 1 test");
            parallel.invoke(actions);

            console.writeline("结束!");
        }

        static void actiontest(object value)
        {
            console.writeline(">>> thread:{0}, value:{1}",
            thread.currentthread.managedthreadid, value);
        }
    }
}
program

2、for方法,主要用于处理针对数组元素的并行操作(数据的并行) 

C# Parallel用法
using system;
using system.threading;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            int[] nums = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            parallel.for(0, nums.length, (i) =>
            {
                console.writeline("针对数组索引{0}对应的那个元素{1}的一些工作代码……threadid={2}", i, nums[i], thread.currentthread.managedthreadid);
            });
            console.readkey();
        }
    }
}
program

3、foreach方法,主要用于处理泛型集合元素的并行操作(数据的并行)

C# Parallel用法
using system;
using system.collections.generic;
using system.threading;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            list<int> nums = new list<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            parallel.foreach(nums, (item) =>
            {
                console.writeline("针对集合元素{0}的一些工作代码……threadid={1}", item, thread.currentthread.managedthreadid);
            });
            console.readkey();
        }
    }
}
program

  数据的并行的方式二(asparallel()):

C# Parallel用法
using system;
using system.collections.generic;
using system.linq;
using system.threading;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            list<int> nums = new list<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            var evennumbers = nums.asparallel().select(item => calculate(item));
            //注意这里是个延迟加载,也就是不用集合的时候 这个calculate里面的算法 是不会去运行 可以屏蔽下面的代码看效果;
            console.writeline(evennumbers.count());
            //foreach (int item in evennumbers)
            //    console.writeline(item);
            console.readkey();
        }

        static int calculate(int number)
        {
            console.writeline("针对集合元素{0}的一些工作代码……threadid={1}", number, thread.currentthread.managedthreadid);
            return number * 2;
        }
    }
}
program

  .asordered() 对结果进行排序:

C# Parallel用法
using system;
using system.collections.generic;
using system.linq;
using system.threading;
using system.threading.tasks;

namespace consoleapp
{

    class program
    {
        static void main(string[] args)
        {
            list<int> nums = new list<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            var evennumbers = nums.asparallel().asordered().select(item => calculate(item));
            //注意这里是个延迟加载,也就是不用集合的时候 这个calculate里面的算法 是不会去运行 可以屏蔽下面的代码看效果;
            //console.writeline(evennumbers.count());
            foreach (int item in evennumbers)
                console.writeline(item);
            console.readkey();
        }

        static int calculate(int number)
        {
            console.writeline("针对集合元素{0}的一些工作代码……threadid={1}", number, thread.currentthread.managedthreadid);
            return number * 2;
        }
    }
}
program

  foreach的独到之处就是可以将数据进行分区,每一个小区内实现串行计算,分区采用partitioner.create实现

C# Parallel用法
using system;
using system.collections.concurrent;
using system.diagnostics;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            for (int j = 1; j < 4; j++)
            {
                concurrentbag<int>  bag = new concurrentbag<int>();
                var watch = stopwatch.startnew();
                watch.start();
                parallel.foreach(partitioner.create(0, 3000000), i =>
                {
                    for (int m = i.item1; m < i.item2; m++)
                    {
                        bag.add(m);
                    }
                });
                console.writeline("并行计算:集合有:{0},总共耗时:{1}", bag.count, watch.elapsedmilliseconds);
                gc.collect();

            }
        }
    }
}
program

  paralleloptions类
  paralleloptions options = new paralleloptions();
  //指定使用的硬件线程数为4
  options.maxdegreeofparallelism = 4;
  有时候我们的线程可能会跑遍所有的内核,为了提高其他应用程序的稳定性,就要限制参与的内核,正好paralleloptions提供了maxdegreeofparallelism属性。

C# Parallel用法
using system;
using system.collections.concurrent;
using system.diagnostics;
using system.linq;
using system.threading.tasks;

namespace consoleapp1
{
    public class student
    {
        public int id { get; set; }
        public string name { get; set; }
        public int age { get; set; }
        public datetime createtime { get; set; }
    }

    class program
    {
        static void main(string[] args)
        {
            var dic = loaddata();
            stopwatch watch = new stopwatch();
            watch.start();
            var query2 = (from n in dic.values.asparallel()
                          where n.age > 20 && n.age < 25
                          select n).tolist();
            watch.stop();
            console.writeline("并行计算耗费时间:{0}", watch.elapsedmilliseconds);

            console.read();
        }

        public static concurrentdictionary<int, student> loaddata()
        {
            concurrentdictionary<int, student> dic = new concurrentdictionary<int, student>();
            paralleloptions options = new paralleloptions();
            //指定使用的硬件线程数为4
            options.maxdegreeofparallelism = 4;
            //预加载1500w条记录
            parallel.for(0, 15000000, options, (i) =>
            {
                var single = new student()
                {
                    id = i,
                    name = "hxc" + i,
                    age = i % 151,
                    createtime = datetime.now.addseconds(i)
                };
                dic.tryadd(i, single);
            });

            return dic;
        }
    }
}
program

常见问题的处理

  <1> 如何中途退出并行循环?
  是的,在串行代码中我们break一下就搞定了,但是并行就不是这么简单了,不过没关系,在并行循环的委托参数中提供了一个parallelloopstate,该实例提供了break和stop方法来帮我们实现。
  break: 当然这个是通知并行计算尽快的退出循环,比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。
  stop:这个就不一样了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。

C# Parallel用法
using system;
using system.collections.concurrent;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            concurrentbag<int> bag = new concurrentbag<int>();

            parallel.for(0, 20000000, (i, state) =>
            {
                if (bag.count == 1000)
                {
                    //state.break();
                    state.stop();
                    return;
                }
                bag.add(i);
            });

            console.writeline("当前集合有{0}个元素。", bag.count);

        }
    }
}
program

  取消(cancel)

C# Parallel用法
using system;
using system.threading;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        public static void main()
        {

            var cts = new cancellationtokensource();
            var ct = cts.token;
            task.factory.startnew(() => fun(ct));
            console.readkey();
            //thread.sleep(3000);
            cts.cancel();
            console.writeline("任务取消了!");

        }

        static void fun(cancellationtoken token)
        {
            parallel.for(0, 100000,
                        new paralleloptions { cancellationtoken = token },
                        (i) =>
                        {
                            console.writeline("针对数组索引{0}的一些工作代码……threadid={1}", i, thread.currentthread.managedthreadid);
                        });
        }
    }
}
program

  <2> 并行计算中抛出异常怎么处理?
  首先任务是并行计算的,处理过程中可能会产生n多的异常,那么如何来获取到这些异常呢?普通的exception并不能获取到异常,然而为并行诞生的aggregateexcepation就可以获取到一组异常。

C# Parallel用法
using system;
using system.threading;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            try
            {
                parallel.invoke(run1, run2);
            }
            catch (aggregateexception ex)
            {
                foreach (var single in ex.innerexceptions)
                {
                    console.writeline(single.message);
                }
            }
            console.writeline("结束了!");
            //console.read();
        }

        static void run1()
        {
            thread.sleep(3000);
            throw new exception("我是任务1抛出的异常");
        }

        static void run2()
        {
            thread.sleep(5000);
            throw new exception("我是任务2抛出的异常");
        }
    }
}
program

  注意parallel里面 不建议抛出异常 因为在极端的情况下比如进去的第一批线程先都抛异常了 此时aggregateexcepation就只能捕获到这一批的错误,然后程序就结束了

C# Parallel用法
using system;
using system.collections.generic;
using system.threading.tasks;

namespace consoleapp1
{
    public class testclass
    {
        public static list<int> numberlist = null;
        private static readonly object locker = new object();
        public void test(int number)
        {
            throw new exception("1111");
            //lock (locker)
            //{
            //    if (numberlist == null)
            //    {
            //        console.writeline("执行添加");
            //        numberlist = new list<int>();
            //        numberlist.add(1);
            //        //thread.sleep(1000);
            //    }
            //}
            //if (number == 5 || number == 7) throw new exception(string.format("number{0}boom!", number));
            //console.writeline(number);
        }
    }

    class program
    {
        private static readonly object locker = new object();
        static void main(string[] args)
        {
            list<string> errlist = new list<string>();
            try
            {
                parallel.for(0, 10, (i) =>
                {
                    try
                    {
                        testclass a = new testclass();
                        a.test(i);
                    }
                    catch (exception ex)
                    {
                        lock (locker)
                        {
                            errlist.add(ex.message);
                            throw ex;
                        }
                    }
                });
            }
            catch (aggregateexception ex)
            {
                foreach (var single in ex.innerexceptions)
                {
                    console.writeline(single.message);
                }
            }
            int index = 1;
            foreach (string err in errlist)
            {
                console.writeline("{0}、的错误:{1}", index++, err);
            }
        }
    }
}
program

  可以向下面这样来处理一下
  不在aggregateexcepation中来处理 而是在parallel里面的try catch来记录错误,或处理错误

C# Parallel用法
using system;
using system.collections.generic;
using system.threading.tasks;

namespace consoleapp1
{
    public class testclass
    {
        public static list<int> numberlist = null;
        private static readonly object locker = new object();
        public void test(int number)
        {
            throw new exception("1111");
            //lock (locker)
            //{
            //    if (numberlist == null)
            //    {
            //        console.writeline("执行添加");
            //        numberlist = new list<int>();
            //        numberlist.add(1);
            //        //thread.sleep(1000);
            //    }
            //}
            //if (number == 5 || number == 7) throw new exception(string.format("number{0}boom!", number));
            //console.writeline(number);
        }
    }

    class program
    {
        private static readonly object locker = new object();
        static void main(string[] args)
        {
            list<string> errlist = new list<string>();
            parallel.for(0, 10, (i) =>
            {
                try
                {
                    testclass a = new testclass();
                    a.test(i);
                }
                catch (exception ex)
                {
                    lock (locker)
                    {
                        errlist.add(ex.message);
                    }
                    //console.writeline(ex.message);
                    //注:这里不再将错误抛出.....
                    //throw ex;
                }
            });

            int index = 1;
            foreach (string err in errlist)
            {
                console.writeline("{0}、的错误:{1}", index++, err);
            }
        }
    }
}
program