欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

linux通过c++实现线程池类

程序员文章站 2022-04-06 13:00:42
线程池的实现 [TOC] 前言 初学C++,想封装点常用的C++类,已经写好了mutex,cond,thread的类,想用起来写点东西,于是就决定写线程池了,这里拙笔记录下学习笔记. 本文主要内容包括: 、`使用原因 适用场景 线程池的实现 任务调度逻辑 样例测试`. 线程池的概念 线程池是指在一个 ......

目录

线程池的实现


前言

初学C++,想封装点常用的C++类,已经写好了mutex,cond,thread的类,想用起来写点东西,于是就决定写线程池了,这里拙笔记录下学习笔记.
本文主要内容包括: 线程池的概念使用原因适用场景线程池的实现任务调度逻辑样例测试.

线程池的概念

线程池是指在一个多线程程序中创建一个线程集合,在执行新的任务的时候不是新建一个线程,而是使用线程池中已创建好的线程,一旦任务执行完毕,线程就会休眠等待新的任务分配下来,线程池中的线程数量取决于机器给进程所能分配的内存大小,以及应用程序的需求.

使用原因及适用场合

1.在服务器端编程中,最原始的方法我们使用顺序化的结构,一个服务器只能处理一个客户,如果同时2个客户端链接上来了,服务器只能先处理了先到达的那个个,这样第二个客户端只能等了,影响客户的响应时间.它只适用于客户量少的短连接.这时候有方案2.

2.在多线程服务器端编程中,一个服务器如果要处理多条链接的客户端,当链接很少的时候我们可以每来一条链接创建一个线程。但当并发量很大的时候呢,不停地的增加线程,在某个时间计算机资源可能耗尽.于是有了方案3

3.为了弥补方案2中每个请求创建线程的缺陷,我们使用固定大小线程池,全部IO交给IO复用线程解决(本文不涉及),而任务计算交给线程池.如果任务彼此独立,IO压力不大,那么这种方案非常适合.

当然服务器模型远不止这3种,还有很多方案,本文不涉.

线程池的实现原理

线程池类主要维系两个队列:任务队列,线程队列

linux通过c++实现线程池类

线程池通过take方法从线程队列提取任务,到一个线程中去执行; 有任务就提取执行,无任务则阻塞线程休眠.

任务队列可以单独写个任务类出来,也可以写个任务类基类,预留虚任务函数接口,继承下来泛化.
当然最便利的方法就是直接用函数地址来做任务咯.

    typedef void (*Task)(void);

线程队列 线程队列通过我自己写的线程类实现.

#include <pthread.h>

class Thread{
public:
typedef void (*threadFun_t)(void *arg);

    explicit Thread(const threadFun_t &threadRoutine, void *arg);
    ~Thread();
    void start();
    void join();
    static void *threadGuide(void *arg);
    pthread_t getThreadId() const{
        return m_threadId;
    }
private:
    pthread_t m_threadId;
    bool m_isRuning;
    threadFun_t m_threadRoutine;
    void *m_threadArg;
};

Thread::Thread(const threadFun_t &threadRoutine, void *arg)
    :m_isRuning(false),
     m_threadId(0),
     m_threadRoutine(threadRoutine),
     m_threadArg(arg){
}

Thread::~Thread(){
    if(m_isRuning){//如果线程正在执行,则分离此线程.
        CHECK(!pthread_detach(m_threadId));
    }
}

void *Thread::threadGuide(void *arg){
    Thread *p = static_cast<Thread *>(arg);
    p->m_threadRoutine(p->m_threadArg);
    return NULL;
}

void Thread::join(){
    VERIFY(m_isRuning);
    CHECK(!pthread_join(m_threadId, NULL));
    m_isRuning = false;
}

void Thread::start(){

    pthread_attr_t attr;

    CHECK(!pthread_attr_init(&attr));

    //CHECK(!pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED));    //set thread separation state property

    CHECK(!pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED));    //Set thread inheritance

    CHECK(!pthread_attr_setschedpolicy(&attr, SCHED_OTHER));                //set thread scheduling policy

    CHECK(!pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM));             //Set thread scope

    CHECK(!pthread_create(&m_threadId, &attr, threadGuide, this));

    m_isRuning = true;

}{
        MutexLockGuard lock(m_mutex);
        m_isRuning = false;
    }
}

构造传入带一个空类型参数指针(为什么要带这个空类型指针后面会提)函数指针,通过start()方法创建线程,然后执行threadGuide()方法调用构造时候传入的函数指针执行咱们想运行的函数实现Thread类.

