多线程,队列,先进先出、信号量...
程序员文章站
2022-04-14 16:39:57
某些场景并发量太高,需要采用队列辅助,特此备注:多线程队列,先进先出 某些情况也会用到阻塞当前线程,等待服务器返回或者耗时的处理,这种情况,可采用信号量辅助 1 ManualResetEvent allDone = new ManualResetEvent(false);//初始化,开启阻塞 2 a ......
某些场景并发量太高,需要采用队列辅助,特此备注:多线程队列,先进先出
某些情况也会用到阻塞当前线程,等待服务器返回或者耗时的处理,这种情况,可采用信号量辅助
1 manualresetevent alldone = new manualresetevent(false);//初始化,开启阻塞 2 alldone.reset();//信号量重置,状态为阻塞 3 alldone.waitone(5*1000);//阻塞当前线程,最大等待5秒 4 alldone.set();//释放信号量,将阻塞的线程继续向下
口水话注解:调用了set方法将事件设为true后,不会去调用reset方法,这将导致事件一直处于true,其它等待的多个线程都会得到执行,直到你手动调用reset方法。
相当于你把门打开后,需要手动去关(非自动门)。
1 autoresetevent reciveresetevent = new system.threading.autoresetevent(true);//初始化,未开启阻塞 2 reciveresetevent.waitone(30000);//阻塞 3 reciveresetevent.set();//释放后,自动开启阻塞
口水话注解:调用了set方法将事件设为true之后,其中一个等待线程得到执行后,它会自动调用reset方法,将事件信号设为false,以阻塞其它的线程。
相当于放一个线程进来,门自动就关了(自动门)。
1 /* 2 例子: 3 4 //初始化后台处理消息线程 5 asyncqueuedataprocessor<string> asyncqueuedetector = new asyncqueuedataprocessor<string>(); 6 asyncqueuedetector.processdata += asyncqueue_processdata;//事件里处理有数据时 7 asyncqueuedetector.start(); 8 9 服务启动后,使用 asyncqueuedetector.appenddata 追加数据到队列 10 */ 11 12 13 /// <summary> 14 /// 异步处理队列数据 15 /// </summary> 16 /// <typeparam name="t"></typeparam> 17 public class asyncqueuedataprocessor<t> : idisposable 18 { 19 #region 成员变量 20 21 /// <summary> 22 /// 待处理的数据队列 23 /// </summary> 24 concurrentqueue<t> dataqueue; 25 26 /// <summary> 27 /// 数据处理定时器 28 /// </summary> 29 timer processtimer; 30 31 #endregion 32 33 #region 构造函数 34 35 /// <summary> 36 /// 初始化 37 /// </summary> 38 /// <param name="intervalmillsecond"></param> 39 public asyncqueuedataprocessor(int intervalmillsecond = 120) 40 { 41 dataqueue = new concurrentqueue<t>(); 42 processtimer = new timer(intervalmillsecond); 43 processtimer.elapsed += processtimer_elapsed; 44 processtimer.autoreset = false; 45 } 46 47 #endregion 48 49 #region 自定义事件 50 51 /// <summary> 52 /// 数据抛出时触发 53 /// </summary> 54 public event eventhandler<t> processdata; 55 56 #endregion 57 58 #region 公共方法 59 60 /// <summary> 61 /// 释放所有资源 62 /// </summary> 63 public void dispose() 64 { 65 processtimer.dispose(); 66 cleardata(); 67 } 68 69 /// <summary> 70 /// 开始处理数据 71 /// </summary> 72 public void start() 73 { 74 processtimer.start(); 75 } 76 77 /// <summary> 78 /// 停止处理数据 79 /// </summary> 80 public void stop() 81 { 82 processtimer.stop(); 83 } 84 85 /// <summary> 86 /// 追加数据到队列 87 /// </summary> 88 /// <param name="data"></param> 89 public void appenddata(t data) 90 { 91 dataqueue.enqueue(data); 92 } 93 94 /// <summary> 95 /// 清空队列数据 96 /// </summary> 97 public void cleardata() 98 { 99 do 100 { 101 } while (dataqueue.trydequeue(out t t)); 102 } 103 #endregion 104 105 #region 事件 106 107 //处理数据时执行 108 private void processtimer_elapsed(object sender, elapsedeventargs e) 109 { 110 t t; 111 if (dataqueue.trydequeue(out t)) 112 { 113 if (processdata != null) 114 { 115 try 116 { 117 processdata(this, t); 118 } 119 catch (exception exp) 120 { 121 debug.writeline($"队列里面发生了未处理的异常-{exp.message}\r\n{exp.stacktrace}"); 122 } 123 } 124 } 125 126 processtimer.start(); 127 } 128 129 #endregion 130 }