欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

TaskCreationOptions.LongRunning 运行比可用线程数更多的任务

程序员文章站 2022-04-08 15:56:00
最近在学WebSocket,服务端需要监听多个WebSocket客户端发送的消息。 开始的解决方法是每个WebSocket客户端都添加一个线程进行监听,代码如下: /// /// 监听端口 创建WebSocket /// ///


最近在学websocket,服务端需要监听多个websocket客户端发送的消息。

开始的解决方法是每个websocket客户端都添加一个线程进行监听,代码如下:

/// <summary>
/// 监听端口 创建websocket
/// </summary>
/// <param name="httplistener"></param>
private void createwebsocket(httplistener httplistener)
{
    if (!httplistener.islistening)
        throw new exception("httplistener未启动");
    httplistenercontext listenercontext =  httplistener.getcontextasync().result;

    if (!listenercontext.request.iswebsocketrequest)
    {
        createwebsocket(httplistener);
        return;
    }

    websocketcontext websocket = null;
    try
    {
        websocket = new websocketcontext(listenercontext, subprotocol);
    }
    catch (exception ex)
    {
        log.error(ex);
        createwebsocket(httplistener);
        return;
    }

    log.info($"成功创建websocket:{websocket.id}");

    int workerthreads = 0, completionportthreads = 0;
    threadpool.getavailablethreads(out workerthreads, out completionportthreads);
    if (workerthreads <= reservedthreadscount + 1 || completionportthreads <= reservedthreadscount + 1)
    {
        /**
         * 可用线程小于预留线程数量
         * 通知客户端关闭连接
         * */
        websocket.closeasync(websocketclosestatus.internalservererror, "可用线程不足,无法连接").wait();
    }
    else
    {
        if (onreceivemessage != null)
            websocket.onreceivemessage += onreceivemessage;
        websocket.onclosewebsocket += websocket_onclosewebsocket;
        websocketcontexts.add(websocket);

        // 在线程中监听客户端发送的消息
        threadpool.queueuserworkitem(new waitcallback(p =>
        {
            (p as websocketcontext).receivemessageasync().wait();
        }), websocket);

    }

    createwebsocket(httplistener);
}

 

但是可用线程数量是有限的,先连接的客户端一直递归接收消息,导致线程无限占用,后连接上的客户端就没有线程用于监听接受消息了。
接受消息方法如下:

/// <summary>
/// 递归 同步接收消息
/// </summary>
/// <returns></returns>
public void receivemessage()
{
    websocket websocket = httplistenerwebsocketcontext.websocket;

    if (websocket.state != websocketstate.open)
        throw new exception("http未握手成功,不能接受消息!");
    
    var bytebuffer = websocket.createserverbuffer(receivebuffersize);
    websocketreceiveresult receiveresult = null;
    try
    {
        receiveresult = websocket.receiveasync(bytebuffer, cancellationtoken).result;
    }
    catch (websocketexception ex)
    {
        if (ex.innerexception is httplistenerexception)
        {
            log.error(ex);
            closeasync(websocketclosestatus.protocolerror, "客户端断开连接" + ex.message).wait(timespan.fromseconds(20));
            return;
        }
        else
        {
            log.error(ex);
            closeasync(websocketclosestatus.protocolerror, "websocket 连接异常" + ex.message).wait(timespan.fromseconds(20));
            return;
        }
    }
    catch (exception ex)
    {
        log.error(ex);
        closeasync(websocketclosestatus.protocolerror, "客户端断开连接" + ex.message).wait(timespan.fromseconds(20));
        return;
    }
    if (receiveresult.closestatus.hasvalue)
    {
        log.info("接受到关闭消息!");
        closeasync(receiveresult.closestatus.value, receiveresult.closestatusdescription).wait(timespan.fromseconds(20));
        return;
    }

    byte[] bytes = new byte[receiveresult.count];
    array.copy(bytebuffer.array, bytes, bytes.length);

    string message = encoding.getstring(bytes);
    log.info($"{id}接收到消息:{message}");

    if (onreceivemessage != null)
        onreceivemessage.invoke(this, message);

    if (!cancellationtoken.iscancellationrequested)
        receivemessage();
}

