欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

基于folly的AtomicIntrusiveLinkedList无锁队列进行简单封装的多生产多消费模型

程序员文章站 2022-03-18 16:42:16
1.基于folly的AtomicIntrusiveLinkedList略微修改的无锁队列代码: 2.基于上面无锁队列的封装 3.测试用代码: 4. 基于AtomicIntrusiveLinkedList插入操作可以一次插入一个节点,而移出操作则会一次移出多个节点,如果每个消费队列都使用一个Atomi ......

1.基于folly的AtomicIntrusiveLinkedList略微修改的无锁队列代码:

#ifndef FOLLY_REVISE_H
#define FOLLY_REVISE_H

namespace folly {
    /**
    * A very simple atomic single-linked list primitive
    *
    */

    template <typename T>
    struct node
    {
        T data;
        node* next;

        node(const T& data) : data(data), next(nullptr) { }
        node(T&& data): data(std::move(data)), next(nullptr) { }
    };


    template <class T>
    class AtomicForwardList
    {
    public:
        AtomicForwardList() { }
        AtomicForwardList(const AtomicForwardList&) = delete;
        AtomicForwardList& operator=(const AtomicForwardList&) = delete;

        AtomicForwardList(AtomicForwardList&& other) noexcept
            : head_(other.head_.load())
        {
            other.head_ = nullptr;
        }

        AtomicForwardList& operator=(AtomicForwardList&& other) noexcept
        {
            AtomicForwardList tmp(std::move(other));
            swap(*this, tmp);

            return *this;
        }

        /**
        * Note: list must be empty on destruction.
        */
        ~AtomicForwardList()
        {
            assert(empty());
        }

        bool empty() const
        {
            return head_.load() == nullptr;
        }

        /**
        * Atomically insert t at the head of the list.
        * @return True if the inserted element is the only one in the list
        *        after the call.
        */
        bool insertHead(T* t)
        {
            assert(t->next == nullptr);

            auto oldHead = head_.load(std::memory_order_relaxed);
            do
            {
                t->next = oldHead;
                /* oldHead is updated by the call below.
                NOTE: we don't use next(t) instead of oldHead directly due to
                compiler bugs (GCC prior to 4.8.3 (bug 60272), clang (bug 18899),
                MSVC (bug 819819); source:
                http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange */
            } while (!head_.compare_exchange_weak(oldHead, t,
                std::memory_order_release, std::memory_order_relaxed));

            return oldHead == nullptr;
        }

        /**
        * Replaces the head with nullptr,
        * and calls func() on the removed elements in the order from tail to head
        * Returns false if the list was empty.
        */
        template <typename F>
        bool sweepOnce(F&& func)
        {
            if (auto head = head_.exchange(nullptr))        // why is memory_order_seq_cst
            {
                auto rhead = reverse(head);
                unlinkAll(rhead, std::forward<F>(func));
                return true;
            }
            return false;
        }

        /**
        * Repeatedly replaces the head with nullptr
        * and calls func() on the removed elements in the order from tail to head.
        * Stops when the list is empty.
        */
        template <typename F>
        void sweep(F&& func)
        {
            while (sweepOnce(std::forward<F>(func)))
            {
            }
        }

        /**
        * Similar to sweepOnce() but calls func() on elements in LIFO order
        *
        * func() is called for all elements in the list at the moment
        * reverseSweepOnce() is called. 
        */
        template <typename F>
        bool reverseSweepOnce(F&& func)
        {
            // We don't loop like sweep() does because the overall order of callbacks
            // would be strand-wise LIFO which is meanless to callers.
            if (auto head = head_.exchange(nullptr))
            {
                unlinkAll(head, std::forward<F>(func));
                return true;
            }
            return false;
        }

        /**
        * Replaces the head with nullptr,
        * and get the member list pointed by head in input order
        */
        T* getInputList()
        {
            if (auto head = head_.exchange(nullptr, std::memory_order_acquire))        // why is memory_order_seq_cst
            {
                auto rhead = reverse(head);
                return rhead;
            }

            return nullptr;
        }

        /**
        * Replaces the head with nullptr
        * and get the member list pointed by head in reversed input order
        */
        T* getList()
        {
            return head_.exchange(nullptr);
        }

    private:
        std::atomic<T*> head_{ nullptr };

        /* Reverses a linked list, returning the pointer to the new head
        (old tail) */
        static T* reverse(T* head)
        {
            T* rhead = nullptr;

            while (head != nullptr)
            {
                auto t = head;
                head = t->next;
                t->next = rhead;
                rhead = t;
            }

            return rhead;
        }

        /* Unlinks all elements in the linked list fragment pointed to by 'head',
        * calling func() on every element */
        template <typename F>
        void unlinkAll(T* head, F&& func)
        {
            while (head != nullptr)
            {
                auto t = head;
                head = t->next;
                t->next = nullptr;
                func(t);
            }
        }
    };
}


