欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

ThreadPool类(线程池)

程序员文章站 2022-07-09 12:53:59
CLR线程池并不会在CLR初始化时立即建立线程,而是在应用程序要创建线程来运行任务时,线程池才初始化一个线程。线程池初始化时是没有线程的,线程池里的线程的初始化与其他线程一样,但是在完成任务以后,该线程不会自行销毁,而是以挂起的状态返回到线程池。直到应用程序再次向线程池发出请求时,线程池里挂起的线程 ......

clr线程池并不会在clr初始化时立即建立线程,而是在应用程序要创建线程来运行任务时,线程池才初始化一个线程。
线程池初始化时是没有线程的,线程池里的线程的初始化与其他线程一样,但是在完成任务以后,该线程不会自行销毁,而是以挂起的状态返回到线程池。直到应用程序再次向线程池发出请求时,线程池里挂起的线程就会再度激活执行任务。
这样既节省了建立线程所造成的性能损耗,也可以让多个任务反复重用同一线程,从而在应用程序生存期内节约大量开销。

通过clr线程池所建立的线程总是默认为后台线程,优先级数为threadpriority.normal。

clr线程池分为工作者线程(workerthreads)i/o线程(completionportthreads)两种:

  • 工作者线程是主要用作管理clr内部对象的运作,通常用于计算密集的任务。
  • i/o(input/output)线程主要用于与外部系统交互信息,如输入输出,cpu仅需在任务开始的时候,将任务的参数传递给设备,然后启动硬件设备即可。等任务完成的时候,cpu收到一个通知,一般来说是一个硬件的中断信号,此时cpu继续后继的处理工作。在处理过程中,cpu是不必完全参与处理过程的,如果正在运行的线程不交出cpu的控制权,那么线程也只能处于等待状态,即使操作系统将当前的cpu调度给其他线程,此时线程所占用的空间还是被占用,而并没有cpu处理这个线程,可能出现线程资源浪费的问题。如果这是一个网络服务程序,每一个网络连接都使用一个线程管理,可能出现大量线程都在等待网络通信,随着网络连接的不断增加,处于等待状态的线程将会很消耗尽所有的内存资源。可以考虑使用线程池解决这个问题。

  线程池的最大值一般默认为1000、2000。当大于此数目的请求时,将保持排队状态,直到线程池里有线程可用。

  使用clr线程池的工作者线程一般有两种方式:

  • 通过threadpool.queueuserworkitem()方法;
  • 通过委托;

  要注意,不论是通过threadpool.queueuserworkitem()还是委托,调用的都是线程池里的线程。

通过以下两个方法可以读取和设置clr线程池中工作者线程与i/o线程的最大线程数。

  1. threadpool.getmax(out in workerthreads,out int completionportthreads);
  2. threadpool.setmax(int workerthreads,int completionportthreads);

  若想测试线程池中有多少线程正在投入使用,可以通过threadpool.getavailablethreads(out in workthreads,out int conoletionportthreads)方法。

方法 说明
getavailablethreads 剩余空闲线程数
getmaxthreads 最多可用线程数,所有大于此数目的请求将保持排队状态,直到线程池线程变为可用
getminthreads 检索线程池在新请求预测中维护的空闲线程数
queueuserworkitem 启动线程池里得一个线程(队列的方式,如线程池暂时没空闲线程,则进入队列排队)
setmaxthreads 设置线程池中的最大线程数
setminthreads 设置线程池最少需要保留的线程数

我们可以使用线程池来解决上面的大部分问题,跟使用单个线程相比,使用线程池有如下优点:

1、缩短应用程序的响应时间。因为在线程池中有线程的线程处于等待分配任务状态(只要没有超过线程池的最大上限),无需创建线程。

2、不必管理和维护生存周期短暂的线程,不用在创建时为其分配资源,在其执行完任务之后释放资源。

3、线程池会根据当前系统特点对池内的线程进行优化处理。

总之使用线程池的作用就是减少创建和销毁线程的系统开销。在.net中有一个线程的类threadpool,它提供了线程池的管理。

threadpool是一个静态类,它没有构造函数,对外提供的函数也全部是静态的。其中有一个queueuserworkitem方法,它有两种重载形式,如下:

public static bool queueuserworkitem(waitcallback callback):将方法排入队列以便执行。此方法在有线程池线程变得可用时执行。

public static bool queueuserworkitem(waitcallback callback,object state):将方法排入队列以便执行,并指定包含该方法所用数据的对象。此方法在有线程池线程变得可用时执行。

queueuserworkitem方法中使用的的waitcallback参数表示一个delegate,它的声明如下:

public delegate void waitcallback(object state)

如果需要传递任务信息可以利用waitcallback中的state参数,类似于parameterizedthreadstart委托。

下面是一个threadpool的例子,代码如下:

ThreadPool类(线程池)
using system;
using system.collections;
using system.componentmodel;
using system.diagnostics;
using system.threading;

namespace consoleapp1
{
    class threadpooldemo
    {
        public threadpooldemo()
        {
        }

        public void work()
        {
            threadpool.queueuserworkitem(new waitcallback(countprocess));
            threadpool.queueuserworkitem(new waitcallback(getenvironmentvariables));
        }
        /// <summary>  
        /// 统计当前正在运行的系统进程信息  
        /// </summary>  
        /// <param name="state"></param>  
        private void countprocess(object state)
        {
            process[] processes = process.getprocesses();
            foreach (process p in processes)
            {
                try
                {
                    console.writeline("进程信息:id:{0},processname:{1},starttime:{2}", p.id, p.processname, p.starttime);
                }
                catch (win32exception e)
                {
                    console.writeline("processname:{0}", p.processname);
                }
                finally
                {
                }
            }
            console.writeline("获取进程信息完毕。");
        }
        /// <summary>  
        /// 获取当前机器系统变量设置  
        /// </summary>  
        /// <param name="state"></param>  
        public void getenvironmentvariables(object state)
        {
            idictionary list = system.environment.getenvironmentvariables();
            foreach (dictionaryentry item in list)
            {
                console.writeline("系统变量信息:key={0},value={1}", item.key, item.value);
            }
            console.writeline("获取系统变量信息完毕。");
        }
    }
}
threadpooldemo
ThreadPool类(线程池)
using system;
using system.threading;

namespace consoleapp1
{

    class program
    {
        static void main(string[] args)
        {
            threadpooldemo tpd1 = new threadpooldemo();
            tpd1.work();
            thread.sleep(5000);
            console.writeline("ok");
            console.readline();
        }
    }
}
program

 

利用threadpool调用工作线程和io线程的范例

ThreadPool类(线程池)
using system;
using system.collections;
using system.io;
using system.text;
using system.threading;

namespace consoleapp1
{

    class program
    {
        static void main(string[] args)
        {
            // 设置线程池中处于活动的线程的最大数目
            // 设置线程池中工作者线程数量为1000,i/o线程数量为1000
            threadpool.setmaxthreads(1000, 1000);
            console.writeline("main thread: queue an asynchronous method");
            printmessage("main thread start");

            // 把工作项添加到队列中,此时线程池会用工作者线程去执行回调方法            
            threadpool.queueuserworkitem(asyncmethod);
            asyncwritefile();
            console.read();
        }

        // 方法必须匹配waitcallback委托
        private static void asyncmethod(object state)
        {
            thread.sleep(1000);
            printmessage("asynchoronous method");
            console.writeline("asynchoronous thread has worked ");
        }


        #region 异步读取文件模块
        private static void asyncreadfile()
        {
            byte[] bytedata = new byte[1024];
            filestream stream = new filestream(@"d:\123.txt", filemode.openorcreate, fileaccess.readwrite, fileshare.readwrite, 1024, true);
            //把filestream对象,byte[]对象,长度等有关数据绑定到filedate对象中,以附带属性方式送到回调函数
            hashtable ht = new hashtable();
            ht.add("length", (int)stream.length);
            ht.add("stream", stream);
            ht.add("bytedata", bytedata);

            //启动异步读取,倒数第二个参数是指定回调函数,倒数第一个参数是传入回调函数中的参数
            stream.beginread(bytedata, 0, (int)ht["length"], new asynccallback(completed), ht);
            printmessage("asyncreadfile method");
        }

        //实际参数就是回调函数
        static void completed(iasyncresult result)
        {
            thread.sleep(2000);
            printmessage("asyncreadfile completed method");
            //参数result实际上就是hashtable对象,以filestream.endread完成异步读取
            hashtable ht = (hashtable)result.asyncstate;
            filestream stream = (filestream)ht["stream"];
            int length = stream.endread(result);
            stream.close();
            string str = encoding.utf8.getstring(ht["bytedata"] as byte[]);
            console.writeline(str);
            stream.close();
        }
        #endregion

        #region 异步写入文件模块
        //异步写入模块
        private static void asyncwritefile()
        {
            //文件名 文件创建方式 文件权限 文件进程共享 缓冲区大小为1024 是否启动异步i/o线程为true
            filestream stream = new filestream(@"d:\123.txt", filemode.openorcreate, fileaccess.readwrite, fileshare.readwrite, 1024, true);
            //这里要注意,如果写入的字符串很小,则.net会使用辅助线程写,因为这样比较快
            byte[] bytes = encoding.utf8.getbytes("你在他乡还好吗?");
            //异步写入开始,倒数第二个参数指定回调函数,最后一个参数将自身传到回调函数里,用于结束异步线程
            stream.beginwrite(bytes, 0, (int)bytes.length, new asynccallback(callback), stream);
            printmessage("asyncwritefile method");
        }

