欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

acl协程中的同步功能

程序员文章站 2024-01-02 10:21:52
...

一、概述

    对于 C/C++ 网络协程库,仅提供网络通信功能是远远不够的,如果想要将协程应用于复杂的应用环境中,一些基础性设施是必须的,比如”协程同步“功能。在 acl 协程中,提供了两种应用场景下的同步机制:单机线程内协程之间的同步以及可以跨线程间的协程同步功能。

    因为 acl 协程的调度器是单线程的(如果想用多核,可以启动多个线程,每个线程独立进行调度),所以如果你的应用场景仅需线程内不同协程间的同步,则只需使用 fiber_lock/fiber_sem 即可,其中 fiber_lock 为协程同步锁,fiber_sem 为协程信号量,其实现原理本质上是协程执行上下文的切换,所以比较容易实现;另外,acl 协程还提供应用场景更加复杂应用范围更大的同步的机制,该同步模块支持“协程之间、协程与线程之间、线程之间”的同步,其实现原理是原子操作+IO事件的组合。

 

二、单一线程内协程间的同步

    首先,介绍一下单一线程内协程间的同步接口:fiber_mutex,fiber_rwlock,fiber_sem。其中,fiber_mutex 类似于线程互斥量 pthread_mutex_t,其提供基本的互斥功能,fiber_rwlock 提供协程读写锁功能,fiber_sem 则提供了协程信号量的功能。下面给出一个简单的协程互斥的例子:

 

#include <acl-lib/acl_cpp/lib_acl.hpp>
#include <acl-lib/fiber/libfiber.hpp>

class myfiber : public acl::fiber
{
public:
        myfiber(acl::fiber_mutex& lock) : lock_(lock) {}

protected:
        void run(void)
        {
                for (int i = 0; i < 5; i++) {
                        lock_.lock();
                        printf("locked by fiber-%u and sleep\r\n",
                                acl::fiber::self());
                        sleep(1);
                        printf("fiber-%u wakeup\r\n", acl::fiber::self());
                        lock_.unlock();
                }

                delete this;
        }

private:
        acl::fiber_mutex& lock_;
        ~myfiber(void) {}
};

int main(void)
{
        acl::fiber_mutex lock;

        // 创建并启动第一个协程
        acl::fiber* fb1 = new myfiber(lock);
        fb1->start();

        // 创建并启动第二个协程
        acl::fiber* fb2 = new myfiber(lock);
        fb2->start();

        // 启动协程调度器
        acl::fiber::schedule();

        return 0;
}

 

    该例子非常简单明了的讲述了如何创建 acl 协程,以及如何使用 acl 协程锁进行同步的过程。

 

    该示例代码:https://github.com/acl-dev/demo/blob/master/file_lock.cpp

 

三、协程与线程之间的同步互斥

    acl 协程在最初设计协程同步互斥时,通过上下文切换便很容易实现了,如上面所示,但随着应用场景复杂度的提升,上面的同步方式已经完全不能胜任,在新项目中有一个非常麻烦的处理同步的需求:希望互斥锁不仅可以用在同一线程的协程之间,而且希望还可以用在线程之间,不同线程内的协程之间的同步互斥。因为 acl 的协程调度器是单线程模式,虽然可以同时启动多个线程,每个线程内部创建大量协程,但各个线程之间的资源共享仅能通过系统的线程锁来保证,这看起来似乎没有问题,但却一个场景下完全失效了:

     1、分别创建了 A、B 两类线程,每个线程是一个独立的协程调度过程(即每个线程内可创建大量的协程),同时还创建了 C 线程池(纯线程模式);

     2、A 线程中的某个协程 a1 创建了一个共享对象 o1,协程 a2 创建了一个共享对象 o2;B 线程中的某个协程 b1 创建一个共享对象 o3;

     3、a1 协程对 o1 加锁保护,a2 协程对 o2 加锁保护,b1 协程对 o3 加锁保护;

     4、在某一时刻,b1 协程因想要对 o2 加锁保护而处于等待状态,但此时恰巧 a1 协程想要对 o3 加锁,但此时 o3 已被 b1 协程加锁保护。

 

    针对上述过程,如果用纯线程锁进行加锁保护则肯定会出现死锁问题,因此在 acl 协程里设计了一个可在不同线程的协程间共享的事件互斥模块:fiber_event(C++类)或 acl_fiber_event(C语言)。在此暂且不讲该功能实现(实现有点复杂且有一定技巧性),只讲如何使用,下面是 fiber_event 提供的功能接口:

 

