欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Redis源码剖析——ziplist的实现

程序员文章站 2022-05-19 18:20:58
...

有序集合对象

ziplist为Redis中的压缩列表,是列表键和哈希键的底层实现之一,用于存储长度短的字符串和小整数。ziplist采用一段连续的内存来存储节点

ziplist的表示

因为ziplist的数据结构的长度是变化的所有没有特定的结构体,ziplist在内存中的布局如下
Redis源码剖析——ziplist的实现

entry也是不定长的,没有特定的结构体,entry在内存中的布局如下
Redis源码剖析——ziplist的实现

previous_entry_length的长度可以为1或者5字节,具体解释如下
1. 前一节点长度小于254字节,previous_entry_length长度为1字节,长度就保存在previous_entry_length中
2. 前一节点长度大于等于254字节,previous_entry_length长度为5,第一字节为0xFE,后面四个字节保存实际长度

encoding的长度可以为1、2、5字节,具体解释如下
Redis源码剖析——ziplist的实现

表示编码类型的一些宏定义如下

// ziplist 末端标识符,以及 5 字节长长度标识符
#define ZIP_END 255
#define ZIP_BIGLEN 254

// 字符串编码和整数编码的掩码
#define ZIP_STR_MASK 0xc0
#define ZIP_INT_MASK 0x30

// 字符串编码类型
#define ZIP_STR_06B (0 << 6)
#define ZIP_STR_14B (1 << 6)
#define ZIP_STR_32B (2 << 6)

// 整数编码类型
#define ZIP_INT_16B (0xc0 | 0<<4)
#define ZIP_INT_32B (0xc0 | 1<<4)
#define ZIP_INT_64B (0xc0 | 2<<4)
#define ZIP_INT_24B (0xc0 | 3<<4)
#define ZIP_INT_8B 0xfe

为了复用一些代码,比如提取previous_entry_length,encoding求压缩列表大小等信息,一些宏定义如下

// 用于取出 bytes 属性的现有值,或者为 bytes 属性赋予新值
#define ZIPLIST_BYTES(zl)       (*((uint32_t*)(zl)))
// 定位到 ziplist 的 offset 属性,该属性记录了到达表尾节点的偏移量
// 用于取出 offset 属性的现有值,或者为 offset 属性赋予新值
#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))
// 定位到 ziplist 的 length 属性,该属性记录了 ziplist 包含的节点数量
// 用于取出 length 属性的现有值,或者为 length 属性赋予新值
#define ZIPLIST_LENGTH(zl)      (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))
// 返回 ziplist 表头的大小
#define ZIPLIST_HEADER_SIZE     (sizeof(uint32_t)*2+sizeof(uint16_t))
// 返回指向 ziplist 第一个节点(的起始位置)的指针
#define ZIPLIST_ENTRY_HEAD(zl)  ((zl)+ZIPLIST_HEADER_SIZE)
// 返回指向 ziplist 最后一个节点(的起始位置)的指针
#define ZIPLIST_ENTRY_TAIL(zl)  ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)))
// 返回指向 ziplist 末端 ZIP_END (的起始位置)的指针
#define ZIPLIST_ENTRY_END(zl)   ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)

还有一个zlentry结构体,前几天我一直把它理解为就是ziplist里的entry节点,所以一直看不懂代码到底怎么回事,现在才恍然大悟……

// zlentry 不是ziplist中的节点,而是保存信息的结构--
typedef struct zlentry {

    // prevrawlen :前置节点的长度
    // prevrawlensize :编码 prevrawlen 所需的字节大小
    unsigned int prevrawlensize, prevrawlen;

    // len :当前节点值的长度
    // lensize :编码 len 所需的字节大小
    unsigned int lensize, len;

    // 当前节点 header 的大小
    // 等于 prevrawlensize + lensize
    unsigned int headersize;  

    // 当前节点值所使用的编码类型
    unsigned char encoding;

    // 指向当前节点的指针
    unsigned char *p;

} zlentry;

此结构体中prevrawlensize和prevrawlen解析了entry节点中的previous_entry_length
lensize表示entry节点中的encoding的长度
len表示entry节点中字符串或者整数的长度
headersize(表示节点头部的大小) = prevrawlensize + lensize
encoding存储entry的编码
p指向entry节点起点

