欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  数据库

Redis内部数据结构详解之跳跃表(skiplist)

程序员文章站 2022-05-22 18:25:41
...

本文所引用的源码全部来自Redis2.8.2版本。 Redis中skiplist数据结构与API相关文件是:redis.h与t_zset.c。 http://blog.csdn.net/acceptedxukai/article/details/8923174 这是我之前写的关于skiplist最传统的实现,功能远不如Redis中跳表的强大,但是代码简

本文所引用的源码全部来自Redis2.8.2版本。

Redis中skiplist数据结构与API相关文件是:redis.h与t_zset.c。

http://blog.csdn.net/acceptedxukai/article/details/8923174 这是我之前写的关于skiplist最传统的实现,功能远不如Redis中跳表的强大,但是代码简短,比较容易理解。

一、跳跃表简介

跳跃表是一种随机化数据结构,基于并联的链表,其效率可以比拟平衡二叉树,查找、删除、插入等操作都可以在对数期望时间内完成,对比平衡树,跳跃表的实现要简单直观很多。

以下是一个跳跃表的例图(来自*):
Redis内部数据结构详解之跳跃表(skiplist)

从图中可以看出跳跃表主要有以下几个部分构成:

1、 表头head:负责维护跳跃表的节点指针

2、 节点node:实际保存元素值,每个节点有一层或多层

3、 层level:保存着指向该层下一个节点的指针

4、 表尾tail:全部由null组成

跳跃表的遍历总是从高层开始,然后随着元素值范围的缩小,慢慢降低到低层。

二、Redis中跳跃表的数据结构

Redis作者为了适合自己功能的需要,对原来的跳跃表进行了一下修改:

1、 允许重复的score值:多个不同的元素(member)的score值可以相同

2、 进行元素对比的时候,不仅要检查score值,还需要检查member:当score值相等时,需要比较member域进行比较。

3、 结构保存一个tail指针:跳跃表的表尾指针

4、 每个节点都有一个高度为1层的前驱指针,用于从底层表尾向表头方向遍历

跳跃表数据结构如下(redis.h):

typedef struct zskiplistNode {
robj *obj; //节点数据
double score;
struct zskiplistNode*backward; //前驱
struct zskiplistLevel {
struct zskiplistNode*forward;//后继
unsigned int span;//该层跨越的节点数量
} level[];
} zskiplistNode;

typedef struct zskiplist {
struct zskiplistNode*header, *tail;
unsigned long length;//节点的数目
int level;//目前表的最大层数
} zskiplist; 数据结构中可能span这个参数最不好理解了,下面简单解释一下:

参照上面跳跃表的例图,head节点的span所有level的值都将是1;node 1在level 0 span值为1,因为跨越1个元素都将走到下一个节点2,在level 1 span值为2,因为需要跨越2个元素(node 2,node 3)才能到达下一个节点3。

关于span的解释可以详看:
http://*.com/questions/10458572/what-does-the-skiplistnode-variable-span-mean-in-redis-h

三、简单列出跳跃表的API,他们的作用以及算法复杂度:

约定O(N)表示对于元素个数的表达,O(L)表示对于跳表层数的表达

函数名
作用
复杂度

zslCreateNode
新建并返回一个跳表节点
O(1)

zslCreate
新建并初始化一个跳跃表
O(L)

zslFreeNode
释放给定的节点
O(1)

zslFree
释放给定的跳跃表
O(N)

zslRandomLevel
得到新节点的层数(抛硬币法的改进)
O(1)

zslInsert
将给定的score与member新建节点并添加到跳表中
O(logN)

zslDeleteNode
删除给定的跳表节点
O(L)

zslDelete
删除给定的score与member在跳表中匹配的节点
O(logN)

zslIsInRange
检查跳表中的元素score值是否在给定的范围内
O(1)

zslFirstInRange
查找第一个符合给定范围的节点
O(logN)

zslLastInRange
查找最后一个符合给定范围的节点
O(logN)

zslDeleteRangeByScore
删除score值在给定范围内的节点
O(logN)+O(M)

zslDeleteRangeByRank
删除排名在给定范围内的节点
O(logN)+O(M)

zslGetRank
返回给定score与member在集合中的排名
O(logN)

