folly无锁队列,尝试添加新的函数
1. folly是facebook开源的关于无锁队列的库,实现过程很精妙。folly向队列中添加节点过程,符合标准库中的队列的设计,而取出节点的过程,则会造成多个线程的分配不均。我曾经试着提供一次 取出一个节点的函数,虽然存在一些问题,不过还是有很多可以学习的地方。我新增的函数,在下面代码中,会在注释中标识“新增函数”。
/* * Copyright 2014-present Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <atomic> #include <cassert> #include <utility> namespace folly { /** * A very simple atomic single-linked list primitive. * * Usage: * * class MyClass { * AtomicIntrusiveLinkedListHook<MyClass> hook_; * } * * AtomicIntrusiveLinkedList<MyClass, &MyClass::hook_> list; * list.insert(&a); * list.sweep([] (MyClass* c) { doSomething(c); } */ template <class T> struct AtomicIntrusiveLinkedListHook { T* next{ nullptr }; }; template <class T, AtomicIntrusiveLinkedListHook<T> T::*HookMember> class AtomicIntrusiveLinkedList { public: AtomicIntrusiveLinkedList() {} AtomicIntrusiveLinkedList(const AtomicIntrusiveLinkedList&) = delete; AtomicIntrusiveLinkedList& operator=(const AtomicIntrusiveLinkedList&) = delete; AtomicIntrusiveLinkedList(AtomicIntrusiveLinkedList&& other) noexcept { auto tmp = other.head_.load(); other.head_ = head_.load(); head_ = tmp; } AtomicIntrusiveLinkedList& operator=( AtomicIntrusiveLinkedList&& other) noexcept { auto tmp = other.head_.load(); other.head_ = head_.load(); head_ = tmp; return *this; } /** * Note: list must be empty on destruction. */ ~AtomicIntrusiveLinkedList() { assert(empty()); } bool empty() const { return head_.load() == nullptr; } /** * Atomically insert t at the head of the list. * @return True if the inserted element is the only one in the list * after the call. */ bool insertHead(T* t) { assert(next(t) == nullptr); auto oldHead = head_.load(std::memory_order_relaxed); do { next(t) = oldHead; /* oldHead is updated by the call below. NOTE: we don't use next(t) instead of oldHead directly due to compiler bugs (GCC prior to 4.8.3 (bug 60272), clang (bug 18899), MSVC (bug 819819); source: http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange */ } while (!head_.compare_exchange_weak(oldHead, t, std::memory_order_release, std::memory_order_relaxed)); return oldHead == nullptr; } /** * Replaces the head with nullptr, * and calls func() on the removed elements in the order from tail to head. * Returns false if the list was empty. */ template <typename F> bool sweepOnce(F&& func) { if (auto head = head_.exchange(nullptr)) { auto rhead = reverse(head); unlinkAll(rhead, std::forward<F>(func)); return true; } return false; } // 新增函数 // if std::memory_order_acquire applies to next(oldHead)(the first one, the argument of compare_exchange_weak) // and I don't know if following bugs affect the code // GCC prior to 4.8.3 (bug 60272), clang prior to 2014-05-05 (bug 18899) // MSVC prior to 2014-03-17 (bug 819819). template <typename F> bool sweepHead(F&& func) { // handle if the list is not empty auto oldHead = head_.load(std::memory_order_relaxed); while (oldHead != nullptr && !head_.compare_exchange_weak(oldHead, next(oldHead), std::memory_order_acquire, std::memory_order_relaxed)) ; // if drop out head successfully if (oldHead) { next(oldHead) = nullptr; unlinkAll(oldHead, std::forward<F>(func)); return true; } return false; } // 新增函数 // if std::memory_order_acquire does not apply to next(oldHead) // and I don't know if following bugs affect the code // GCC prior to 4.8.3 (bug 60272), clang prior to 2014-05-05 (bug 18899) // MSVC prior to 2014-03-17 (bug 819819). template <typename F> bool dropHead(F&& func) { T* oldHead = nullptr; // handle if the list is not empty while ((oldHead = head_.load(std::memory_order_acquire))) { // because insert and drop out will be involving with head_, they // will change head_ first, then others bool res = head_.compare_exchange_weak(oldHead, next(oldHead), std::memory_order_relaxed, std::memory_order_relaxed); if (res/* && oldHead != nullptr*/) { next(oldHead) = nullptr; unlinkAll(oldHead, std::forward<F>(func)); return true; } } return false; } /** * Repeatedly replaces the head with nullptr, * and calls func() on the removed elements in the order from tail to head. * Stops when the list is empty. */ template <typename F> void sweep(F&& func) { while (sweepOnce(func)) { } } /** * Similar to sweep() but calls func() on elements in LIFO order. * * func() is called for all elements in the list at the moment * reverseSweep() is called. Unlike sweep() it does not loop to ensure the * list is empty at some point after the last invocation. This way callers * can reason about the ordering: elements inserted since the last call to * reverseSweep() will be provided in LIFO order. * * Example: if elements are inserted in the order 1-2-3, the callback is * invoked 3-2-1. If the callback moves elements onto a stack, popping off * the stack will produce the original insertion order 1-2-3. */ template <typename F> void reverseSweep(F&& func) { // We don't loop like sweep() does because the overall order of callbacks // would be strand-wise LIFO which is meaningless to callers. auto head = head_.exchange(nullptr); unlinkAll(head, std::forward<F>(func)); } private: std::atomic<T*> head_{ nullptr }; static T*& next(T* t) { return (t->*HookMember).next; } /* Reverses a linked list, returning the pointer to the new head (old tail) */ static T* reverse(T* head) { T* rhead = nullptr; while (head != nullptr) { auto t = head; head = next(t); next(t) = rhead; rhead = t; } return rhead; } /* Unlinks all elements in the linked list fragment pointed to by `head', * calling func() on every element */ template <typename F> void unlinkAll(T* head, F&& func) { while (head != nullptr) { auto t = head; head = next(t); next(t) = nullptr; func(t); } } }; } // namespace folly
下面是我测试时使用的代码:
#include <memory> #include <cassert> #include <iostream> #include <vector> #include <thread> #include <future> #include <random> #include <cmath> #include "folly.h" using namespace folly; struct student_name { //student_name(const std::string& name) // : name(name) //{ //} //std::string name; student_name(int age = 0) : age(age) { } int age; AtomicIntrusiveLinkedListHook<student_name> node; }; AtomicIntrusiveLinkedList<student_name, &student_name::node> g_students; std::atomic<int> g_inserts; // insert num (successful) std::atomic<int> g_drops; // drop num (successful) std::atomic<int> g_printNum; // as same as g_drops std::atomic<long> g_ageInSum; // age sum when producing student_name std::atomic<long> g_ageOutSum; // age sum when consuming student_name // constexpr int HANDLE_NUM = 60000; // using when run the program with sleep_for function in Windows(Operating System) constexpr int HANDLE_NUM = 20000000; // when testing, no more than this number, you know 20,000,000 * 100 ~= MAX_INT constexpr int PRODUCE_THTREAD_NUM = 10; // producing thread number constexpr int CONSUME_THREAD_NUM = 9; // consuming thread number void printOne(student_name* t) { g_printNum.fetch_add(1, std::memory_order_relaxed); g_ageOutSum.fetch_add(t->age, std::memory_order_relaxed); // clean node // delete t; // std::cout << t->name << std::endl; } void insert_students(int idNo) { // const std::string noStr = std::to_string(idNo); std::default_random_engine dre(time(nullptr)); //std::uniform_int_distribution<int> di(0, 5); std::uniform_int_distribution<int> ageDi(1, 99); while (true) { int newAge = ageDi(dre); g_ageInSum.fetch_add(newAge, std::memory_order_relaxed); g_students.insertHead(new student_name(newAge)); // use memory_order_relaxed avoiding affect folly memory order g_inserts.fetch_add(1, std::memory_order_relaxed); // std::this_thread::sleep_for(std::chrono::microseconds(di(dre))); // use memory_order_relaxed avoiding affect folly memory order if (g_inserts.load(std::memory_order_relaxed) >= HANDLE_NUM) { return; } } } void drop_students(int idNo) { //const std::string noStr = std::to_string(idNo); //std::default_random_engine dre; //std::uniform_int_distribution<int> di(0, 4); while(true) { bool ret = g_students.dropHead(printOne); // bool ret = g_students.sweepHead(printOne); if (ret) { // use memory_order_relaxed avoiding affect folly memory order g_drops.fetch_add(1, std::memory_order_relaxed); // std::cout << "drop correct " << g_drops.load() << std::endl; } // std::this_thread::sleep_for(std::chrono::microseconds(di(dre))); // use memory_order_relaxed avoiding affect folly memory order if (g_drops.load(std::memory_order_relaxed) >= HANDLE_NUM) { return; } } } int main() { std::vector<std::future<void>> insert_threads; for (int i = 0; i != PRODUCE_THTREAD_NUM; ++i) { insert_threads.push_back(std::async(insert_students, i)); } std::vector<std::future<void>> drop_threads; for (int i = 0; i != CONSUME_THREAD_NUM; ++i) { drop_threads.push_back(std::async(drop_students, i)); } // std::this_thread::sleep_for(std::chrono::seconds(2)); for (auto& item : insert_threads) { item.get(); } for (auto& item : drop_threads) { item.get(); } std::cout << "insert count1: " << g_inserts.load() << std::endl; std::cout << "drop count1: " << g_drops.load() << std::endl; std::cout << "print num1: " << g_printNum.load() << std::endl; std::cout << "age in1: " << g_ageInSum.load() << std::endl; std::cout << "age out1: " << g_ageOutSum.load() << std::endl; std::cout << std::endl; while (true) { bool ret = g_students.dropHead(printOne); if (ret) { // use memory_order_relaxed avoiding affect folly memory order g_drops.fetch_add(1, std::memory_order_relaxed); // std::cout << "drop correct " << g_drops.load() << std::endl; } if (g_students.empty()) { break; } } std::cout << "insert count2: " << g_inserts.load() << std::endl; std::cout << "drop count2: " << g_drops.load() << std::endl; std::cout << "print num2: " << g_printNum.load() << std::endl; std::cout << "age in2: " << g_ageInSum.load() << std::endl; std::cout << "age out2: " << g_ageOutSum.load() << std::endl; }
我将我测试中的主要要点说一下:
(1)以上代码,将printOne函数中的// delete t;前面的注释符号(“//”)去掉。我在ubuntu测试结果正常。
(2)如果将main函数中的insert_threads.push_back(std::async(insert_students, i));改为insert_threads.push_back(std::async(std::launch::async, insert_students, i));,将main函数中的drop_threads.push_back(std::async(drop_students, i));改为drop_threads.push_back(std::async(std::launch::async, drop_students, i));,将printOne函数中的// delete t;前面的注释符号(“//”)去掉。在ubuntu下运行,会出现内存访问的问题。(这个在多线程测试时值得注意)
(3)如果将(2)中的delete t注释掉,则不会出现内存访问的问题。
(4)如果PRODUCE_THTREAD_NUM依旧为10,而CONSUME_THREAD_NUM改为1,也不会出现内存访问的问题。
(5)如果PRODUCE_THTREAD_NUM为1,而CONSUME_THREAD_NUM依旧为9,会出现内存访问的问题。
(6)修改PRODUCE_THTREAD_NUM和PRODUCE_THTREAD_NUM,只要不是1改成非1,非1改成1,对结果没有影响。
(7)如果大幅度减小HANDLE_NUM,例如改为20000,在PRODUCE_THREAD_NUM为10,CONSUME_THREAD_NUM也没有出现内存访问的问题。
说明:
由测试结果表明,新增函数(两个函数测试效果相同)存在问题,不能适用于多线程取出。问题的原因,经过分析,应该是head_.compare_exchange_weak(oldHead, next(oldHead), std::memory_order_relaxed, std::memory_order_relaxed);(对于dropHead)和delete t之间的冲突,如果在该函数调用之前,对应节点已经被删除,则会出现内存访问的问题,在我看来,对于compare_exchange_weak失败的情况,如果不用分析next(oldHead),则没有任何问题,也就是说如果compare_exchange_weak使用类似于&&或者||的短路设计方式的话,以上代码可以正常运行。但是compare_exchange_weak的实现,为标准库提供,不能修改,所以如果想要使用以上的函数,则需要考虑以上的冲突。
2. 我之前尝试写上面的函数的目的是为了将节点均匀分配到不同的线程。我有一个设想, 没有实际的代码,各位可以参考一下,如果觉得可用,可以考虑实现。
dropHead的实现步骤如下:
(1)使用folly的sweepOnce函数,一次取出所有的节点。
(2)判断sweepOnce中的head是否为nullptr,next(head)是否为nullptr,如果head不为nullptr,而且next(head)不为nullptr,则将_head与next(head)交换。
(3)将next(head)指向的节点队列添加到_head指向的无锁队列中,处理head指向的节点。
以上的代码,因为(2)中的操作步骤很少(两个判断),所以(3)中的插入应该也会很少。所以新增函数的负担应该不大,甚至可以考虑(3)中的节点不再进行回插,总的来说,应该会使每个线程的处理量更加均匀。不过,考虑到线程可能会在(2)中被中断,所以建议进行认真测试的情况下再使用。
如果有什么疑问,或者有什么好的想法,希望能够告诉我。