欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

redis源码分析之跳跃表

程序员文章站 2024-02-28 13:30:22
...

前言

跳跃表数据结构可以 与平衡树和红黑树查询效率。 正常时间复杂度是O(logn), 最差时间复杂度是O(n)

skiplist原理介绍

redis源码分析之跳跃表

这样所有新增加的指针连成了一个新的链表,但它包含的节点个数只有原来的一半(上图中是9, 45, 99)。现在当我们想查找数据的时候,可以先沿着这个新链表进行查找。当碰到比待查数据大的节点时,再回到原来的链表中进行查找。比如,我们想查找55,查找的路径是沿着下图中标红的指针所指向的方向进行的:

redis源码分析之跳跃表

  1. 55首先和9比较,再和45比较,比它们都大,继续向后比较。

  2. 但55和99比较的时候,比99要小,因此回到下面的链表(原链表),与67比较。

  3. 55比67要小,沿下面的指针继续向前和45比较。55比45大,说明待查数据55在原链表中不存在,而且它的插入位置应该在45和67之间。

正文

1, 数据结构介绍

redis源码中的跳跃表数据结构分为 链表(zskiplist)和节点(zskiplistNode)

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode 
{
    sds						ele;			/*成员object对象*/
    double					score;			/*分数字段依赖此值对skiplist进行排序*/
    struct zskiplistNode *	backward;		/*插入层中指向上一个元素level数组*/
    struct zskiplistLevel 
	{
        struct zskiplistNode *forward;	/*每层中指向下一个元素指针*/
        unsigned long span;				/*距离下一个元素之间元素数量, 即forward指向的元素*/
    } level[];
} zskiplistNode;

typedef struct zskiplist 
{
    struct zskiplistNode *header, *tail;/*跳跃表头节点和尾节点*/
    unsigned long length;				/*跳跃表中元素个数*/
    int level;							/*跳跃表当前最大层数*/
} zskiplist;

typedef struct zset 
{
    dict *dict;
    zskiplist *zsl;
} zset;

2, 跳跃表的创建流程

创建初始化跳跃表的数据结构

/* Create a new skiplist. */
zskiplist *zslCreate(void) 
{
    int			j;
    zskiplist *·zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
	/*初始化创建一个头节点, 初始化节点信息*/
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; ++j)
	{
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}


//---------
/* Create a skiplist node with the specified number of levels.
 * The SDS string 'ele' is referenced by the node after the call. */
zskiplistNode *zslCreateNode(int level, double score, sds ele) 
{
    zskiplistNode *zn =
        zmalloc(sizeof(*zn) + level * sizeof(struct zskiplistLevel));
    zn->score	=	score;
    zn->ele		=	ele;
    return	zn;
}

3, 插入操作

插入的操作:

  1. 先找到层(level)
  2. 该层是否存在不存在, 就创建该层, 把数据放到该层节点上面
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) 
{
    // updata[]数组记录每一层位于插入节点的前一个节点
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    // rank[]记录每一层位于插入节点的前一个节点的排名
    //在查找某个节点的过程中,将沿途访问过的所有层的跨度累计起来,得到的结果就是目标节点在跳跃表中的排位
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    //表头节点
    x = zsl->header;
	/*从头节点开始搜索, 一层一层向下搜索, 直到直到最后一层, update数组中保存着每层应该插入的位置*/
    for (i = zsl->level - 1; i >= 0; i--) 
	{
		// 注意观察 '=='  节点 [node] ---> 区分层 [level] ---> 是否是下一个节点
        rank[i] = i == (zsl->level - 1) ? 0 : rank[i + 1];  
        while (x->level[i].forward && (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&  sdscmp(x->level[i].forward->ele,   ele) < 0)))
        {
			//记录跨越了多少个节点  
            rank[i] += x->level[i].span;
			//查找下一个节点
            x = x->level[i].forward;
        }
        // 存储当前层上位于插入节点的前一个节点,找下一层的插入节点
        update[i] = x; // _ptr
    }
    
	/* 随机一个层数, 如果随机的层数是新的层数, 则需要给update数组中新的层数赋值*/
    level = zslRandomLevel();// [00-64]
    // 如果level大于当前存储的最大level值
    // 设定rank数组中大于原level层以上的值为0--为什么设置为0
    // 同时设定update数组大于原level层以上的数据
    if (level > zsl->level) 
	{
		//是从增加的层数开始增加的
        for (i = zsl->level; i < level; i++) 
		{
            //因为这一层没有节点,所以重置rank[i]为0
            rank[i] = 0;
            //因为这一层还没有节点,所以节点的前一个节点都是头节点
            update[i] = zsl->header;
            //在未添加新节点之前,需要更新的节点跨越的节点数目自然就是zsl->length---
			//因为整个层只有一个头结点----->言外之意头结点的span都是链表长度
            update[i]->level[i].span = zsl->length;
        }
        // 更新level值(max层数)
        zsl->level = level;
    }
	/*创建新的节点插入到update数组对应的层*/
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) 
	{
		// next
        x->level[i].forward = update[i]->level[i].forward;// _ptr
        // 
		update[i]->level[i].forward = x;
		/*
		header                update[i]     x    update[i]->forward
		|-----------|-----------|-----------|-----------|-----------|-----------|
		|<---update[i].span---->|
		|<-------rank[i]------->|
		|<-------------------rank[0]------------------->|

		更新update数组中span值和新插入元素span值, rank[0]存储的是x元素距离头部的距离, rank[i]存储的是update[i]距离头部的距离, 上面给出了示意图
		*/
		//x->level[i].span = 从x到update[i]->forword的span数目, 
		//原来的update[i]->level[i].span = 从update[i]到update[i]->level[i]->forward的span数目 
		//所以x->level[i].span = 原来的update[i]->level[i].span - (rank[0] - rank[i]); 
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
		//对于update[i]->level[i].span值的更新由于在update[i]与update[i]->level[i]->forward之间又添加了x, 
		//update[i]->level[i].span = 从update[i]到x的span数目, 
		//由于update[0]后面肯定是新添加的x,所以自然新的update[i]->level[i].span = (rank[0] - rank[i]) + 1; 

		//提示: update[i]和x[i]之间肯定没有节点了
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
	/* level可能小zsl->level, 无变动的元素span依次增加1*/
    for (i = level; i < zsl->level; i++) 
	{
        update[i]->level[i].span++;
    }
	/*上一个元素level数组, 重新赋值*/
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
	if (x->level[0].forward)
	{
		x->level[0].forward->backward = x;
	}
	else
	{ 
		/*下一个元素为空,则表示x为尾部元素*/
		zsl->tail = x;
	}
	// 跳跃表长度+1
    zsl->length++;
    return x;
}

4, 删除节点操作


/* Delete an element with matching score/element from the skiplist.
 * The function returns 1 if the node was found and deleted, otherwise
 * 0 is returned.
 *
 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
 * it is not freed (but just unlinked) and *node is set to the node pointer,
 * so that it is possible for the caller to reuse the node (including the
 * referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}

zset-max-ziplist-entries 128
zset-max-ziplist-value 64

这个配置的意思是说,在如下两个条件之一满足的时候,ziplist会转成zset(具体的触发条件参见t_zset.c中的zaddGenericCommand相关代码):

  1. 当sorted set中的元素个数,即(数据, score)对的数目超过128的时候,也就是ziplist数据项超过256的时候。
  2. 当sorted set中插入的任意一个数据的长度超过了64的时候。

结语

个人博客链接