zslGetElementByRank
根据给定的rank来查找元素
O(logN)

四、上述API源码的简单解析

4.1 zslCreateNode

//建立一个skiplist节点,需要传入所在的level,score,以及保存的数值obj
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->obj = obj;
return zn;
}zmalloc是Redis在系统函数malloc上自己封装的函数,主要为了方便对内存使用情况的计算。

4.2 zslCreate

//创建skiplist,header不存储任何数据
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;

zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);//ZSKIPLIST_MAXLEVEL = 32
for (j = 0; j zsl->header->level[j].forward = NULL;//后继
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;//前驱
zsl->tail = NULL;//尾指针
return zsl;
}

4.3 zslRandomLevel

int zslRandomLevel(void) {//为新的skiplist节点生成该节点level数目
int level = 1;
while ((random()&0xFFFF) level += 1;
return (level }

4.4 zslInsert

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;

redisAssert(!isnan(score));
x = zsl->header;//header不存储数据
//从高向低
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
//rank[i]用来记录第i层达到插入位置的所跨越的节点总数,也就是该层最接近(小于)给定score的排名
//rank[i]初始化为上一层所跨越的节点总数
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
//后继节点不为空,并且后继节点的score比给定的score小
while (x->level[i].forward &&
(x->level[i].forward->score //score相同,但节点的obj比给定的obj小
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) rank[i] += x->level[i].span;//记录跨越了多少个节点
x = x->level[i].forward;//继续向右走
}
update[i] = x;//保存访问的节点,并且将当前x移动到下一层
}
/* we assume the key is not already inside, since we allow duplicated
* scores, and the re-insertion of score and redis object should never
* happen since the caller of zslInsert() should test in the hash table
* if the element is already inside or not. */
level = zslRandomLevel();//计算新的level
if (level > zsl->level) {//新的level > zsl->level,需要进行升级
for (i = zsl->level; i rank[i] = 0;
update[i] = zsl->header;//需要更新的节点就是header
update[i]->level[i].span = zsl->length;
//在未添加新节点之前,需要更新的节点跨越的节点数目自然就是zsl->length,
}
zsl->level = level;
}
x = zslCreateNode(level,score,obj);//建立新节点
//开始插入节点
for (i = 0; i //新节点的后继就是插入位置节点的后继
x->level[i].forward = update[i]->level[i].forward;
//插入位置节点的后继就是新节点
update[i]->level[i].forward = x;

/* update span covered by update[i] as x is inserted here */
/**
rank[i]: 在第i层,update[i]->score的排名
rank[0] - rank[i]: update[0]->score与update[i]->score之间间隔了几个数,即span数目
对于update[i]->level[i].span值的更新由于在update[i]与update[i]->level[i]->forward之间又添加了x,
update[i]->level[i].span = 从update[i]到x的span数目,
由于update[0]后面肯定是新添加的x,所以自然新的update[i]->level[i].span = (rank[0] - rank[i]) + 1;
x->level[i].span = 从x到update[i]->forword的span数目,
原来的update[i]->level[i].span = 从update[i]到update[i]->level[i]->forward的span数目
所以x->level[i].span = 原来的update[i]->level[i].span - (rank[0] - rank[i]);

另外需要注意当level > zsl->level时,update[i] = zsl->header的span处理
*/
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

/* increment span for untouched levels */
//如果新节点的level小于原来skiplist的level,那么在上层没有insert新节点的span需要加1
for (i = level; i level; i++) {
update[i]->level[i].span++;
}

x->backward = (update[0] == zsl->header) ? NULL : update[0];//前驱指针
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}

4.5 zslDeleteNode

/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i level; i++) {
if (update[i]->level[i].forward == x) {//update[i]->level[i]的后继等于要删除节点x
//原来的update[i]->level[i].span == 1,所以新的span很好计算,就应该直接等于x->level[i].span
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {//处理前驱节点
x->level[0].forward->backward = x->backward;
} else {//否则,更新tail
zsl->tail = x->backward;
}
//收缩level
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}

4.6 zslDelete

/* Delete an element with matching score/object from the skiplist. */
//根据score, obj来删除节点
int zslDelete(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;

x = zsl->header;
// 遍历所有层,记录删除节点后需要被修改的节点到 update 数组
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score (x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) x = x->level[i].forward;
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
// 因为多个不同的 member 可能有相同的 score
// 所以要确保 x 的 member 和 score 都匹配时,才进行删除
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
zslDeleteNode(zsl, x, update);
zslFreeNode(x);
return 1;
} else {
return 0; /* not found */
}
return 0; /* not found */
}

