欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

c++云风coroutine库解析

程序员文章站 2022-04-24 13:36:29
...

云风coroutine库是一个C语言实现的轻量级协程库,源码简洁易懂,可以说是了(ru)解(keng)协程原理的最好源码资源。
我在之前的文章中,借助腾讯开源的libco,对C/C++的协程实现有了一个简单介绍,参考博客。其实libco和云风coroutine有很多相似的思想,只不过实现的方式不同而已,云风库只是提供了一种实现思路,并没有对hook进行处理,而libco则是工业级的协程库实现。通过两者源码阅读分析,可以比较出不同的实现方式的差异,多多思考源码库的作者为什么要这么设计,自己能不能对其进行改进,这样对自己的提升很有帮助。

设计思路分析

云风库主要利用ucontext簇函数进行协程上下文切换,ucontext簇函数的最大有点就是简单易用,但是切换的性能不如libco设计的汇编逻辑(主要原因是ucontext内部实现上下文切换了很多不需要的寄存器,而libco汇编实现的切换则更加简洁直接),主要包括一下四个函数:

//获取当前的上下文保存到ucp
getcontext(ucontext_t *ucp)
//直接到ucp所指向的上下文中去执行
setcontext(const ucontext_t *ucp)
//创建一个新的上下文
makecontext(ucontext_t *ucp, void (*func)(), int argc, ...) 
//将当前上下文保存到oucp,然后跳转到ucp上下文执行
int swapcontext(ucontext_t *oucp, const ucontext_t *ucp)

//ucontext结构体
typedef struct ucontext{
	//当这个上下文终止后,运行uc_link指向的上下文
	//如果为NULL则终止当前线程
    struct ucontext* uc_link;
    //  为该上下文的阻塞信号集合
    sigset_t uc_sigmask;
    //该上下文使用的栈
    stack_t uc_stack;
    //保持上下文的特定寄存器
    mcontext_t uc_mcontext;
    ...
}ucontext_t

API的设计方面,云风库的API设计的非常简洁,主要如下的设计:

//协程对象的四种状态
#define COROUTINE_DEAD 0
#define COROUTINE_READY 1
#define COROUTINE_RUNNING 2
#define COROUTINE_SUSPEND 3

//新建协程调度器对象
struct schedule * coroutine_open(void);
//关闭协程调度器
void coroutine_close(struct schedule *);

//创建协程对象
int coroutine_new(struct schedule *, coroutine_func, void *ud);
//执行协程对象(启动或继续)
void coroutine_resume(struct schedule *, int id);
//返回协程对象的状态
int coroutine_status(struct schedule *, int id);
//返回正在执行的协程ID
int coroutine_running(struct schedule *);
//yeild当前执行的协程,返回到主协程
void coroutine_yield(struct schedule *);

在协程栈的设计中,云风库也是选择共享栈的实现,即每个协程自己保持栈中的数据,每当resume运行的时候,将自己的数据copy到运行栈上,每当yield的时候,将运行栈的数据(首先要找到栈底和栈顶)保存在自己协程结构体中。这种方法优势在于只需要一开始初始化一大块栈内存(云风库默认是1M),运行时数据放在其上即可,不会考虑到爆栈的问题,相比于每个协程一个自己的栈,栈内存的利用率要高很多。缺点在于,每次协程切换都会有用户态中的copy过程。接下来可以看其如何实现。

源码分析

接下来看其主要的逻辑实现。云风库中主要有两个结构体。一个是调度器,一个是协程。

//协程调度器
struct schedule {
	char stack[STACK_SIZE];	//默认大小1MB,是一个共享栈,所有协程运行时都在其上
	ucontext_t main;		//主协程上下文
	int nco;				//协程调度器中存活协程个数
	int cap;				//协程调度器管理最大容量。最大支持多少协程。当nco >= cap时候要扩容
	int running;			//正在运行的协程ID
	struct coroutine **co;	//一维数组,数组元素是协程指针
};

//协程
struct coroutine {
	coroutine_func func;	//协程执行函数
	void *ud;				//协程参数
	ucontext_t ctx;			//协程上下文
	struct schedule * sch;	//协程对应的调度器
	ptrdiff_t cap;			//协程自己保存栈的最大容量(stack的容量)
	ptrdiff_t size;			//协程自己保存的栈当前大小(stack的大小)
	int status;				//协程状态
	char *stack;			//当前协程自己保存的栈数据,因为是共享栈的设计,
	//即每个协程都在同一个栈空间中切换,所以每个协程在切换出来后要保存自己的栈内容
};