#endif    // FOLLY_REVISE_H

2.基于上面无锁队列的封装

#ifndef COMPOSITE_ATOMIC_LIST_H
#define COMPOSITE_ATOMIC_LIST_H

/**
* Compose a multiple-producers and multiple-consumers atomic list
* through given consumer number AtomicForwardList
*/

#include <vector>
#include <cassert>
#include "folly_revise.h"

namespace folly
{

    template <class T>
    class CompositeAtomicList
    {
    public:
        using size_type = typename std::vector<AtomicForwardList<T>>::size_type;
    public:
        CompositeAtomicList(size_type producerNum, size_type consumerNum)
            : m_producerNum(producerNum), m_consumerNum(consumerNum)
        {
            // it is meanless if there is no producer or consumer
            assert(producerNum > 0);
            assert(consumerNum > 0);

            // the number of composite list is equal to consumer number
            m_compositeList.resize(consumerNum);
            // initialize the first insertion index of the producers
            m_producerIdxs.resize(producerNum);
            for (std::vector<size_type>::size_type si = 0; si != m_producerIdxs.size(); ++si)
            {
                m_producerIdxs[si] = si % consumerNum;
            }
        }

        CompositeAtomicList(const CompositeAtomicList&) = delete;
        CompositeAtomicList& operator=(const CompositeAtomicList&) = delete;

        //CompositeAtomicList(CompositeAtomicList&& other) noexcept
        //    : m_producerNum(other.m_producerNum), m_consumerNum(other.m_consumerNum),
        //    m_producerIdxs(std::move(other.m_producerIdxs),
        //    m_compositeList(std::move(other.m_compositeList)
        //{

        //}
        CompositeAtomicList(CompositeAtomicList&& other) noexcept = default;
        CompositeAtomicList& operator=(CompositeAtomicList&& other) noexcept = default;

        ~CompositeAtomicList() = default;

        // producer num
        size_type getProducerNum() const
        {
            return m_producerNum;
        }

        // consumer num
        size_type getConsumerNum() const
        {
            return m_consumerNum;
        }

        bool empty() const
        {
            // if there is one consumer list is not empty, the CompositeList is not empty
            for (const auto& item : m_compositeList)
            {
                if (!item.empty())
                {
                    return false;
                }
            }

            return true;
        }

        // insert node for producer number producer_num
        bool insertHead(size_type producer_num, T* t)
        {
            auto ret = m_compositeList[m_producerIdxs[producer_num]].insertHead(t);
            m_producerIdxs[producer_num] = (++m_producerIdxs[producer_num]) % m_consumerNum;
            return ret;
        }

        /**
        * A consumer function
        * consume nodes for consumer number consumer_num through
        * invoking function func for every list node in consumer_num.
        * You should invoke all consumer function for a particular consumer_num
        * within just one thread to evenly distribute the tasks.
        *
        * Recommend calling std::this_thread::yield() when this function returns false
        */
        template <typename F>
        bool sweepOnce(size_type consumer_num, F&& func)
        {
            return m_compositeList[consumer_num].sweepOnce(std::forward<F>(func));
        }

        /**
        * A consumer function
        * repeat consume nodes for consumer number consumer_num through
        * invoking function func for every list node in consumer_sum.
        * You should invoke all consumer function for a particular consumer_num
        * within just one thread to evenly distribute the tasks.
        *
        * Recommend calling std::this_thread::yield() after calling this function
        */
        template <typename F>
        void sweep(size_type consumer_num, F&& func)
        {
            m_compositeList[consumer_num].sweep(std::forward<F>(func));
        }

        /**
         * A consumer function
         * consume nodes for all consumer numbers once through
         * invoking function func for every list node in consumer_num.
         * You could invoke this function after all task handler threads terminated
         * to ensure all nodes have been consumed
         */
        template <typename F>
        void sweepAll(F&& func)
        {
            for (size_type si = 0; si != m_consumerNum; ++si)
            {
                sweepOnce(si, std::forward<F>(func));
            }
        }

        /**
        * A consumer function
        * Similar to sweepOnce() but calls func() on elements in LIFO order
        *
        * func() is called for all elements in the list at the moment
        * reverseSweepOnce() is called.
        *
        * Recommend calling std::this_thread::yield() when this function returns false
        */
        template <typename F>
        bool reverseSweepOnce(size_type consumer_num, F&& func)
        {
            return m_compositeList[consumer_num].reverseSweepOnce(std::forward<F>(func));
        }

        /**
        * A consumer function
        * get all the nodes from consumer list consumer_num in input order
        * @ return a list of node
        *
        * Recommend calling std::this_thread::yield() when this function returns nullptr
        */
        T* getInputList(size_type consumer_num)
        {
            return m_compositeList[consumer_num].getInputList();
        }

