欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用Interlocked在多线程下进行原子操作,无锁无阻塞的实现线程运行状态判断

程序员文章站 2022-07-05 12:10:47
巧妙地使用Interlocked的各个方法,再无锁无阻塞的情况下判断出所有线程的运行完成状态。 昨晚耐着性子看完了clr via c#的第29章<<基元线程同步构造>>,尽管这本书不是第一次看了,但是之前看的都是一带而过,没有深入理解,甚至可以说是不理解,实习了之后发现自己的知识原来这么表面,很多的 ......

巧妙地使用Interlocked的各个方法,再无锁无阻塞的情况下判断出所有线程的运行完成状态。

昨晚耐着性子看完了clr via c#的第29章<<基元线程同步构造>>,尽管这本书不是第一次看了,但是之前看的都是一带而过,没有深入理解,甚至可以说是不理解,实习了之后发现自己的知识原来这么表面,很多的实现都不能做出来,这很大程度上打击了我,而且,春招也快来了,更需要打扎实基础。引起我注意的是jeffrey在第29章说的:使用Interlocked,代码很短,绝不阻塞任何线程,二期使用线程池线程来实现自动伸缩。下载了源码,然后分析了下书中的示例,code如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace vlr_via_cs
{
    internal static class AsyncCoordinatorDemo
    {
        public static void Go()
        {
            const Int32 timeout = 50000;   // Change to desired timeout
            MultiWebRequests act = new MultiWebRequests(timeout);
            Console.WriteLine("All operations initiated (Timeout={0}). Hit <Enter> to cancel.",
               (timeout == Timeout.Infinite) ? "Infinite" : (timeout.ToString() + "ms"));
            Console.ReadLine();
            act.Cancel();

            Console.WriteLine();
            Console.WriteLine("Hit enter to terminate.");
            Console.ReadLine();
        }

        private sealed class MultiWebRequests
        {
            // This helper class coordinates all the asynchronous operations
            private AsyncCoordinator m_ac = new AsyncCoordinator();

            // Set of Web servers we want to query & their responses (Exception or Int32)
            private Dictionary<String, Object> m_servers = new Dictionary<String, Object> {
                { "http://cjjjs.com/", null },
                { "http://cnblogs.com/", null },
                { "http://www.jobbole.com/", null }
            };

            public MultiWebRequests(Int32 timeout = Timeout.Infinite)
            {
                // Asynchronously initiate all the requests all at once
                var httpClient = new HttpClient();
                foreach (var server in m_servers.Keys)
                {
                    m_ac.AboutToBegin(1); //确保先做三次加法, 若是有Sleep,在调用完这个函数后,执行
                    httpClient.GetByteArrayAsync(server).ContinueWith(task => ComputeResult(server, task));
                }

                // Tell AsyncCoordinator that all operations have been initiated and to call
                // AllDone when all operations complete, Cancel is called, or the timeout occurs
                m_ac.AllBegun(AllDone, timeout);
            }

            private void ComputeResult(String server, Task<Byte[]> task)
            {
                Object result;
                if (task.Exception != null)
                {
                    result = task.Exception.InnerException;
                }
                else
                {
                    // Process I/O completion here on thread pool thread(s)
                    // Put your own compute-intensive algorithm here...
                    result = task.Result.Length;   // This example just returns the length
                }

                // Save result (exception/sum) and indicate that 1 operation completed
                m_servers[server] = result;
                m_ac.JustEnded();
            }

            // Calling this method indicates that the results don't matter anymore
            public void Cancel() { m_ac.Cancel(); }

            // This method is called after all Web servers respond, 
            // Cancel is called, or the timeout occurs
            private void AllDone(CoordinationStatus status)
            {
                switch (status)
                {
                    case CoordinationStatus.Cancel:
                        Console.WriteLine("Operation canceled.");
                        break;

                    case CoordinationStatus.Timeout:
                        Console.WriteLine("Operation timed-out.");
                        break;

                    case CoordinationStatus.AllDone:
                        Console.WriteLine("Operation completed; results below:");
                        foreach (var server in m_servers)
                        {
                            Console.Write("{0} ", server.Key);
                            Object result = server.Value;
                            if (result is Exception)
                            {
                                Console.WriteLine("failed due to {0}.", result.GetType().Name);
                            }
                            else
                            {
                                Console.WriteLine("returned {0:N0} bytes.", result);
                            }
                        }
                        break;
                }
            }
        }

        private enum CoordinationStatus
        {
            AllDone,
            Timeout,
            Cancel
        };

        private sealed class AsyncCoordinator
        {
            private Int32 m_opCount = 1;        // Decremented when AllBegun calls JustEnded
            private Int32 m_statusReported = 0; // 0=false, 1=true
            private Action<CoordinationStatus> m_callback;
            private Timer m_timer;

            // This method MUST be called BEFORE initiating an operation
            public void AboutToBegin(Int32 opsToAdd = 1)
            {
                Interlocked.Add(ref m_opCount, opsToAdd);
            }

            // This method MUST be called AFTER an operations result has been processed
            public void JustEnded()
            {
                if (Interlocked.Decrement(ref m_opCount) == 0)
                    ReportStatus(CoordinationStatus.AllDone);
            }

            // This method MUST be called AFTER initiating ALL operations
            public void AllBegun(Action<CoordinationStatus> callback, Int32 timeout = Timeout.Infinite)
            {
                m_callback = callback;
                if (timeout != Timeout.Infinite)
                {
                    // 在指定的时间点(dueTime) 调用回调函数,随后在指定的时间间隔(period)调用回调函数
                    m_timer = new Timer(TimeExpired, null, timeout, Timeout.Infinite);
                }
                JustEnded();
            }

            // 处理过时的线程
            private void TimeExpired(Object o) {
                ReportStatus(CoordinationStatus.Timeout);
            }

            public void Cancel()
            {
                if (m_callback == null)
                    throw new InvalidOperationException("Cancel cannot be called before AllBegun");
                ReportStatus(CoordinationStatus.Cancel);
            }

            private void ReportStatus(CoordinationStatus status)
            {
                if (m_timer != null)
                {  // If timer is still in play, kill it
                    Timer timer = Interlocked.Exchange(ref m_timer, null);
                    if (timer != null) timer.Dispose();
                }

                // If status has never been reported, report it; else ignore it
                if (Interlocked.Exchange(ref m_statusReported, 1) == 0)
                    m_callback(status);
            }
        }
    }


    class Program
    {
        static void Main(string[] args)
        {
            AsyncCoordinatorDemo.Go();

            Console.Read();
        }
    }
}

的确是无锁的操作,Interlocked方法是用户模式下的原子操作,针对的是CPU,不是线程内存,而且它是自旋等待的,耗费的是CPU资源。分析了下AsyncCoordinator类,主要就是利用Interlocked的Add方法,实时计数线程的数量,随后待一个线程运行的最后又调用Interlocked的Decrement方法自减。如果你留心的话,你会发现,目前绝大多数的并发判断中都用到了Interlocked的这些方法,尤其是interlocked的anything模式下的compareexchange方法,在这里提一嘴,除了compareexchange和exchange方法的返回值是返回ref类型原先的值之外,其余的方法都是返回改变之后的值。最后我们可以通过AllBegun方法来判断是不是所有的线程都执行完了,随后将状态变量m_statusReported设置为1,防止在进行状态判断。

这个类很好,之前写并发的时候,老是烦恼怎么判断并发是否已经完事了,又不想用到阻塞,这个类很好,当然应用到具体项目中可能还需要改,但是基本的模型还是这个,不变的。

有点感慨:好东西需要我们自己去发掘,之前查生产者消费者模型的时候,java代码一大堆,愣是没有看到几个C#,就算有也是简易,尽管可以把java的改变为C#的,但有点感慨C#的技术栈和资源少