接下来看一下coroutine_resume的源码,这个函数是开启指定协程,可以看到有两种情况,一种是协程第一次执行,状态从COROUTINE_READY -> COROUTINE_RUNNING,另一种是协程之前运行过但是yield了,再次执行,状态从COROUTINE_SUSPEND -> COROUTINE_RUNNING。

//mainfunc是对协程函数的封装,里面运行了用户提供的协程函数,并在结束后删除对应的协程
static void
mainfunc(uint32_t low32, uint32_t hi32) {
	uintptr_t ptr = (uintptr_t)low32 | ((uintptr_t)hi32 << 32);
	struct schedule *S = (struct schedule *)ptr;
	int id = S->running;
	struct coroutine *C = S->co[id];
	C->func(S,C->ud);	//运行对应函数
	_co_delete(C);
	S->co[id] = NULL;
	--S->nco;
	S->running = -1;
}

//指定的协程开始(继续)运行
void 
coroutine_resume(struct schedule * S, int id) {
	//首先确保没有正在运行的协程,并且id满足条件
	assert(S->running == -1);
	assert(id >=0 && id < S->cap);
	struct coroutine *C = S->co[id];
	if (C == NULL)
		return;
	int status = C->status;
	switch(status) {
	case COROUTINE_READY:
		//此时协程是刚刚新建的,还没运行过,切换上下文,getcontext初始化即将要运行协程的上下文,
		getcontext(&C->ctx);
		C->ctx.uc_stack.ss_sp = S->stack;	//设置共享栈(即设置当前协程中的栈为运行栈,运行栈就是共享栈,一开始就分配好的1M空间)
		C->ctx.uc_stack.ss_size = STACK_SIZE;
		C->ctx.uc_link = &S->main;			//uc_link是当前上下文终止后,执行运行主协程的上下文
		S->running = id;
		C->status = COROUTINE_RUNNING;
		//注意这里将S作为参数,传到mainfunc里面,但是先划分成两个uint32_t,然后再在mainfunc中合并
		uintptr_t ptr = (uintptr_t)S;
		makecontext(&C->ctx, (void (*)(void)) mainfunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32));
		swapcontext(&S->main, &C->ctx);
		break;
	case COROUTINE_SUSPEND:
		//此时协程已经被yield过,memcpy将协程自己的栈中内存copy到运行栈
		//共享栈的缺点就是在yield和resume的时候要自己进行copy,将协程自己保存的栈内容与运行栈之间进行copy
		memcpy(S->stack + STACK_SIZE - C->size, C->stack, C->size);
		S->running = id;
		C->status = COROUTINE_RUNNING;
		swapcontext(&S->main, &C->ctx);
		break;
	default:
		assert(0);
	}
}

接下来看一下coroutine_yield的实现。yield就是将协程调度器中当前运行的协程中断,然后用户态下切换另一个协程运行。至于中断的原因有很多,比如在等IO的时候,或者等待系统调用,或者等待网络数据,云风库中没有对其实现,具体可以去看看libco中hook了哪些函数。
这里的有一个问题,就是在yield的过程中,要将运行栈的数据copy出来,如何找到运行栈中的数据呢?我们知道,在进程地址空间中,栈是从高地址向低地址延伸的,也就是说栈底在高地址,栈顶在低地址,要想copy栈中的数据,只需要找到栈顶和栈底地址,将其中的数据memcpy出来即可。栈底很好找,即为S->stack + STACK_SIZE,栈顶则可以利用一个dummy对象,将S->stack与dummy对象的地址相减,即为栈目前的长度。

//保存当前协程的协程栈,因为coroutine是基于共享栈的,所以协程的栈内容需要单独保存起来。
static void
_save_stack(struct coroutine *C, char *top) {
	//利用dump找到栈顶位置(最低地址)
	char dummy = 0;
	assert(top - &dummy <= STACK_SIZE);
	if (C->cap < top - &dummy) {
		free(C->stack);
		C->cap = top-&dummy;
		C->stack = malloc(C->cap);
	}
	C->size = top - &dummy;
	memcpy(C->stack, &dummy, C->size);
}

//切换出当前正在运行的协程,切换到主协程运行,因为主协程中有while(),并且两个子协程相继切换
void
coroutine_yield(struct schedule * S) {
	int id = S->running;
	assert(id >= 0);
	struct coroutine * C = S->co[id];
	assert((char *)&C > S->stack);
	_save_stack(C,S->stack + STACK_SIZE);//栈是从高地址向低地址发展的,S->stack + STACK_SIZE是栈底(最高地址)
	C->status = COROUTINE_SUSPEND;
	S->running = -1;
	swapcontext(&C->ctx , &S->main);
}

参考:

  1. 云风的BLOG
  2. libco协程概述
相关标签: Server