基于阻塞队列的生产者消费者C#并发设计
程序员文章站
2022-03-21 19:40:43
这是从上文的<<图文并茂的生产者消费者应用实例demo>>整理总结出来的,具体就不说了,直接给出代码,注释我已经加了,原来的code请看<<.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列>>,我改成适合我的版本了,直接给code: 调用code: 封装的队列: ......
这是从上文的<<图文并茂的生产者消费者应用实例demo>>整理总结出来的,具体就不说了,直接给出代码,注释我已经加了,原来的code请看<<.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列>>,我改成适合我的版本了,直接给code:
调用code:
static void Main(string[] args) { ProcessQueue<int> processQueue = new ProcessQueue<int>(); processQueue.ProcessExceptionEvent += ProcessQueue_ProcessExceptionEvent; processQueue.ProcessItemEvent += ProcessQueue_ProcessItemEvent; for (int i = 0; i < 50; i++) { processQueue.Enqueue(i); } Console.WriteLine("阻塞队列的数量: {0}", processQueue.GetInternalItemCount()); processQueue.Flush(); Console.Read(); } /// <summary> /// 该方法对入队的每个元素进行处理 /// </summary> /// <param name="value"></param> private static void ProcessQueue_ProcessItemEvent(int value) { Console.WriteLine("输出: {0}", value); } /// <summary> /// 处理异常 /// </summary> /// <param name="obj">队列实例</param> /// <param name="ex">异常对象</param> /// <param name="value">出错的数据</param> private static void ProcessQueue_ProcessExceptionEvent(dynamic obj, Exception ex, int value) { Console.WriteLine(ex.ToString()); }
封装的队列:
public class ProcessQueue<T> { private BlockingCollection<T> _queue; private CancellationTokenSource _cancellationTokenSource; private CancellationToken _cancellToken; //内部线程池 private List<Thread> _threadCollection; //队列是否正在处理数据 private int _isProcessing; //有线程正在处理数据 private const int Processing = 1; //没有线程处理数据 private const int UnProcessing = 0; //队列是否可用 private volatile bool _enabled = true; //内部处理线程数量 private int _internalThreadCount; // 消费者处理事件 public event Action<T> ProcessItemEvent; //处理异常,需要三个参数,当前队列实例,异常,当时处理的数据 public event Action<dynamic, Exception, T> ProcessExceptionEvent; public ProcessQueue() { _queue = new BlockingCollection<T>(); _cancellationTokenSource = new CancellationTokenSource(); _internalThreadCount = 3; _cancellToken = _cancellationTokenSource.Token; _threadCollection = new List<Thread>(); } public ProcessQueue(int internalThreadCount) : this() { this._internalThreadCount = internalThreadCount; } /// <summary> /// 队列内部元素的数量 /// </summary> public int GetInternalItemCount() { //return _queue.Count; return _threadCollection.Count; } //生产者生产 public void Enqueue(T items) { if (items == null) { throw new ArgumentException("items"); } _queue.Add(items); DataAdded(); } public void Flush() { StopProcess(); while (_queue.Count != 0) { T item = default(T); if (_queue.TryTake(out item)) { try { ProcessItemEvent(item); } catch (Exception ex) { OnProcessException(ex, item); } } } } // 通知消费者消费队列元素 private void DataAdded() { if (_enabled) { if (!IsProcessingItem()) { Console.WriteLine("DataAdded"); ProcessRangeItem(); StartProcess(); } } } //判断是否队列有线程正在处理 private bool IsProcessingItem() { // 替换第一个参数, 如果相等 //int x = Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing); return !(Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing) == UnProcessing); } // 多消费者消费 private void ProcessRangeItem() { for (int i = 0; i < this._internalThreadCount; i++) { ProcessItem(); } } // 开启消费处理 private void ProcessItem() { Thread currentThread = new Thread((state) => { T item = default(T); while (_enabled) { try { try { if (!_queue.TryTake(out item)) { //Console.WriteLine("阻塞队列为0时的item: {0}", item); //Console.WriteLine("ok!!!"); break; } // 处理事件 ProcessItemEvent(item); } catch (OperationCanceledException ex) { DebugHelper.DebugView(ex.ToString()); } } catch (Exception ex) { OnProcessException(ex, item); } } }); _threadCollection.Add(currentThread); } // 开启消费者 private void StartProcess() { //Console.WriteLine("线程的数量: {0}", _threadCollection.Count); foreach (var thread in _threadCollection) { thread.Start(); thread.IsBackground = true; } } // 终止运行 private void StopProcess() { this._enabled = false; foreach (var thread in _threadCollection) { if (thread.IsAlive) { thread.Join(); } } _threadCollection.Clear(); } private void OnProcessException(Exception ex, T item) { var tempException = ProcessExceptionEvent; Interlocked.CompareExchange(ref ProcessExceptionEvent, null, null); if (tempException != null) { ProcessExceptionEvent(this, ex, item); } } }
上一篇: 详解Chai.js断言库API中文文档
下一篇: 荷叶减肥茶你喝过吗?其实还是不错的