使用Interlocked在多线程下进行原子操作,无锁无阻塞的实现线程运行状态判断
巧妙地使用Interlocked的各个方法,再无锁无阻塞的情况下判断出所有线程的运行完成状态。
昨晚耐着性子看完了clr via c#的第29章<<基元线程同步构造>>,尽管这本书不是第一次看了,但是之前看的都是一带而过,没有深入理解,甚至可以说是不理解,实习了之后发现自己的知识原来这么表面,很多的实现都不能做出来,这很大程度上打击了我,而且,春招也快来了,更需要打扎实基础。引起我注意的是jeffrey在第29章说的:使用Interlocked,代码很短,绝不阻塞任何线程,二期使用线程池线程来实现自动伸缩。下载了源码,然后分析了下书中的示例,code如下:
using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Tasks; namespace vlr_via_cs { internal static class AsyncCoordinatorDemo { public static void Go() { const Int32 timeout = 50000; // Change to desired timeout MultiWebRequests act = new MultiWebRequests(timeout); Console.WriteLine("All operations initiated (Timeout={0}). Hit <Enter> to cancel.", (timeout == Timeout.Infinite) ? "Infinite" : (timeout.ToString() + "ms")); Console.ReadLine(); act.Cancel(); Console.WriteLine(); Console.WriteLine("Hit enter to terminate."); Console.ReadLine(); } private sealed class MultiWebRequests { // This helper class coordinates all the asynchronous operations private AsyncCoordinator m_ac = new AsyncCoordinator(); // Set of Web servers we want to query & their responses (Exception or Int32) private Dictionary<String, Object> m_servers = new Dictionary<String, Object> { { "http://cjjjs.com/", null }, { "http://cnblogs.com/", null }, { "http://www.jobbole.com/", null } }; public MultiWebRequests(Int32 timeout = Timeout.Infinite) { // Asynchronously initiate all the requests all at once var httpClient = new HttpClient(); foreach (var server in m_servers.Keys) { m_ac.AboutToBegin(1); //确保先做三次加法, 若是有Sleep,在调用完这个函数后,执行 httpClient.GetByteArrayAsync(server).ContinueWith(task => ComputeResult(server, task)); } // Tell AsyncCoordinator that all operations have been initiated and to call // AllDone when all operations complete, Cancel is called, or the timeout occurs m_ac.AllBegun(AllDone, timeout); } private void ComputeResult(String server, Task<Byte[]> task) { Object result; if (task.Exception != null) { result = task.Exception.InnerException; } else { // Process I/O completion here on thread pool thread(s) // Put your own compute-intensive algorithm here... result = task.Result.Length; // This example just returns the length } // Save result (exception/sum) and indicate that 1 operation completed m_servers[server] = result; m_ac.JustEnded(); } // Calling this method indicates that the results don't matter anymore public void Cancel() { m_ac.Cancel(); } // This method is called after all Web servers respond, // Cancel is called, or the timeout occurs private void AllDone(CoordinationStatus status) { switch (status) { case CoordinationStatus.Cancel: Console.WriteLine("Operation canceled."); break; case CoordinationStatus.Timeout: Console.WriteLine("Operation timed-out."); break; case CoordinationStatus.AllDone: Console.WriteLine("Operation completed; results below:"); foreach (var server in m_servers) { Console.Write("{0} ", server.Key); Object result = server.Value; if (result is Exception) { Console.WriteLine("failed due to {0}.", result.GetType().Name); } else { Console.WriteLine("returned {0:N0} bytes.", result); } } break; } } } private enum CoordinationStatus { AllDone, Timeout, Cancel }; private sealed class AsyncCoordinator { private Int32 m_opCount = 1; // Decremented when AllBegun calls JustEnded private Int32 m_statusReported = 0; // 0=false, 1=true private Action<CoordinationStatus> m_callback; private Timer m_timer; // This method MUST be called BEFORE initiating an operation public void AboutToBegin(Int32 opsToAdd = 1) { Interlocked.Add(ref m_opCount, opsToAdd); } // This method MUST be called AFTER an operations result has been processed public void JustEnded() { if (Interlocked.Decrement(ref m_opCount) == 0) ReportStatus(CoordinationStatus.AllDone); } // This method MUST be called AFTER initiating ALL operations public void AllBegun(Action<CoordinationStatus> callback, Int32 timeout = Timeout.Infinite) { m_callback = callback; if (timeout != Timeout.Infinite) { // 在指定的时间点(dueTime) 调用回调函数,随后在指定的时间间隔(period)调用回调函数 m_timer = new Timer(TimeExpired, null, timeout, Timeout.Infinite); } JustEnded(); } // 处理过时的线程 private void TimeExpired(Object o) { ReportStatus(CoordinationStatus.Timeout); } public void Cancel() { if (m_callback == null) throw new InvalidOperationException("Cancel cannot be called before AllBegun"); ReportStatus(CoordinationStatus.Cancel); } private void ReportStatus(CoordinationStatus status) { if (m_timer != null) { // If timer is still in play, kill it Timer timer = Interlocked.Exchange(ref m_timer, null); if (timer != null) timer.Dispose(); } // If status has never been reported, report it; else ignore it if (Interlocked.Exchange(ref m_statusReported, 1) == 0) m_callback(status); } } } class Program { static void Main(string[] args) { AsyncCoordinatorDemo.Go(); Console.Read(); } } }
的确是无锁的操作,Interlocked方法是用户模式下的原子操作,针对的是CPU,不是线程内存,而且它是自旋等待的,耗费的是CPU资源。分析了下AsyncCoordinator类,主要就是利用Interlocked的Add方法,实时计数线程的数量,随后待一个线程运行的最后又调用Interlocked的Decrement方法自减。如果你留心的话,你会发现,目前绝大多数的并发判断中都用到了Interlocked的这些方法,尤其是interlocked的anything模式下的compareexchange方法,在这里提一嘴,除了compareexchange和exchange方法的返回值是返回ref类型原先的值之外,其余的方法都是返回改变之后的值。最后我们可以通过AllBegun方法来判断是不是所有的线程都执行完了,随后将状态变量m_statusReported设置为1,防止在进行状态判断。
这个类很好,之前写并发的时候,老是烦恼怎么判断并发是否已经完事了,又不想用到阻塞,这个类很好,当然应用到具体项目中可能还需要改,但是基本的模型还是这个,不变的。
有点感慨:好东西需要我们自己去发掘,之前查生产者消费者模型的时候,java代码一大堆,愣是没有看到几个C#,就算有也是简易,尽管可以把java的改变为C#的,但有点感慨C#的技术栈和资源少
上一篇: Go基础Slice教程详解