Redis源码剖析——ziplist的实现
有序集合对象
ziplist为Redis中的压缩列表,是列表键和哈希键的底层实现之一,用于存储长度短的字符串和小整数。ziplist采用一段连续的内存来存储节点
ziplist的表示
因为ziplist的数据结构的长度是变化的所有没有特定的结构体,ziplist在内存中的布局如下
entry也是不定长的,没有特定的结构体,entry在内存中的布局如下
previous_entry_length的长度可以为1或者5字节,具体解释如下
1. 前一节点长度小于254字节,previous_entry_length长度为1字节,长度就保存在previous_entry_length中
2. 前一节点长度大于等于254字节,previous_entry_length长度为5,第一字节为0xFE,后面四个字节保存实际长度
encoding的长度可以为1、2、5字节,具体解释如下
表示编码类型的一些宏定义如下
// ziplist 末端标识符,以及 5 字节长长度标识符
#define ZIP_END 255
#define ZIP_BIGLEN 254
// 字符串编码和整数编码的掩码
#define ZIP_STR_MASK 0xc0
#define ZIP_INT_MASK 0x30
// 字符串编码类型
#define ZIP_STR_06B (0 << 6)
#define ZIP_STR_14B (1 << 6)
#define ZIP_STR_32B (2 << 6)
// 整数编码类型
#define ZIP_INT_16B (0xc0 | 0<<4)
#define ZIP_INT_32B (0xc0 | 1<<4)
#define ZIP_INT_64B (0xc0 | 2<<4)
#define ZIP_INT_24B (0xc0 | 3<<4)
#define ZIP_INT_8B 0xfe
为了复用一些代码,比如提取previous_entry_length,encoding求压缩列表大小等信息,一些宏定义如下
// 用于取出 bytes 属性的现有值,或者为 bytes 属性赋予新值
#define ZIPLIST_BYTES(zl) (*((uint32_t*)(zl)))
// 定位到 ziplist 的 offset 属性,该属性记录了到达表尾节点的偏移量
// 用于取出 offset 属性的现有值,或者为 offset 属性赋予新值
#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))
// 定位到 ziplist 的 length 属性,该属性记录了 ziplist 包含的节点数量
// 用于取出 length 属性的现有值,或者为 length 属性赋予新值
#define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))
// 返回 ziplist 表头的大小
#define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))
// 返回指向 ziplist 第一个节点(的起始位置)的指针
#define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE)
// 返回指向 ziplist 最后一个节点(的起始位置)的指针
#define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)))
// 返回指向 ziplist 末端 ZIP_END (的起始位置)的指针
#define ZIPLIST_ENTRY_END(zl) ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)
还有一个zlentry结构体,前几天我一直把它理解为就是ziplist里的entry节点,所以一直看不懂代码到底怎么回事,现在才恍然大悟……
// zlentry 不是ziplist中的节点,而是保存信息的结构--
typedef struct zlentry {
// prevrawlen :前置节点的长度
// prevrawlensize :编码 prevrawlen 所需的字节大小
unsigned int prevrawlensize, prevrawlen;
// len :当前节点值的长度
// lensize :编码 len 所需的字节大小
unsigned int lensize, len;
// 当前节点 header 的大小
// 等于 prevrawlensize + lensize
unsigned int headersize;
// 当前节点值所使用的编码类型
unsigned char encoding;
// 指向当前节点的指针
unsigned char *p;
} zlentry;
此结构体中prevrawlensize和prevrawlen解析了entry节点中的previous_entry_length
lensize表示entry节点中的encoding的长度
len表示entry节点中字符串或者整数的长度
headersize(表示节点头部的大小) = prevrawlensize + lensize
encoding存储entry的编码
p指向entry节点起点
给定节点p,将节点信息保存在结构体中的函数如下
static zlentry zipEntry(unsigned char *p) {
zlentry e;
// e.prevrawlensize 保存着编码前一个节点的长度所需的字节数
// e.prevrawlen 保存着前一个节点的长度
ZIP_DECODE_PREVLEN(p, e.prevrawlensize, e.prevrawlen);
// p + e.prevrawlensize 将指针移动到列表节点本身
// e.encoding 保存着节点值的编码类型
// e.lensize 保存着编码节点值长度所需的字节数
// e.len 保存着节点值的长度
ZIP_DECODE_LENGTH(p + e.prevrawlensize, e.encoding, e.lensize, e.len);
// 计算头结点的字节数
e.headersize = e.prevrawlensize + e.lensize;
// 记录指针
e.p = p;
return e;
}
// 解码节点,取出前置节点的长度和编码该长度需要的字节数
#define ZIP_DECODE_PREVLEN(ptr, prevlensize, prevlen) do { \
\
/* 先计算被编码长度值的字节数 */ \
ZIP_DECODE_PREVLENSIZE(ptr, prevlensize); \
\
/* 再根据编码字节数来取出长度值 */ \
if ((prevlensize) == 1) { \
(prevlen) = (ptr)[0]; \
} else if ((prevlensize) == 5) { \
assert(sizeof((prevlensize)) == 4); \
memcpy(&(prevlen), ((char*)(ptr)) + 1, 4); \
memrev32ifbe(&prevlen); \
} \
} while(0);
// 解码ptr所指节点的编码类型,encoding域的长度,以及字符串或整数的长度
#define ZIP_DECODE_LENGTH(ptr, encoding, lensize, len) do { \
\
/* 取出值的编码类型 */ \
ZIP_ENTRY_ENCODING((ptr), (encoding)); \
\
// 字符串编码
// 由解码过程可以看出,存储长度采用的大端编码,高位在内存低地址
if ((encoding) < ZIP_STR_MASK) { \
if ((encoding) == ZIP_STR_06B) { \
(lensize) = 1; \
(len) = (ptr)[0] & 0x3f; \
} else if ((encoding) == ZIP_STR_14B) { \
(lensize) = 2; \
(len) = (((ptr)[0] & 0x3f) << 8) | (ptr)[1]; \
} else if (encoding == ZIP_STR_32B) { \
(lensize) = 5; \
(len) = ((ptr)[1] << 24) | \
((ptr)[2] << 16) | \
((ptr)[3] << 8) | \
((ptr)[4]); \
} else { \
assert(NULL); \
} \
\
/* 整数编码 */ \
} else { \
(lensize) = 1; \
(len) = zipIntSize(encoding); \
} \
} while(0);
基本函数
函数 | 作用 | 复杂度 |
---|---|---|
ziplistNew | 新建一个压缩列表 | O(1) |
ziplistPush | 将字符串压入ziplist | O(N^2) |
ziplistIndex | 根据给定索引返回节点 | O(N) |
ziplistInsert | 将节点插入给定位置 | O(N^2) |
ziplistDelete | 删除给定节点 | O(N^2) |
ziplistNew
unsigned char *ziplistNew(void) {
// 表头加末端大小
unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
// 为表头和表末端分配空间
unsigned char *zl = zmalloc(bytes);
// 初始化表属性
ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
ZIPLIST_LENGTH(zl) = 0;
// 设置表末端
zl[bytes-1] = ZIP_END;
return zl;
}
ziplistPush
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
// 根据 where 参数的值,决定将值推入到表头还是表尾
unsigned char *p;
p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
// 返回添加新值后的 ziplist
// T = O(N^2)
return __ziplistInsert(zl,p,s,slen);
}
核心的函数__ziplistInsert如下
// 节点插入p之前
static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
// 记录当前 ziplist 的长度
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen, prevlen = 0;
size_t offset;
int nextdiff = 0;
unsigned char encoding = 0;
long long value = 123456789;
zlentry entry, tail;
if (p[0] != ZIP_END) {
// 如果 p[0] 不指向列表末端,说明列表非空,并且 p 正指向列表的其中一个节点
// 那么取出 p 所指向节点的信息,并将它保存到 entry 结构中
// 然后用 prevlen 变量记录前置节点的长度
// (当插入新节点之后 p 所指向的节点就成了新节点的前置节点)
entry = zipEntry(p);
prevlen = entry.prevrawlen;
} else {
// 如果 p 指向表尾末端,那么程序需要检查列表是否为:
// 1)如果 ptail 也指向 ZIP_END ,那么列表为空;
// 2)如果列表不为空,那么 ptail 将指向列表的最后一个节点。
unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
if (ptail[0] != ZIP_END) {
// 表尾节点为新节点的前置节点
// 取出表尾节点的长度
prevlen = zipRawEntryLength(ptail);
}
}
// 尝试看能否将输入字符串转换为整数,如果成功的话:
// 1)value 将保存转换后的整数值
// 2)encoding 则保存适用于 value 的编码方式
// 无论使用什么编码, reqlen 都保存节点值的长度
if (zipTryEncoding(s,slen,&value,&encoding)) {
reqlen = zipIntSize(encoding);
} else {
reqlen = slen;
}
// 计算编码前置节点的长度所需的大小
reqlen += zipPrevEncodeLength(NULL,prevlen);
// 计算编码当前节点encoding值所需的大小
reqlen += zipEncodeLength(NULL,encoding,slen);
// 只要新节点不是被添加到列表末端,
// 那么程序就需要检查看 p 所指向的节点(的 header)能否编码新节点的长度。
// nextdiff 保存了新旧编码之间的字节大小差,如果这个值大于 0
// 那么说明需要对 p 所指向的节点(的 header )进行扩展
nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
// 因为重分配空间可能会改变 zl 的地址
// 所以在分配之前,需要记录 zl 到 p 的偏移量,然后在分配之后依靠偏移量还原 p
offset = p-zl;
// curlen 是 ziplist 原来的长度
// reqlen 是整个新节点的长度
// nextdiff 是新节点的后继节点扩展 header 的长度(要么 0 字节,要么 4 个字节)
zl = ziplistResize(zl,curlen+reqlen+nextdiff);
p = zl+offset;
if (p[0] != ZIP_END) {
// 新元素之后还有节点,因为新元素的加入,需要对这些原有节点进行调整
// 移动现有元素,为新元素的插入空间腾出位置
memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);
// 将新节点的长度编码至后置节点
// p+reqlen 定位到后置节点
// reqlen 是新节点的长度
zipPrevEncodeLength(p+reqlen,reqlen);
// 更新到达表尾的偏移量,将新节点的长度也算上
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);
// 如果新节点的后面有多于一个节点
// 那么程序需要将 nextdiff 记录的字节数也计算到表尾偏移量中
// 这样才能让表尾偏移量正确对齐表尾节点
tail = zipEntry(p+reqlen);
if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
} else {
// 新元素是新的表尾节点
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
}
// 当 nextdiff != 0 时,新节点的后继节点的(header 部分)长度已经被改变,
// 所以需要连锁更新后续的节点
if (nextdiff != 0) {
offset = p-zl;
zl = __ziplistCascadeUpdate(zl,p+reqlen);
p = zl+offset;
}
// 一切搞定,将前置节点的长度写入新节点的 header
p += zipPrevEncodeLength(p,prevlen);
// 将节点值的长度写入新节点的 header
p += zipEncodeLength(p,encoding,slen);
// 写入节点值
if (ZIP_IS_STR(encoding)) {
memcpy(p,s,slen);
} else {
zipSaveInteger(p,value,encoding);
}
// 更新列表的节点数量计数器
ZIPLIST_INCR_LENGTH(zl,1);
return zl;
}
__ziplistInsert的步骤如下: 1.
求新节点需要的编码长度,以及当前p所指节点的previous_entry_length能否装的下新节点长度,若装不下,则p所指节点头部需要扩展,要多分配内存 2.
重分配内存 3.
移动节点,中间预留出容纳新节点的空间 4.
更新之前p所指节点(即插入节点的后续节点)的头部 5.
头部扩展后可能会导致连锁更新 6.
最后写入节点,更新节点数量
连锁更新
插入新节点后若新节点比较长,previous_entry_length需要5字节编码,而新节点后的节点X原来是1字节编码的,头部会扩展为5字节,整个X节点的长度恰好变大为需要5字节编码,而X的后续节点Y之前编码X的长度只需要1字节,装不下又会导致Y的头部扩展………往复下去
因为前置节点长度大于等于254,previous_entry_length才会采用5字节编码,所以当有连续的250-253字节长度的节点时才会导致连锁更新
,概率不会很大,并不会因为连锁更新导致非常频繁的分配拷贝释放内存
插入也会导致连锁缩减,但是我们忽略这种情况。因为缩减并不是必须的,并且一个节点也只多占用了4字节内存。没有必要为了缩减几字节的内存而进行内存重分配
连锁更新代码如下:
static unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
size_t offset, noffset, extra;
unsigned char *np;
zlentry cur, next;
while (p[0] != ZIP_END) {
// 计算编码当前节点的长度所需的字节数
// 将 p 所指向的节点的信息保存到 cur 结构中
cur = zipEntry(p);
// 当前节点的长度
rawlen = cur.headersize + cur.len;
rawlensize = zipPrevEncodeLength(NULL,rawlen);
// 如果已经没有后续空间需要更新了,跳出
if (p[rawlen] == ZIP_END) break;
// 取出后续节点的信息,保存到 next 结构中
next = zipEntry(p+rawlen);
// 后续节点编码当前节点的空间已经足够,无须再进行任何处理,跳出
// 可以证明,只要遇到一个空间足够的节点,
// 那么这个节点之后的所有节点的空间都是足够的
if (next.prevrawlen == rawlen) break;
if (next.prevrawlensize < rawlensize) {
// 执行到这里,表示 next 空间的大小不足以编码 cur 的长度
// 所以程序需要对 next 节点的(header 部分)空间进行扩展
// 记录 p 的偏移量
offset = p-zl;
// 计算需要增加的节点数量
extra = rawlensize-next.prevrawlensize;
// 扩展 zl 的大小
zl = ziplistResize(zl,curlen+extra);
// 还原指针 p
p = zl+offset;
// 记录下一节点的偏移量
np = p+rawlen;
noffset = np-zl;
// 当 next 节点不是表尾节点时,更新列表到表尾节点的偏移量
if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
}
// 向后移动 cur 节点之后的数据,为 cur 的新 header 腾出空间
memmove(np+rawlensize,
np+next.prevrawlensize,
curlen-noffset-next.prevrawlensize-1);
// 将新的前一节点长度值编码进新的 next 节点的 header
zipPrevEncodeLength(np,rawlen);
// 移动指针,继续处理下个节点
p += rawlen;
curlen += extra;
} else {
if (next.prevrawlensize > rawlensize) {
// 执行到这里,说明 next 节点编码前置节点的 header 空间有 5 字节
// 而编码 rawlen 只需要 1 字节
// 但是程序不会对 next 进行缩小,
// 所以这里只将 rawlen 写入 5 字节的 header 中就算了。
zipPrevEncodeLengthForceLarge(p+rawlen,rawlen);
} else {
// 运行到这里,
// 说明 cur 节点的长度正好可以编码到 next 节点的 header 中
zipPrevEncodeLength(p+rawlen,rawlen);
}
// 到这里此节点previous_entry_length长度不需要扩展
// 连锁更新结束
break;
}
}
return zl;
}
连锁更新的思想就是对节点依次进行检测,如果当前节点X的previous_entry_length足以容纳下前一个节点的长度则更新终止,如果不足则扩展X,然后继续向后进行
ziplistIndex
这个函数简单,因为每个节点存储了前一个节点的长度
// 根据索引返回节点
unsigned char *ziplistIndex(unsigned char *zl, int index) {
unsigned char *p;
zlentry entry;
// 处理负数索引
if (index < 0) {
// 将索引转换为正数
index = (-index)-1;
// 定位到表尾节点
p = ZIPLIST_ENTRY_TAIL(zl);
// 如果列表不为空,那么。。。
if (p[0] != ZIP_END) {
// 从表尾向表头遍历
entry = zipEntry(p);
while (entry.prevrawlen > 0 && index--) {
// 前移指针
p -= entry.prevrawlen;
entry = zipEntry(p);
}
}
// 处理正数索引
} else {
// 定位到表头节点
p = ZIPLIST_ENTRY_HEAD(zl);
while (p[0] != ZIP_END && index--) {
// 后移指针
p += zipRawEntryLength(p);
}
}
// 返回结果
return (p[0] == ZIP_END || index > 0) ? NULL : p;
}
ziplistInsert
之前介绍过核心函数__ziplistInsert,转调用此函数即可
unsigned char *ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
return __ziplistInsert(zl,p,s,slen);
}
ziplistDelete
unsigned char *ziplistDelete(unsigned char *zl, unsigned char **p) {
// 因为 __ziplistDelete 时会对 zl 进行内存重分配
// 而内存充分配可能会改变 zl 的内存地址
// 所以这里需要记录到达 *p 的偏移量
size_t offset = *p-zl;
zl = __ziplistDelete(zl,*p,1);
*p = zl+offset;
return zl;
}
核心实现函数为__ziplistDelete
下面介绍此函数,之前介绍了插入,删除操作就很容易理解了
如果删除节点后的previous_entry_length不足以容纳它现在的前置节点了,则会导致连锁更新
static unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
unsigned int i, totlen, deleted = 0;
size_t offset;
int nextdiff = 0;
zlentry first, tail;
// 计算被删除节点总共占用的内存字节数
// 以及被删除节点的总个数
first = zipEntry(p);
for (i = 0; p[0] != ZIP_END && i < num; i++) {
p += zipRawEntryLength(p);
deleted++;
}
// totlen 是所有被删除节点总共占用的内存字节数
totlen = p-first.p;
if (totlen > 0) {
if (p[0] != ZIP_END) {
// 执行这里,表示被删除节点之后仍然有节点存在
// 因为位于被删除范围之后的第一个节点的 header 部分的大小
// 可能容纳不了新的前置节点,所以需要计算新旧前置节点之间的字节数差
nextdiff = zipPrevLenByteDiff(p,first.prevrawlen);
// 如果有需要的话,将指针 p 后退 nextdiff 字节,为新 header 空出空间
p -= nextdiff;
// 将 first 的前置节点的长度编码至 p 中
zipPrevEncodeLength(p,first.prevrawlen);
// 更新到达表尾的偏移量
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))-totlen);
// 如果被删除节点之后,有多于一个节点
// 那么程序需要将 nextdiff 记录的字节数也计算到表尾偏移量中
// 这样才能让表尾偏移量正确对齐表尾节点
tail = zipEntry(p);
if (p[tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
// 从表尾向表头移动数据,覆盖被删除节点的数据
memmove(first.p,p,
intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1);
} else {
// 执行这里,表示被删除节点之后已经没有其他节点了
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe((first.p-zl)-first.prevrawlen);
}
// 缩小并更新 ziplist 的长度
offset = first.p-zl;
zl = ziplistResize(zl, intrev32ifbe(ZIPLIST_BYTES(zl))-totlen+nextdiff);
ZIPLIST_INCR_LENGTH(zl,-deleted);
p = zl+offset;
// 如果 p 所指向的节点的大小已经变更,那么进行级联更新
// 检查 p 之后的所有节点是否符合 ziplist 的编码要求
if (nextdiff != 0)
zl = __ziplistCascadeUpdate(zl,p);
}
return zl;
}
小节
ziplist采用连续内存块存储节点,节约了内存,释放内存只需一次即可,用于存储长度小的字符串和小整数。但是每次的插入和删除都将导致内存重分配,可能导致连锁更新(概率不大)
上一篇: php原生分页
推荐阅读
-
Android开发实现高仿优酷的客户端图片左右滑动切换功能实例【附源码下载】
-
Java 源码剖析如何实现动态代理
-
jQuery插件FusionCharts实现的2D柱状图效果示例【附demo源码下载】
-
Android 实现IOS 滚轮选择控件的实例(源码下载)
-
ubuntu 系统上为php加上redis 扩展的实现方法
-
CSS3实现的闪烁跳跃进度条示例(附源码)
-
使用DevExpress的PdfViewer实现PDF打开、预览、另存为、打印(附源码下载)
-
基于DevExpress的SpreadsheetControl实现对Excel的打开、预览、保存、另存为、打印(附源码下载)
-
php+ajax实现带进度条的大数据排队导出思路以及源码
-
Java Redis分布式锁的正确实现方式详解