Redis源码学习(一)跳跃表数据结构
昨天就开始了Redis源码的学习,一口气看了很多,其中数据结构相对较复杂的且感兴趣的那就是跳跃表了,源码都是C。
跳跃表由一个跳跃表控制头zskiplist跟节点zskiplistNode组成,其中zskiplistNode中含有level[]数组。
typedef struct zskiplistNode {
// 成员对象
robj *obj;
// 分值
double score;
// 后退指针
struct zskiplistNode *backward;
// 层
struct zskiplistLevel {
// 前进指针
struct zskiplistNode *forward;
// 跨度
unsigned int span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
// 表头节点和表尾节点
struct zskiplistNode *header, *tail;
// 表中节点的数量
unsigned long length;
// 表中层数最大的节点的层数
int level;
} zskiplist;
网上挖来的图,看图结合结构体,整体很好理解。
下面从数据结构内置的函数分析开始
创建跳跃表 zskiplist *zslCreate(void);
先给zskiplist控制头分配空间,再赋初值,level=1;length=0;(从这里可以看出header不算在length内),再初始化表头节点。
zskiplistNode *zslCreateNode(int level, double score, robj *obj);函数声明可以看出,Node节点需要初始化的数据,然后返回该节点,实现很简单,无非申请空间,再赋初值(但其中的level[]仅仅为其申请level*sizeof(struct zskiplistLevel)长的空间)。
初始化表头节点后,为其level[]空间的forward跟span赋初值。再将表头节点的向后指针跟表尾指针赋值为null。
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
// 分配空间
zsl = zmalloc(sizeof(*zsl));
// 设置高度和起始层数
zsl->level = 1;
zsl->length = 0;
// 初始化表头节点
// T = O(1)
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
// 设置表尾
zsl->tail = NULL;
return zsl;
}
释放给定跳跃表 void zslFree(zskiplist *zsl);
先得到指向第一个节点的指针,即可以把表头节点释放(避免内存泄漏);
同理之后每次释放当前节点的前提先握有后一节点的指针;
当所有zslListNode释放后就可释放控制头zslList;
void zslFree(zskiplist *zsl) {
zskiplistNode *node = zsl->header->level[0].forward, *next;
// 释放表头
zfree(zsl->header);
// 释放表中所有节点
// T = O(N)
while(node) {
next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
// 释放跳跃表结构
zfree(zsl);
}
新节点的添加zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
这部分逻辑比较复杂,于是手动手工过程跟了一遍,假设上图情况,插入score值为2.5的节点
此时已经找到新节点要插入的位置update对应层次往后指向的位置即为要插入的位置;
再往后level = zslRandomLevel(); level值是随机的,我们取level值大于zsl->level情况分析,假设level 为7
则rank[5],rank[6]赋初值为0,update[5],update[6] = zsl->header;
zsl->header->level[5].span,zsl->header->level[6].span = zsl->length;这里赋初值并不正确,在后面会调整,只是做零时变量。最后更新zsl->level为level,即zsl->level为所有节点中level最大值。
剩下,我们需要建立对应的连接,即利用update数组,剩下无非是多次链表插入的操作,最后更正沿途span值。
最后利用update[]设置下后退指针。
下面是完整代码
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
redisAssert(!isnan(score));
// 在各个层查找节点的插入位置
// T_wrost = O(N^2), T_avg = O(N log N)
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
// 如果 i 不是 zsl->level-1 层
// 那么 i 层的起始 rank 值为 i+1 层的 rank 值
// 各个层的 rank 值一层层累积
// 最终 rank[0] 的值加一就是新节点的前置节点的排位
// rank[0] 会在后面成为计算 span 值和 rank 值的基础
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 沿着前进指针遍历跳跃表
// T_wrost = O(N^2), T_avg = O(N log N)
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
// 比对分值
(x->level[i].forward->score == score &&
// 比对成员, T = O(N)
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
// 记录沿途跨越了多少个节点
rank[i] += x->level[i].span;
// 移动至下一指针
x = x->level[i].forward;
}
// 记录将要和新节点相连接的节点
update[i] = x;
}
/* we assume the key is not already inside, since we allow duplicated
* scores, and the re-insertion of score and redis object should never
* happen since the caller of zslInsert() should test in the hash table
* if the element is already inside or not.
*
* zslInsert() 的调用者会确保同分值且同成员的元素不会出现,
* 所以这里不需要进一步进行检查,可以直接创建新元素。
*/
// 获取一个随机值作为新节点的层数
// T = O(N)
level = zslRandomLevel();
// 如果新节点的层数比表中其他节点的层数都要大
// 那么初始化表头节点中未使用的层,并将它们记录到 update 数组中
// 将来也指向新节点
if (level > zsl->level) {
// 初始化未使用层
// T = O(1)
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
// 更新表中节点最大层数
zsl->level = level;
}
// 创建新节点
x = zslCreateNode(level,score,obj);
// 将前面记录的指针指向新节点,并做相应的设置
// T = O(1)
for (i = 0; i < level; i++) {
// 设置新节点的 forward 指针
x->level[i].forward = update[i]->level[i].forward;
// 将沿途记录的各个节点的 forward 指针指向新节点
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
// 计算新节点跨越的节点数量
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
// 更新新节点插入之后,沿途节点的 span 值
// 其中的 +1 计算的是新节点
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
// 未接触的节点的 span 值也需要增一,这些节点直接从表头指向新节点
// T = O(1)
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
// 设置新节点的后退指针
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
// 跳跃表的节点计数增一
zsl->length++;
return x;
}
整体逻辑初看复杂,仔细跟了手工过程后,发现关键是维护update数组。找到对应位置,剩下无非是类似链表插入节点操作跟一些细节的值的维护。随机level??
如果跳跃表层数分布如下,那么很容易理解其查找平均时间复杂度为O(logN)(基本上类似于二分查找,通过空间换时间解决链表缺点,本质上又有点像平衡树)
那为何随机的层数(1至32之间的整数)也能保证logN的复杂度?关键在他的随机算法。。
看下具体的int zslRandomLevel(void);
/* Returns a random level for the new skiplist node we are going to create.
*
* 返回一个随机值,用作新跳跃表节点的层数。
*
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned.
*
* 返回值介乎 1 和 ZSKIPLIST_MAXLEVEL 之间(包含 ZSKIPLIST_MAXLEVEL),
* 根据随机算法所使用的幂次定律,越大的值生成的几率越小。
*
* T = O(N)
*/
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
总结
简单分析了一下跳跃表的结构跟实现,和一些基本的api,跳跃表是Redis有序集合的底层实现之一,从插入函数也可以看出,对使用者有要求:多个节点可以包含相同的分值,但每个节点的成员对象必须唯一。跳跃表的节点按照分值排序,分值相同,按照对象大小排序。
int compareStringObjectsForLexRange(robj *a, robj *b) {
if (a == b) return 0; /* This makes sure that we handle inf,inf and
-inf,-inf ASAP. One special case less. */
if (a == shared.minstring || b == shared.maxstring) return -1;
if (a == shared.maxstring || b == shared.minstring) return 1;
return compareStringObjects(a,b);
}
上一篇: SpringBoot之Starter组件手写和启动原理解析
下一篇: php中变量赋值的方式