redis 源代码阅读与学习笔记(四)

本章关注 redis 5.0 如何处理接收发数据


/* With multiplexing we need to take per-client state.
 * Clients are taken in a linked list. */
typedef struct client {
    // ... 其他无关字段略 ...
    sds querybuf;           /* Buffer we use to accumulate client queries. */
    // ... 其他无关字段略 ...
    int argc;               /* Num of arguments of current command. */
    robj **argv;            /* Arguments of current command. */
    // ... 其他无关字段略 ...
    list *reply;            /* List of reply objects to send to the client. */
    // ... 其他无关字段略 ...
} client;
字段名 说明
querybuf 接收缓存区
argc 解析接收缓存区中命令时,记录当前命令参数个数
argv 解析接收缓存区中命令时,记录当前命令参数
reply 待发送消息列表
buf 正在发送缓冲区

struct client 结构体中,还有一些字段,用于辅助记录当前处理中的位置信息、统计信息等,就不列上去了


struct redisServer {
    // ... 其他无关字段略 ...
    list *clients_pending_write; /* There is to write or install handler. */
    // ... 其他无关字段略 ...
字段名 说明
clients_pending_write 有数据待发送客户端对象列表


void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    // ... 其他无关代码略 ...
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
    } else if (nread == 0) {
        serverLog(LL_VERBOSE, "Client closed connection");
    } else if (c->flags & CLIENT_MASTER) {
        /* Append the query buffer to the pending (not applied) buffer
         * of the master. We'll use this buffer later in order to have a
         * copy of the string applied by the last command executed. */
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
    // ... 其他无关代码略 ...
  • c->querybuf 是 sds 简单动态字符串,就是一个可以动态扩展的 buff

  • sdsMakeRoomFor 函数根据输入的readlen,看情况动态扩展缓存区

  • readfd 中读取网络数据

  • processInputBufferAndReplicate 函数内,处理解析消息等


void beforeSleep(struct aeEventLoop *eventLoop) {
    // ... 其他无关代码略 ...
    /* Handle writes with pending output buffers. */
    // ... 其他无关代码略 ...
int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);

    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;

        // ... 其他无关代码略 ...

        /* Try to write buffers to the client socket. */
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

        /* If after the synchronous writes above we still have data to
         * output to the client, we need to install the writable handler. */
        if (clientHasPendingReplies(c)) {
            int ae_flags = AE_WRITABLE;
            /* For the fsync=always policy, we want that a given FD is never
             * served for reading and writing in the same event loop iteration,
             * so that in the middle of receiving the query, and serving it
             * to the client, we'll call beforeSleep() that will do the
             * actual fsync of AOF to disk. AE_BARRIER ensures that. */
            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS)
                ae_flags |= AE_BARRIER;
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR)
    return processed;
/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string
         * using our optimized function, and attach the resulting string
         * to the output buffer. */
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
  • 服务器主循环每次epoll返回会调用beforeSleep
  • handleClientsWithPendingWrites 函数内,会遍历待发送数据的客户端列表clients_pending_write
  • writeToClient 是实际发送过程,代码有点长,没贴上去
    • c->buf中数据发送
    • c->reply中也有数据,继续发送c->reply中的
  • addReply 先丢进c->buf消息合批,因为c->buf大小固定,塞不下,就丢进c->reply


sdsMakeRoomFor 函数是可能内存扩展的,但是readQueryFromClient中未处理内存满时sdsMakeRoomFor执行失败的情况;会 crash

官方 github 上,也有很多人遇到该问题,诸如:

目前线上 redis 不能让其内存满负荷,满负荷的话,一定会 crash !!!