这是不能接受的。

后来在task中看到,在创建task时可以设置taskcreationoptions参数

该枚举有个字段longrunning 

longrunning 2

指定任务将是长时间运行的、粗粒度的操作,涉及比细化的系统更少、更大的组件。 它会向 taskscheduler 提示,过度订阅可能是合理的。 可以通过过度订阅创建比可用硬件线程数更多的线程。 它还将提示任务计划程序:该任务需要附加线程,以使任务不阻塞本地线程池队列中其他线程或工作项的向前推动。

经过测试,可同时运行的任务数量的确可以超出可用线程数量。
测试如下:
没有设置 taskcreationoptions.longrunning  代码如下:

        /// <summary>
        /// 测试任务
        /// 只运行了9个任务
        /// </summary>
        [testmethod]
        public void testtask1()
        {
            var cts = new cancellationtokensource();
            int maxworkerthreads = 0, maxcompletionportthreads = 0;
            threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads);
            console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}");
            maxworkerthreads = 10;
            maxcompletionportthreads = 10;
            console.writeline(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 i/o 线程的最大数目为{1}
同时运行30个长时运行线程,每个线程中运行一个同步方法,看是否30个线程是否都能运行。", maxworkerthreads, maxcompletionportthreads);
            threadpool.setmaxthreads(10, 10);
            threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads);
            console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}");
            
            int count = 0;
            while (count++ < 30)
            {
                task.factory.startnew(p =>
                {
                    int index = (int)p;
                    int runcount = 0;
                    longrunningtask($"线程{index}", runcount, cts.token);
                }, count, cts.token, taskcreationoptions.none, taskscheduler.default);
            }

            task.delay(timespan.fromseconds(10)).wait(timespan.fromseconds(20)); // 等待超时,等待任务没有执行
            cts.cancel();
        }

        /// <summary>
        /// 长时运行任务
        /// 递归运行
        /// </summary>
        /// <param name="taskname">任务名称</param>
        /// <param name="runcount">运行次数</param>
        /// <param name="token">传播有关取消操作的通知</param>
        private void longrunningtask(string taskname, int runcount, cancellationtoken token)
        {
            printtask($"任务【{taskname}】线程id【{environment.currentmanagedthreadid}】第【{++runcount}】次运行").wait();
            if (!token.iscancellationrequested)
                longrunningtask(taskname, runcount, token);
        }
        /// <summary>
        /// 异步打印任务 等待1秒后打印消息
        /// </summary>
        /// <param name="message">消息</param>
        /// <returns></returns>
        private task printtask(string message)
        {
            return task.factory.startnew(() =>
            {
                thread.sleep(1000);
                console.writeline(message);
            });
        }

 测试结果
TaskCreationOptions.LongRunning 运行比可用线程数更多的任务

测试用了20秒才完成
TaskCreationOptions.LongRunning 运行比可用线程数更多的任务

主线程创建了一个等待10秒后完成的任务,任务等待超时20秒

说明主程序创建的任务没有执行,而是等待超时了。

 

设置了 taskcreationoptions.longrunning  代码如下:

        /// <summary>
        /// 测试长时运行任务
        /// 30个任务全部都运行了
        /// </summary>
        [testmethod]
        public void testtasklongrunning()
        {
            var cts = new cancellationtokensource();
            int maxworkerthreads = 0, maxcompletionportthreads = 0;
            threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads);
            maxworkerthreads = 10;
            maxcompletionportthreads = 10;
            console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}");
            console.writeline(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 i/o 线程的最大数目为{1}
同时运行30个长时运行线程,每个线程中运行一个同步方法,看是否30个线程是否都能运行。", maxworkerthreads, maxcompletionportthreads);
            threadpool.setmaxthreads(10, 10);
            threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads);
            console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}");
            
            int count = 0;
            while (count++ < 30)
            {

                task.factory.startnew(p =>
                {
                    int index = (int)p;
                    int runcount = 0;
                    longrunningtask($"线程{index}", runcount, cts.token);
                }, count, cts.token, taskcreationoptions.longrunning, taskscheduler.default);
            }

            task.delay(timespan.fromseconds(10)).wait(timespan.fromseconds(20));    // 等待没有超时,等待任务有执行
            cts.cancel();
        }

