欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

redis 源代码阅读与学习笔记(四)

程序员文章站 2022-07-06 19:19:25
...

接收发缓存区处理

本章关注 redis 5.0 如何处理接收发数据

相关数据结构

/* With multiplexing we need to take per-client state.
 * Clients are taken in a linked list. */
typedef struct client {
    // ... 其他无关字段略 ...
    sds querybuf;           /* Buffer we use to accumulate client queries. */
    // ... 其他无关字段略 ...
    int argc;               /* Num of arguments of current command. */
    robj **argv;            /* Arguments of current command. */
    // ... 其他无关字段略 ...
    list *reply;            /* List of reply objects to send to the client. */
    // ... 其他无关字段略 ...
    char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
字段名 说明
querybuf 接收缓存区
argc 解析接收缓存区中命令时,记录当前命令参数个数
argv 解析接收缓存区中命令时,记录当前命令参数
reply 待发送消息列表
buf 正在发送缓冲区

struct client 结构体中,还有一些字段,用于辅助记录当前处理中的位置信息、统计信息等,就不列上去了

避免分散注意力、或增加理解难度

struct redisServer {
    // ... 其他无关字段略 ...
    list *clients_pending_write; /* There is to write or install handler. */
    // ... 其他无关字段略 ...
};
字段名 说明
clients_pending_write 有数据待发送客户端对象列表

接收缓存区处理过程

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    // ... 其他无关代码略 ...
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
            return;
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        /* Append the query buffer to the pending (not applied) buffer
         * of the master. We'll use this buffer later in order to have a
         * copy of the string applied by the last command executed. */
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }
    // ... 其他无关代码略 ...
    processInputBufferAndReplicate(c);
}
  • c->querybuf 是 sds 简单动态字符串,就是一个可以动态扩展的 buff

  • sdsMakeRoomFor 函数根据输入的readlen,看情况动态扩展缓存区

  • readfd 中读取网络数据

  • processInputBufferAndReplicate 函数内,处理解析消息等

发送缓冲区处理过程

void beforeSleep(struct aeEventLoop *eventLoop) {
    // ... 其他无关代码略 ...
    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWrites();
    // ... 其他无关代码略 ...
}
int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);

    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);

        // ... 其他无关代码略 ...

        /* Try to write buffers to the client socket. */
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

        /* If after the synchronous writes above we still have data to
         * output to the client, we need to install the writable handler. */
        if (clientHasPendingReplies(c)) {
            int ae_flags = AE_WRITABLE;
            /* For the fsync=always policy, we want that a given FD is never
             * served for reading and writing in the same event loop iteration,
             * so that in the middle of receiving the query, and serving it
             * to the client, we'll call beforeSleep() that will do the
             * actual fsync of AOF to disk. AE_BARRIER ensures that. */
            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS)
            {
                ae_flags |= AE_BARRIER;
            }
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR)
            {
                freeClientAsync(c);
            }
        }
    }
    return processed;
}
/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string
         * using our optimized function, and attach the resulting string
         * to the output buffer. */
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyStringToList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}
  • 服务器主循环每次epoll返回会调用beforeSleep
  • handleClientsWithPendingWrites 函数内,会遍历待发送数据的客户端列表clients_pending_write
  • writeToClient 是实际发送过程,代码有点长,没贴上去
    • c->buf中数据发送
    • c->reply中也有数据,继续发送c->reply中的
  • addReply 先丢进c->buf消息合批,因为c->buf大小固定,塞不下,就丢进c->reply

其他问题

sdsMakeRoomFor 函数是可能内存扩展的,但是readQueryFromClient中未处理内存满时sdsMakeRoomFor执行失败的情况;会 crash

官方 github 上,也有很多人遇到该问题,诸如:

目前线上 redis 不能让其内存满负荷,满负荷的话,一定会 crash !!!

相关标签: Redis redis