欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  数据库

redis源代码分析22–协议

程序员文章站 2024-02-14 14:35:28
...

redis默认使用tcp协议的6379端口,其协议是文本行格式的而不是二进制格式的,每一行都以\r\n结尾,非常容易理解。 参考ProtocolSpecification.html就知道,发布到redis的命令有如下几种返回格式(对于不存在的值,会返回-1,此时client library应返回合适的n

redis默认使用tcp协议的6379端口,其协议是文本行格式的而不是二进制格式的,每一行都以”\r\n”结尾,非常容易理解。

参考ProtocolSpecification.html就知道,发布到redis的命令有如下几种返回格式(对于不存在的值,会返回-1,此时client library应返回合适的nil对象(比如C语言的NULL),而不是空字符串):

1)第一个字节是字符“-”,后面跟着一行出错信息(error reply)

比如lpop命令,当操作的对象不是一个链表时,会返回如下出错信息:

“-ERR Operation against a key holding the wrong kind of value\r\n”

2)第一个字节是字符“+”,后面跟着一行表示执行结果的提示信息(line reply)

比如set命令执行成功后,会返回”+OK\r\n”

3)第一个字节是字符“$”,后面先跟一行,仅有一个数字,该数字表示下一行字符的个数(若不存在,则数字为-1)(bulk reply)

比如get命令,成功时返回值的类似于“$7\r\nmyvalue”,不存在时返回的信息为“$-1\r\n”

4)第一个字节是字符“*”,后面先跟一行,仅有一个数字,该数字表示bulk reply的个数(若不存在,则数字为-1)(multi-bulk reply)

比如lrange命令,若要求返回0–2之间的值,则成功时返回值类似于”*3\r\n$6\r\nvalue1\r\n$7\r\nmyvalue\r\n$5\r\nhello\r\n”,不存在时返回的信息类似于”*-1\r\n”。

5)第一个字节是字符“:”,后面跟着一个整数值(integer reply)

比如incr命令,成功时会返回对象+1后的值。

而client发布命令的格式有如下几种,第一个字符串都必须是命令字,不同的参数之间用1个空格来分隔:
1)Inline Command::仅一行

比如 EXISTS命令,client发送的字节流类似于”EXISTS mykey\r\n”。
2)Bulk Command:类似于返回协议的bulk reply,一般有两行,第一行依次为“命令字 参数 一个数字”,该数字表示下一行字符的个数

比如SET命令,client发送的字节流类似于”SET mykey 5\r\nhello\r\n”。

3)multi-bulk Command:跟返回协议的multi-bulk reply类似。

比如上面的SET命令,用multi-bulk协议表示则为“*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$5\r\nhello\r\n”。

尽管对于某些命令该协议发送的字节流比bulk command形式要多,但它可以支持任何一种命令,支持跟多个二进制安全参数的命令(bulk command仅支持一个),也可以使得client library不修改代码就能支持redis新发布的命令(只要把不支持的命令按multi-bulk形式发布即可)。redis的官方文档中还提到,未来可能仅支持client采用multi-bulk Command格式发布命令。

另外提一下,client library可以连续发布多条命令,而不是等到redis返回前一条命令的执行结果才发布新的命令,这种机制被称作pipelining,支持redis的client library大多支持这种机制,读者可自行参考。

最后来看看redis实现时用来返回信息的相关函数。

redis 会使用addReplySds、addReplyDouble、addReplyLongLong、addReplyUlong、 addReplyBulkLen、addReplyBulk、addReplyBulkCString等来打包不同的返回信息,最终调用addReply 来发送信息。

addReply会将发送信息添加到相应redisClient的reply链表尾部,并使用 sendReplyToClient来发送。sendReplyToClient会遍历reply链表,并依次发送,其间如果可以打包 reply(server.glueoutputbuf为真),则可以使用glueReplyBuffersIfNeeded把reply链表中的值合并到一个缓冲区,然后一次性发送。

static void addReply(redisClient *c, robj *obj) {
    if (listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c) == AE_ERR) return;
    if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
        obj = dupStringObject(obj);
        obj->refcount = 0; /* getDecodedObject() will increment the refcount */
    }
    listAddNodeTail(c->reply,getDecodedObject(obj));
}
static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = privdata;
    int nwritten = 0, totwritten = 0, objlen;
    robj *o;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    /* Use writev() if we have enough buffers to send */
    if (!server.glueoutputbuf &&
        listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
        !(c->flags & REDIS_MASTER))
    {
        sendReplyToClientWritev(el, fd, privdata, mask);
        return;
    }
    while(listLength(c->reply)) {
        if (server.glueoutputbuf && listLength(c->reply) > 1)
            glueReplyBuffersIfNeeded(c);
        o = listNodeValue(listFirst(c->reply));
        objlen = sdslen(o->ptr);
        if (objlen == 0) {
            listDelNode(c->reply,listFirst(c->reply));
            continue;
        }
        if (c->flags & REDIS_MASTER) {
            /* Don't reply to a master */
            nwritten = objlen - c->sentlen;
        } else {
            nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
            if (nwritten sentlen += nwritten;
        totwritten += nwritten;
        /* If we fully sent the object on head go to the next one */
        if (c->sentlen == objlen) {
            listDelNode(c->reply,listFirst(c->reply));
            c->sentlen = 0;
        }
        /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
         * bytes, in a single threaded server it's a good idea to serve
         * other clients as well, even if a very large request comes from
         * super fast link that is always able to accept data (in real world
         * scenario think about 'KEYS *' against the loopback interfae) */
        if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
    }
    if (nwritten == -1) {
        if (errno == EAGAIN) {
            nwritten = 0;
        } else {
            redisLog(REDIS_VERBOSE,
                "Error writing to client: %s", strerror(errno));
            freeClient(c);
            return;
        }
    }
    if (totwritten > 0) c->lastinteraction = time(NULL);
    if (listLength(c->reply) == 0) {
        c->sentlen = 0;
        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
    }
}

关于client library的实现,可按照前面介绍的格式自己实现,也可以阅读现有的client library来加深理解。