欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Redis服务启动、事件循环与客户端命令的执行

程序员文章站 2022-05-20 14:05:14
...

Redis 服务启动

直接看代码。新的版本中服务器启动代码在 server.c

int main(int argc, char **argv) {
    struct timeval tv;
    int j;

    // 省略 ...

    initServerConfig();
    moduleInitModulesSystem();

    if (argc >= 2) {
    	
    	// ...

        resetServerSaveParams();
        loadServerConfig(configfile,options);// 根据指定的配置文件,初始化服务器配置
        sdsfree(options);
    }

    initServer();// 初始化服务器数据结构,创建aeEventLoop,并注册端口监听

    if (!server.sentinel_mode) {
    	// ...
        
        moduleLoadFromQueue();
        InitServerLast();
        loadDataFromDisk();
        
        // ...
    } else {
        InitServerLast();
        sentinelIsRunning();
    }

    // ...

    aeSetBeforeSleepProc(server.el,beforeSleep); // 钩子函数:每个event执行的前置函数
    aeSetAfterSleepProc(server.el,afterSleep); // 钩子函数:在每个event执行完成后,执行
    
    aeMain(server.el); // 事件循环处理
    
    aeDeleteEventLoop(server.el); // 退出前清理事件

    serverLog(LL_NOTICE, "Self add done");
    return 0;
}

总的来说,服务器启动主要流程如下:

Redis服务启动、事件循环与客户端命令的执行

简单来说就是:

1.初始化服务器

2.初始化数据库、构建事件循环、监听端口

3.如果需要,从AOF/RDB中恢复数据

4.事件循环,等待客户端的请求

 

事件循环

Redis 处理的客户端请求、自己的时间周期函数,都是基于事件循环的。

首先在服务器启动时,就创建了 aeEventLoop ,函数如下。该在上边的 initServer 函数内部被调用

// 说明:0、1、2分别表示标准输入、标准输出、标准错误输出
aeEventLoop *aeCreateEventLoop(int setsize) {// 最大文件描述符,默认为 10000+128
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++) // fd就是这个数组的下标
        eventLoop->events[i].mask = AE_NONE;// 表示暂时 未注册事件
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

创建后的 aeEventLoop 结构在 redisServer 下,数据结构如下:

Redis服务启动、事件循环与客户端命令的执行

aeEventLoop 里面有两类事件:

aeFileEvent:文件事件

aeTimeEvent:时间事件

最后 aeFiredEvent 用于存储触发的待处理文件事件。

注意:上边 main 函数的最后,先设置钩子函数,再在 aeMain 中循环处理事件。接下来看下这个事件循环处理 aeMain 函数

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL) // 前置函数不空,则执行
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); // 处理所有事件
    }
}

也就是说在收到终止事件循环的通知之前,服务器一直在执行这个循环:

1.执行事件sleep 前置函数

2.处理事件

 

接下来所有的事件处理过程都在 aeProcessEvents 中,这个过程如图:

Redis服务启动、事件循环与客户端命令的执行

简单来说就是在每个循环中:

1.执行事件前置函数

2.找到最早时间事件

3.多路复用获取文件事件

4.标记要触发的事件

5.执行后置函数

6.执行文件事件:先处理读事件,后处理写事件。(非 AE_BARRIER 情况)

7.处理时间事件

来看源代码:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))// 找到最早的时间事件
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);// 多路复用获取事件

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);// 本质上就是释放 module 的锁

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event laster. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsynching a file to disk,
             * before replying to a client. */
            int invert = fe->mask & AE_BARRIER;
            // 一般先执行readable事件再执行writable事件
            // 但是如果设置了AE_BARRIER,那么readable事件之后不会触发writable事件。此时先执行writable事件再执行readable事件

            /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS) // 总是先处理文件事件,再处理时间事件
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

 

文件事件

在上边的代码循环中:总是先处理文件事件,再处理时间事件。

其中文件事件的获取主要是这行代码:

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);// 多路复用获取事件

找出发生的事件,并返回事件发生的数量。

注意:该函数里面对系统底层进行了调用,注意该实现是可替换的。(且与主机环境有关。比如MAC OS 上是 kqueue)。在源码中对应的文件有:ae_epoll.c,ae_evport.c,ae_kqueue.c,ae_select.c。

这些分别是对 Reactor模式的几种I/O多路复用库函数:select、epoll、evport、kqueue,的封装。根据不同配置或者环境,可切换。