测试结果:

TaskCreationOptions.LongRunning 运行比可用线程数更多的任务

测试用了10秒完成

TaskCreationOptions.LongRunning 运行比可用线程数更多的任务

主线程创建了一个等待10秒后完成的任务,任务等待超时20秒

说明主程序创建的任务立即执行了,程序等待了10秒完成。

 使用taskcreationoptions.longrunning  需要注意的是action必须是同步方法同时运行任务书才能超出可以用线程数量,否则不能。

例如:

        /// <summary>
        /// 测试长时运行任务
        /// 只运行了前9个任务
        /// </summary>
        [testmethod]
        public void testtasklongrunning2()
        {
            var cts = new cancellationtokensource();
            int maxworkerthreads = 0, maxcompletionportthreads = 0;
            threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads);
            console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}");

            maxworkerthreads = 10;
            maxcompletionportthreads = 10;
            console.writeline(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 i/o 线程的最大数目为{1}
同时运行30个长时运行线程,每个线程中运行一个异步方法,看是否30个线程是否都能运行。", maxworkerthreads, maxcompletionportthreads);
            threadpool.setmaxthreads(10, 10);
            threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads);
            console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}");
            
            int count = 0;
            while (count++ < 30)
            {

                task.factory.startnew(async p =>
                {
                    int index = (int)p;
                    int runcount = 0;
                    await longrunningtaskasync($"线程{index}", runcount, cts.token);
                }, count, cts.token, taskcreationoptions.longrunning, taskscheduler.default);
            }

            task.delay(timespan.fromseconds(10)).wait(timespan.fromseconds(20));    // 等待没有超时,等待任务有执行
            cts.cancel();
        }
        /// <summary>
        /// 异步长时运行任务
        /// </summary>
        /// <param name="taskname">任务名称</param>
        /// <param name="runcount">运行次数</param>
        /// <param name="token">传播有关取消操作的通知</param>
        /// <returns></returns>
        private async task longrunningtaskasync(string taskname, int runcount, cancellationtoken token)
        {
            await printtask($"任务【{taskname}】线程id【{environment.currentmanagedthreadid}】第【{++runcount}】次运行");
            if (!token.iscancellationrequested)
                await longrunningtaskasync(taskname, runcount, token);
        }

 

测试结果

TaskCreationOptions.LongRunning 运行比可用线程数更多的任务

测试用了10秒完成

TaskCreationOptions.LongRunning 运行比可用线程数更多的任务

主线程创建了一个等待10秒后完成的任务,任务等待超时20秒

说明主程序创建的任务立即执行了,程序等待了10秒完成。

websocket修改后的监听方法:

        /// <summary>
        /// 监听端口 创建websocket
        /// </summary>
        /// <param name="httplistener"></param>
        private void createwebsocket(httplistener httplistener)
        {
            if (!httplistener.islistening)
                throw new exception("httplistener未启动");
            httplistenercontext listenercontext = httplistener.getcontext();

            if (!listenercontext.request.iswebsocketrequest)
            {
                createwebsocket(httplistener);
                return;
            }

            websocketcontext websocket = null;
            try
            {
                websocket = new websocketcontext(listenercontext, subprotocol);
            }
            catch (exception ex)
            {
                log.error(ex);
                createwebsocket(httplistener);
                return;
            }

            log.info($"成功创建websocket:{websocket.id}");

            int workerthreads = 0, completionportthreads = 0;
            threadpool.getavailablethreads(out workerthreads, out completionportthreads);
            if (onreceivemessage != null)
                    websocket.onreceivemessage += onreceivemessage;
                websocket.onclosewebsocket += websocket_onclosewebsocket;

            task.factory.startnew(() =>
            {
                websocket.receivemessage();
            }, cancellationtoken, taskcreationoptions.longrunning, taskscheduler.default);

            createwebsocket(httplistener);
        }

 

修改后的websocket服务可以监听超过可用线程数量的客户端

TaskCreationOptions.LongRunning 运行比可用线程数更多的任务