欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Redis底层数据结构之跳跃表

程序员文章站 2024-02-28 22:27:28
...

Redis底层数据结构之跳跃表

1.引言

跳跃表由 William Pugh 在1990年发明,是一种有序的数据结构,类似于平衡树、红黑树这样的数据结构,能够维护一个有序的列表,方便查找。跳跃表支持平均为O(logN)的时间复杂度查找,这与平衡树大体相当,那么为什么Redis的作者选用跳跃表来实现有序集合呢?我们从以下几点说明

  • 插入和删除:平衡树的调整操作都需要从引起不平衡的节点开始,向上调整,通过一系列的左右旋转操作来达到平衡,但是跳跃表只需要更改相邻节点的指针即可,这一点在后面的实现说明中会体现到

  • 内存占用:平衡树的每个节点几乎都要占用两个指针,而跳跃表的指针数目取决于全局参数P,指针个数N = (1-p)/4,Redis使用P=1/4,所以平均占用1.33个指针

  • 实现难度:跳跃表的实现难度要比平衡树简单的多,并且查找时间复杂度大体相当,均为O(logN)

1.普通跳跃表性质

首先给出跳跃表的一个直观表示:

Redis底层数据结构之跳跃表

  • 跳跃表由很多层构成,每一层都是一个有序链表

  • 最底层包含了所有的元素,如果一个元素出现的level i中那么在level i之下的链表中也会出现

  • 每个节点包含两个指针,一个指向同一层链表的下一个元素

普通跳跃表实现代码(加注解):

    #include<stdio.h>
    #include<stdlib.h>
    #define MAX_LEVEL 10
    typedef struct skipNode* Node;
    struct skipNode{
        int key;
        int value;
        Node forward[1];
    };
    typedef struct skiplist{
        Node header; 
        int level;
    }skiplist;

    Node createNode(int level,int key,int value){
        //使用malloc分配level个连续空间,包括存放节点的key和value的skipNode空间 
        skipNode * node = (Node) malloc(sizeof(skipNode) + level * sizeof(Node));
        node->key = key;
        node->value = value;
        return node;
    }

    skiplist * createSkiplist(){
        skiplist *list = (skiplist*)malloc(sizeof(skiplist));
        list->level = 0;
        list->header = createNode(MAX_LEVEL,0,0);
        /*
            levelN      forward[N] = NULL
            levelN-1        forward[N-1] = NULL
            levelN-2        forward[N-2] = NULL
                             . 
            level0      forward[0] = NULL
            header------>
        */ 
        for(int i = 0; i < MAX_LEVEL; i++){
            list->header->forward[i] = NULL;
        }
        return list;
    }

    int randomLevel(){  
        int k=1;  
        while (rand()%2)    
            k++;    
        k=(k<MAX_LEVEL)?k:MAX_LEVEL;  
        return k;    
    }  

    /*
      插入元素 
     */
    bool insertElement(skiplist* list , int key , int value){
        //使用一个数组保存比key大的节点位置
        skipNode * update[MAX_LEVEL];
        //查找原始key要插入的位置,从高层level乡下查找
        int level = list->level;
        skipNode* head = list->header;
        skipNode* next;
        for(int i = level - 1; i >= 0; i--){
            while((next = head->forward[i])&& (next->key < key)){
                head = next;
            }
            update[i] = head;
        }
        // key值已经存在
        if(next && next->key == key){
            // 覆盖掉原来的value 
            head->value = value;
            return false;
        }
         //随机生成新结点的层数  
        int nlevel = randomLevel();
        if(nlevel > list->level){
            nlevel = ++list->level;  
            update[level] = list->header;  
        }
        // 申请新节点的空间
        Node newNode = createNode(nlevel,key,value);
        // 逐层调整指针指向
        for(int i = 0 ; i < nlevel ; i++){
            newNode->forward[i] = update[i]->forward[i];
            update[i]->forward[i] = newNode;
        } 
        return true;
    }

    /*
      搜索指定key的value
    */  
    skipNode* search(skiplist* list,int key)  
    {  
        skipNode *p,*q=NULL;  
        p = list->header;  
        //从最高层开始搜  
        int k = list->level;  
        for(int i = k-1; i >= 0; i--){  
            while((q = p->forward[i]) && (q->key <= key)) {  
                if(q->key == key){  
                    return q;  
                }  
                p=q;  
            }  
        }  
        return NULL;  
    }

    /**
      删除指定节点的值 
    **/
    bool deleteElement(skiplist* list , int key, int value){
        skipNode* update[MAX_LEVEL];
        skipNode* x = list->header;
        int level = list->level;
        for(int i = level - 1 ; i >= 0 ; i--){
            while(x->forward[i]->key < key){
                x = x->forward[i]; 
            }
            update[i] = x;
        }
        x = x->forward[0];
        if(x->key != key){
            // 节点不存在直接返回 
            return false;
        }else{
            for(int i = 0 ; i < list->level ; i++){
                if(update[i]->forward[i] == x){
                    update[i]->forward[i] = x->forward[i];    
                }
            }
            free(x);
            for(int i = list->level-1 ; i >= 0 ; i--){
                if(list->header->forward[i] == NULL){    
                        list->level--;
                    }
            }
        }
        return true; 
    }

