redis源码分析之跳跃表
程序员文章站
2024-02-28 13:30:22
...
前言
跳跃表数据结构可以 与平衡树和红黑树查询效率。 正常时间复杂度是O(logn), 最差时间复杂度是O(n)
skiplist原理介绍
这样所有新增加的指针连成了一个新的链表,但它包含的节点个数只有原来的一半(上图中是9, 45, 99)。现在当我们想查找数据的时候,可以先沿着这个新链表进行查找。当碰到比待查数据大的节点时,再回到原来的链表中进行查找。比如,我们想查找55,查找的路径是沿着下图中标红的指针所指向的方向进行的:
-
55首先和9比较,再和45比较,比它们都大,继续向后比较。
-
但55和99比较的时候,比99要小,因此回到下面的链表(原链表),与67比较。
-
55比67要小,沿下面的指针继续向前和45比较。55比45大,说明待查数据55在原链表中不存在,而且它的插入位置应该在45和67之间。
正文
1, 数据结构介绍
redis源码中的跳跃表数据结构分为 链表(zskiplist)和节点(zskiplistNode)
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode
{
sds ele; /*成员object对象*/
double score; /*分数字段依赖此值对skiplist进行排序*/
struct zskiplistNode * backward; /*插入层中指向上一个元素level数组*/
struct zskiplistLevel
{
struct zskiplistNode *forward; /*每层中指向下一个元素指针*/
unsigned long span; /*距离下一个元素之间元素数量, 即forward指向的元素*/
} level[];
} zskiplistNode;
typedef struct zskiplist
{
struct zskiplistNode *header, *tail;/*跳跃表头节点和尾节点*/
unsigned long length; /*跳跃表中元素个数*/
int level; /*跳跃表当前最大层数*/
} zskiplist;
typedef struct zset
{
dict *dict;
zskiplist *zsl;
} zset;
2, 跳跃表的创建流程
创建初始化跳跃表的数据结构
/* Create a new skiplist. */
zskiplist *zslCreate(void)
{
int j;
zskiplist *·zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
/*初始化创建一个头节点, 初始化节点信息*/
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; ++j)
{
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
//---------
/* Create a skiplist node with the specified number of levels.
* The SDS string 'ele' is referenced by the node after the call. */
zskiplistNode *zslCreateNode(int level, double score, sds ele)
{
zskiplistNode *zn =
zmalloc(sizeof(*zn) + level * sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}
3, 插入操作
插入的操作:
- 先找到层(level)
- 该层是否存在不存在, 就创建该层, 把数据放到该层节点上面
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele)
{
// updata[]数组记录每一层位于插入节点的前一个节点
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
// rank[]记录每一层位于插入节点的前一个节点的排名
//在查找某个节点的过程中,将沿途访问过的所有层的跨度累计起来,得到的结果就是目标节点在跳跃表中的排位
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
//表头节点
x = zsl->header;
/*从头节点开始搜索, 一层一层向下搜索, 直到直到最后一层, update数组中保存着每层应该插入的位置*/
for (i = zsl->level - 1; i >= 0; i--)
{
// 注意观察 '==' 节点 [node] ---> 区分层 [level] ---> 是否是下一个节点
rank[i] = i == (zsl->level - 1) ? 0 : rank[i + 1];
while (x->level[i].forward && (x->level[i].forward->score < score ||
(x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele, ele) < 0)))
{
//记录跨越了多少个节点
rank[i] += x->level[i].span;
//查找下一个节点
x = x->level[i].forward;
}
// 存储当前层上位于插入节点的前一个节点,找下一层的插入节点
update[i] = x; // _ptr
}
/* 随机一个层数, 如果随机的层数是新的层数, 则需要给update数组中新的层数赋值*/
level = zslRandomLevel();// [00-64]
// 如果level大于当前存储的最大level值
// 设定rank数组中大于原level层以上的值为0--为什么设置为0
// 同时设定update数组大于原level层以上的数据
if (level > zsl->level)
{
//是从增加的层数开始增加的
for (i = zsl->level; i < level; i++)
{
//因为这一层没有节点,所以重置rank[i]为0
rank[i] = 0;
//因为这一层还没有节点,所以节点的前一个节点都是头节点
update[i] = zsl->header;
//在未添加新节点之前,需要更新的节点跨越的节点数目自然就是zsl->length---
//因为整个层只有一个头结点----->言外之意头结点的span都是链表长度
update[i]->level[i].span = zsl->length;
}
// 更新level值(max层数)
zsl->level = level;
}
/*创建新的节点插入到update数组对应的层*/
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++)
{
// next
x->level[i].forward = update[i]->level[i].forward;// _ptr
//
update[i]->level[i].forward = x;
/*
header update[i] x update[i]->forward
|-----------|-----------|-----------|-----------|-----------|-----------|
|<---update[i].span---->|
|<-------rank[i]------->|
|<-------------------rank[0]------------------->|
更新update数组中span值和新插入元素span值, rank[0]存储的是x元素距离头部的距离, rank[i]存储的是update[i]距离头部的距离, 上面给出了示意图
*/
//x->level[i].span = 从x到update[i]->forword的span数目,
//原来的update[i]->level[i].span = 从update[i]到update[i]->level[i]->forward的span数目
//所以x->level[i].span = 原来的update[i]->level[i].span - (rank[0] - rank[i]);
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
//对于update[i]->level[i].span值的更新由于在update[i]与update[i]->level[i]->forward之间又添加了x,
//update[i]->level[i].span = 从update[i]到x的span数目,
//由于update[0]后面肯定是新添加的x,所以自然新的update[i]->level[i].span = (rank[0] - rank[i]) + 1;
//提示: update[i]和x[i]之间肯定没有节点了
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* level可能小zsl->level, 无变动的元素span依次增加1*/
for (i = level; i < zsl->level; i++)
{
update[i]->level[i].span++;
}
/*上一个元素level数组, 重新赋值*/
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
{
x->level[0].forward->backward = x;
}
else
{
/*下一个元素为空,则表示x为尾部元素*/
zsl->tail = x;
}
// 跳跃表长度+1
zsl->length++;
return x;
}
4, 删除节点操作
/* Delete an element with matching score/element from the skiplist.
* The function returns 1 if the node was found and deleted, otherwise
* 0 is returned.
*
* If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
* it is not freed (but just unlinked) and *node is set to the node pointer,
* so that it is possible for the caller to reuse the node (including the
* referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
这个配置的意思是说,在如下两个条件之一满足的时候,ziplist会转成zset(具体的触发条件参见t_zset.c中的zaddGenericCommand相关代码):
- 当sorted set中的元素个数,即(数据, score)对的数目超过128的时候,也就是ziplist数据项超过256的时候。
- 当sorted set中插入的任意一个数据的长度超过了64的时候。