欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

redis源码剖析之异步事件驱动框架

程序员文章站 2022-03-04 20:00:58
...

想要分析源码,我觉得可以把它当成一个整体,或者一个函数,有输入和对应的输出。我们从输入开始分析流程。先撇开main函数,在gdb上打印下线程的堆栈是个好想法,让我们看看它都在做什么。
redis源码剖析之异步事件驱动框架

可以看得到它其中一条线程阻塞在epoll_wait(我的系统是ubuntu,因此选择了epoll)。在这里检测套接字事件,等待client请求。还有个线程池,创建了3条线程等待处理任务。

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

redis源码剖析之异步事件驱动框架

接下来我们看下epoll的对应实现,位于src/ae_epoll.c。首先是构造函数,根据配置申请epoll_wait时传入的存放事件的buff。接下来创建epoll,那么为什么是传入1024这个值呢?引用自man手册的描述是,这个值从前是用来提示内核用户会注册多少个描述符进入epoll的。但是自从kernel 2.6.8后就没有意义了,内核会自动申请描述事件的内部数据结构空间。

epoll_create()  creates  an epoll(7) instance. 
Since Linux 2.6.8, the size argument is ignored, but must be greater than zero;
typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;
 
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
 
    if (!state) return -1;
     
    // 根据配置申请用以epoll_wait时存放事件的buff
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
     
    // 创建epoll
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

接下来是注册和注销事件的两个接口,我们看注册时只有可读可写事件,而EPOLLERR与EPOLLHUP是默认会一直监测的事件。至于为何没有考虑使用边缘触发,暂且还未清楚缘由;边缘触发有助于减少事件通知次数,但是也会加大编程难度。注销事件时,需要判断是部分注销还是全部注销,再选择对应的操作类型。可能细心的你注意到为什么这里可以直接用fd偏移获取到注册的信息,不担心越界吗?这个问题我们文章最后来回答。

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE 
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
 
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}
 
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    int mask = eventLoop->events[fd].mask & (~delmask);
 
    // 不一定删除全部事件类型, 因此需要判断修改还是全部删除
    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (mask != AE_NONE) {
        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
        /* Note, Kernel < 2.6.9 requires a non null event pointer even for
         * EPOLL_CTL_DEL. */
        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
    }
}

最后是等待事件的接口。检测到事件后需要保存一下,但是这里记录事件的数据结构struct epoll_event是平台相关的。因此将发生的事件转换为统一的事件记录数据结构aeFiredEvent,即放到放入eventLoop->fired数组中记录。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
 
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp  (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;
 
        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;
 
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

最后我们来回答先前的问题,为什么譬如aeApiAddEvent中,可以直接使用fd作为数组索引查找注册信息。那是因为初始化的时候调用adjustOpenFilesLimit函数设置了最大描述符。这种不使用map之类保存注册信息,而是直接使用描述符偏移进行数组索引的设计,足见redis对运行速度的追求颇具心思。

相关标签: redis