欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Redis源码解析 - eventloop(redis调度的核心)

程序员文章站 2022-05-18 22:11:16
...

相信大家在很多关于Redis的文章里面都看过一句话:redis是单进程的,所以它不用考虑那么多多进程编程带来的麻烦。这话其实不完全正确。如果你读过Redis源码,你肯定知道,Redis存在一个主进程,同时还会有多个子进程。只是它的子进程通常是用于处理某个临时性的任务(比如RDB持久化过程、AOF的rewrite、主备之间的全同步等等,都是比较耗时的任务),一旦任务处理完就销毁了。

今天,我们要讨论的问题是,在这个主进程里面,它的调度核心 - eventloop。

写过C语言的都清楚,任何一个C语言程序的入口都是main函数。main函数执行结束,这个程序就会退出。

对于一些服务型的程序,它需要长期驻留并不断给使用者提供服务响应,比如Redis-server,还比如一些嵌入式的设备软件如路由器、防火墙、基站等等等等。这一类的程序一旦启动起来后,就不能立刻退出。

那么,怎样才能让main函数不退出呢?最简单的,就是在里面写一个死循环,不就不用退出了吗?

int main(void)
{
    // something

    // server loop
    while(true)
    {
    // wait something
    // do something
    // wait next
    }
    exit();
}

这个方法是肯定可以保证程序不退出的,但是它会对性能造成较大影响,出现所谓的“忙等待”(死循环一直占用CPU)。

要解决这个问题,我们得先分析服务要响应的“输入”到底有哪些?

我理解的“输入”有两类:

1、系统内部产生的,主要是定时器触发的;比如,Redis的key支持超时,那么系统内部肯定会在一定的周期去检查哪些key超时了。这就是定时器触发的一个输入事件。

2、系统外部产生的。这主要是一些IO事件。比如,用户在redis-cli输入一个命令,它最终是会通过socket传给server的,对于server来说,这是一个socket的IO事件。又比如,rdb的持久化过程,就是对磁盘文件的读写IO事件。在linux中,这些IO事件最终都可以转换为对一个FD(文件描述符)的读写事件。

Redis的main函数里面,主要就是在循环处理这两类“输入”。这个大循环,我们叫做eventLoop。

下面我们看看eventLoop如何处理的?

在redis-server的main函数中,最后会调用aeMain,这就是我们的主循环,必须放在main最后。代码走到这里就会一直在这个循环里面执行,除非退出。

Redis源码解析 - eventloop(redis调度的核心)

aeMain里面就是一个while循环了,它的执行条件eventLoop->stop正常情况下都是false,所以会一直循环。

Redis源码解析 - eventloop(redis调度的核心)

它里面调用了aeProcessEvents,这个函数就是最核心的循环处理入口了。下面我们重点分析这个函数。

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
	
    // ae的事件有两种类型:fd的IO事件、定时器事件,如果都不存在,则不需要执行下面代码。
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        // 下面这一大段代码巴拉巴拉,其实都是在干一件事情,就是找到最近一个超时的定时器事件。
        // 并且获取距离当前的这个时间间隔值。拿来做什么用呢?
        // 为了避免“忙等待”,我们在检查FD的IO读写状态时(select或者epoll),都会采用阻塞的方式,如果没有可读可写的FD,就一直阻塞着等待。但是,我还有定时器事件要处理啊,如果一直没有IO事件,那我定时器事件不是一直没法处理么?
        // 所以,我们会给select或者epoll传入一个阻塞的超时时间,超过这个时间,都给我返回。
        // 下面获取的这个值,就是用于设置阻塞超时时间的。
        // 这样做,既可以避免非阻塞式的忙等待,又可以保证定时器事件能够按时得到处理。
        // 其实这种处理方式非常普遍,以C为开发语言的很多服务型软件都是这样玩的。

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            // 找到最近超时的定时器事件
            shortest = aeSearchNearestTimer(eventLoop); 
        if (shortest) { 
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;
			// 计算超时时间
            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                // 已经有定时器超时了。采用非阻塞(tvp设置为0),立即返回。
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                // 没有定时器事件? 那就一直阻塞吧。
                tvp = NULL; /* wait forever */
            }
        }
        
        // AE_DONT_WAIT表示强制不允许阻塞。这在TLS的场景中有用。
        if (eventLoop->flags & AE_DONT_WAIT) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        }

		// select之前提供一个回调
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        // 这里面就是调select或者epoll。这就是网上老说的IO多路复用,很多把这个点作为redis高性能的一个重要原因来提。但是,IO多路复用不是很普遍吗? 现在还有读socket不是这样多路复用的吗?
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
		// select之后提供一个回调
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

		// 回调各个event的处理函数
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event laster. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsynching a file to disk,
             * before replying to a client. */
            int invert = fe->mask & AE_BARRIER;

            /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            // 可读事件的回调处理
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }
            
            // 可写事件的回调处理
            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
	// 处理定时器事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

请着重看我在里面添加的注释。

它的整个调度逻辑其实是非常简单的。eventloop的事件分为FD读写事件和定时器事件,下面我们来看一个简单的FD读写事件注册的例子。比如就以socket的可读事件为例,它的接口是:

Redis源码解析 - eventloop(redis调度的核心)

这是conn模块的接口,我们知道redis的socket都被conn给封装了起来。这个函数会调用aeCreateFileEvent注册fd的AE_READABLE事件,并且给它注册一个回调函数ae_handler。

Redis源码解析 - eventloop(redis调度的核心)

同一个系统中的fd都是内核分配的,全局的,所以fd是唯一的。redis为了查询效率,使用了数组events[]来存这些事件。这个数组在server_init就分配好了。这是空间换时间。

 

我们再来看看定时器是如何处理的?

我一开始以为redis会注册很多个定时器,最后发现我土了,它真正意义上的定时器只有一个,就是serverCron。也就是说,在server_init时,调用aeCreateTimeEvent注册了一个全局大定时器serverCron。

其它所有的定时任务,均由这个serverCron来驱动。所以,我们会看到redis有一个主频的概念(server.hz),它表示的其实就是1秒钟这个serverCron会被调用多少次。默认server.hz = 10,也就是1秒钟调用10次,也就是100ms调用一次。

由于redis几乎所有需要定时处理的任务都是在serverCron里面驱动的,所以理论上,你可以通过调整这个主频,来改善性能。

我们看一个例子,看看它如何驱动其它定时任务的。比如,redis会每个5s打印一次client的状态,这其实就是5s的小定时器。

Redis源码解析 - eventloop(redis调度的核心)

它通过run_with_period(5000)来判断是否超时。

Redis源码解析 - eventloop(redis调度的核心)

(_ms_ <= 1000/server.hz),这个很简单了,如果你的超时周期小于主频的周期,那肯定每次调用都超时。

!(server.cronloops%((_ms_)/(1000/server.hz))):

server.cronloops是一个累积计数,serverCron每次被调用都会加1。

((_ms_)/(1000/server.hz))的结果,表示需要调用多少次serverCron才能满足我的一个超时周期。

两者取余,只要余数为0,那么肯定就是超时周期了。

 

所以,redis的所有定时任务都是基于serverCron来驱动的,而eventloop直接驱动的只有serverCron。

 

这就是Redis主进程的调度核心eventloop的实现逻辑。当然还有一些细节,可以自己看代码消化。

相关标签: Redis源码