给定节点p,将节点信息保存在结构体中的函数如下

static zlentry zipEntry(unsigned char *p) {
    zlentry e;

    // e.prevrawlensize 保存着编码前一个节点的长度所需的字节数
    // e.prevrawlen 保存着前一个节点的长度
    ZIP_DECODE_PREVLEN(p, e.prevrawlensize, e.prevrawlen);

    // p + e.prevrawlensize 将指针移动到列表节点本身
    // e.encoding 保存着节点值的编码类型
    // e.lensize 保存着编码节点值长度所需的字节数
    // e.len 保存着节点值的长度
    ZIP_DECODE_LENGTH(p + e.prevrawlensize, e.encoding, e.lensize, e.len);

    // 计算头结点的字节数
    e.headersize = e.prevrawlensize + e.lensize;

    // 记录指针
    e.p = p;

    return e;
}


// 解码节点,取出前置节点的长度和编码该长度需要的字节数
#define ZIP_DECODE_PREVLEN(ptr, prevlensize, prevlen) do {                     \
                                                                               \
    /* 先计算被编码长度值的字节数 */                                           \
    ZIP_DECODE_PREVLENSIZE(ptr, prevlensize);                                  \
                                                                               \
    /* 再根据编码字节数来取出长度值 */                                         \
    if ((prevlensize) == 1) {                                                  \
        (prevlen) = (ptr)[0];                                                  \
    } else if ((prevlensize) == 5) {                                           \
        assert(sizeof((prevlensize)) == 4);                                    \
        memcpy(&(prevlen), ((char*)(ptr)) + 1, 4);                             \
        memrev32ifbe(&prevlen);                                                \
    }                                                                          \
} while(0);


// 解码ptr所指节点的编码类型,encoding域的长度,以及字符串或整数的长度
#define ZIP_DECODE_LENGTH(ptr, encoding, lensize, len) do {                    \
                                                                               \
    /* 取出值的编码类型 */                                                     \
    ZIP_ENTRY_ENCODING((ptr), (encoding));                                     \
                                                                               \
    // 字符串编码
    // 由解码过程可以看出,存储长度采用的大端编码,高位在内存低地址
    if ((encoding) < ZIP_STR_MASK) {                                           \
        if ((encoding) == ZIP_STR_06B) {                                       \
            (lensize) = 1;                                                     \
            (len) = (ptr)[0] & 0x3f;                                           \
        } else if ((encoding) == ZIP_STR_14B) {                                \
            (lensize) = 2;                                                     \
            (len) = (((ptr)[0] & 0x3f) << 8) | (ptr)[1];                       \
        } else if (encoding == ZIP_STR_32B) {                                  \
            (lensize) = 5;                                                     \
            (len) = ((ptr)[1] << 24) |                                         \
                    ((ptr)[2] << 16) |                                         \
                    ((ptr)[3] <<  8) |                                         \
                    ((ptr)[4]);                                                \
        } else {                                                               \
            assert(NULL);                                                      \
        }                                                                      \
                                                                               \
    /* 整数编码 */                                                             \
    } else {                                                                   \
        (lensize) = 1;                                                         \
        (len) = zipIntSize(encoding);                                          \
    }                                                                          \
} while(0);

基本函数

函数 作用 复杂度
ziplistNew 新建一个压缩列表 O(1)
ziplistPush 将字符串压入ziplist O(N^2)
ziplistIndex 根据给定索引返回节点 O(N)
ziplistInsert 将节点插入给定位置 O(N^2)
ziplistDelete 删除给定节点 O(N^2)

ziplistNew

unsigned char *ziplistNew(void) {

    // 表头加末端大小
    unsigned int bytes = ZIPLIST_HEADER_SIZE+1;

    // 为表头和表末端分配空间
    unsigned char *zl = zmalloc(bytes);

    // 初始化表属性
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
    ZIPLIST_LENGTH(zl) = 0;

    // 设置表末端
    zl[bytes-1] = ZIP_END;

    return zl;
}

ziplistPush

unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {

    // 根据 where 参数的值,决定将值推入到表头还是表尾
    unsigned char *p;
    p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);

    // 返回添加新值后的 ziplist
    // T = O(N^2)
    return __ziplistInsert(zl,p,s,slen);
}

核心的函数__ziplistInsert如下

// 节点插入p之前
static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    // 记录当前 ziplist 的长度
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; 
    zlentry entry, tail;

    if (p[0] != ZIP_END) {
        // 如果 p[0] 不指向列表末端,说明列表非空,并且 p 正指向列表的其中一个节点
        // 那么取出 p 所指向节点的信息,并将它保存到 entry 结构中
        // 然后用 prevlen 变量记录前置节点的长度
        // (当插入新节点之后 p 所指向的节点就成了新节点的前置节点)
        entry = zipEntry(p);
        prevlen = entry.prevrawlen;
    } else {
        // 如果 p 指向表尾末端,那么程序需要检查列表是否为:
        // 1)如果 ptail 也指向 ZIP_END ,那么列表为空;
        // 2)如果列表不为空,那么 ptail 将指向列表的最后一个节点。
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
            // 表尾节点为新节点的前置节点
            // 取出表尾节点的长度
            prevlen = zipRawEntryLength(ptail);
        }
    }

    // 尝试看能否将输入字符串转换为整数,如果成功的话:
    // 1)value 将保存转换后的整数值
    // 2)encoding 则保存适用于 value 的编码方式
    // 无论使用什么编码, reqlen 都保存节点值的长度
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        reqlen = zipIntSize(encoding);
    } else { 
        reqlen = slen;
    }
    // 计算编码前置节点的长度所需的大小
    reqlen += zipPrevEncodeLength(NULL,prevlen);
    // 计算编码当前节点encoding值所需的大小
    reqlen += zipEncodeLength(NULL,encoding,slen);

    // 只要新节点不是被添加到列表末端,
    // 那么程序就需要检查看 p 所指向的节点(的 header)能否编码新节点的长度。
    // nextdiff 保存了新旧编码之间的字节大小差,如果这个值大于 0 
    // 那么说明需要对 p 所指向的节点(的 header )进行扩展
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;

    // 因为重分配空间可能会改变 zl 的地址
    // 所以在分配之前,需要记录 zl 到 p 的偏移量,然后在分配之后依靠偏移量还原 p 
    offset = p-zl;
    // curlen 是 ziplist 原来的长度
    // reqlen 是整个新节点的长度
    // nextdiff 是新节点的后继节点扩展 header 的长度(要么 0 字节,要么 4 个字节)
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    p = zl+offset;

    if (p[0] != ZIP_END) {
        // 新元素之后还有节点,因为新元素的加入,需要对这些原有节点进行调整

        // 移动现有元素,为新元素的插入空间腾出位置
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

        // 将新节点的长度编码至后置节点
        // p+reqlen 定位到后置节点
        // reqlen 是新节点的长度
        zipPrevEncodeLength(p+reqlen,reqlen);

        // 更新到达表尾的偏移量,将新节点的长度也算上
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

        // 如果新节点的后面有多于一个节点
        // 那么程序需要将 nextdiff 记录的字节数也计算到表尾偏移量中
        // 这样才能让表尾偏移量正确对齐表尾节点
        tail = zipEntry(p+reqlen);
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
        // 新元素是新的表尾节点
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

    // 当 nextdiff != 0 时,新节点的后继节点的(header 部分)长度已经被改变,
    // 所以需要连锁更新后续的节点
    if (nextdiff != 0) {
        offset = p-zl;
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

    // 一切搞定,将前置节点的长度写入新节点的 header
    p += zipPrevEncodeLength(p,prevlen);
    // 将节点值的长度写入新节点的 header
    p += zipEncodeLength(p,encoding,slen);
    // 写入节点值
    if (ZIP_IS_STR(encoding)) {
        memcpy(p,s,slen);
    } else {
        zipSaveInteger(p,value,encoding);
    }

    // 更新列表的节点数量计数器
    ZIPLIST_INCR_LENGTH(zl,1);

    return zl;
}

__ziplistInsert的步骤如下:
1.求新节点需要的编码长度,以及当前p所指节点的previous_entry_length能否装的下新节点长度,若装不下,则p所指节点头部需要扩展,要多分配内存
2. 重分配内存
3. 移动节点,中间预留出容纳新节点的空间
4.更新之前p所指节点(即插入节点的后续节点)的头部
5. 头部扩展后可能会导致连锁更新
6. 最后写入节点,更新节点数量

连锁更新

插入新节点后若新节点比较长,previous_entry_length需要5字节编码,而新节点后的节点X原来是1字节编码的,头部会扩展为5字节,整个X节点的长度恰好变大为需要5字节编码,而X的后续节点Y之前编码X的长度只需要1字节,装不下又会导致Y的头部扩展………往复下去

因为前置节点长度大于等于254,previous_entry_length才会采用5字节编码,所以当有连续的250-253字节长度的节点时才会导致连锁更新,概率不会很大,并不会因为连锁更新导致非常频繁的分配拷贝释放内存

插入也会导致连锁缩减,但是我们忽略这种情况。因为缩减并不是必须的,并且一个节点也只多占用了4字节内存。没有必要为了缩减几字节的内存而进行内存重分配

连锁更新代码如下:

static unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
    size_t offset, noffset, extra;
    unsigned char *np;
    zlentry cur, next;

    while (p[0] != ZIP_END) {
        // 计算编码当前节点的长度所需的字节数
        // 将 p 所指向的节点的信息保存到 cur 结构中 
        cur = zipEntry(p);
        // 当前节点的长度
        rawlen = cur.headersize + cur.len;
        rawlensize = zipPrevEncodeLength(NULL,rawlen);

        // 如果已经没有后续空间需要更新了,跳出
        if (p[rawlen] == ZIP_END) break;

        // 取出后续节点的信息,保存到 next 结构中
        next = zipEntry(p+rawlen);

        // 后续节点编码当前节点的空间已经足够,无须再进行任何处理,跳出
        // 可以证明,只要遇到一个空间足够的节点,
        // 那么这个节点之后的所有节点的空间都是足够的
        if (next.prevrawlen == rawlen) break;

        if (next.prevrawlensize < rawlensize) {

            // 执行到这里,表示 next 空间的大小不足以编码 cur 的长度
            // 所以程序需要对 next 节点的(header 部分)空间进行扩展

            // 记录 p 的偏移量
            offset = p-zl;
            // 计算需要增加的节点数量
            extra = rawlensize-next.prevrawlensize;
            // 扩展 zl 的大小
            zl = ziplistResize(zl,curlen+extra);
            // 还原指针 p
            p = zl+offset;

            // 记录下一节点的偏移量
            np = p+rawlen;
            noffset = np-zl;

            // 当 next 节点不是表尾节点时,更新列表到表尾节点的偏移量
            if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
                ZIPLIST_TAIL_OFFSET(zl) =
                    intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
            }

            // 向后移动 cur 节点之后的数据,为 cur 的新 header 腾出空间
            memmove(np+rawlensize,
                np+next.prevrawlensize,
                curlen-noffset-next.prevrawlensize-1);
            // 将新的前一节点长度值编码进新的 next 节点的 header
            zipPrevEncodeLength(np,rawlen);

            // 移动指针,继续处理下个节点
            p += rawlen;
            curlen += extra;
        } else {
            if (next.prevrawlensize > rawlensize) {
                // 执行到这里,说明 next 节点编码前置节点的 header 空间有 5 字节
                // 而编码 rawlen 只需要 1 字节
                // 但是程序不会对 next 进行缩小,
                // 所以这里只将 rawlen 写入 5 字节的 header 中就算了。
                zipPrevEncodeLengthForceLarge(p+rawlen,rawlen);
            } else {
                // 运行到这里,
                // 说明 cur 节点的长度正好可以编码到 next 节点的 header 中
                zipPrevEncodeLength(p+rawlen,rawlen);
            }
            // 到这里此节点previous_entry_length长度不需要扩展
            // 连锁更新结束
            break;
        }
    }

    return zl;
}

连锁更新的思想就是对节点依次进行检测,如果当前节点X的previous_entry_length足以容纳下前一个节点的长度则更新终止,如果不足则扩展X,然后继续向后进行

ziplistIndex

这个函数简单,因为每个节点存储了前一个节点的长度

// 根据索引返回节点
unsigned char *ziplistIndex(unsigned char *zl, int index) {
    unsigned char *p;
    zlentry entry;
    // 处理负数索引
    if (index < 0) {
        // 将索引转换为正数
        index = (-index)-1;      
        // 定位到表尾节点
        p = ZIPLIST_ENTRY_TAIL(zl);
        // 如果列表不为空,那么。。。
        if (p[0] != ZIP_END) {
            // 从表尾向表头遍历
            entry = zipEntry(p);
            while (entry.prevrawlen > 0 && index--) {
                // 前移指针
                p -= entry.prevrawlen;
                entry = zipEntry(p);
            }
        }
    // 处理正数索引
    } else {
        // 定位到表头节点
        p = ZIPLIST_ENTRY_HEAD(zl);
        while (p[0] != ZIP_END && index--) {
            // 后移指针
            p += zipRawEntryLength(p);
        }
    }

    // 返回结果
    return (p[0] == ZIP_END || index > 0) ? NULL : p;
}

ziplistInsert

之前介绍过核心函数__ziplistInsert,转调用此函数即可

unsigned char *ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    return __ziplistInsert(zl,p,s,slen);
}

ziplistDelete

unsigned char *ziplistDelete(unsigned char *zl, unsigned char **p) {

    // 因为 __ziplistDelete 时会对 zl 进行内存重分配
    // 而内存充分配可能会改变 zl 的内存地址
    // 所以这里需要记录到达 *p 的偏移量
    size_t offset = *p-zl;
    zl = __ziplistDelete(zl,*p,1);

    *p = zl+offset;

    return zl;
}

核心实现函数为__ziplistDelete
下面介绍此函数,之前介绍了插入,删除操作就很容易理解了
如果删除节点后的previous_entry_length不足以容纳它现在的前置节点了,则会导致连锁更新

static unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
    unsigned int i, totlen, deleted = 0;
    size_t offset;
    int nextdiff = 0;
    zlentry first, tail;

    // 计算被删除节点总共占用的内存字节数
    // 以及被删除节点的总个数
    first = zipEntry(p);
    for (i = 0; p[0] != ZIP_END && i < num; i++) {
        p += zipRawEntryLength(p);
        deleted++;
    }

    // totlen 是所有被删除节点总共占用的内存字节数
    totlen = p-first.p;
    if (totlen > 0) {
        if (p[0] != ZIP_END) {
            // 执行这里,表示被删除节点之后仍然有节点存在

            // 因为位于被删除范围之后的第一个节点的 header 部分的大小
            // 可能容纳不了新的前置节点,所以需要计算新旧前置节点之间的字节数差
            nextdiff = zipPrevLenByteDiff(p,first.prevrawlen);
            // 如果有需要的话,将指针 p 后退 nextdiff 字节,为新 header 空出空间
            p -= nextdiff;
            // 将 first 的前置节点的长度编码至 p 中
            zipPrevEncodeLength(p,first.prevrawlen);

            // 更新到达表尾的偏移量
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))-totlen);

            // 如果被删除节点之后,有多于一个节点
            // 那么程序需要将 nextdiff 记录的字节数也计算到表尾偏移量中
            // 这样才能让表尾偏移量正确对齐表尾节点
            tail = zipEntry(p);
            if (p[tail.headersize+tail.len] != ZIP_END) {
                ZIPLIST_TAIL_OFFSET(zl) =
                   intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
            }

            // 从表尾向表头移动数据,覆盖被删除节点的数据
            memmove(first.p,p,
                intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1);
        } else {
            // 执行这里,表示被删除节点之后已经没有其他节点了
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe((first.p-zl)-first.prevrawlen);
        }

        // 缩小并更新 ziplist 的长度
        offset = first.p-zl;
        zl = ziplistResize(zl, intrev32ifbe(ZIPLIST_BYTES(zl))-totlen+nextdiff);
        ZIPLIST_INCR_LENGTH(zl,-deleted);
        p = zl+offset;

        // 如果 p 所指向的节点的大小已经变更,那么进行级联更新
        // 检查 p 之后的所有节点是否符合 ziplist 的编码要求
        if (nextdiff != 0)
            zl = __ziplistCascadeUpdate(zl,p);
    }

    return zl;
}

小节

ziplist采用连续内存块存储节点,节约了内存,释放内存只需一次即可,用于存储长度小的字符串和小整数。但是每次的插入和删除都将导致内存重分配,可能导致连锁更新(概率不大)

https://www.kancloud.cn/kancloud/redisbook/63856