基于folly的AtomicIntrusiveLinkedList无锁队列进行简单封装的多生产多消费模型
程序员文章站
2022-03-18 16:42:16
1.基于folly的AtomicIntrusiveLinkedList略微修改的无锁队列代码: 2.基于上面无锁队列的封装 3.测试用代码: 4. 基于AtomicIntrusiveLinkedList插入操作可以一次插入一个节点,而移出操作则会一次移出多个节点,如果每个消费队列都使用一个Atomi ......
1.基于folly的AtomicIntrusiveLinkedList略微修改的无锁队列代码:
#ifndef FOLLY_REVISE_H #define FOLLY_REVISE_H namespace folly { /** * A very simple atomic single-linked list primitive * */ template <typename T> struct node { T data; node* next; node(const T& data) : data(data), next(nullptr) { } node(T&& data): data(std::move(data)), next(nullptr) { } }; template <class T> class AtomicForwardList { public: AtomicForwardList() { } AtomicForwardList(const AtomicForwardList&) = delete; AtomicForwardList& operator=(const AtomicForwardList&) = delete; AtomicForwardList(AtomicForwardList&& other) noexcept : head_(other.head_.load()) { other.head_ = nullptr; } AtomicForwardList& operator=(AtomicForwardList&& other) noexcept { AtomicForwardList tmp(std::move(other)); swap(*this, tmp); return *this; } /** * Note: list must be empty on destruction. */ ~AtomicForwardList() { assert(empty()); } bool empty() const { return head_.load() == nullptr; } /** * Atomically insert t at the head of the list. * @return True if the inserted element is the only one in the list * after the call. */ bool insertHead(T* t) { assert(t->next == nullptr); auto oldHead = head_.load(std::memory_order_relaxed); do { t->next = oldHead; /* oldHead is updated by the call below. NOTE: we don't use next(t) instead of oldHead directly due to compiler bugs (GCC prior to 4.8.3 (bug 60272), clang (bug 18899), MSVC (bug 819819); source: http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange */ } while (!head_.compare_exchange_weak(oldHead, t, std::memory_order_release, std::memory_order_relaxed)); return oldHead == nullptr; } /** * Replaces the head with nullptr, * and calls func() on the removed elements in the order from tail to head * Returns false if the list was empty. */ template <typename F> bool sweepOnce(F&& func) { if (auto head = head_.exchange(nullptr)) // why is memory_order_seq_cst { auto rhead = reverse(head); unlinkAll(rhead, std::forward<F>(func)); return true; } return false; } /** * Repeatedly replaces the head with nullptr * and calls func() on the removed elements in the order from tail to head. * Stops when the list is empty. */ template <typename F> void sweep(F&& func) { while (sweepOnce(std::forward<F>(func))) { } } /** * Similar to sweepOnce() but calls func() on elements in LIFO order * * func() is called for all elements in the list at the moment * reverseSweepOnce() is called. */ template <typename F> bool reverseSweepOnce(F&& func) { // We don't loop like sweep() does because the overall order of callbacks // would be strand-wise LIFO which is meanless to callers. if (auto head = head_.exchange(nullptr)) { unlinkAll(head, std::forward<F>(func)); return true; } return false; } /** * Replaces the head with nullptr, * and get the member list pointed by head in input order */ T* getInputList() { if (auto head = head_.exchange(nullptr, std::memory_order_acquire)) // why is memory_order_seq_cst { auto rhead = reverse(head); return rhead; } return nullptr; } /** * Replaces the head with nullptr * and get the member list pointed by head in reversed input order */ T* getList() { return head_.exchange(nullptr); } private: std::atomic<T*> head_{ nullptr }; /* Reverses a linked list, returning the pointer to the new head (old tail) */ static T* reverse(T* head) { T* rhead = nullptr; while (head != nullptr) { auto t = head; head = t->next; t->next = rhead; rhead = t; } return rhead; } /* Unlinks all elements in the linked list fragment pointed to by 'head', * calling func() on every element */ template <typename F> void unlinkAll(T* head, F&& func) { while (head != nullptr) { auto t = head; head = t->next; t->next = nullptr; func(t); } } }; } #endif // FOLLY_REVISE_H
2.基于上面无锁队列的封装
#ifndef COMPOSITE_ATOMIC_LIST_H #define COMPOSITE_ATOMIC_LIST_H /** * Compose a multiple-producers and multiple-consumers atomic list * through given consumer number AtomicForwardList */ #include <vector> #include <cassert> #include "folly_revise.h" namespace folly { template <class T> class CompositeAtomicList { public: using size_type = typename std::vector<AtomicForwardList<T>>::size_type; public: CompositeAtomicList(size_type producerNum, size_type consumerNum) : m_producerNum(producerNum), m_consumerNum(consumerNum) { // it is meanless if there is no producer or consumer assert(producerNum > 0); assert(consumerNum > 0); // the number of composite list is equal to consumer number m_compositeList.resize(consumerNum); // initialize the first insertion index of the producers m_producerIdxs.resize(producerNum); for (std::vector<size_type>::size_type si = 0; si != m_producerIdxs.size(); ++si) { m_producerIdxs[si] = si % consumerNum; } } CompositeAtomicList(const CompositeAtomicList&) = delete; CompositeAtomicList& operator=(const CompositeAtomicList&) = delete; //CompositeAtomicList(CompositeAtomicList&& other) noexcept // : m_producerNum(other.m_producerNum), m_consumerNum(other.m_consumerNum), // m_producerIdxs(std::move(other.m_producerIdxs), // m_compositeList(std::move(other.m_compositeList) //{ //} CompositeAtomicList(CompositeAtomicList&& other) noexcept = default; CompositeAtomicList& operator=(CompositeAtomicList&& other) noexcept = default; ~CompositeAtomicList() = default; // producer num size_type getProducerNum() const { return m_producerNum; } // consumer num size_type getConsumerNum() const { return m_consumerNum; } bool empty() const { // if there is one consumer list is not empty, the CompositeList is not empty for (const auto& item : m_compositeList) { if (!item.empty()) { return false; } } return true; } // insert node for producer number producer_num bool insertHead(size_type producer_num, T* t) { auto ret = m_compositeList[m_producerIdxs[producer_num]].insertHead(t); m_producerIdxs[producer_num] = (++m_producerIdxs[producer_num]) % m_consumerNum; return ret; } /** * A consumer function * consume nodes for consumer number consumer_num through * invoking function func for every list node in consumer_num. * You should invoke all consumer function for a particular consumer_num * within just one thread to evenly distribute the tasks. * * Recommend calling std::this_thread::yield() when this function returns false */ template <typename F> bool sweepOnce(size_type consumer_num, F&& func) { return m_compositeList[consumer_num].sweepOnce(std::forward<F>(func)); } /** * A consumer function * repeat consume nodes for consumer number consumer_num through * invoking function func for every list node in consumer_sum. * You should invoke all consumer function for a particular consumer_num * within just one thread to evenly distribute the tasks. * * Recommend calling std::this_thread::yield() after calling this function */ template <typename F> void sweep(size_type consumer_num, F&& func) { m_compositeList[consumer_num].sweep(std::forward<F>(func)); } /** * A consumer function * consume nodes for all consumer numbers once through * invoking function func for every list node in consumer_num. * You could invoke this function after all task handler threads terminated * to ensure all nodes have been consumed */ template <typename F> void sweepAll(F&& func) { for (size_type si = 0; si != m_consumerNum; ++si) { sweepOnce(si, std::forward<F>(func)); } } /** * A consumer function * Similar to sweepOnce() but calls func() on elements in LIFO order * * func() is called for all elements in the list at the moment * reverseSweepOnce() is called. * * Recommend calling std::this_thread::yield() when this function returns false */ template <typename F> bool reverseSweepOnce(size_type consumer_num, F&& func) { return m_compositeList[consumer_num].reverseSweepOnce(std::forward<F>(func)); } /** * A consumer function * get all the nodes from consumer list consumer_num in input order * @ return a list of node * * Recommend calling std::this_thread::yield() when this function returns nullptr */ T* getInputList(size_type consumer_num) { return m_compositeList[consumer_num].getInputList(); } /** * A consumer function * get all the nodes from consumer list consumer_num in reversed input order * @ return a list of node * * Recommend calling std::this_thread::yield() when this function returns nullptr */ T* getList() { return m_compositeList[consumer_num].getList(); } private: // the producer and consumer count size_type m_producerNum; size_type m_consumerNum; // the next inserted list for producers std::vector<size_type> m_producerIdxs; // the composite atomic lists std::vector<AtomicForwardList<T>> m_compositeList; }; } #endif // COMPOSITE_ATOMIC_LIST_H
3.测试用代码:
#include <memory> #include <cassert> #include <iostream> #include <vector> #include <thread> #include <future> #include <random> #include <cmath> #include "folly_revise.h" #include "composite_atomic_list.h" using namespace folly; struct student_name { student_name(int age = 0) : age(age), next(nullptr) { } int age; student_name* next; }; using ATOMIC_STUDENT_LIST = CompositeAtomicList<student_name>; constexpr int PRODUCE_THREAD_NUM = 10; // producing thread number constexpr int CONSUME_THREAD_NUM = 5; // consuming thread number ATOMIC_STUDENT_LIST g_students(PRODUCE_THREAD_NUM, CONSUME_THREAD_NUM); std::atomic<int> g_inserts; // insert num (successful) std::atomic<int> g_drops; // drop num (successful) std::atomic<int> g_printNum; // as same as g_drops std::atomic<long> g_ageInSum; // age sum when producing student_name std::atomic<long> g_ageOutSum; // age sum when consuming student_name std::atomic<bool> goOn(true); constexpr int ONE_THREAD_PRODUCE_NUM = 2000000; // when testing, no more than this number, you know 20,000,00 * 100 * 10 ~= MAX_INT if thread num <= 10 inline void printOne(student_name* t) { g_printNum.fetch_add(1, std::memory_order_relaxed); g_ageOutSum.fetch_add(t->age, std::memory_order_relaxed); g_drops.fetch_add(1, std::memory_order_relaxed); delete t; } void insert_students(int idNo) { std::default_random_engine dre(time(nullptr)); std::uniform_int_distribution<int> ageDi(1, 99); for (int i = 0; i < ONE_THREAD_PRODUCE_NUM; ++i) { int newAge = ageDi(dre); g_ageInSum.fetch_add(newAge, std::memory_order_relaxed); g_students.insertHead(idNo, new student_name(newAge)); // use memory_order_relaxed avoiding affect folly memory order g_inserts.fetch_add(1, std::memory_order_relaxed); } } void drop_students(int idNo) { while (goOn.load(std::memory_order_relaxed)) { //auto st = g_students.getInputList(); //while (st) //{ // auto next = st->next; // printOne(st); // // use memory_order_relaxed avoiding affect folly memory order // g_drops.fetch_add(1, std::memory_order_relaxed); // st = next; //} g_students.sweep(idNo, printOne); std::this_thread::yield(); } } int main() { std::vector<std::future<void>> insert_threads; for (int i = 0; i != PRODUCE_THREAD_NUM; ++i) { insert_threads.push_back(std::async(std::launch::async, insert_students, i)); } std::vector<std::future<void>> drop_threads; for (int i = 0; i != CONSUME_THREAD_NUM; ++i) { drop_threads.push_back(std::async(std::launch::async, drop_students, i)); } for (auto& item : insert_threads) { item.get(); } goOn.store(std::memory_order_relaxed); for (auto& item : drop_threads) { item.get(); } g_students.sweepAll(printOne); std::cout << "insert count1: " << g_inserts.load() << std::endl; std::cout << "drop count1: " << g_drops.load() << std::endl; std::cout << "print num1: " << g_printNum.load() << std::endl; std::cout << "age in1: " << g_ageInSum.load() << std::endl; std::cout << "age out1: " << g_ageOutSum.load() << std::endl; std::cout << std::endl; }
4. 基于AtomicIntrusiveLinkedList插入操作可以一次插入一个节点,而移出操作则会一次移出多个节点,如果每个消费队列都使用一个AtomicInstructiveLinkedList来存储,只要生产均匀分布到各个消费队列中,应该可以实现比较好的效果。不过,由于生产均匀分布分布到各个消费队列中并不那么容易实现,通过使用随机化之类的方式,可以防止人为导致的不均匀。不过,都不能从根本上解决问题,所以,上述方法只有在比较容易实现生产均匀分布到各个消费队列时,适合采用。