基于C++11实现线程池的工作原理
目录
基于c++11实现线程池的工作原理.
不久前写过一篇线程池,那时候刚用c++写东西不久,很多c++标准库里面的东西没怎么用,今天基于c++11重新实现了一个线程池。
简介
线程池(thread pool)
:一种线程的使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的组成
1、线程池管理器
创建一定数量的线程,启动线程,调配任务,管理着线程池。
本篇线程池目前只需要启动(start()),停止方法(stop()),及任务添加方法(addtask).
start()创建一定数量的线程池,进行线程循环.
stop()停止所有线程循环,回收所有资源.
addtask()添加任务.
2、工作线程
线程池中线程,在线程池中等待并执行分配的任务.
本篇选用条件变量实现等待与通知机制.
3、任务接口,
添加任务的接口,以供工作线程调度任务的执行。
4、任务队列
用于存放没有处理的任务。提供一种缓冲机制
同时任务队列具有调度功能,高优先级的任务放在任务队列前面;本篇选用priority_queue 与pair的结合用作任务优先队列的结构.
线程池工作的四种情况.
假设我们的线程池大小为3,任务队列目前不做大小限制.
1、主程序当前没有任务要执行,线程池中的任务队列为空闲状态.
此情况下所有工作线程处于空闲的等待状态,任务缓冲队列为空.
2、主程序添加小于等于线程池中线程数量的任务.
此情况基于情形1,所有工作线程已处在等待状态,主线程开始添加三个任务,添加后通知(notif())唤醒线程池中的线程开始取(take())任务执行. 此时的任务缓冲队列还是空。
3、主程序添加任务数量大于当前线程池中线程数量的任务.
此情况发生情形2后面,所有工作线程都在工作中,主线程开始添加第四个任务,添加后发现现在线程池中的线程用完了,于是存入任务缓冲队列。工作线程空闲后主动从任务队列取任务执行.
4、主程序添加任务数量大于当前线程池中线程数量的任务,且任务缓冲队列已满.
此情况发生情形3且设置了任务缓冲队列大小后面,主程序添加第n个任务,添加后发现池子中的线程用完了,任务缓冲队列也满了,于是进入等待状态、等待任务缓冲队列中的任务腾空通知。
但是要注意这种情形会阻塞主线程,本篇暂不限制任务队列大小,必要时再来优化.
实现
等待通知机制通过条件变量实现,logger和currentthread,用于调试,可以无视.
#ifndef _threadpool_hh #define _threadpool_hh #include <vector> #include <utility> #include <queue> #include <thread> #include <functional> #include <mutex> #include "condition.hh" class threadpool{ public: static const int kinitthreadssize = 3; enum taskprioritye { level0, level1, level2, }; typedef std::function<void()> task; typedef std::pair<taskprioritye, task> taskpair; threadpool(); ~threadpool(); void start(); void stop(); void addtask(const task&); void addtask(const taskpair&); private: threadpool(const threadpool&);//禁止复制拷贝. const threadpool& operator=(const threadpool&); struct taskprioritycmp { bool operator()(const threadpool::taskpair p1, const threadpool::taskpair p2) { return p1.first > p2.first; //first的小值优先 } }; void threadloop(); task take(); typedef std::vector<std::thread*> threads; typedef std::priority_queue<taskpair, std::vector<taskpair>, taskprioritycmp> tasks; threads m_threads; tasks m_tasks; std::mutex m_mutex; condition m_cond; bool m_isstarted; }; #endif //cpp #include <assert.h> #include "logger.hh" // debug #include "currentthread.hh" // debug #include "threadpool.hh" threadpool::threadpool() :m_mutex(), m_cond(m_mutex), m_isstarted(false) { } threadpool::~threadpool() { if(m_isstarted) { stop(); } } void threadpool::start() { assert(m_threads.empty()); m_isstarted = true; m_threads.reserve(kinitthreadssize); for (int i = 0; i < kinitthreadssize; ++i) { m_threads.push_back(new std::thread(std::bind(&threadpool::threadloop, this))); } } void threadpool::stop() { log_trace << "threadpool::stop() stop."; { std::unique_lock<std::mutex> lock(m_mutex); m_isstarted = false; m_cond.notifyall(); log_trace << "threadpool::stop() notifyall()."; } for (threads::iterator it = m_threads.begin(); it != m_threads.end() ; ++it) { (*it)->join(); delete *it; } m_threads.clear(); } void threadpool::threadloop() { log_trace << "threadpool::threadloop() tid : " << currentthread::tid() << " start."; while(m_isstarted) { task task = take(); if(task) { task(); } } log_trace << "threadpool::threadloop() tid : " << currentthread::tid() << " exit."; } void threadpool::addtask(const task& task) { std::unique_lock<std::mutex> lock(m_mutex); /*while(m_tasks.isfull()) {//when m_tasks have maxsize cond2.wait(); } */ taskpair taskpair(level2, task); m_tasks.push(taskpair); m_cond.notify(); } void threadpool::addtask(const taskpair& taskpair) { std::unique_lock<std::mutex> lock(m_mutex); /*while(m_tasks.isfull()) {//when m_tasks have maxsize cond2.wait(); } */ m_tasks.push(taskpair); m_cond.notify(); } threadpool::task threadpool::take() { std::unique_lock<std::mutex> lock(m_mutex); //always use a while-loop, due to spurious wakeup while(m_tasks.empty() && m_isstarted) { log_trace << "threadpool::take() tid : " << currentthread::tid() << " wait."; m_cond.wait(lock); } log_trace << "threadpool::take() tid : " << currentthread::tid() << " wakeup."; task task; tasks::size_type size = m_tasks.size(); if(!m_tasks.empty() && m_isstarted) { task = m_tasks.top().second; m_tasks.pop(); assert(size - 1 == m_tasks.size()); /*if (taskqueuesize_ > 0) { cond2.notify(); }*/ } return task; }
测试程序
start() 、stop()
测试线程池基本的创建退出工作,及检测资源是否正常回收.
int main() { { threadpool threadpool; threadpool.start(); getchar(); } getchar(); return 0; }
./test.out 2018-11-25 16:50:36.054805 [trace] [threadpool.cpp:53] [threadloop] threadpool::threadloop() tid : 3680 start. 2018-11-25 16:50:36.054855 [trace] [threadpool.cpp:72] [take] threadpool::take() tid : 3680 wait. 2018-11-25 16:50:36.055633 [trace] [threadpool.cpp:53] [threadloop] threadpool::threadloop() tid : 3679 start. 2018-11-25 16:50:36.055676 [trace] [threadpool.cpp:72] [take] threadpool::take() tid : 3679 wait. 2018-11-25 16:50:36.055641 [trace] [threadpool.cpp:53] [threadloop] threadpool::threadloop() tid : 3681 start. 2018-11-25 16:50:36.055701 [trace] [threadpool.cpp:72] [take] threadpool::take() tid : 3681 wait. 2018-11-25 16:50:36.055736 [trace] [threadpool.cpp:53] [threadloop] threadpool::threadloop() tid : 3682 start. 2018-11-25 16:50:36.055746 [trace] [threadpool.cpp:72] [take] threadpool::take() tid : 3682 wait. 2018-11-25 16:51:01.411792 [trace] [threadpool.cpp:36] [stop] threadpool::stop() stop. 2018-11-25 16:51:01.411863 [trace] [threadpool.cpp:39] [stop] threadpool::stop() notifyall(). 2018-11-25 16:51:01.411877 [trace] [threadpool.cpp:76] [take] threadpool::take() tid : 3680 wakeup. 2018-11-25 16:51:01.411883 [trace] [threadpool.cpp:62] [threadloop] threadpool::threadloop() tid : 3680 exit. 2018-11-25 16:51:01.412062 [trace] [threadpool.cpp:76] [take] threadpool::take() tid : 3682 wakeup. 2018-11-25 16:51:01.412110 [trace] [threadpool.cpp:62] [threadloop] threadpool::threadloop() tid : 3682 exit. 2018-11-25 16:51:01.413052 [trace] [threadpool.cpp:76] [take] threadpool::take() tid : 3679 wakeup. 2018-11-25 16:51:01.413098 [trace] [threadpool.cpp:62] [threadloop] threadpool::threadloop() tid : 3679 exit. 2018-11-25 16:51:01.413112 [trace] [threadpool.cpp:76] [take] threadpool::take() tid : 3681 wakeup. 2018-11-25 16:51:01.413141 [trace] [threadpool.cpp:62] [threadloop] threadpool::threadloop() tid : 3681 exit.
addtask()、prioritytaskqueue
测试添加任务接口,及优先任务队列.
主线程首先添加了5个普通任务、 1s后添加一个高优先级任务,当前3个线程中的最先一个空闲后,会最先执行后面添加的priorityfunc().
std::mutex g_mutex; void priorityfunc() { for (int i = 1; i < 4; ++i) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::lock_guard<std::mutex> lock(g_mutex); log_debug << "priorityfunc() [" << i << "at thread [ " << currentthread::tid() << "] output";// << std::endl; } } void testfunc() { // loop to print character after a random period of time for (int i = 1; i < 4; ++i) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::lock_guard<std::mutex> lock(g_mutex); log_debug << "testfunc() [" << i << "] at thread [ " << currentthread::tid() << "] output";// << std::endl; } } int main() { threadpool threadpool; threadpool.start(); for(int i = 0; i < 5 ; i++) threadpool.addtask(testfunc); std::this_thread::sleep_for(std::chrono::seconds(1)); threadpool.addtask(threadpool::taskpair(threadpool::level0, priorityfunc)); getchar(); return 0; }
./test.out 2018-11-25 18:24:20.886837 [trace] [threadpool.cpp:56] [threadloop] threadpool::threadloop() tid : 4121 start. 2018-11-25 18:24:20.886893 [trace] [threadpool.cpp:103] [take] threadpool::take() tid : 4121 wakeup. 2018-11-25 18:24:20.887580 [trace] [threadpool.cpp:56] [threadloop] threadpool::threadloop() tid : 4120 start. 2018-11-25 18:24:20.887606 [trace] [threadpool.cpp:103] [take] threadpool::take() tid : 4120 wakeup. 2018-11-25 18:24:20.887610 [trace] [threadpool.cpp:56] [threadloop] threadpool::threadloop() tid : 4122 start. 2018-11-25 18:24:20.887620 [trace] [threadpool.cpp:103] [take] threadpool::take() tid : 4122 wakeup. 2018-11-25 18:24:21.887779 [debug] [main.cpp:104] [testfunc] testfunc() [1] at thread [ 4120] output 2018-11-25 18:24:21.887813 [debug] [main.cpp:104] [testfunc] testfunc() [1] at thread [ 4122] output 2018-11-25 18:24:21.888909 [debug] [main.cpp:104] [testfunc] testfunc() [1] at thread [ 4121] output 2018-11-25 18:24:22.888049 [debug] [main.cpp:104] [testfunc] testfunc() [2] at thread [ 4120] output 2018-11-25 18:24:22.888288 [debug] [main.cpp:104] [testfunc] testfunc() [2] at thread [ 4122] output 2018-11-25 18:24:22.889978 [debug] [main.cpp:104] [testfunc] testfunc() [2] at thread [ 4121] output 2018-11-25 18:24:23.888467 [debug] [main.cpp:104] [testfunc] testfunc() [3] at thread [ 4120] output 2018-11-25 18:24:23.888724 [trace] [threadpool.cpp:103] [take] threadpool::take() tid : 4120 wakeup. 2018-11-25 18:24:23.888778 [debug] [main.cpp:104] [testfunc] testfunc() [3] at thread [ 4122] output 2018-11-25 18:24:23.888806 [trace] [threadpool.cpp:103] [take] threadpool::take() tid : 4122 wakeup. 2018-11-25 18:24:23.890413 [debug] [main.cpp:104] [testfunc] testfunc() [3] at thread [ 4121] output 2018-11-25 18:24:23.890437 [trace] [threadpool.cpp:103] [take] threadpool::take() tid : 4121 wakeup. 2018-11-25 18:24:24.889247 [debug] [main.cpp:92] [priorityfunc] priorityfunc() [1at thread [ 4120] output 2018-11-25 18:24:24.891187 [debug] [main.cpp:104] [testfunc] testfunc() [1] at thread [ 4121] output 2018-11-25 18:24:24.893163 [debug] [main.cpp:104] [testfunc] testfunc() [1] at thread [ 4122] output 2018-11-25 18:24:25.889567 [debug] [main.cpp:92] [priorityfunc] priorityfunc() [2at thread [ 4120] output 2018-11-25 18:24:25.891477 [debug] [main.cpp:104] [testfunc] testfunc() [2] at thread [ 4121] output 2018-11-25 18:24:25.893450 [debug] [main.cpp:104] [testfunc] testfunc() [2] at thread [ 4122] output 2018-11-25 18:24:26.890295 [debug] [main.cpp:92] [priorityfunc] priorityfunc() [3at thread [ 4120] output 2018-11-25 18:24:26.890335 [trace] [threadpool.cpp:99] [take] threadpool::take() tid : 4120 wait. 2018-11-25 18:24:26.892265 [debug] [main.cpp:104] [testfunc] testfunc() [3] at thread [ 4121] output 2018-11-25 18:24:26.892294 [trace] [threadpool.cpp:99] [take] threadpool::take() tid : 4121 wait. 2018-11-25 18:24:26.894274 [debug] [main.cpp:104] [testfunc] testfunc() [3] at thread [ 4122] output 2018-11-25 18:24:26.894299 [trace] [threadpool.cpp:99] [take] threadpool::take() tid : 4122 wait. 2018-11-25 18:24:35.359003 [trace] [threadpool.cpp:37] [stop] threadpool::stop() stop. 2018-11-25 18:24:35.359043 [trace] [threadpool.cpp:42] [stop] threadpool::stop() notifyall(). 2018-11-25 18:24:35.359061 [trace] [threadpool.cpp:103] [take] threadpool::take() tid : 4120 wakeup. 2018-11-25 18:24:35.359067 [trace] [threadpool.cpp:65] [threadloop] threadpool::threadloop() tid : 4120 exit. 2018-11-25 18:24:35.359080 [trace] [threadpool.cpp:103] [take] threadpool::take() tid : 4122 wakeup. 2018-11-25 18:24:35.359090 [trace] [threadpool.cpp:65] [threadloop] threadpool::threadloop() tid : 4122 exit. 2018-11-25 18:24:35.359123 [trace] [threadpool.cpp:103] [take] threadpool::take() tid : 4121 wakeup. 2018-11-25 18:24:35.359130 [trace] [threadpool.cpp:65] [threadloop] threadpool::threadloop() tid : 4121 exit.
源码下载
如果有需要,可以访问我的github进行下载
: https://github.com/bethlyrosedaisley/threadpool
上一篇: C++拷贝构造函数 的理解
下一篇: 我与众不同