欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

c# Parallel类的使用

程序员文章站 2022-06-25 14:01:00
parallel类是对线程的抽象,提供数据与任务的并行性。类定义了静态方法for和foreach,使用多个任务来完成多个作业。parallel.for和parallel.foreach方法在每次迭代的...

  parallel类是对线程的抽象,提供数据与任务的并行性。类定义了静态方法for和foreach,使用多个任务来完成多个作业。parallel.for和parallel.foreach方法在每次迭代的时候调用相同的代码,而parallel.invoke()方法允许同时调用不同的方法。parallel.foreach()方法用于数据的并行性,parallel.invoke()方法用于任务的并行性。

1、for()方法

  for()方法用于多次执行一个任务,可以并行运行迭代,但迭代的顺序并没指定。for()方法前两个参数为定义循环的开始和结束,第三个参数为action<int>委托。方法的返回值是parallelloopresult结构,它提供了是否结束的信息。如以下循环方法,不能保证输出顺序: 

static void parallelfor()
{
  parallelloopresult result =
    parallel.for(0, 10, async i =>
      {
        console.writeline("{0}, task: {1}, thread: {2}", i,
          task.currentid, thread.currentthread.managedthreadid);

        await task.delay(10);//异步方法,用于释放线程供其他任务使用。完成后,可能看不到方法的输出,因为主(前台线)程结束,所有的后台线程也将结束
        console.writeline("{0}, task: {1}, thread: {2}", i, task.currentid, thread.currentthread.managedthreadid);
      });
  console.writeline("is completed: {0}", result.iscompleted);
}

  异步功能虽然方便,但是知道后台发生了什么仍然重要,必须留意。

提前停止for()方法

  可以根据条件提前停止for()方法,而不必完成全部的迭代。,传入参数parallelloopstate的对象,调用break()方法或者stop()方法。如调用break()方法,当迭代值大于15的时候中断(当前线程结束,类似于普通for的continue),但其他任务可以同时运行,有其他值的任务也可以运行(如果当前线程是主线程,那么就等同于stop(),结束所有线程)。stop()方法结束的是所有操作(类似于普通for的break)。利用lowestbreakiteration属性可以忽略其他任务的结果:

static void parallelfor()
{
  parallelloopresult result = parallel.for(10, 40, (int i, parallelloopstate pls) =>
     {
       console.writeline("i: {0} task {1}", i, task.currentid);
       thread.sleep(10);
       if (i > 15)
         pls.break();
     });
  console.writeline("is completed: {0}", result.iscompleted);
  if (!result.iscompleted)
    console.writeline("lowest break iteration: {0}", result.lowestbreakiteration);
}

  for()方法可以使用几个线程执行循环。如果要对每个线程进行初始化,就需要使用到for<tlocal>(int, int, func<tlocal>, func<int, parallelloopstate, tlocal, tlocal> , action<tlocal>)方法。

  • 前两个参数是对应的循环起始和终止条件;
  • 第二个参数类型是func<tlocal>,返回一个值,传递给第三个参数。
  • 第三个参数类型是func<int, parallelloopstate, tlocal, tlocal>,是循环体的委托,其内部的第一个参数是循环迭代,内部第二个参数允许停止迭代,内部第三个参数用于接收for()方法的前一个参数的返回值。循环体应当返回与for()循环泛型类型一致的值。
  • 第四个参数是指定的一个委托,用于执行相关后续操作。
static void parallelfor()
{
  parallel.for<string>(0, 20, () =>
   {
     // invoked once for each thread
     console.writeline("init thread {0}, task {1}", thread.currentthread.managedthreadid, task.currentid);
     return string.format("t{0}", thread.currentthread.managedthreadid);
   },
   (i, pls, str1) =>
   {
     // invoked for each member
     console.writeline("body i {0} str1 {1} thread {2} task {3}", i, str1, thread.currentthread.managedthreadid, task.currentid);
     thread.sleep(10);
     return string.format("i {0}", i);
   },
   (str1) =>
   {
     // final action on each thread
     console.writeline("finally {0}", str1);
   });
}

2、使用foreach()方法循环

  foreach()方法遍历实现了ienumerable的集合,其方式类似于foreach语句,但是以异步方式遍历,没有确定的顺序。如果要中断循环,同样可以采用parallelloopstate参数。foreach<tsource>有许多泛型的重载方法。

static void parallelforeach()
{
  string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten", "eleven", "twelve" };

  parallelloopresult result = parallel.foreach<string>(data, s =>
       {
         console.writeline(s);
       });
  parallel.foreach<string>(data, (s, pls, l) =>
  {
    console.writeline("{0} {1}", s, l);
  });
}

3、调用多个方法
  如果有多个任务并行,可以使用parallel.invoke()方法,它提供任务的并行性模式:

static void parallelinvoke()
{
  parallel.invoke(foo, bar);
}

static void foo()
{
  console.writeline("foo");
}

static void bar()
{
  console.writeline("bar");
}

4、for()方法的取消

  在for()方法的重载方法中,可以传递一个paralleloptions类型的参数,利用此参数可以传递一个cancellationtoken参数。使用cancellationtokensource对象用于注册cancellationtoken,并允许调用cancel方法用于取消操作。

  一旦取消操作,for()方法就抛出一个operationcanceledexception类型的异常,使用cancellationtoken可以注册取消操作时的信息。调用register方法,传递一个在取消操作时调用的委托。通过取消操作,可以将其他的迭代操作在启动之前取消,但已经启动的迭代操作允许完成。取消操作是以协作方式进行的,以避免在取消迭代操作的中间泄露资源。

