TaskCreationOptions.LongRunning 运行比可用线程数更多的任务
最近在学websocket,服务端需要监听多个websocket客户端发送的消息。
开始的解决方法是每个websocket客户端都添加一个线程进行监听,代码如下:
/// <summary> /// 监听端口 创建websocket /// </summary> /// <param name="httplistener"></param> private void createwebsocket(httplistener httplistener) { if (!httplistener.islistening) throw new exception("httplistener未启动"); httplistenercontext listenercontext = httplistener.getcontextasync().result; if (!listenercontext.request.iswebsocketrequest) { createwebsocket(httplistener); return; } websocketcontext websocket = null; try { websocket = new websocketcontext(listenercontext, subprotocol); } catch (exception ex) { log.error(ex); createwebsocket(httplistener); return; } log.info($"成功创建websocket:{websocket.id}"); int workerthreads = 0, completionportthreads = 0; threadpool.getavailablethreads(out workerthreads, out completionportthreads); if (workerthreads <= reservedthreadscount + 1 || completionportthreads <= reservedthreadscount + 1) { /** * 可用线程小于预留线程数量 * 通知客户端关闭连接 * */ websocket.closeasync(websocketclosestatus.internalservererror, "可用线程不足,无法连接").wait(); } else { if (onreceivemessage != null) websocket.onreceivemessage += onreceivemessage; websocket.onclosewebsocket += websocket_onclosewebsocket; websocketcontexts.add(websocket); // 在线程中监听客户端发送的消息 threadpool.queueuserworkitem(new waitcallback(p => { (p as websocketcontext).receivemessageasync().wait(); }), websocket); } createwebsocket(httplistener); }
但是可用线程数量是有限的,先连接的客户端一直递归接收消息,导致线程无限占用,后连接上的客户端就没有线程用于监听接受消息了。
接受消息方法如下:
/// <summary> /// 递归 同步接收消息 /// </summary> /// <returns></returns> public void receivemessage() { websocket websocket = httplistenerwebsocketcontext.websocket; if (websocket.state != websocketstate.open) throw new exception("http未握手成功,不能接受消息!"); var bytebuffer = websocket.createserverbuffer(receivebuffersize); websocketreceiveresult receiveresult = null; try { receiveresult = websocket.receiveasync(bytebuffer, cancellationtoken).result; } catch (websocketexception ex) { if (ex.innerexception is httplistenerexception) { log.error(ex); closeasync(websocketclosestatus.protocolerror, "客户端断开连接" + ex.message).wait(timespan.fromseconds(20)); return; } else { log.error(ex); closeasync(websocketclosestatus.protocolerror, "websocket 连接异常" + ex.message).wait(timespan.fromseconds(20)); return; } } catch (exception ex) { log.error(ex); closeasync(websocketclosestatus.protocolerror, "客户端断开连接" + ex.message).wait(timespan.fromseconds(20)); return; } if (receiveresult.closestatus.hasvalue) { log.info("接受到关闭消息!"); closeasync(receiveresult.closestatus.value, receiveresult.closestatusdescription).wait(timespan.fromseconds(20)); return; } byte[] bytes = new byte[receiveresult.count]; array.copy(bytebuffer.array, bytes, bytes.length); string message = encoding.getstring(bytes); log.info($"{id}接收到消息:{message}"); if (onreceivemessage != null) onreceivemessage.invoke(this, message); if (!cancellationtoken.iscancellationrequested) receivemessage(); }
这是不能接受的。
后来在task中看到,在创建task时可以设置taskcreationoptions参数
该枚举有个字段longrunning
longrunning | 2 |
指定任务将是长时间运行的、粗粒度的操作,涉及比细化的系统更少、更大的组件。 它会向 taskscheduler 提示,过度订阅可能是合理的。 可以通过过度订阅创建比可用硬件线程数更多的线程。 它还将提示任务计划程序:该任务需要附加线程,以使任务不阻塞本地线程池队列中其他线程或工作项的向前推动。 |
经过测试,可同时运行的任务数量的确可以超出可用线程数量。
测试如下:
没有设置 taskcreationoptions.longrunning 代码如下:
/// <summary> /// 测试任务 /// 只运行了9个任务 /// </summary> [testmethod] public void testtask1() { var cts = new cancellationtokensource(); int maxworkerthreads = 0, maxcompletionportthreads = 0; threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads); console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}"); maxworkerthreads = 10; maxcompletionportthreads = 10; console.writeline(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 i/o 线程的最大数目为{1} 同时运行30个长时运行线程,每个线程中运行一个同步方法,看是否30个线程是否都能运行。", maxworkerthreads, maxcompletionportthreads); threadpool.setmaxthreads(10, 10); threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads); console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}"); int count = 0; while (count++ < 30) { task.factory.startnew(p => { int index = (int)p; int runcount = 0; longrunningtask($"线程{index}", runcount, cts.token); }, count, cts.token, taskcreationoptions.none, taskscheduler.default); } task.delay(timespan.fromseconds(10)).wait(timespan.fromseconds(20)); // 等待超时,等待任务没有执行 cts.cancel(); } /// <summary> /// 长时运行任务 /// 递归运行 /// </summary> /// <param name="taskname">任务名称</param> /// <param name="runcount">运行次数</param> /// <param name="token">传播有关取消操作的通知</param> private void longrunningtask(string taskname, int runcount, cancellationtoken token) { printtask($"任务【{taskname}】线程id【{environment.currentmanagedthreadid}】第【{++runcount}】次运行").wait(); if (!token.iscancellationrequested) longrunningtask(taskname, runcount, token); } /// <summary> /// 异步打印任务 等待1秒后打印消息 /// </summary> /// <param name="message">消息</param> /// <returns></returns> private task printtask(string message) { return task.factory.startnew(() => { thread.sleep(1000); console.writeline(message); }); }
测试结果
测试用了20秒才完成
主线程创建了一个等待10秒后完成的任务,任务等待超时20秒
说明主程序创建的任务没有执行,而是等待超时了。
设置了 taskcreationoptions.longrunning 代码如下:
/// <summary> /// 测试长时运行任务 /// 30个任务全部都运行了 /// </summary> [testmethod] public void testtasklongrunning() { var cts = new cancellationtokensource(); int maxworkerthreads = 0, maxcompletionportthreads = 0; threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads); maxworkerthreads = 10; maxcompletionportthreads = 10; console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}"); console.writeline(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 i/o 线程的最大数目为{1} 同时运行30个长时运行线程,每个线程中运行一个同步方法,看是否30个线程是否都能运行。", maxworkerthreads, maxcompletionportthreads); threadpool.setmaxthreads(10, 10); threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads); console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}"); int count = 0; while (count++ < 30) { task.factory.startnew(p => { int index = (int)p; int runcount = 0; longrunningtask($"线程{index}", runcount, cts.token); }, count, cts.token, taskcreationoptions.longrunning, taskscheduler.default); } task.delay(timespan.fromseconds(10)).wait(timespan.fromseconds(20)); // 等待没有超时,等待任务有执行 cts.cancel(); }
测试结果:
测试用了10秒完成
主线程创建了一个等待10秒后完成的任务,任务等待超时20秒
说明主程序创建的任务立即执行了,程序等待了10秒完成。
使用taskcreationoptions.longrunning 需要注意的是action必须是同步方法同时运行任务书才能超出可以用线程数量,否则不能。
例如:
/// <summary> /// 测试长时运行任务 /// 只运行了前9个任务 /// </summary> [testmethod] public void testtasklongrunning2() { var cts = new cancellationtokensource(); int maxworkerthreads = 0, maxcompletionportthreads = 0; threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads); console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}"); maxworkerthreads = 10; maxcompletionportthreads = 10; console.writeline(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 i/o 线程的最大数目为{1} 同时运行30个长时运行线程,每个线程中运行一个异步方法,看是否30个线程是否都能运行。", maxworkerthreads, maxcompletionportthreads); threadpool.setmaxthreads(10, 10); threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads); console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}"); int count = 0; while (count++ < 30) { task.factory.startnew(async p => { int index = (int)p; int runcount = 0; await longrunningtaskasync($"线程{index}", runcount, cts.token); }, count, cts.token, taskcreationoptions.longrunning, taskscheduler.default); } task.delay(timespan.fromseconds(10)).wait(timespan.fromseconds(20)); // 等待没有超时,等待任务有执行 cts.cancel(); } /// <summary> /// 异步长时运行任务 /// </summary> /// <param name="taskname">任务名称</param> /// <param name="runcount">运行次数</param> /// <param name="token">传播有关取消操作的通知</param> /// <returns></returns> private async task longrunningtaskasync(string taskname, int runcount, cancellationtoken token) { await printtask($"任务【{taskname}】线程id【{environment.currentmanagedthreadid}】第【{++runcount}】次运行"); if (!token.iscancellationrequested) await longrunningtaskasync(taskname, runcount, token); }
测试结果
测试用了10秒完成
主线程创建了一个等待10秒后完成的任务,任务等待超时20秒
说明主程序创建的任务立即执行了,程序等待了10秒完成。
websocket修改后的监听方法:
/// <summary> /// 监听端口 创建websocket /// </summary> /// <param name="httplistener"></param> private void createwebsocket(httplistener httplistener) { if (!httplistener.islistening) throw new exception("httplistener未启动"); httplistenercontext listenercontext = httplistener.getcontext(); if (!listenercontext.request.iswebsocketrequest) { createwebsocket(httplistener); return; } websocketcontext websocket = null; try { websocket = new websocketcontext(listenercontext, subprotocol); } catch (exception ex) { log.error(ex); createwebsocket(httplistener); return; } log.info($"成功创建websocket:{websocket.id}"); int workerthreads = 0, completionportthreads = 0; threadpool.getavailablethreads(out workerthreads, out completionportthreads); if (onreceivemessage != null) websocket.onreceivemessage += onreceivemessage; websocket.onclosewebsocket += websocket_onclosewebsocket; task.factory.startnew(() => { websocket.receivemessage(); }, cancellationtoken, taskcreationoptions.longrunning, taskscheduler.default); createwebsocket(httplistener); }
修改后的websocket服务可以监听超过可用线程数量的客户端
上一篇: C# Socket网络编程
下一篇: DotNetCore依赖注入实现批量注入