Redis源码解析 - eventloop(redis调度的核心)
相信大家在很多关于Redis的文章里面都看过一句话:redis是单进程的,所以它不用考虑那么多多进程编程带来的麻烦。这话其实不完全正确。如果你读过Redis源码,你肯定知道,Redis存在一个主进程,同时还会有多个子进程。只是它的子进程通常是用于处理某个临时性的任务(比如RDB持久化过程、AOF的rewrite、主备之间的全同步等等,都是比较耗时的任务),一旦任务处理完就销毁了。
今天,我们要讨论的问题是,在这个主进程里面,它的调度核心 - eventloop。
写过C语言的都清楚,任何一个C语言程序的入口都是main函数。main函数执行结束,这个程序就会退出。
对于一些服务型的程序,它需要长期驻留并不断给使用者提供服务响应,比如Redis-server,还比如一些嵌入式的设备软件如路由器、防火墙、基站等等等等。这一类的程序一旦启动起来后,就不能立刻退出。
那么,怎样才能让main函数不退出呢?最简单的,就是在里面写一个死循环,不就不用退出了吗?
int main(void)
{
// something
// server loop
while(true)
{
// wait something
// do something
// wait next
}
exit();
}
这个方法是肯定可以保证程序不退出的,但是它会对性能造成较大影响,出现所谓的“忙等待”(死循环一直占用CPU)。
要解决这个问题,我们得先分析服务要响应的“输入”到底有哪些?
我理解的“输入”有两类:
1、系统内部产生的,主要是定时器触发的;比如,Redis的key支持超时,那么系统内部肯定会在一定的周期去检查哪些key超时了。这就是定时器触发的一个输入事件。
2、系统外部产生的。这主要是一些IO事件。比如,用户在redis-cli输入一个命令,它最终是会通过socket传给server的,对于server来说,这是一个socket的IO事件。又比如,rdb的持久化过程,就是对磁盘文件的读写IO事件。在linux中,这些IO事件最终都可以转换为对一个FD(文件描述符)的读写事件。
Redis的main函数里面,主要就是在循环处理这两类“输入”。这个大循环,我们叫做eventLoop。
下面我们看看eventLoop如何处理的?
在redis-server的main函数中,最后会调用aeMain,这就是我们的主循环,必须放在main最后。代码走到这里就会一直在这个循环里面执行,除非退出。
aeMain里面就是一个while循环了,它的执行条件eventLoop->stop正常情况下都是false,所以会一直循环。
它里面调用了aeProcessEvents,这个函数就是最核心的循环处理入口了。下面我们重点分析这个函数。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
// ae的事件有两种类型:fd的IO事件、定时器事件,如果都不存在,则不需要执行下面代码。
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 下面这一大段代码巴拉巴拉,其实都是在干一件事情,就是找到最近一个超时的定时器事件。
// 并且获取距离当前的这个时间间隔值。拿来做什么用呢?
// 为了避免“忙等待”,我们在检查FD的IO读写状态时(select或者epoll),都会采用阻塞的方式,如果没有可读可写的FD,就一直阻塞着等待。但是,我还有定时器事件要处理啊,如果一直没有IO事件,那我定时器事件不是一直没法处理么?
// 所以,我们会给select或者epoll传入一个阻塞的超时时间,超过这个时间,都给我返回。
// 下面获取的这个值,就是用于设置阻塞超时时间的。
// 这样做,既可以避免非阻塞式的忙等待,又可以保证定时器事件能够按时得到处理。
// 其实这种处理方式非常普遍,以C为开发语言的很多服务型软件都是这样玩的。
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
// 找到最近超时的定时器事件
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
// 计算超时时间
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
// 已经有定时器超时了。采用非阻塞(tvp设置为0),立即返回。
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
// 没有定时器事件? 那就一直阻塞吧。
tvp = NULL; /* wait forever */
}
}
// AE_DONT_WAIT表示强制不允许阻塞。这在TLS的场景中有用。
if (eventLoop->flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
// select之前提供一个回调
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
// 这里面就是调select或者epoll。这就是网上老说的IO多路复用,很多把这个点作为redis高性能的一个重要原因来提。但是,IO多路复用不是很普遍吗? 现在还有读socket不是这样多路复用的吗?
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
// select之后提供一个回调
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 回调各个event的处理函数
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable
* event laster. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsynching a file to disk,
* before replying to a client. */
int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
// 可读事件的回调处理
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
// 可写事件的回调处理
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* Check time events */
// 处理定时器事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
请着重看我在里面添加的注释。
它的整个调度逻辑其实是非常简单的。eventloop的事件分为FD读写事件和定时器事件,下面我们来看一个简单的FD读写事件注册的例子。比如就以socket的可读事件为例,它的接口是:
这是conn模块的接口,我们知道redis的socket都被conn给封装了起来。这个函数会调用aeCreateFileEvent注册fd的AE_READABLE事件,并且给它注册一个回调函数ae_handler。
同一个系统中的fd都是内核分配的,全局的,所以fd是唯一的。redis为了查询效率,使用了数组events[]来存这些事件。这个数组在server_init就分配好了。这是空间换时间。
我们再来看看定时器是如何处理的?
我一开始以为redis会注册很多个定时器,最后发现我土了,它真正意义上的定时器只有一个,就是serverCron。也就是说,在server_init时,调用aeCreateTimeEvent注册了一个全局大定时器serverCron。
其它所有的定时任务,均由这个serverCron来驱动。所以,我们会看到redis有一个主频的概念(server.hz),它表示的其实就是1秒钟这个serverCron会被调用多少次。默认server.hz = 10,也就是1秒钟调用10次,也就是100ms调用一次。
由于redis几乎所有需要定时处理的任务都是在serverCron里面驱动的,所以理论上,你可以通过调整这个主频,来改善性能。
我们看一个例子,看看它如何驱动其它定时任务的。比如,redis会每个5s打印一次client的状态,这其实就是5s的小定时器。
它通过run_with_period(5000)来判断是否超时。
(_ms_ <= 1000/server.hz),这个很简单了,如果你的超时周期小于主频的周期,那肯定每次调用都超时。
!(server.cronloops%((_ms_)/(1000/server.hz))):
server.cronloops是一个累积计数,serverCron每次被调用都会加1。
((_ms_)/(1000/server.hz))的结果,表示需要调用多少次serverCron才能满足我的一个超时周期。
两者取余,只要余数为0,那么肯定就是超时周期了。
所以,redis的所有定时任务都是基于serverCron来驱动的,而eventloop直接驱动的只有serverCron。
这就是Redis主进程的调度核心eventloop的实现逻辑。当然还有一些细节,可以自己看代码消化。
上一篇: php解析base64数据生成图片的方法
下一篇: PHP脚本编写在网站开发中的作用解析