4.8 zslFirstInRange

/* Find the first node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
//找到跳跃表中第一个符合给定范围的元素
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
zskiplistNode *x;
int i;

/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,&range)) return NULL;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *OUT* of range. */
while (x->level[i].forward &&
!zslValueGteMin(x->level[i].forward->score,&range))
x = x->level[i].forward;
}

/* This is an inner range, so the next node cannot be NULL. */
x = x->level[0].forward;
redisAssert(x != NULL);

/* Check if score if (!zslValueLteMax(x->score,&range)) return NULL;
return x;
}

4.9 zslLastInRange

/* Find the last node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
//找到跳跃表中最后一个符合给定范围的元素
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
zskiplistNode *x;
int i;

/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,&range)) return NULL;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *IN* range. */
while (x->level[i].forward &&
zslValueLteMax(x->level[i].forward->score,&range))
x = x->level[i].forward;
}

/* This is an inner range, so this node cannot be NULL. */
redisAssert(x != NULL);

/* Check if score >= min. */
if (!zslValueGteMin(x->score,&range)) return NULL;
return x;
}

4.10 zslDeleteRangeByScore

/* Delete all the elements with score between min and max from the skiplist.
* Min and max are inclusive, so a score >= min || score * Note that this function takes the reference to the hash table view of the
* sorted set, in order to remove the elements from the hash table too. */
//删除给定范围内的 score 的元素。
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long removed = 0;
int i;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (range.minex ?//是否是闭区间
x->level[i].forward->score x->level[i].forward->score x = x->level[i].forward;
update[i] = x;
}

/* Current node is the last with score x = x->level[0].forward;

/* Delete nodes while in range. */
while (x && (range.maxex ? x->score score zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update);//删除节点x
dictDelete(dict,x->obj);//在字典中也删除
zslFreeNode(x);
removed++;
x = next;
}
return removed;//删除的节点个数
}

4.11 zslDeleteRangeByRank

/* Delete all the elements with rank between start and end from the skiplist.
* Start and end are inclusive. Note that start and end need to be 1-based */
//删除给定排序范围内的所有节点[start,end]
unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long traversed = 0, removed = 0;
int i;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) traversed += x->level[i].span;//排名
x = x->level[i].forward;
}
update[i] = x;
}

traversed++;
x = x->level[0].forward;
while (x && traversed zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update);//删除节点x
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
traversed++;
x = next;
}
return removed;
}

4.12 zslGetRank

//得到score在skiplist中的排名,如果元素不在skiplist中,返回0
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
zskiplistNode *x;
unsigned long rank = 0;
int i;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score (x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,o) rank += x->level[i].span;//排名,加上该层跨越的节点数目
x = x->level[i].forward;
}

/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->obj && equalStringObjects(x->obj,o)) {// 找到目标元素
return rank;
}
}
return 0;
}

4.13 zslGetElementByRank

//根据给定的 rank 查找元素
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) {
traversed += x->level[i].span;//排名
x = x->level[i].forward;
}
if (traversed == rank) {
return x;
}
}
return NULL;
}

上述13个API函数中可能内部调用的一些函数没有列出,相关代码请查看Redis 2.8.2源码。

五、小结

跳跃表(Skiplist)是一种随机化数据结构,它在查找、插入、删除等操作的期望时间复杂度都能达到对数级,并且编码相对简单许多,跳跃表目前是Redis中用于存储有序集合的底层数据结构,另外可以存储有序集的数据结构是字典,Redis中还有一种底层数据结构intset可以用来存储有序整数集。
Redis作者通过对原有的跳跃表进行修改,包括span的设计、score值可以重复、添加tail与backward指针等,从而实现了排序功能,从尾至头反向遍历的功能等。

最后感谢黄健宏(huangz1990)的Redis设计与实现及其他对Redis2.6源码的相关注释对我在研究Redis2.8源码方面的帮助。