        static void callback(iasyncresult result)
        {
            //显示线程池现状
            thread.sleep(2000);
            printmessage("asyncwritefile callback method");
            //通过result.asyncstate再强制转换为filestream就能够获取filestream对象,用于结束异步写入
            filestream stream = (filestream)result.asyncstate;
            stream.endwrite(result);
            stream.flush();
            stream.close();
            asyncreadfile();
        }
        #endregion

        // 打印线程池信息
        private static void printmessage(string data)
        {
            int workthreadnumber;
            int iothreadnumber;

            // 获得线程池中可用的线程,把获得的可用工作者线程数量赋给workthreadnumber变量
            // 获得的可用i/o线程数量给iothreadnumber变量
            threadpool.getavailablethreads(out workthreadnumber, out iothreadnumber);

            console.writeline("{0}\n currentthreadid is {1}\n currentthread is background :{2}\n workerthreadnumber is:{3}\n iothreadnumbers is: {4}\n",
                data,
                thread.currentthread.managedthreadid,
                thread.currentthread.isbackground.tostring(),
                workthreadnumber.tostring(),
                iothreadnumber.tostring());
        }
    }
}
program

 

线程池中放入异步操作

ThreadPool类(线程池)
using system;
using system.threading;

namespace consoleapp1
{

    class program
    {
        private static void asyncoperation(object state)
        {
            console.writeline("operation state: {0}", state ?? "(null)");
            console.writeline("worker thread id: {0}", thread.currentthread.managedthreadid);
            thread.sleep(timespan.fromseconds(2));
        }

        static void main(string[] args)
        {
            const int x = 1;
            const int y = 2;
            const string lambdastate = "lambda state 2";

            threadpool.queueuserworkitem(asyncoperation);
            thread.sleep(timespan.fromseconds(1));

            threadpool.queueuserworkitem(asyncoperation, "async state");
            thread.sleep(timespan.fromseconds(1));

            threadpool.queueuserworkitem(state => {
                console.writeline("operation state: {0}", state);
                console.writeline("worker thread id: {0}", thread.currentthread.managedthreadid);
                thread.sleep(timespan.fromseconds(2));
            }, "lambda state");

            threadpool.queueuserworkitem(_ =>
            {
                console.writeline("operation state: {0}, {1}", x + y, lambdastate);
                console.writeline("worker thread id: {0}", thread.currentthread.managedthreadid);
                thread.sleep(timespan.fromseconds(2));
            }, "lambda state");

            thread.sleep(timespan.fromseconds(2));
        }
    }
}
program

 

线程池同步操作

ThreadPool类(线程池)
using system;
using system.threading;

namespace consoleapp1
{
    class threadpooldemo
    {
        static object lockobj = new object();
        static int count = 0;
        manualresetevent manualevent;
        public threadpooldemo(manualresetevent manualevent)
        {
            this.manualevent = manualevent;
        }
        public void displaynumber(object a)
        {

            lock (lockobj)
            {
                count++;
                console.writeline("当前运算结果:{0},count={1},当前子线程id:{2} 的状态:{3}", a, count, thread.currentthread.managedthreadid, thread.currentthread.threadstate);
            }
            //console.writeline("当前运算结果:{0}", a);
            //console.writeline("当前运算结果:{0},当前子线程id:{1} 的状态:{2}", a,thread.currentthread.managedthreadid, thread.currentthread.threadstate);
            //这里是方法执行时间的模拟,如果注释该行代码,就能看出线程池的功能了
            thread.sleep(2000);
            //console.writeline("当前运算结果:{0},count={1},当前子线程id:{2} 的状态:{3}", a, count, thread.currentthread.managedthreadid, thread.currentthread.threadstate);
            //这里是释放共享锁,让其他线程进入
            manualevent.set();


        }
    }
}
threadpooldemo
ThreadPool类(线程池)
using system;
using system.diagnostics;
using system.threading;

namespace consoleapp1
{

    class program
    {
        //设定任务数量 
        static int count = 10;
        static void main(string[] args)
        {
            //让线程池执行5个任务所以也为每个任务加上这个对象保持同步
            manualresetevent[] events = new manualresetevent[count];
            console.writeline("当前主线程id:{0}", thread.currentthread.managedthreadid);

            stopwatch sw = new stopwatch();
            sw.start();
            nothreadpool(count);
            sw.stop();
            console.writeline("execution time using threads: {0}", sw.elapsedmilliseconds);


            sw.reset();
            sw.start();
            //循环每个任务
            for (int i = 0; i < count; i++)
            {
                //实例化同步工具
                events[i] = new manualresetevent(false);
                //test在这里就是任务类,将同步工具的引用传入能保证共享区内每次只有一个线程进入
                threadpooldemo tst = new threadpooldemo(events[i]);
                //thread.sleep(200);
                //将任务放入线程池中,让线程池中的线程执行该任务                 
                threadpool.queueuserworkitem(tst.displaynumber, i);
            }
            //注意这里,设定waitall是为了阻塞调用线程(主线程),让其余线程先执行完毕,
            //其中每个任务完成后调用其set()方法(收到信号),当所有
            //的任务都收到信号后,执行完毕,将控制权再次交回调用线程(这里的主线程)
            manualresetevent.waitall(events);
            sw.stop();
            console.writeline("execution time using threads: {0}", sw.elapsedmilliseconds);
            //console.writeline("所有任务做完!");
            console.readkey();
        }