主机环境和设置不同,采用的方式可能不一样。具体的配置在 config.h 中。我的本地是 MAC,选择的是 kqueue ,源码如下:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    if (tvp != NULL) {// 事件的时间到了
        struct timespec timeout;
        timeout.tv_sec = tvp->tv_sec;
        timeout.tv_nsec = tvp->tv_usec * 1000;
        // 调用系统代码获取事件数量及事件队列
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, &timeout);
    } else {
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, NULL);
    }

    if (retval > 0) {
        int j;

        numevents = retval;
        for(j = 0; j < numevents; j++) {
            int mask = 0;
            struct kevent *e = state->events+j;// 对获取

            if (e->filter == EVFILT_READ) mask |= AE_READABLE;
            if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->ident;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

对于服务器采用了哪种方式,可以采用 info server 查看下:

Redis服务启动、事件循环与客户端命令的执行

 

时间事件

Redis 时间事件有两种:

  • 定时事件,每隔一段时间会执行一次;

  • 非定时事件,只会在某个时间点执行一次

时间事件的处理相当简单,就是循环里面遍历时间事件链表,发现时间超时就执行对应的proc。很简单,看源码即可:

/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);

    /* If the system clock is moved to the future, and then set back to the
     * right value, time events may be delayed in a random way. Often this
     * means that scheduled operations will not be performed soon enough.
     *
     * Here we try to detect system clock skews, and force all the time
     * events to be processed ASAP when this happens: the idea is that
     * processing events earlier is less dangerous than delaying them
     * indefinitely, and practice suggests it is. */
    if (now < eventLoop->lastTime) {// 系统时间调整到未来,再调整回来,时间事件就会被延迟随机的时间。
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    eventLoop->lastTime = now;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
        long now_sec, now_ms;
        long long id;

        /* Remove events scheduled for deletion. */
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            if (te->prev)
                te->prev->next = te->next;
            else
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);// 删除事件的处理函数
            zfree(te);
            te = next;
            continue;
        }

        /* Make sure we don't process time events created by time events in
         * this iteration. Note that this check is currently useless: we always
         * add new timers on the head, however if we change the implementation
         * detail, this check may be useful again: we keep it here for future
         * defense. */
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);// 执行并返回一个 int 值
            processed++;
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); //不为-1 代表下次该超时事件触发的时间间隔
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}

值得说下,timeProc 返回值如果不为 -1,则是距离下次触发的时间间隔。

 

 

客户端连接

客户端让服务器执行命令,要先建立连接。

先回到我们的服务启动过程,有一步初始化服务器 initServer :数据结构与监听

void initServer(void) {

    // 初始化数据库
    // 省略 ...

    // 时间事件循环
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }

    // 创建注册 TCP 的监听
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    // 创建注册 socket 监听
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");


    // 创建注册 pipe 的监听
    if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
        moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
            serverPanic(
                "Error registering the readable event for the module "
                "blocked clients subsystem.");
    }

    // ...

}

依次创建并监听三个文件事件:tcp 连接、socket连接、pipe。

监听到客户端连接后,创建 client ,并注册其请求的监听 readQueryFromClient

client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));

    // -1 代表无需创建有连接的客户端
    if (fd != -1) {
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)  
            // 此处为客户端的读注册监听。即处理客户端发送过来的命令
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }

    // ...
}

注意,这里是 AE_READABLE ,也就是客户端的请求事件。

创建的 client 会放入 redisServer 的 clients 链表中。到此客户端与服务器连接成功,客户端就可以发起请求了。

 

命令执行

Redis 的命令从客户端发送到服务器,然后被服务器执行,最后返回结果给客户端。

命令发送到服务器端,也是通过事件处理机制来分发执行的。上边说到 createClient 时,注册了可读事件的监听  readQueryFromClient 。

当可读性事件发生时,该监听函数对输入缓冲区等进行读取等一系列操作。最后调用 processCommand 处理命令,。而该函数位于 server.c 文件中。到此,就完成了事件分发。

接下来看服务器对命令的处理

该函数有点长,有 193 行,不过都是对当前服务器状态和命令的判断。直接看函数的最后:

int processCommand(client *c) {
    // 模块命令过滤、处理
    moduleCallCommandFilters(c);

    // ...
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    
    // ...

    /* Exec the command */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c); // 缓存事务命令,此时事务还没有提交
        addReply(c,shared.queued);
    } else {
        call(c,CMD_CALL_FULL); // 执行命令
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
    return C_OK;
}

此处,如果client 开启了事务,且未提交,则直接放在 queue 中。否则,直接执行命令对应的函数。

整个过程如图:

Redis服务启动、事件循环与客户端命令的执行

执行完,则将结果放入缓冲区,并发回给客户端。客户端得到回复,按照既定的方式解析即可。