欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C# 多线程与高并发处理并且具备暂停、继续、停止功能

程序员文章站 2023-11-13 13:41:58
--近期有一个需要运用多线程的项目,会有并发概率,所以写了一份代码,可能有写地方还不完善,后续有需求在改 1 /// 2 /// 并发对象 3 /// 4 public class MeterAsyncQueue 5 { 6 public MeterAsyn... ......
--近期有一个需要运用多线程的项目,会有并发概率,所以写了一份代码,可能有写地方还不完善,后续有需求在改
1 /// <summary> 2 /// 并发对象 3 /// </summary> 4 public class meterasyncqueue 5 { 6 public meterasyncqueue() 7 { 8 meterinfotask = new meterinfo(); 9 } 10 11 public meterinfo meterinfotask { get; set; } 12 } 13 public class meterinfo 14 { 15 public meterinfo() 16 { 17 18 } 19 public int id { get; set; } 20 21 }
  1     /// <summary>
  2     /// 线程通用类
  3     /// </summary>
  4     public class taskcommand
  5     {
  6         cancellationtokensource tokensource = new cancellationtokensource();
  7         manualresetevent resetevent = new manualresetevent(true);
  8         thread thread = null;
  9         /// <summary>
 10         /// 开始任务
 11         /// </summary>
 12         public void startdata()
 13         {
 14             tokensource = new cancellationtokensource();
 15             resetevent = new manualresetevent(true);
 16 
 17             list<int> ids = new list<int>();
 18             for (int i = 0; i < 10000; i++)
 19             {
 20                 ids.add(i);
 21             }
 22             thread = new thread(new threadstart(() => starttask(ids)));
 23             thread.start();
 24         }
 25         /// <summary>
 26         /// 暂停任务
 27         /// </summary>
 28         public void outdata()
 29         {
 30             //task暂停
 31             resetevent.reset();
 32         }
 33         /// <summary>
 34         /// 继续任务
 35         /// </summary>
 36         public void continuedata()
 37         {
 38             //task继续
 39             resetevent.set();
 40         }
 41         /// <summary>
 42         /// 取消任务
 43         /// </summary>
 44         public void cancel()
 45         {
 46             //释放对象
 47             resetevent.dispose();
 48             foreach (var currenttask in paralleltasks)
 49             {
 50                 if (currenttask != null)
 51                 {
 52                     if (currenttask.status == taskstatus.running) { }
 53                     {
 54                         //终止task线程
 55                         tokensource.cancel();
 56                     }
 57                 }
 58             }
 59             thread.abort();
 60         }
 61         /// <summary>
 62         /// 执行数据
 63         /// </summary>
 64         /// <param name="index"></param>
 65         public void execute(int index)
 66         {
 67             //阻止当前线程
 68             resetevent.waitone();
 69 
 70             console.writeline("当前第" + index + "个线程");
 71 
 72             thread.sleep(1000);
 73 
 74         }
 75         //队列对象
 76         private queue<meterasyncqueue> asyncqueues { get; set; }
 77 
 78         /// <summary>
 79         /// 并发任务数
 80         /// </summary>
 81         private int paralleltaskcount { get; set; }
 82 
 83 
 84         /// <summary>
 85         /// 并行任务集合
 86         /// </summary>
 87         private list<task> paralleltasks { get; set; }
 88         //控制线程并行数量
 89         public void starttask(list<int> ids)
 90         {
 91             isinittask = true;
 92             paralleltasks = new list<task>();
 93             asyncqueues = new queue<meterasyncqueue>();
 94             //获取并发数
 95             paralleltaskcount = 5;
 96 
 97             //初始化异步队列
 98             initasyncqueue(ids);
 99             //开始执行队列任务
100             handlingtask();
101 
102             task.waitall(new task[] { task.whenall(paralleltasks.toarray()) });
103         }
104         /// <summary>
105         /// 初始化异步队列
106         /// </summary>
107         private void initasyncqueue(list<int> ids)
108         {
109             foreach (var item in ids)
110             {
111                 meterinfo info = new meterinfo();
112                 info.id = item;
113                 asyncqueues.enqueue(new meterasyncqueue()
114                 {
115                     meterinfotask = info
116                 });
117             }
118         }
119         /// <summary>
120         /// 是否首次执行任务
121         /// </summary>
122         private bool isinittask { get; set; }
123         //锁
124         private readonly object _objlock = new object();
125 
126         /// <summary>
127         /// 开始执行队列任务
128         /// </summary>
129         private void handlingtask()
130         {
131             lock (_objlock)
132             {
133                 if (asyncqueues.count <= 0)
134                 {
135                     return;
136                 }
137 
138                 var loopcount = getavailabletaskcount();
139                 //并发处理队列
140                 for (int i = 0; i < loopcount; i++)
141                 {
142                     handlingqueue();
143                 }
144                 isinittask = false;
145             }
146         }
147         /// <summary>
148         /// 获取队列锁
149         /// </summary>
150         private readonly object _queuelock = new object();
151 
152         /// <summary>
153         /// 处理队列
154         /// </summary>
155         private void handlingqueue()
156         {
157             cancellationtoken token = tokensource.token;
158             lock (_queuelock)
159             {
160                 if (asyncqueues.count > 0)
161                 {
162                     var asyncqueue = asyncqueues.dequeue();
163 
164                     if (asyncqueue == null) return;
165                     var task = task.factory.startnew(() =>
166                     {
167                         if (token.iscancellationrequested)
168                         {
169                             return;
170                         }
171                         //阻止当前线程
172                         resetevent.waitone();
173                         //执行任务
174                         execute(asyncqueue.meterinfotask.id);
175 
176                     }, token).continuewith(t =>
177                     {
178                         handlingtask();
179                     }, taskcontinuationoptions.onlyonrantocompletion | taskcontinuationoptions.executesynchronously);
180                     paralleltasks.add(task);
181                 }
182             }
183         }
184         /// <summary>
185         /// 获取当前有效并行的任务数
186         /// </summary>
187         /// <returns></returns>
188         [methodimpl(methodimploptions.synchronized)]
189         private int getavailabletaskcount()
190         {
191             if (isinittask)
192                 return paralleltaskcount;
193             return 1;
194         }
195     }