这两个队列在线程池中的定义如下:

private:
    std::vector<Thread *> m_threads;
    std::deque<Task> m_tasks;

任务调度逻辑

任务分配逻辑主要靠两个条件变量实现,(条件变量本文不做详述)
1.任务队列是否空.
2.任务队列是否满.
其逻辑如下图所示:
linux通过c++实现线程池类

start()方法是线程池的运行方法.通过它创建线程池.
threadRoutine()就是我们就是线程池中创建的线程,
线程跑起来后,通过 isRunning 控制线程循环是否退出.
stop()方法关闭线程池,回收资源.
循环中判断 : 有任务则执行,无任务则wait 阻塞等待.

void ThreadPool::start(){
    m_isRuning = true;
    m_threads.reserve(m_threadsSize);
    for(size_t i = 0; i < m_threadsSize; i++){
        m_threads.push_back(new Thread(threadRoutine, this));
        m_threads[i]->start();
    }
}

Thread(threadRoutine, this) 这里就是为什么我线程类要带一个无符号类型指针参数的原因,因为静态函数无法调用c++的类成员函数(主要原因是类在编译期间未实例化没有明确的地址.)我们只能通过线程池对象的this指针调用它的成员.

任务调度的源码实现:

ThreadPool::ThreadPool(size_t tasksSize, size_t threadsSize)
    :m_tasksSzie(tasksSize),
     m_threadsSize(threadsSize),
     m_mutex(),
     m_tasksEmpty(m_mutex),
     m_tasksFull(m_mutex),
     m_isRuning(false){
}

ThreadPool::~ThreadPool(){
    if(m_isRuning){
        stop();
    }
}

void ThreadPool::threadRoutine(void *arg){
    ThreadPool *p = static_cast<ThreadPool *>(arg);
    while(p->m_isRuning){
        ThreadPool::Task task(p->take());
        if(task){
            task();
        }
    }
}

ThreadPool::Task ThreadPool::take(){
    MutexLockGuard lock(m_mutex);
    while(m_tasks.empty() && m_isRuning){
        m_tasksEmpty.wait();
    }
    if(!m_tasks.empty()){
        Task task = m_tasks.front();
        m_tasks.pop_front();
        m_tasksFull.notify();
        return task;
    }
    return NULL;
}

void ThreadPool::addTask(Task task){
    if(m_threads.empty()){//如果线程池是空的,直接跑任务.
        task();
    }
    else{
        MutexLockGuard lock(m_mutex);
        while(m_tasksSzie > 0 && m_tasks.size() >= m_tasksSzie){
            m_tasksFull.wait();
        }

        m_tasks.push_back(task);
        m_tasksEmpty.notify();
    }
}

void ThreadPool::start(){
    m_isRuning = true;
    m_threads.reserve(m_threadsSize);
    for(size_t i = 0; i < m_threadsSize; i++){
        m_threads.push_back(new Thread(threadRoutine, this));
        m_threads[i]->start();
    }
}

void ThreadPool::stop(){
    {
        MutexLockGuard lock(m_mutex);
        m_isRuning = false;
        m_tasksEmpty.notifyAll();
    }
    for(int i = m_threadsSize - 1; i >= 0; i--){
        m_threads[i]->join();
        delete(m_threads[i]);
        m_threads.pop_back();
    }
}

程序测试

测试代码:
创建一个有两个线程,拥有5个任务的任务队列,执行8个加数任务.

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include "MutexLock.hh"
#include "Thread.hh"
#include <unistd.h>
#include "Condition.hh"
#include "ThreadPool.hh"
#include <vector>

//threadPool test

MutexLock CntLock;

int cnt = 0;

void test(void){
    unsigned long i = 0xfffffff;
    //MutexLockGuard loo(CntLock);
    //CntLock.lock();
    while(i--);
    printf("%d\n", ++cnt);
    //CntLock.unlock();
    sleep(1);
}

int main()
{
//ThreadPool Test

    ThreadPool tp(5, 2);
    tp.start();

    sleep(3);
    for(int i = 0; i < 8; i++)
        tp.addTask(test);
        
    getchar();

    return 0;
}

简单test结果:

thread 140068496353024 run task
thread 140068504745728 run task
1
2
thread 140068504745728 run task
thread 140068496353024 run task
3
4
thread 140068496353024 run task
thread 140068504745728 run task
5
6
thread 140068496353024 run task
thread 140068504745728 run task
7
8