2. Redis中跳跃表的实现

zskiplistNode的定义

typedef struct zskiplistNode {
        robj *obj;
        double score;
        struct zskiplistNode *backward;
        struct zskiplistLevel {
            struct zskiplistNode *forward;
            unsigned int span;
        } level[];
    } zskiplistNode;

zskiplist定义

typedef struct zskiplist {
        struct zskiplistNode *header, *tail;
        unsigned long length;
        int level;
    } zskiplist;

图解:

Redis底层数据结构之跳跃表

性质说明:

  • 多个节点可以包含相同的score,但是每个节点的成员对象必须是唯一的

  • 跳跃表按照score的大小进行从小到大排序,当分数一样是,节点按照成员对象的大小进行排序

具体的实现代码在T_zset.c文件中

首先是创建和初始化操作:

//创建节点
    zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
        zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
        zn->score = score;
        zn->obj = obj;
        return zn;
    }

    zskiplist *zslCreate(void) {
        int j;
        zskiplist *zsl;
        //创建zskiplist
        zsl = zmalloc(sizeof(*zsl));
        zsl->level = 1;
        zsl->length = 0;
        //创建header节点,ZSKIPLIST_MAXLEVEL=32,默认层数最大为32
        zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
        // 初始化
        for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
            zsl->header->level[j].forward = NULL;
            zsl->header->level[j].span = 0;
        }
        zsl->header->backward = NULL;
        zsl->tail = NULL;
        return zsl;
    }

在看插入函数zslInsert:

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
        // 使用Update数组记录要插入的位置
        zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
        // 使用rank数组记录每层的插入位置所跨越的节点个数
        unsigned int rank[ZSKIPLIST_MAXLEVEL];
        int i, level;

        redisAssert(!isnan(score));
        x = zsl->header;
        // 从上往下遍历
        for (i = zsl->level-1; i >= 0; i--) {
            /* 每一层遍历开始初始化当前层跨越的节点数目,也就等于上一层终止查询时候跨越的节点数目*/
            rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
            /* 按照score的大小进行从小到大查找,当分数一样是,节点按照成员对象的大小进行查找
             * 找到第一个不满足条件的节点,即为新节点要插入的位置的前驱,用update记录
             */
            while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
                /** rank 值在初始条件下逐步增加**/
                rank[i] += x->level[i].span;
                x = x->level[i].forward;
            }
            update[i] = x;
        }

        // 随机生成待插入节点的层数
        level = zslRandomLevel();
        // 如果随机生成的层数比当前跳跃表的层数大,则使用header代表高出部分的前驱
        if (level > zsl->level) {
            for (i = zsl->level; i < level; i++) {
                rank[i] = 0;
                update[i] = zsl->header;
                update[i]->level[i].span = zsl->length;
            }
            zsl->level = level;
        }
        // 为新节点申请空间
        x = zslCreateNode(level,score,obj);
        // 更新新插入节点的指针指向
        for (i = 0; i < level; i++) {
            x->level[i].forward = update[i]->level[i].forward;
            update[i]->level[i].forward = x;

            /* 使用原来记录的rank数组更新新节点的跨度距离 */
            x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
            update[i]->level[i].span = (rank[0] - rank[i]) + 1;
        }

        /* increment span for untouched levels */
        for (i = level; i < zsl->level; i++) {
            update[i]->level[i].span++;
        }
        // 更新后向指针
        x->backward = (update[0] == zsl->header) ? NULL : update[0];
        if (x->level[0].forward)
            x->level[0].forward->backward = x;
        else
            zsl->tail = x;
        zsl->length++;
        return x;
    }

Redis底层数据结构之跳跃表

删除函数zslDelete:

/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    // 从下往上逐层更新指针指向和跨度
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            // 更新跨度,
            update[i]->level[i].span += x->level[i].span - 1;
            // 更新forward指向,
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            update[i]->level[i].span -= 1;
        }
    }
    // 更新backward指针
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    // 更新level的新高度
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}

/* Delete an element with matching score/object from the skiplist. */
int zslDelete(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    // 和插入操作一样,首先获得删除节点的前驱位置,使用update数组记录
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0)))
            x = x->level[i].forward;
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    // 如果找到的节点x是要删除的节点,则执行删除操作,释放删除节点的内存
    if (x && score == x->score && equalStringObjects(x->obj,obj)) {
        zslDeleteNode(zsl, x, update);
        zslFreeNode(x);
        return 1;
    }
    return 0; /* not found */
}

除了基本的删除和插入操作外,Redis跳跃表还提供很多操作,包括获得指定节点在表中的排位zslGetRank函数,获得满足一定条件的排位等操作,从这里可以看出Redis跳跃表中设置跨度字段的好处,能够快速的计算排位,具体的实现函数这里就不一一介绍了,感兴趣的朋友可以参照源码一探究竟。