欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

c++11实现一个半同步半异步线程池

程序员文章站 2022-07-01 23:03:19
在处理大量并发任务的时候,如果按照传统的方式,一个请求一个线程来处理请求任务,大量的线程创建和销毁将消耗过多的资源,还增加了线程上下文切换的开销,而通过线程池技术就可以很好的解决这些问题,线程池技术...

在处理大量并发任务的时候,如果按照传统的方式,一个请求一个线程来处理请求任务,大量的线程创建和销毁将消耗过多的资源,还增加了线程上下文切换的开销,而通过线程池技术就可以很好的解决这些问题,线程池技术通过在系统中预先创建一定数量的线程,当任务请求到来时从线程池中分配一个预先创建的线程去处理任务,线程在完成任务之后还可以重用,不会销毁,而是等待下次任务的到来.

分层

半同步半异步线程池分为三层:

同步服务层: 它处理来自上层的任务请求,上层的请求可能是并发的,这些请求不是马上就会被处理的,而是将这些任务放到一个同步排队层中,等待处理.

同步排队层: 来自上层的任务请求都会加到排队层中等待处理.

异步服务层: 这一层中会有多个线程同时处理排队层中的任务,异步服务层从同步排队层中取出任务并行的处理.

c++11实现一个半同步半异步线程池

线程池实现

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

using namespace std;

/********************************同步队列******************************/

template 
class syncqueue
{
public:
    syncqueue(int maxsize): m_maxsize(maxsize), m_needstop(false) { }

    //添加事件
    void put(const t& x)
    {
        add(x);
    }

    //添加事件
    void put(t && x)
    {
        //调用内部接口,进行完美转发
        add(std::forward(x));
    }

    //从队列中取事件,取所有事件
    void take(std::list &list)
    {
        std::unique_lock locker(m_mutex);
        //当不满足任何一个则等待,但是若m_needstop为true是因为任务要终止了所以不阻塞
        m_notempty.wait(locker, [this]{return (m_needstop || notempty()); });
        if (m_needstop)
        {
            return;
        }

        list = std::move(m_queue);
        m_notfull.notify_one();
    }

    //取一个事件
    void take(t &t)
    {
        std::unique_lock locker(m_mutex);
        m_notempty.wait(locker, [this]{return m_needstop || notempty(); });
        if (m_needstop)
        {
            return;
        }

        t = m_queue.front();
        m_queue.pop_front();
        m_notfull.notify_one();
    }

    //终止同步队列
    void stop()
    {
        {
            //锁作用域就在这对大括号内
            std::lock_guard locker(m_mutex);
            //将终止标志设为true
            m_needstop = true;
        }

        //唤醒所有进程一一终止
        m_notfull.notify_all();
        m_notempty.notify_all();
    }

    //队列为空
    bool empty()
    {
        std::lock_guard locker(m_mutex);
        return m_queue.empty();
    }

    //队列为满
    bool full()
    {
        std::lock_guard locker(m_mutex);
        return m_queue.size() == m_maxsize;
    }

    //队列大小
    size_t size()
    {
        std::lock_guard locker(m_mutex);
        return m_queue.size();
    }

    //队列大小
    int count()
    {
        return m_queue.size();
    }

private:
    //队列不为满
    bool notfull() const
    {
        bool full = (m_queue.size() >= m_maxsize);
        if (full)
        {
            cout << "the queue is full, need wait..." << endl;
        }

        return !full;
    }

    //队列不为空
    bool notempty() const
    {
        bool empty = m_queue.empty();
        if (empty)
        {
            cout << "the queue is empty, need wait..., 异步层的线程id: " << this_thread::get_id() << endl;
        }

        return !empty;
    }

    //向队列中添加事件,若不为满且终止标志为false则添加事件
    template 
    void add(f && x)
    {
        std::unique_lock locker(m_mutex);
        //当不满足任何一个则等待,但是若m_needstop为true是因为任务要终止了所以不阻塞
        m_notfull.wait(locker, [this]{return m_needstop || notfull(); });
        if (m_needstop)
        {
            return;
        }

        m_queue.push_back(std::forward(x));
        m_notempty.notify_one();
    }

private:
    //缓冲区
    std::list m_queue;
    //互斥量
    std::mutex m_mutex;
    //队列不为空的条件变量
    std::condition_variable m_notempty;
    //队列不为满的条件变量
    std::condition_variable m_notfull;
    //任务队列最大长度
    int m_maxsize;
    //终止的标识,当为true时代表同步队列要终止
    bool m_needstop;
};



/**************************线程池********************************/

//传递给同步队列的最大个数
const int maxtaskcount = 100;
class threadpool
{
public:
    using task = std::function;
    //构造函数,默认参数hardware_concurrency()获取cpu核心数量
    threadpool(int numthreads = std::thread::hardware_concurrency()):m_queue(maxtaskcount)
    {
        cout << "numthreads: " << numthreads << endl;
        start(numthreads);
    }

    ~threadpool()
    {
        stop();
    }

    //保证多线程环境下只调用一次stopthreadgroup函数
    void stop()
    {
        std::call_once(m_flag, [this]{ stopthreadgroup(); });
    }

    //添加任务,右值完美转发
    void addtask(task && task)
    {
        m_queue.put(std::forward (task));
    }

    //添加任务
    void addtask(const task && task)
    {
        m_queue.put(task);
    }

private:
    //建立numthreads个数的线程组
    void start(int numthreads)
    {
        m_running  = true;

        for (int i = 0; i < numthreads; i++)
        {
            //多个线程依次的处理
            m_threadgroup.push_back(std::make_shared(&threadpool::runinthread, this));
        }
    }

    //取出任务队列中的全部,依次执行
    void runinthread()
    {
        while (m_running)
        {
            std::list list;
            m_queue.take(list);

            for (auto & task : list)
            {
                if (!m_running)
                {
                    return ;
                }

                //执行任务
                task();
            }
        }
    }

    //终止所有任务的执行
    void stopthreadgroup()
    {       
        //终止同步队列
        m_queue.stop();
        m_running = false;

        for (auto thread : m_threadgroup)
        {
            if (thread)
            {
                thread->join();
            }
        }

        m_threadgroup.clear();
    }

private: 
    //处理任务的线程组
    std::list> m_threadgroup;
    //同步队列
    syncqueue m_queue;
    //运行的标志,flase代表终止
    atomic_bool m_running;
    //保证在函数在多线程环境中只被调用一次
    std::once_flag m_flag;
};

int main()
{
    threadpool pool;

    //pool.start(2);
    std::thread thd1([&pool]
    {
        for (int i = 0; i < 10; i++)
        {
            auto thdid = this_thread::get_id();
            pool.addtask([thdid]
            {
                cout << "1.thread id: " << thdid << endl;
            });
        }
    });
    std::thread thd2([&pool]
    {
        for (int i = 0; i < 10; i++)
        {
            auto thdid = this_thread::get_id();
            pool.addtask([thdid]
            {
                cout << "2.thread id: " << thdid << endl;
            });
        }
    });

    this_thread::sleep_for(std::chrono::seconds(2));
    getchar();
    pool.stop();
    thd1.join();
    thd2.join();
}

c++11实现一个半同步半异步线程池

对象池

对象池对于创建开销较大的对象来说很有意义,为了避免重复创建开销较大的对象,可以通过对象池来优化.

对象池的思路比较简单,实现创建好一批对象,放到一个集合中,每当程序需要新的对象时,就从对象池中获取,程序用完该对象后都会把该对象归还给对象池.这样会避免重复创建对象,提高程序性能.

#include 
#include 
#include 
#include

using namespace std;

//要成为不可复制的类,典型的方法是将类的复制构造函数和赋值运算符设置为private或protected
//为了使objectpool为不可复制的类,我们定义了类noncopyable,只需继承起则可为不可复制的类
class noncopyable
{
protected:
    noncopyable() = default;
    ~noncopyable() = default;
    noncopyable(const noncopyable&) = delete;
    noncopyable& operator =(const noncopyable &) = delete;
};

//对象最大个数
const int maxobjectnum = 10;

template 
class objectpool : noncopyable
{
    template 
    using constructor = function (args...)>;
private:
    //定义multimap类型的私有成员通过constructor类型获得字符串,则通过字符串类型一对多的对应特定的对象.
    multimap> m_object_map;

public:
    //初始化创建对象
    template 
    void init(size_t num, args ...args)
    {
        if (num <= 0 || num > maxobjectnum)
        {
            throw std::logic_error("object num out of range");
        }

        //init时的模板类型不同所得到的constructname字符串不同
        //所以相同的初始化类型对应m_object_map中的first相同,不同类型的则不同
        auto constructname = typeid(constructor).name();
        //cout << "init: " << constructname << endl;
        for (size_t i = 0; i < num; i++)
        {
            //删除器中不直接删除对象,而是回收到对象池中,以供下次使用
            m_object_map.emplace(constructname, 
                shared_ptr(new t(std::forward(args)...), [this, constructname](t *p)
            {
                cout << "dis: " << constructname << endl;
                m_object_map.emplace(std::move(constructname),shared_ptr(p));
            }));
        }
    }

    //从对象池获取一个对象
    template 
    std::shared_ptr get()
    {
        string constructname = typeid(constructor).name();
        cout << constructname << endl;

        //通过get的模板类型得到对应的字符串,通过该字符串找到所有该字符串的对应
        auto range = m_object_map.equal_range(constructname);
        //从该类型对应的对象中获取其中一个
        for (auto it = range.first; it != range.second; it++)
        {
            auto ptr = it -> second;
            m_object_map.erase(it);
            return ptr;
        } 

        return nullptr;
    }
};