Barrier 屏障使用
程序员文章站
2022-07-12 19:50:30
...
1、 System.Threading.Barrier 是同步基元,可以使多个线程(称为“参与者”)分阶段同时处理算法。 达到代码中的屏障点之前,每个参与者将继续执行。 屏障表示工作阶段的末尾。 单个参与者到达屏障后将被阻止,直至所有参与者都已达到同一障碍。 所有参与者都已达到屏障后,你可以选择调用阶段后操作。 此阶段后操作可由单线程用于执行操作,而所有其他线程仍被阻止。 执行此操作后,所有参与者将不受阻止。
解释:多线程调用同一个(也可以是不同)方法。在barrier.SignalAndWait();方法之前的逻辑可以并行执行。当遇到此方法时,线程进入等待,等待定义的等待线程全部到达此障碍,再执行后面的阶段操作(单线程)
//创建Barrier对象,阻止线程数为2,并提供后期委托 //在每个阶段结束时调用。 Barrier barrier = new Barrier(2, (bar) => { // Examine results from all threads, determine // whether to continue, create inputs for next phase, etc. if (someCondition) success = true; });
//定义每个线程将执行的工作。 (线程不必都执行相同的方法。) void CrunchNumbers(int partitionNum) { // Up to System.Int64.MaxValue phases are supported. We assume // in this code that the problem will be solved before that. while (success == false) { //开始阶段: //在每个线程上处理数据,也可以选择 //存储结果,例如: results[partitionNum] = ProcessData(data[partitionNum]); //结束阶段: //在所有线程到达后,后阶段委托 //被调用,然后线程被解除阻塞。重载 //接受超时值和/或CancellationToken。 barrier.SignalAndWait(); } } //执行n个任务以并行运行。 为简单起见 //在这个例子中,所有线程都执行相同的方法。 static void Main() { var app = new BarrierDemo(); Thread t1 = new Thread(() => app.CrunchNumbers(0)); Thread t2 = new Thread(() => app.CrunchNumbers(1)); t1.Start(); t2.Start(); }
下面为一个完成的实例:
//#define TRACE using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace BarrierSimple { class Program { static string[] words1 = new string[] { "brown", "jumps", "the", "fox", "quick"}; static string[] words2 = new string[] { "dog", "lazy","the","over"}; static string solution = "the quick brown fox jumps over the lazy dog."; static bool success = false; static Barrier barrier = new Barrier(2, (b) => { StringBuilder sb = new StringBuilder(); for (int i = 0; i < words1.Length; i++) { sb.Append(words1[i]); sb.Append(" "); } for (int i = 0; i < words2.Length; i++) { sb.Append(words2[i]); if(i < words2.Length - 1) sb.Append(" "); } sb.Append("."); #if TRACE System.Diagnostics.Trace.WriteLine(sb.ToString()); #endif Console.CursorLeft = 0; Console.Write("Current phase: {0}", barrier.CurrentPhaseNumber); if (String.CompareOrdinal(solution, sb.ToString()) == 0) { success = true; Console.WriteLine("\r\nThe solution was found in {0} attempts", barrier.CurrentPhaseNumber); } }); static void Main(string[] args) { Thread t1 = new Thread(() => Solve(words1)); Thread t2 = new Thread(() => Solve(words2)); t1.Start(); t2.Start(); // Keep the console window open. Console.ReadLine(); } // Use Knuth-Fisher-Yates shuffle to randomly reorder each array. // For simplicity, we require that both wordArrays be solved in the same phase. // Success of right or left side only is not stored and does not count. static void Solve(string[] wordArray) { while(success == false) { Random random = new Random(); for (int i = wordArray.Length - 1; i > 0; i--) { int swapIndex = random.Next(i + 1); string temp = wordArray[i]; wordArray[i] = wordArray[swapIndex]; wordArray[swapIndex] = temp; } // We need to stop here to examine results // of all thread activity. This is done in the post-phase // delegate that is defined in the Barrier constructor. barrier.SignalAndWait(); } } } }
上一篇: Spark相关术语
推荐阅读