BlockingCollection 和 IProducerConsumerCollection
1、BlockingCollection 为实现 IProducerConsumerCollection<T> 的线程安全集合提供阻塞和限制功能。
实现了 IProducerConsumerCollection<T> 接口的有:ConcurrentStack, ConcurrentQueue, and ConcurrentBag。
如果初始化BlockingColloction的时候不指定IProducerConsumerCollection<T>。则默认为ConcurrentQueue (FIFO) 。
2、为了展示BlockingCollection类的主要功能,让我们看看一个多线程场景,我们先用一个普通集合类。假设我们有一个队列,我们希望将工作放入该队列,并让其他线程消耗放入队列的项目,以便对它们执行某些操作。
var queue = new Queue<string>(); Task.Factory.StartNew(() => { while (true) { queue.Enqueue("value"); } });
在这个例子中,生成任务一直向队列中添加一个值value
Task.Factory.StartNew(() => { while (true) { if (queue.Count > 0) { string value = queue.Dequeue(); Console.WriteLine("Worker 1: " + value); } } }); Task.Factory.StartNew(() => { while (true) { if (queue.Count > 0) { //并发异常 string value = queue.Dequeue(); Console.WriteLine("Worker 2: " + value); } } });
开启两个消费任务来消费队列中的值。我们会发现,高负载运行的时候,其中一个任务检查队列是否为空,然后去拉取数值,这个时候另一个线程可能已经删除了值。显然会抛出异常。
那么,为了支持这种情况,我们需要做些什么呢?一种方法是使用lock关键字序列化对队列的所有调用。如果每个工作项都很重,这可能是一个好的解决方案,但如果我们的工作项很轻或我们有大量的消费者,那么它会破坏性能。好吧,在.NET 4.0中,我们可以使用几种无锁数据结构。实际上,这是一个并发队列。如果我们使用ConcurrentQueue,我们可以编写如下代码:
var queue = new ConcurrentQueue<string>(); Task.Factory.StartNew(() => { while (true) { queue.Enqueue("value" + count); count++; } }); Task.Factory.StartNew(() => { while (true) { string value; if (queue.TryDequeue(out value)) { Console.WriteLine("Worker 1: " + value); } } }); Task.Factory.StartNew(() => { while (true) { string value; if (queue.TryDequeue(out value)) { Console.WriteLine("Worker 2: " + value); } } });
这很不错,但是如果我们可以尝试获取一个项目并且如果没有可用的项目就阻塞队列,那会不会很好?当然会的!这就是我们拥有BlockingCollection的原因。它实现了这种确切的行为,还有一些额外的行为。BlockingCollection在其构造函数中采用IProducerConsumerCollection,或者如果调用其空构造函数,它将默认使用ConcurrentQueue。然后你要做的就是在BlockingCollection上调用“Add”或“Take”,如果队列中没有任何内容,它将阻止。所以上面的代码看起来像这样:
var blockingCollection = new BlockingCollection<string>(); Task.Factory.StartNew(() => { while (true) { blockingCollection.Add("value" + count); count++; } }); Task.Factory.StartNew(() => { while (true) { Console.WriteLine("Worker 1: " + blockingCollection.Take()); } }); Task.Factory.StartNew(() => { while (true) { Console.WriteLine("Worker 2: " + blockingCollection.Take()); } });
这已经很好了,但是我们还是会有While声明,我们不用这些while可不可以呢?当然可以啦!BlockingCollection还为我们实现了这种行为,并使用了一个名为“GetConsumingEnumerable”的方法。我们所做的只是调用此方法,然后迭代生成的IEnumerable并阻塞,直到找到工作项!现在这很好!所以上面的一个消费者看起来像这样:
Task.Factory.StartNew(() => { foreach (string value in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine("Worker 1: " + value); } });
这段代码将永远存在,迭代阻塞集合,并在项目用完时阻塞。一旦新项目开始出现在集合中,它将再次开始枚举它们!很容易!
BlockingCollection的作用就是让一些线程生成数据,并让许多其他线程获取和处理相同数据的非常简单的方法。您可以切换其底层存储机制,以便影响项目在添加项目时的行为,以及数据项来自哪里,何时从中获取数据项,所有数据项都完全从基础数据存储中抽象出来。我希望你找到BlockingCollection类的一些很好的用途,我希望你喜欢这篇文章!
上一篇: 基于数组的有界阻塞队列ArrayBlockingQueue源码分析
下一篇: 扶我到旁边坐一会儿