/**
 * 可用于协程之间、线程之间以及协程与线程之间,通过事件等待/通知方式进行同步的
 * 的事件混合锁
 */
class fiber_event
{
public:
	/**
	 * 构造方法
	 * @param use_mutex {bool} 在用在多线程之间进行事件同步时,如果启动的
	 *  的线程数较多(成百上千个线程),则此标志应设为 true 以便于内部在
	 *  同步内部对象时使用线程互斥锁进行保护,以避免形成惊群现象,如果启动
	 *  的线程数较多但该标志为 false,则内部使用原子数进行同步保护,很容易
	 *  造成惊群问题;当启动的线程数较(几十个左右),则此参数可以设为 false
	 *  以告之内部使用原子数进行同步保护
	 * @param fatal_on_error {bool} 内部发生错误时是否直接崩溃,以便于开发
	 *  人员进行错误调试
	 */
	fiber_event(bool use_mutex = true, bool fatal_on_error = true);
	~fiber_event(void);

	/**
	 * 等待事件锁
	 * @return {bool} 返回 true 表示加锁成功,否则表示内部出错
	 */
	bool wait(void);

	/**
	 * 尝试等待事件锁
	 * @return {bool} 返回 true 表示加锁成功,否则表示锁正在被占用
	 */
	bool trywait(void);

	/**
	 * 事件锁拥有者释放事件锁并通知等待者
	 * @return {bool} 返回 true 表示通知成功,否则表示内部出错
	 */
	bool notify(void);

public:
	/**
	 * 返回 C 版本的事件对象
	 * @return {ACL_FIBER_EVENT*}
	 */
	ACL_FIBER_EVENT* get_event(void) const
	{
		return event_;
	}

private:
	ACL_FIBER_EVENT* event_;
};

 

 

     接下来给出一个简单的例子:

 

#include <acl-lib/acl_cpp/lib_acl.hpp>
#include <acl-lib/fiber/libfiber.hpp>

class myfiber : public acl::fiber
{
public:
        myfiber(acl::fiber_event& event, unsigned long long& count)
        : event_(event)
        , count_(count)
        {}

protected:
        // @override
        void run(void)
        {
                for (int i = 0; i < 1000; i++) {
                        event_.wait();
                        count_++;
                        if (count_ % 100000 == 0) {
                                printf("thread-%ld, fiber-%u, count=%llu\r\n",
                                        acl::thread::self(), acl::fiber::self(),
                                        count_);
                        }
                        event_.notify();
                }

                delete this;
        }

private:
        acl::fiber_event& event_;
        unsigned long long& count_;

        ~myfiber(void) {}
};

class mythread : public acl::thread
{
public:
        mythread(acl::fiber_event& event, unsigned long long& count)
        : event_(event)
        , count_(count)
        {}

        ~mythread(void) {}

protected:
        // @override
        void* run(void)
        {
                for (int i = 0; i < 1000; i++) {
                        acl::fiber* fb = new myfiber(event_, count_);
                        fb->start();
                }

                acl::fiber::schedule();
                return NULL;
        }

private:
        acl::fiber_event& event_;
        unsigned long long& count_;
};

int main(void)
{
        unsigned long long count = 0;
        acl::fiber_event event;

        std::vector<acl::thread*> threads;
        for (int i = 0; i < 10; i++) {
                acl::thread* thr = new mythread(event, count);
                threads.push_back(thr);
                thr->start();
        }

        for (std::vector<acl::thread*>::iterator it = threads.begin();
                it != threads.end(); ++it) {
                (*it)->wait();
                delete *it;
        }

        printf("at last count=%llu\r\n", count);
        return 0;
}

 

 

    在上面例子中,创建 10 个线程,每个线程是一个独立的协程调度过程;每个线程内部创建 1000 个协程,每个协程内部对全局对象 count 进行加 1 操作,每次对 count 加 1 都需要调用 acl::thread_event 中的 wait/notify 进行同步。

    该示例代码:https://github.com/acl-dev/demo/blob/master/fiber_event.cpp

上一篇:

下一篇: