Redis事件处理及源码剖析
Redis服务器是一个事件驱动程序。它和libevent网络库事件驱动一样,都是依托操作系统I/O多路复用技术支撑起来的,为什么它不直接用libevent呢?redis的作者Salvatore给出的答案是:
代码要足够简洁,能够满足当前的需求即可,尽量不要引入外部的依赖。
事件处理器的定义
Redis服务器要处理两种事件,一种是文件事件,另一种是时间事件。
文件事件:服务器通过套接字与客户端链接,文件事件就是服务器对套接字的操作的抽象。服务器就是要监听并处理这些事件来完成一系列网络通信操作。文件事件的结构体是:
/* File event structure
*
* 文件事件结构
*/
typedef struct aeFileEvent {
// 监听事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE ,
// 或者 AE_READABLE | AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */
// 读事件处理器
aeFileProc *rfileProc;
// 写事件处理器
aeFileProc *wfileProc;
// 多路复用库的私有数据
void *clientData;
} aeFileEvent;
为了存储已经到达(就绪)的文件事件,Redis还定义了一个结构aeFiredEvent来存储已经就绪的文件事件:
/* A fired event
*
* 已就绪事件
*/
// 需要知道文件事件的文件描述符还有事件类型才能对于锁定该事件
typedef struct aeFiredEvent {
// 已就绪文件描述符
int fd;
// 事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE
// 或者是两者的或
int mask;
} aeFiredEvent;
时间事件:服务器中的一些操作需要在给定的时间点执行,时间事件就是对这类定时操作的抽象。时间事件的结构是:
/* Time event structure
*
* 时间事件结构
*/
typedef struct aeTimeEvent {
// 时间事件的唯一标识符
long long id; /* time event identifier. */
// 事件的到达时间,是毫秒和秒,注意与timeval结构体里的秒和微秒区别
long when_sec; /* seconds */
long when_ms; /* milliseconds */
// 事件处理函数
aeTimeProc *timeProc;
// 事件释放函数
aeEventFinalizerProc *finalizerProc;
// 多路复用库的私有数据
void *clientData;
// 指向下个时间事件结构,形成链表
struct aeTimeEvent *next;
} aeTimeEvent;
服务器怎么区分和调度时间事件和文件事件呢? redis中设计了一个结构体aeEventLoop,来统一处理文件事件和时间事件。它的结构体如下:
/* State of an event based program
*
* 事件处理器的状态
*/
typedef struct aeEventLoop {
// 目前已注册的最大描述符
int maxfd; /* highest file descriptor currently registered */
// 目前已追踪的最大描述符
// 文件描述符监听集合的大小
int setsize; /* max number of file descriptors tracked */
// 用于生成时间事件 id
long long timeEventNextId;
// 最后一次执行时间事件的时间
time_t lastTime; /* Used to detect system clock skew */
// 已注册的文件事件,是一个数组,几号描述符就存在几号数组元素
aeFileEvent *events; /* Registered events */
// 已就绪的文件事件,也是一个数组,大小和events一样,通过aeApipoll处理
// 这里的fired是依次存的就绪的文件事件,访问是依次访问(因为都要处理)
aeFiredEvent *fired; /* Fired events */
// 时间事件
aeTimeEvent *timeEventHead;
// 事件处理器的开关
int stop;
// 多路复用库的私有数据
// 用来保存轮询事件的状态的
// 表示具体的底层多路复用所使用的数据结构,比如对于select来说,
// 该结构中保存了读写描述符数组;
// 对于epoll来说,该结构中保存了epoll描述符,以及epoll_event结构数组;
void *apidata; /* This is used for polling API specific data */
// 在处理事件前要执行的函数
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
redis是通过封装常见的I/O多路复用函数库来实现多路复用功能的,有select, epoll, evport, 和 kqueue, 对应于redis源码文件的ae_select.c, ae_epoll.c, ae_evport.c, ae_kqueue.c文件。这些封装的多路复用函数库有一个统一的接口,是通过aeApiState这个结构体实现的,每个文件的结构体的定义不相同,是为了适应不同的I/O多路复用函数,比如常见的封装epoll函数的aeApiState结构体为:
/*
* 事件状态
*/
typedef struct aeApiState {
// epoll_event 实例描述符
int epfd;
// 事件槽
struct epoll_event *events;
} aeApiState;
封装epoll函数的结构体为:
typedef struct aeApiState {
fd_set rfds, wfds;
/* We need to have a copy of the fd sets as it's not safe to reuse
* FD sets after select(). */
fd_set _rfds, _wfds;
} aeApiState;
如果你对epoll和select函数很熟悉,那你当然会觉得这没毛病。
对上述结构体,我画了一个图整理一下,希望能够反映他们之间的关系。
EventLoop的初始化
调用aeCreateEventLoop函数初始化了事件处理器状态。它的原型是:
aeEventLoop *aeCreateEventLoop(int setsize);
这里要注意的是,创建一个eventLoop时,就会初始化两个结构数组,一个结构数组是aeFileEvent,另一个是aeFiredEvent。
aeFileEvent结构体数组在创建时以最大文件描述符setSize为大小,以文件描述符作为数组索引,这样能够以O(1)直接访问到相应的文件描述符。
aeFiredEvent结构体数组在创建时的大小和aeFileEvent一样大(都是setSize), 但是是通过aeApiPoll函数将发生变化的文件描述符添加到aeFiredEvent数组中,这里不是通过下标添加,而是依次添加,依次取出,因为就绪的文件描述符都要进行处理,没有先后。
时间事件是通过链表存储的,每次添加时间时间都是前向插入到头部。只能依次访问,复杂度是O(N)。
//初始化aeEventLoop结构体中的events和fired
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
事件处理函数:aePeocessEvents
这个函数处理所有已经到达的时间事件,以及所有已经就绪的文件事件。处理流程是:
1. 先获取到达时间离当前时间最近的时间事件,计算还有多少时间到达,这个时间是存储在tvp结构中,它是一个timeval型结构。如果已经到达,将tvp设为0,这个tvp作为处理文件事件的阻塞时间。
2. 调用aeApiPoll函数。这个函数的在指定的时间tvp内,阻塞并等待被aeCreateFileEvent设置为监听状态的套接字产生文件事件,返回已就绪事件个数,就绪事件是保存在fired数组里。
3. 通过aeApiPoll返回的就绪文件事件数,依次处理fired数组中的就绪事件
4. 3处理完之后,转去处理时间事件。
这里要注意的是,aeEventLoop结构体中是定义的when_sec和when_ms, 分别是秒和毫秒。而timeval结构体中的是秒和微秒。aeApiPoll函数接受的是一个timeVal结构,需要进行毫秒和微秒的转换。
//aeApiPoll的原型,不同的多路复用函数实现不同
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
// timeval结构体,tv_usec是微秒
struct timeval
{
__time_t tv_sec; /* Seconds. */
__suseconds_t tv_usec; /* Microseconds. */
};
aeProcessEvents函数的源代码
/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
*
* 处理所有已到达的时间事件,以及所有已就绪的文件事件。
*
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
*
* 如果不传入特殊 flags 的话,那么函数睡眠直到文件事件就绪,
* 或者下个时间事件到达(如果有的话)。
*
* If flags is 0, the function does nothing and returns.
* 如果 flags 为 0 ,那么函数不作动作,直接返回。
*
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* 如果 flags 包含 AE_ALL_EVENTS ,所有类型的事件都会被处理。
*
* if flags has AE_FILE_EVENTS set, file events are processed.
* 如果 flags 包含 AE_FILE_EVENTS ,那么处理文件事件。
*
* if flags has AE_TIME_EVENTS set, time events are processed.
* 如果 flags 包含 AE_TIME_EVENTS ,那么处理时间事件。
*
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed.
* 如果 flags 包含 AE_DONT_WAIT ,
* 那么函数在处理完所有不许阻塞的事件之后,即刻返回。
*
* The function returns the number of events processed.
* 函数的返回值为已处理事件的数量
*/
// AE_FILE_EVENTS 1
// AE_TIME_EVENTS 2
// AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
// AE_DONT_WAIT 4
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
// flags中包含时间事件,且允许阻塞,也就是允许文件事件阻塞
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 获取最近的时间事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 如果时间事件存在的话
// 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
// 计算距今最近的时间事件还要多久才能达到
// 并将该时间距保存在 tv 结构中
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
// 如果最近时间的毫秒小于当前时间的毫秒,需要借位
if (shortest->when_ms < now_ms) {
// tv_usec: 是微秒不是毫秒, +1000是因为向tv_sec借位
// 毫秒和微秒的转换需要乘1000
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
// tv_sec退位
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
// 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
// 执行到这一步,说明没有时间事件
// 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
// 设置文件事件不阻塞
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
// 文件事件可以阻塞直到有事件到达为止
tvp = NULL; /* wait forever */
}
}
// 处理文件事件,阻塞时间由 tvp 决定
// aeApiPoll:在指定的时间tvp内,阻塞并等待被aeCreateFileEvent设置为监听状态的
// 套接字产生文件事件,返回已就绪事件个数,就绪事件保存在fired数组里
numevents = aeApiPoll(eventLoop, tvp);
// numevents位就绪事件个数,
for (j = 0; j < numevents; j++) {
// 从已就绪数组中获取事件
// 在events中是根据下标访问的
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
// mask;事件类型掩码,可以AE_READABLE 或 AE_WRITABLE,或两者的或
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
// 每次循环将它置为0,为0执行读事件,为1执行写事件
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// 读事件
if (fe->mask & mask & AE_READABLE) {
// rfired 确保读/写事件只能执行其中一个
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
// 执行时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}