Redis服务启动、事件循环与客户端命令的执行
Redis 服务启动
直接看代码。新的版本中服务器启动代码在 server.c 中
int main(int argc, char **argv) {
struct timeval tv;
int j;
// 省略 ...
initServerConfig();
moduleInitModulesSystem();
if (argc >= 2) {
// ...
resetServerSaveParams();
loadServerConfig(configfile,options);// 根据指定的配置文件,初始化服务器配置
sdsfree(options);
}
initServer();// 初始化服务器数据结构,创建aeEventLoop,并注册端口监听
if (!server.sentinel_mode) {
// ...
moduleLoadFromQueue();
InitServerLast();
loadDataFromDisk();
// ...
} else {
InitServerLast();
sentinelIsRunning();
}
// ...
aeSetBeforeSleepProc(server.el,beforeSleep); // 钩子函数:每个event执行的前置函数
aeSetAfterSleepProc(server.el,afterSleep); // 钩子函数:在每个event执行完成后,执行
aeMain(server.el); // 事件循环处理
aeDeleteEventLoop(server.el); // 退出前清理事件
serverLog(LL_NOTICE, "Self add done");
return 0;
}
总的来说,服务器启动主要流程如下:
简单来说就是:
1.初始化服务器
2.初始化数据库、构建事件循环、监听端口
3.如果需要,从AOF/RDB中恢复数据
4.事件循环,等待客户端的请求
事件循环
Redis 处理的客户端请求、自己的时间周期函数,都是基于事件循环的。
首先在服务器启动时,就创建了 aeEventLoop ,函数如下。该在上边的 initServer 函数内部被调用
// 说明:0、1、2分别表示标准输入、标准输出、标准错误输出
aeEventLoop *aeCreateEventLoop(int setsize) {// 最大文件描述符,默认为 10000+128
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++) // fd就是这个数组的下标
eventLoop->events[i].mask = AE_NONE;// 表示暂时 未注册事件
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
创建后的 aeEventLoop 结构在 redisServer 下,数据结构如下:
aeEventLoop 里面有两类事件:
aeFileEvent:文件事件
aeTimeEvent:时间事件
最后 aeFiredEvent 用于存储触发的待处理文件事件。
注意:上边 main 函数的最后,先设置钩子函数,再在 aeMain 中循环处理事件。接下来看下这个事件循环处理 aeMain 函数
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL) // 前置函数不空,则执行
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); // 处理所有事件
}
}
也就是说在收到终止事件循环的通知之前,服务器一直在执行这个循环:
1.执行事件sleep 前置函数
2.处理事件
接下来所有的事件处理过程都在 aeProcessEvents 中,这个过程如图:
简单来说就是在每个循环中:
1.执行事件前置函数
2.找到最早时间事件
3.多路复用获取文件事件
4.标记要触发的事件
5.执行后置函数
6.执行文件事件:先处理读事件,后处理写事件。(非 AE_BARRIER 情况)
7.处理时间事件
来看源代码:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))// 找到最早的时间事件
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);// 多路复用获取事件
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);// 本质上就是释放 module 的锁
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable
* event laster. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsynching a file to disk,
* before replying to a client. */
int invert = fe->mask & AE_BARRIER;
// 一般先执行readable事件再执行writable事件
// 但是如果设置了AE_BARRIER,那么readable事件之后不会触发writable事件。此时先执行writable事件再执行readable事件
/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS) // 总是先处理文件事件,再处理时间事件
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
文件事件
在上边的代码循环中:总是先处理文件事件,再处理时间事件。
其中文件事件的获取主要是这行代码:
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);// 多路复用获取事件
找出发生的事件,并返回事件发生的数量。
注意:该函数里面对系统底层进行了调用,注意该实现是可替换的。(且与主机环境有关。比如MAC OS 上是 kqueue)。在源码中对应的文件有:ae_epoll.c,ae_evport.c,ae_kqueue.c,ae_select.c。
这些分别是对 Reactor模式的几种I/O多路复用库函数:select、epoll、evport、kqueue,的封装。根据不同配置或者环境,可切换。
主机环境和设置不同,采用的方式可能不一样。具体的配置在 config.h 中。我的本地是 MAC,选择的是 kqueue ,源码如下:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
if (tvp != NULL) {// 事件的时间到了
struct timespec timeout;
timeout.tv_sec = tvp->tv_sec;
timeout.tv_nsec = tvp->tv_usec * 1000;
// 调用系统代码获取事件数量及事件队列
retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, &timeout);
} else {
retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, NULL);
}
if (retval > 0) {
int j;
numevents = retval;
for(j = 0; j < numevents; j++) {
int mask = 0;
struct kevent *e = state->events+j;// 对获取
if (e->filter == EVFILT_READ) mask |= AE_READABLE;
if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->ident;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
对于服务器采用了哪种方式,可以采用 info server 查看下:
时间事件
Redis 时间事件有两种:
-
定时事件,每隔一段时间会执行一次;
-
非定时事件,只会在某个时间点执行一次
时间事件的处理相当简单,就是循环里面遍历时间事件链表,发现时间超时就执行对应的proc。很简单,看源码即可:
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
if (now < eventLoop->lastTime) {// 系统时间调整到未来,再调整回来,时间事件就会被延迟随机的时间。
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
/* Remove events scheduled for deletion. */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);// 删除事件的处理函数
zfree(te);
te = next;
continue;
}
/* Make sure we don't process time events created by time events in
* this iteration. Note that this check is currently useless: we always
* add new timers on the head, however if we change the implementation
* detail, this check may be useful again: we keep it here for future
* defense. */
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);// 执行并返回一个 int 值
processed++;
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); //不为-1 代表下次该超时事件触发的时间间隔
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
值得说下,timeProc 返回值如果不为 -1,则是距离下次触发的时间间隔。
客户端连接
客户端让服务器执行命令,要先建立连接。
先回到我们的服务启动过程,有一步初始化服务器 initServer :数据结构与监听
void initServer(void) {
// 初始化数据库
// 省略 ...
// 时间事件循环
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
// 创建注册 TCP 的监听
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
// 创建注册 socket 监听
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
// 创建注册 pipe 的监听
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
// ...
}
依次创建并监听三个文件事件:tcp 连接、socket连接、pipe。
监听到客户端连接后,创建 client ,并注册其请求的监听 readQueryFromClient
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
// -1 代表无需创建有连接的客户端
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
// 此处为客户端的读注册监听。即处理客户端发送过来的命令
{
close(fd);
zfree(c);
return NULL;
}
}
// ...
}
注意,这里是 AE_READABLE ,也就是客户端的请求事件。
创建的 client 会放入 redisServer 的 clients 链表中。到此客户端与服务器连接成功,客户端就可以发起请求了。
命令执行
Redis 的命令从客户端发送到服务器,然后被服务器执行,最后返回结果给客户端。
命令发送到服务器端,也是通过事件处理机制来分发执行的。上边说到 createClient 时,注册了可读事件的监听 readQueryFromClient 。
当可读性事件发生时,该监听函数对输入缓冲区等进行读取等一系列操作。最后调用 processCommand 处理命令,。而该函数位于 server.c 文件中。到此,就完成了事件分发。
接下来看服务器对命令的处理
该函数有点长,有 193 行,不过都是对当前服务器状态和命令的判断。直接看函数的最后:
int processCommand(client *c) {
// 模块命令过滤、处理
moduleCallCommandFilters(c);
// ...
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
// ...
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c); // 缓存事务命令,此时事务还没有提交
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL); // 执行命令
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
return C_OK;
}
此处,如果client 开启了事务,且未提交,则直接放在 queue 中。否则,直接执行命令对应的函数。
整个过程如图:
执行完,则将结果放入缓冲区,并发回给客户端。客户端得到回复,按照既定的方式解析即可。