多线程编程学习笔记——使用并发集合(一)
接上文 多线程编程学习笔记——async和await(一)
接上文 多线程编程学习笔记——async和await(二)
接上文 多线程编程学习笔记——async和await(三)
编程需要对基本的数据结构和算法有所了解。程序员为并发情况 选择最合适 的数据结构,那就需要知道算法运行时间、空间复杂度等。
对于并行计算,我们需要使用适当的数据结构,这些数据结构需要具备可伸缩性,尽可能地避免死锁,同时还能提供线程安全的访问。.Net Framework 4.0引入了System.Collections.Concurrent命名空间,其中包含了一些适合并发计算的数据结构。
ConcurrentQueue,这个集合使用了原子的比较和交换(CAS)操作,使用SpinWait来保证线程安全。它实现了一个先进先出(FIFO)的集合,就是说元素出队列的顺序与加入队列的顺序是一样的。可以调用enqueue文献向队列中加入元素,使用TryDequeue方法试图取出队列中的第一个元素,使用TryPeek方法则试图得到第一个元素但并不从队列中删除此元素。 ConCurrentStack的实现也没有使用任何锁,只采用了CAS操作。它是一个后进先出(LIFO)的集合,这意味着最近添加的元素会先返回。可以使用Push和PushRange方法添加元素,使用TryPop和TryPopRange方法获取元素,以及使用TryPeek方法检查元素。 ConcurrentBag是一个支持重复元素的无序集合。它针对这样以下情况 进行了优化,即多个线程以这样的方式工作:每个线程产生和消费自己的任务,极少与其他线程的任务交互(如果要交互则使用锁)。添加元素使用add方法,检查元素使用TryPeek方法,获取元素使用TryTake方法。 ConcurrentDictionary是一个线程安全的字典集合的实现。对于读操作无需使用锁。但是对于写操作则需要锁。这个并发字典使用多个锁,在字典桶之上实现了一个细粒度的锁模型。使用参数 concurrencyLevel可以在构造函数 中定义锁的数量,这意味着预估的线程数量将并发地更新这个字典。注:由于并发字典使用锁,所以没有必要请避免使用以下操作:Count、IsEmpty、Keys、Values、CopyTo及ToArray。
5. BlockingCollection是对IProducerConsumerCollection泛型接口的实现 的一个高级封装。它有很多先进的功能来实现管道场景,即当你有一些步骤需要使用之前步骤运行的结果时。BlockingCollectione类支持如下功能:分场 、调整内部集合容量、取消集合操作、从多个块集合中获取元素。
并行算法有可能非常复杂,并且或多或少涵盖了这些并行集合。线程安全并不是没有代价的。比起System.Collections和System.Collections.Generic命名空间中的经典列表 、集合和数组来说,并发集合会有更大的开销,因此,应该只在需要从多个任务中并发访问集合的时候才使用并发集合。在中等代码中使用并发集合没有意义,因为它们会增加无谓的开销。下面我们使用最简单的例子来学习这些并行集合。
一、 使用ConcurrentDictionary
本示例展示了一个非常简单的场景,比较在单线程环境中使用通常的字典集合与使用并发字典的性能。
1.程序示例代码,如下。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Diagnostics; namespace ThreadCollectionDemo { class Program { const string item = "Dict Name"; public static string CurrentItem; static double time1; static void Main(string[] args) { Console.WriteLine(string.Format("----- ConcurrentDictionary 操作----")); var concurrentDict = new ConcurrentDictionary<int, string>(); var dict = new Dictionary<int, string>(); var sw = new Stopwatch(); sw.Start(); //普通字典,100万次循环 for (int i = 0; i < 1000000; i++) { lock (dict) { dict[i] = string.Format("{0} {1}", item, i); } } sw.Stop(); Console.WriteLine(string.Format("对普通字典集合(Dictionary) 进行100万次写操作共用时间:{0}----",sw.Elapsed)); time1 = sw.Elapsed.TotalMilliseconds; sw.Restart(); for (int i = 0; i < 1000000; i++) { concurrentDict[i] = string.Format("{0} {1}", item, i); } sw.Stop(); Console.WriteLine(string.Format("对并行字典集合(ConcurrentDictionary) 进行100万次写操作共用时间:{0}", sw.Elapsed)); Console.WriteLine(string.Format("写操作 普通字典/并行字典 = {0}", time1/1.0/sw.Elapsed.TotalMilliseconds)); Console.WriteLine(); sw.Restart(); for (int i = 0; i < 1000000; i++) { lock (dict) { CurrentItem = dict[i]; } } sw.Stop(); Console.WriteLine(string.Format("对普通字典集合(Dictionary) 进行100万次读操作共用时间:{0}----", sw.Elapsed)); time1 = sw.Elapsed.TotalMilliseconds; sw.Restart(); for (int i = 0; i < 1000000; i++) { CurrentItem= concurrentDict[i]; } sw.Stop(); Console.WriteLine(string.Format("对并行字典集合(ConcurrentDictionary) 进行100万次读操作共用时间:{0}----", sw.Elapsed)); Console.WriteLine(string.Format("读操作 普通字典/并行字典 = {0}", time1 / 1.0 / sw.Elapsed.TotalMilliseconds)); //多线程并行读取数据 sw.Restart(); Task[] process = new Task[4]; for (int i = 0; i < 4; i++) { process[i ] = Task.Run(() => Get(concurrentDict, i)); } Task.WhenAll(process); sw.Stop(); Console.WriteLine(string.Format("多线程对并行字典集合(ConcurrentDictionary) 进行100万次读操作共用时间:{0}", sw.Elapsed)); Console.WriteLine(string.Format("读操作 普通字典/多线程读并行字典 = {0}", time1 / 1.0 / sw.Elapsed.TotalMilliseconds)); Console.Read(); } private static void Get(ConcurrentDictionary<int,string> dict,int index) { for (int i = 0; i < 1000000; i += 4) { if (i%4==index) { string s = dict[i]; } } } } }
2.单线程情况下,程序运行结果,如下图。
当程序启动时我们创建了两个集合,其中一个是标准的字典集合,另一个是新的并发字典集合。然后采用锁的机制向标准的字典中添加元素,并测量完成100万次的时间。同时采用同样的场景来测量ConcurrentDictionary的性能 ,最后 比较从两个集合中获取值 的性能。
通过上面的比较,我们发现ConcurrentDictionary写操作比使用锁的通常字典要慢的多,而读操作则要快些。因此对字典需要大量的线程安全的读操作,ConcurrentDictionary是最好的选择。
最后,我们使用4个线程同时读ConcurrentDictionary,则会发现这个字典的性能更好。
在main方法中添加一个四线程的读字典的代码。如下。
//多线程并行读取数据 sw.Restart(); Task[] process = new Task[4]; for (int i = 0; i < 4; i++) { process[i ] = Task.Run(() => Get(concurrentDict, i)); } Task.WhenAll(process); sw.Stop(); Console.WriteLine(string.Format("多线程对并行字典集合(ConcurrentDictionary) 进行100万次读操作共用时间:{0}", sw.Elapsed)); Console.WriteLine(string.Format("读操作 普通字典/多线程读并行字典 = {0}", time1 / 1.0 / sw.Elapsed.TotalMilliseconds)); private static void Get(ConcurrentDictionary<int,string> dict,int index) { for (int i = 0; i < 1000000; i += 4) { if (i%4==index) { string s = dict[i]; } } }
3.多线程情况下,程序运行结果,如下图。从图中可以看出,普通字典的操作是多线程读并行字典的30多倍。这是我机器上的情况。各人的机器配置不一,请自行测试。