C# 多线程与高并发处理并且具备暂停、继续、停止功能
程序员文章站
2023-11-13 13:41:58
--近期有一个需要运用多线程的项目,会有并发概率,所以写了一份代码,可能有写地方还不完善,后续有需求在改 1 /// 2 /// 并发对象 3 /// 4 public class MeterAsyncQueue 5 { 6 public MeterAsyn... ......
--近期有一个需要运用多线程的项目,会有并发概率,所以写了一份代码,可能有写地方还不完善,后续有需求在改
1 /// <summary> 2 /// 并发对象 3 /// </summary> 4 public class meterasyncqueue 5 { 6 public meterasyncqueue() 7 { 8 meterinfotask = new meterinfo(); 9 } 10 11 public meterinfo meterinfotask { get; set; } 12 } 13 public class meterinfo 14 { 15 public meterinfo() 16 { 17 18 } 19 public int id { get; set; } 20 21 }
1 /// <summary> 2 /// 线程通用类 3 /// </summary> 4 public class taskcommand 5 { 6 cancellationtokensource tokensource = new cancellationtokensource(); 7 manualresetevent resetevent = new manualresetevent(true); 8 thread thread = null; 9 /// <summary> 10 /// 开始任务 11 /// </summary> 12 public void startdata() 13 { 14 tokensource = new cancellationtokensource(); 15 resetevent = new manualresetevent(true); 16 17 list<int> ids = new list<int>(); 18 for (int i = 0; i < 10000; i++) 19 { 20 ids.add(i); 21 } 22 thread = new thread(new threadstart(() => starttask(ids))); 23 thread.start(); 24 } 25 /// <summary> 26 /// 暂停任务 27 /// </summary> 28 public void outdata() 29 { 30 //task暂停 31 resetevent.reset(); 32 } 33 /// <summary> 34 /// 继续任务 35 /// </summary> 36 public void continuedata() 37 { 38 //task继续 39 resetevent.set(); 40 } 41 /// <summary> 42 /// 取消任务 43 /// </summary> 44 public void cancel() 45 { 46 //释放对象 47 resetevent.dispose(); 48 foreach (var currenttask in paralleltasks) 49 { 50 if (currenttask != null) 51 { 52 if (currenttask.status == taskstatus.running) { } 53 { 54 //终止task线程 55 tokensource.cancel(); 56 } 57 } 58 } 59 thread.abort(); 60 } 61 /// <summary> 62 /// 执行数据 63 /// </summary> 64 /// <param name="index"></param> 65 public void execute(int index) 66 { 67 //阻止当前线程 68 resetevent.waitone(); 69 70 console.writeline("当前第" + index + "个线程"); 71 72 thread.sleep(1000); 73 74 } 75 //队列对象 76 private queue<meterasyncqueue> asyncqueues { get; set; } 77 78 /// <summary> 79 /// 并发任务数 80 /// </summary> 81 private int paralleltaskcount { get; set; } 82 83 84 /// <summary> 85 /// 并行任务集合 86 /// </summary> 87 private list<task> paralleltasks { get; set; } 88 //控制线程并行数量 89 public void starttask(list<int> ids) 90 { 91 isinittask = true; 92 paralleltasks = new list<task>(); 93 asyncqueues = new queue<meterasyncqueue>(); 94 //获取并发数 95 paralleltaskcount = 5; 96 97 //初始化异步队列 98 initasyncqueue(ids); 99 //开始执行队列任务 100 handlingtask(); 101 102 task.waitall(new task[] { task.whenall(paralleltasks.toarray()) }); 103 } 104 /// <summary> 105 /// 初始化异步队列 106 /// </summary> 107 private void initasyncqueue(list<int> ids) 108 { 109 foreach (var item in ids) 110 { 111 meterinfo info = new meterinfo(); 112 info.id = item; 113 asyncqueues.enqueue(new meterasyncqueue() 114 { 115 meterinfotask = info 116 }); 117 } 118 } 119 /// <summary> 120 /// 是否首次执行任务 121 /// </summary> 122 private bool isinittask { get; set; } 123 //锁 124 private readonly object _objlock = new object(); 125 126 /// <summary> 127 /// 开始执行队列任务 128 /// </summary> 129 private void handlingtask() 130 { 131 lock (_objlock) 132 { 133 if (asyncqueues.count <= 0) 134 { 135 return; 136 } 137 138 var loopcount = getavailabletaskcount(); 139 //并发处理队列 140 for (int i = 0; i < loopcount; i++) 141 { 142 handlingqueue(); 143 } 144 isinittask = false; 145 } 146 } 147 /// <summary> 148 /// 获取队列锁 149 /// </summary> 150 private readonly object _queuelock = new object(); 151 152 /// <summary> 153 /// 处理队列 154 /// </summary> 155 private void handlingqueue() 156 { 157 cancellationtoken token = tokensource.token; 158 lock (_queuelock) 159 { 160 if (asyncqueues.count > 0) 161 { 162 var asyncqueue = asyncqueues.dequeue(); 163 164 if (asyncqueue == null) return; 165 var task = task.factory.startnew(() => 166 { 167 if (token.iscancellationrequested) 168 { 169 return; 170 } 171 //阻止当前线程 172 resetevent.waitone(); 173 //执行任务 174 execute(asyncqueue.meterinfotask.id); 175 176 }, token).continuewith(t => 177 { 178 handlingtask(); 179 }, taskcontinuationoptions.onlyonrantocompletion | taskcontinuationoptions.executesynchronously); 180 paralleltasks.add(task); 181 } 182 } 183 } 184 /// <summary> 185 /// 获取当前有效并行的任务数 186 /// </summary> 187 /// <returns></returns> 188 [methodimpl(methodimploptions.synchronized)] 189 private int getavailabletaskcount() 190 { 191 if (isinittask) 192 return paralleltaskcount; 193 return 1; 194 } 195 }
上一篇: C#获取文件相关信息的方法
下一篇: C#遍历操作系统下所有驱动器的方法