        /**
        * A consumer function
        * get all the nodes from consumer list consumer_num in reversed input order
        * @ return a list of node
        *
        * Recommend calling std::this_thread::yield() when this function returns nullptr
        */
        T* getList()
        {
            return m_compositeList[consumer_num].getList();
        }

    private:
        // the producer and consumer count
        size_type m_producerNum;
        size_type m_consumerNum;
        // the next inserted list for producers
        std::vector<size_type> m_producerIdxs;
        // the composite atomic lists
        std::vector<AtomicForwardList<T>> m_compositeList;
    };

}


#endif    // COMPOSITE_ATOMIC_LIST_H

3.测试用代码:

#include <memory>
#include <cassert>

#include <iostream>
#include <vector>
#include <thread>
#include <future>
#include <random>
#include <cmath>

#include "folly_revise.h"

#include "composite_atomic_list.h"

using namespace folly;

struct student_name
{
    student_name(int age = 0)
        : age(age), next(nullptr)
    {

    }

    int age;

    student_name* next;
};

using ATOMIC_STUDENT_LIST = CompositeAtomicList<student_name>;

constexpr int PRODUCE_THREAD_NUM = 10;     // producing thread number
constexpr int CONSUME_THREAD_NUM = 5;      // consuming thread number

ATOMIC_STUDENT_LIST g_students(PRODUCE_THREAD_NUM, CONSUME_THREAD_NUM);

std::atomic<int> g_inserts; // insert num (successful)
std::atomic<int> g_drops;   // drop num (successful)

std::atomic<int> g_printNum;    // as same as g_drops

std::atomic<long> g_ageInSum;   // age sum when producing student_name
std::atomic<long> g_ageOutSum;  // age sum when consuming student_name

std::atomic<bool> goOn(true);

constexpr int ONE_THREAD_PRODUCE_NUM = 2000000;    // when testing, no more than this number, you know 20,000,00 * 100 * 10 ~= MAX_INT if thread num <= 10

inline void printOne(student_name* t)
{
    g_printNum.fetch_add(1, std::memory_order_relaxed);
    g_ageOutSum.fetch_add(t->age, std::memory_order_relaxed);
    g_drops.fetch_add(1, std::memory_order_relaxed);
    delete t;
}

void insert_students(int idNo)
{
    std::default_random_engine dre(time(nullptr));
    std::uniform_int_distribution<int> ageDi(1, 99);

    for (int i = 0; i < ONE_THREAD_PRODUCE_NUM; ++i)
    {
        int newAge = ageDi(dre);
        g_ageInSum.fetch_add(newAge, std::memory_order_relaxed);

        g_students.insertHead(idNo, new student_name(newAge));
        // use memory_order_relaxed avoiding affect folly memory order
        g_inserts.fetch_add(1, std::memory_order_relaxed);
    }
}

void drop_students(int idNo)
{
    while (goOn.load(std::memory_order_relaxed))
    {
        //auto st = g_students.getInputList();
        //while (st)
        //{
        //    auto next = st->next;

        //    printOne(st);
        //    // use memory_order_relaxed avoiding affect folly memory order
        //    g_drops.fetch_add(1, std::memory_order_relaxed);

        //    st = next;
        //}

        g_students.sweep(idNo, printOne);
        std::this_thread::yield();
    }
}

int main()
{
    std::vector<std::future<void>> insert_threads;
    for (int i = 0; i != PRODUCE_THREAD_NUM; ++i)
    {
        insert_threads.push_back(std::async(std::launch::async, insert_students, i));
    }

    std::vector<std::future<void>> drop_threads;
    for (int i = 0; i != CONSUME_THREAD_NUM; ++i)
    {
        drop_threads.push_back(std::async(std::launch::async, drop_students, i));
    }

    for (auto& item : insert_threads)
    {
        item.get();
    }

    goOn.store(std::memory_order_relaxed);

    for (auto& item : drop_threads)
    {
        item.get();
    }

    g_students.sweepAll(printOne);

    std::cout << "insert count1: " << g_inserts.load() << std::endl;
    std::cout << "drop count1: " << g_drops.load() << std::endl;
    std::cout << "print num1: " << g_printNum.load() << std::endl;

    std::cout << "age in1: " << g_ageInSum.load() << std::endl;
    std::cout << "age out1: " << g_ageOutSum.load() << std::endl;

    std::cout << std::endl;
}

4. 基于AtomicIntrusiveLinkedList插入操作可以一次插入一个节点,而移出操作则会一次移出多个节点,如果每个消费队列都使用一个AtomicInstructiveLinkedList来存储,只要生产均匀分布到各个消费队列中,应该可以实现比较好的效果。不过,由于生产均匀分布分布到各个消费队列中并不那么容易实现,通过使用随机化之类的方式,可以防止人为导致的不均匀。不过,都不能从根本上解决问题,所以,上述方法只有在比较容易实现生产均匀分布到各个消费队列时,适合采用。