        static void nothreadpool(int count)
        {
            for (int i = 0; i < count; i++)
            {
                thread.sleep(2000);
                console.writeline("当前运算结果:{0},count={1},当前子线程id:{2} 的状态:{3}", i, i + 1, thread.currentthread.managedthreadid, thread.currentthread.threadstate);
            }
        }

    }
}
program

 

线程池中的取消操作

ThreadPool类(线程池)
using system;
using system.threading;

namespace consoleapp1
{

    class program
    {
        static void main(string[] args)
        {
            threadpool.setmaxthreads(1000, 1000);
            console.writeline("main thread run");
            printmessage("start");
            run();
            console.readkey();
        }

        private static void run()
        {
            cancellationtokensource cts = new cancellationtokensource();

            // 这里用lambda表达式的方式和使用委托的效果一样的,只是用了lambda后可以少定义一个方法。
            // 这在这里就是让大家明白怎么lambda表达式如何由委托转变的
            ////threadpool.queueuserworkitem(o => count(cts.token, 1000));
            threadpool.queueuserworkitem(callback, cts.token);

            console.writeline("press enter key to cancel the operation\n");
            console.readline();

            // 传达取消请求            
            cts.cancel();
            console.readline();
        }

        private static void callback(object state)
        {
            thread.sleep(1000);
            printmessage("asynchoronous method start");
            cancellationtoken token = (cancellationtoken)state;
            count(token, 1000);
        }

        // 执行的操作,当受到取消请求时停止数数
        private static void count(cancellationtoken token, int countto)
        {
            for (int i = 0; i < countto; i++)
            {
                if (token.iscancellationrequested)
                {
                    console.writeline("count is canceled");
                    break;
                }

                console.writeline(i);
                thread.sleep(300);
            }

            console.writeline("cout has done");
        }

        // 打印线程池信息
        private static void printmessage(string data)
        {
            int workthreadnumber;
            int iothreadnumber;

            // 获得线程池中可用的线程,把获得的可用工作者线程数量赋给workthreadnumber变量
            // 获得的可用i/o线程数量给iothreadnumber变量
            threadpool.getavailablethreads(out workthreadnumber, out iothreadnumber);

            console.writeline("{0}\n currentthreadid is {1}\n currentthread is background :{2}\n workerthreadnumber is:{3}\n iothreadnumbers is: {4}\n",
                data,
                thread.currentthread.managedthreadid,
                thread.currentthread.isbackground.tostring(),
                workthreadnumber.tostring(),
                iothreadnumber.tostring());
        }
    }
}
program

 

thread与threadpool的一个性能比较

ThreadPool类(线程池)
using system;
using system.diagnostics;
using system.threading;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            const int numberofoperations = 300;
            var sw = new stopwatch();
            sw.start();
            usethreads(numberofoperations);
            sw.stop();
            console.writeline("execution time using threads: {0}", sw.elapsedmilliseconds);

            sw.reset();
            sw.start();
            usethreadpool(numberofoperations);
            sw.stop();
            console.writeline("execution time using threadpool: {0}", sw.elapsedmilliseconds);
        }

        static void usethreads(int numberofoperations)
        {
            using (var countdown = new countdownevent(numberofoperations))
            {
                console.writeline("scheduling work by creating threads");
                for (int i = 0; i < numberofoperations; i++)
                {
                    var thread = new thread(() => {
                        console.write("{0},", thread.currentthread.managedthreadid);
                        thread.sleep(timespan.fromseconds(0.1));
                        countdown.signal();
                    });
                    thread.start();
                }
                countdown.wait();
                console.writeline();
            }
        }

        static void usethreadpool(int numberofoperations)
        {
            using (var countdown = new countdownevent(numberofoperations))
            {
                console.writeline("starting work on a threadpool");
                for (int i = 0; i < numberofoperations; i++)
                {
                    threadpool.queueuserworkitem(_ => {
                        console.write("{0},", thread.currentthread.managedthreadid);
                        thread.sleep(timespan.fromseconds(0.1));
                        countdown.signal();
                    });
                }
                countdown.wait();
                console.writeline();
            }
        }
    }
}
program