acl协程中的同步功能
一、概述
对于 C/C++ 网络协程库,仅提供网络通信功能是远远不够的,如果想要将协程应用于复杂的应用环境中,一些基础性设施是必须的,比如”协程同步“功能。在 acl 协程中,提供了两种应用场景下的同步机制:单机线程内协程之间的同步以及可以跨线程间的协程同步功能。
因为 acl 协程的调度器是单线程的(如果想用多核,可以启动多个线程,每个线程独立进行调度),所以如果你的应用场景仅需线程内不同协程间的同步,则只需使用 fiber_lock/fiber_sem 即可,其中 fiber_lock 为协程同步锁,fiber_sem 为协程信号量,其实现原理本质上是协程执行上下文的切换,所以比较容易实现;另外,acl 协程还提供应用场景更加复杂应用范围更大的同步的机制,该同步模块支持“协程之间、协程与线程之间、线程之间”的同步,其实现原理是原子操作+IO事件的组合。
二、单一线程内协程间的同步
首先,介绍一下单一线程内协程间的同步接口:fiber_mutex,fiber_rwlock,fiber_sem。其中,fiber_mutex 类似于线程互斥量 pthread_mutex_t,其提供基本的互斥功能,fiber_rwlock 提供协程读写锁功能,fiber_sem 则提供了协程信号量的功能。下面给出一个简单的协程互斥的例子:
#include <acl-lib/acl_cpp/lib_acl.hpp> #include <acl-lib/fiber/libfiber.hpp> class myfiber : public acl::fiber { public: myfiber(acl::fiber_mutex& lock) : lock_(lock) {} protected: void run(void) { for (int i = 0; i < 5; i++) { lock_.lock(); printf("locked by fiber-%u and sleep\r\n", acl::fiber::self()); sleep(1); printf("fiber-%u wakeup\r\n", acl::fiber::self()); lock_.unlock(); } delete this; } private: acl::fiber_mutex& lock_; ~myfiber(void) {} }; int main(void) { acl::fiber_mutex lock; // 创建并启动第一个协程 acl::fiber* fb1 = new myfiber(lock); fb1->start(); // 创建并启动第二个协程 acl::fiber* fb2 = new myfiber(lock); fb2->start(); // 启动协程调度器 acl::fiber::schedule(); return 0; }
该例子非常简单明了的讲述了如何创建 acl 协程,以及如何使用 acl 协程锁进行同步的过程。
该示例代码:https://github.com/acl-dev/demo/blob/master/file_lock.cpp
三、协程与线程之间的同步互斥
acl 协程在最初设计协程同步互斥时,通过上下文切换便很容易实现了,如上面所示,但随着应用场景复杂度的提升,上面的同步方式已经完全不能胜任,在新项目中有一个非常麻烦的处理同步的需求:希望互斥锁不仅可以用在同一线程的协程之间,而且希望还可以用在线程之间,不同线程内的协程之间的同步互斥。因为 acl 的协程调度器是单线程模式,虽然可以同时启动多个线程,每个线程内部创建大量协程,但各个线程之间的资源共享仅能通过系统的线程锁来保证,这看起来似乎没有问题,但却一个场景下完全失效了:
1、分别创建了 A、B 两类线程,每个线程是一个独立的协程调度过程(即每个线程内可创建大量的协程),同时还创建了 C 线程池(纯线程模式);
2、A 线程中的某个协程 a1 创建了一个共享对象 o1,协程 a2 创建了一个共享对象 o2;B 线程中的某个协程 b1 创建一个共享对象 o3;
3、a1 协程对 o1 加锁保护,a2 协程对 o2 加锁保护,b1 协程对 o3 加锁保护;
4、在某一时刻,b1 协程因想要对 o2 加锁保护而处于等待状态,但此时恰巧 a1 协程想要对 o3 加锁,但此时 o3 已被 b1 协程加锁保护。
针对上述过程,如果用纯线程锁进行加锁保护则肯定会出现死锁问题,因此在 acl 协程里设计了一个可在不同线程的协程间共享的事件互斥模块:fiber_event(C++类)或 acl_fiber_event(C语言)。在此暂且不讲该功能实现(实现有点复杂且有一定技巧性),只讲如何使用,下面是 fiber_event 提供的功能接口:
/** * 可用于协程之间、线程之间以及协程与线程之间,通过事件等待/通知方式进行同步的 * 的事件混合锁 */ class fiber_event { public: /** * 构造方法 * @param use_mutex {bool} 在用在多线程之间进行事件同步时,如果启动的 * 的线程数较多(成百上千个线程),则此标志应设为 true 以便于内部在 * 同步内部对象时使用线程互斥锁进行保护,以避免形成惊群现象,如果启动 * 的线程数较多但该标志为 false,则内部使用原子数进行同步保护,很容易 * 造成惊群问题;当启动的线程数较(几十个左右),则此参数可以设为 false * 以告之内部使用原子数进行同步保护 * @param fatal_on_error {bool} 内部发生错误时是否直接崩溃,以便于开发 * 人员进行错误调试 */ fiber_event(bool use_mutex = true, bool fatal_on_error = true); ~fiber_event(void); /** * 等待事件锁 * @return {bool} 返回 true 表示加锁成功,否则表示内部出错 */ bool wait(void); /** * 尝试等待事件锁 * @return {bool} 返回 true 表示加锁成功,否则表示锁正在被占用 */ bool trywait(void); /** * 事件锁拥有者释放事件锁并通知等待者 * @return {bool} 返回 true 表示通知成功,否则表示内部出错 */ bool notify(void); public: /** * 返回 C 版本的事件对象 * @return {ACL_FIBER_EVENT*} */ ACL_FIBER_EVENT* get_event(void) const { return event_; } private: ACL_FIBER_EVENT* event_; };
接下来给出一个简单的例子:
#include <acl-lib/acl_cpp/lib_acl.hpp> #include <acl-lib/fiber/libfiber.hpp> class myfiber : public acl::fiber { public: myfiber(acl::fiber_event& event, unsigned long long& count) : event_(event) , count_(count) {} protected: // @override void run(void) { for (int i = 0; i < 1000; i++) { event_.wait(); count_++; if (count_ % 100000 == 0) { printf("thread-%ld, fiber-%u, count=%llu\r\n", acl::thread::self(), acl::fiber::self(), count_); } event_.notify(); } delete this; } private: acl::fiber_event& event_; unsigned long long& count_; ~myfiber(void) {} }; class mythread : public acl::thread { public: mythread(acl::fiber_event& event, unsigned long long& count) : event_(event) , count_(count) {} ~mythread(void) {} protected: // @override void* run(void) { for (int i = 0; i < 1000; i++) { acl::fiber* fb = new myfiber(event_, count_); fb->start(); } acl::fiber::schedule(); return NULL; } private: acl::fiber_event& event_; unsigned long long& count_; }; int main(void) { unsigned long long count = 0; acl::fiber_event event; std::vector<acl::thread*> threads; for (int i = 0; i < 10; i++) { acl::thread* thr = new mythread(event, count); threads.push_back(thr); thr->start(); } for (std::vector<acl::thread*>::iterator it = threads.begin(); it != threads.end(); ++it) { (*it)->wait(); delete *it; } printf("at last count=%llu\r\n", count); return 0; }
在上面例子中,创建 10 个线程,每个线程是一个独立的协程调度过程;每个线程内部创建 1000 个协程,每个协程内部对全局对象 count 进行加 1 操作,每次对 count 加 1 都需要调用 acl::thread_event 中的 wait/notify 进行同步。
该示例代码:https://github.com/acl-dev/demo/blob/master/fiber_event.cpp