static void cancelparallelloop()
{
  var cts = new cancellationtokensource();
  cts.token.throwifcancellationrequested();
  cts.token.register(() => console.writeline("** token cancelled"));
  // 在500ms后取消标记
  cts.cancelafter(500);
  try
  {
    parallelloopresult result = parallel.for(0, 100,
      new paralleloptions()
      {
        cancellationtoken = cts.token
      },
        x =>
        {
          console.writeline("loop {0} started", x);
          int sum = 0;
          for (int i = 0; i < 100; i++)
          {
            thread.sleep(2);
            sum += i;
          }
          console.writeline("loop {0} finished", x);
        });
  }
  catch (operationcanceledexception ex)
  {
    console.writeline(ex.message);
  }
}

5、发现存在的问题

  使用并行循环时,若出现以下两个问题,需要使用partitioner(命名空间 system.collections.concurrent中)解决。

  1. 使用并行循环时,应确保每次迭代的工作量要明显大于同步共享状态的开销。 如果循环把时间都耗在了阻塞式的访问共享的循环变量上,那么并行执行的好处就很容易完全丧失。尽可能让每次循环迭代都只是在局部进行,避免阻塞式访问造成的损耗。见示例1
  2. 并行循环的每一次迭代都会生成一个委托,如果每次生成委托或方法的开销比迭代完成的工作量大,使用并行方案就不适合了(委托会设计两类开销:构造开销和调用开销。大多数调用开销和普通方法的调用差不多。 但委托是一种对象,构造开销可能相当大,最好是只做一次构造,然后把对象缓存起来)。见示例2

  示例1中,求1000000000以内所有自然数开方的和。第一部分采用直接计算的方式,第二部分采用分区计算。第二部分的partitioner 会把需要迭代的区间分拆为多个不同的空间,并存入tuple对象中。

/*   示例1  */public static void partitionertest()
{
  //使用计时器
  system.diagnostics.stopwatch stopwatch = new system.diagnostics.stopwatch();
  const int maxvalue = 1000000000;
  long sum = 0;
  stopwatch.restart();//开始计时
  parallel.for(0, maxvalue, (i) => {
    interlocked.add(ref sum, (long )math.sqrt(i));//interlocked是原子操作,多线程访问时的线程互斥操作
  });
  stopwatch.stop();
  console.writeline($"parallel.for:{stopwatch.elapsed}");//我的机器运行出的时间是:00:01:37.0391204


  var partitioner = system.collections.concurrent.partitioner.create(0, maxvalue);//拆分区间
  sum = 0;
  stopwatch.restart();
  parallel.foreach(partitioner, (rang) => {
    long partialsum = 0;
    //迭代区间的数据
    for(int i=rang.item1;i<rang.item2;i++)
    {
      partialsum += (long)math.sqrt(i);
    }
    interlocked.add(ref sum, partialsum);//原子操作
  });
  stopwatch.stop();
  console.writeline($"parallel.foreach:{stopwatch.elapsed}"); //我的机器运行出的时间是:00:00:02.7111666
}

  partitioner的分区是静态的,只要迭代分区划分完成,每个分区上都会运行一个委托。如果某一段区间的迭代次数提前完成,也不会尝试重新分区并让处理器分担工作。 对于任意ienumerable<t>类型都可以创建不指定区间的分区,但这样就会让每个迭代项目都创建一个委托,而不是对每个区间创建委托。创建自定义的partitioner可以解决这个问题,代码比较复杂。请自行参阅:

  示例2中,采用一个委托方法来计算两个数之间的关系值。前一种是每次运行都重新构造委托,后一种是先构造出委托的方法而后每一次调用。

//声明一个委托
 private delegate int mathop(int x, int y);
 private int add(int x,int y)
 {
   return x + y;
 }

 private int dooperation(mathop op,int x,int y)
 {
   return op(x, y);
 }

 /*
 * 委托会设计两类开销:构造开销和调用开销。大多数调用开销和普通方法的调用差不多。 但委托是一种对象,构造开销可能相当大,最好是只做一次构造,然后把对象缓存起来。
 */
 public void test()
 {
   system.diagnostics.stopwatch stopwatch = new system.diagnostics.stopwatch();
   stopwatch.restart();
   for(int i=0;i<10;i++)
   {
     //每一次遍历循环,都会产生一次构造和调用开销
     dooperation(add, 1, 2);
   }
   stopwatch.stop();
   console.writeline("construction and invocation: {0}", stopwatch.elapsed);//00:00:00.0003812

   stopwatch.restart();
   mathop op = add;//只产生一次构造开销
   for(int i=0;i<10;i++)
   {
     dooperation(op, 1, 2);//每一次遍历都只产生遍历开销
   }
   stopwatch.stop();
   console.writeline("once construction and invocation: {0}", stopwatch.elapsed);//00:00:00.0000011
 }

以上就是c# parallel类的使用的详细内容,更多关于c# parallel类的资料请关注其它相关文章!

相关标签: c# Parallel