欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

读懂才会用 : 带你见识 Redis 的 zset

程序员文章站 2022-04-24 15:35:52
...

快餐车

本文从代码角度分析Redis 的 zset 结构,希望通过本文掌握如下内容:

  1. Redis 中 zset 不是单一结构完成,是跳表和哈希表共同完成
  2. 跳表的实现原理,跳表升维全靠随机
  3. 跳表中查找、插入、删除的三个口诀
  4. 使用场景(简单延时队列、排行榜、简单限流)

如果您还不能了然于胸,请继续阅读本文。

场景案例

假设我们有某个班级所有学生的语文成绩,想统计、查询区间范围、查询单个学生成绩、满足高性能读取这些需求,Redis 的 zset 结构无疑是最好的选择。Redis 提供了丰富的 API。示例:

ZADD yuwen 90 s01 89 s03 99 s02 74 s04 97 s05

以 yuwen 为 key 分别存储了 s01 到 s06 共计 6 名学生的分数,我们可以查询任一学生的成绩

ZSCORE yuwen s03

可以按照排序返回指定区间内的所有元素

ZRANGE yuwen 1 2 withscores

可以访问指定分数区间内的所有元素

ZRANGEBYSCORE yuwen 90 100 withscores

可以统计指定区间内的个数

ZCOUNT yuwen 80 90

实现

zset 结构中,既支持按单个元素查询,又支持范围查询,是如何实现的呢?我们深入代码分析,在 Redis 的 t_zset.c 的注释中,提到:

/* ZSETs are ordered sets using two data structures to hold the same elements
 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
 * data structure.
 *
 * The elements are added to a hash table mapping Redis objects to scores.
 * At the same time the elements are added to a skip list mapping scores
 * to Redis objects (so objects are sorted by scores in this "view").

翻译过来是 Redis 中有两种数据结构来支持 zset 的功能,一个是 hash table ,一个是 skip list。先来看一下 zset 在代码中的定义:

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;

dict 是一个hash table ,各种编程语言中都有实现。可以保证 O(1) 的时间复杂度,不做过多解释。我们继续看 zskiplist 的定义:

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

zskiplist 是 Redis 对 skiplist 做了变种,skiplist 就是我们常说的跳表。

跳表

跳表由 William Pugh 于1990年发表的论文 Skip lists: a probabilistic alternative to balanced trees 中被首次提出,查找时间复杂度为平均 O(logN),最差 O(N),在大部分情况下效率可与平衡树相媲美,但实现比平衡树简单的多,跳表是一种典型的以空间换时间的数据结构。

跳表具有以下几个特点:

  • 由许多层结构组成。
  • 每一层都是一个有序的链表。
  • 最底层 (Level 1) 的链表包含所有元素。
  • 如果一个元素出现在 Level i 的链表中,则它在 Level i 之下的链表也都会出现。
  • 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素。

跳表的查找会从顶层链表的头部元素开始,然后遍历该链表,直到找到元素大于或等于目标元素的节点,如果当前元素正好等于目标,那么就直接返回它。如果当前元素小于目标元素,那么就垂直下降到下一层继续搜索,如果当前元素大于目标或到达链表尾部,则移动到前一个节点的位置,然后垂直下降到下一层。正因为 Skiplist 的搜索过程会不断地从一层跳跃到下一层的,所以被称为跳跃表。

还是举个例子,假设链接包含1-10,共10个元素。我们要找到第9个,需要从 header 遍历,共 9 次才能找到
读懂才会用 : 带你见识 Redis 的 zset
一次只能比较一个数,最坏的情况下时间复杂度是O(n),如果我们一次可以比较2个元素就好了:
读懂才会用 : 带你见识 Redis 的 zset
一次查找2个的话,我们只找了5次就找到了。所以就有了类似下面的结构,在链表上增加一层减少了元素个数的“链表”:
读懂才会用 : 带你见识 Redis 的 zset
如果增加两层“链表”,只查找3次就可以找到:
读懂才会用 : 带你见识 Redis 的 zset
即便是我们找元素8,也只需要遍历 1->4->7->8,共4次查询。

跳表就是这样的一种数据结构,结点是跳过一部分的,从而加快了查询的速度。之前讲HashMap 中我们提到,java 8 中当哈希冲突个数大于 7 个的时候,转换为红黑树。跳表跟红黑树两者的算法复杂度差不多,为什么Redis要使用跳表而不使用红黑树呢?跳表相对于红黑树,代码简单。如果我们要查询一个区间里面的值,用平衡树实现可能会麻烦。删除一段区间时,如果是平衡树,就会相当困难,毕竟涉及到树的平衡问题,而跳表则没有这种烦恼。

整个查询过程,可以简化理解为 if (下一个是否大于结果) 下一个 else 下一层

加强版跳表

Redis 中的对 skiplist 做了些改造:

  • 增加了后驱指针(*backward
  • 同时记录value 和 score,且 score 可以重复
  • 第一层维护了双向链表

zset 结构整个类图如下:

读懂才会用 : 带你见识 Redis 的 zset
zskiplist 中保存的 zskiplistNode 节点定义:

typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward; //  指向上一个节点
    struct zskiplistLevel {
        struct zskiplistNode *forward;  // 指向下一个节点
        unsigned long span; // 节点之前的跨度
    } level[];  // 该节点的各层信息
} zskiplistNode;

zskiplistNode 中定义了 zskiplistLevel 的数组,用来保存该 node 在每一层的指针。查询跟我们模拟的例子类似,不在详细描述。重点看一下插入操作:

/* Insert a new node in the skiplist. Assumes the element does not already
 * exist (up to the caller to enforce that). The skiplist takes ownership
 * of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        // 计算 span 信息,表示从该节点到下一个节点,需要跳跃多少次
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    // 随机函数
    level = zslRandomLevel();
    // 如果需要上升层次记录好位置
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

插入的时候,首先要进行查询,然后从最底层开始,插入待插入的元素。然后看看从下而上,是否需要逐层插入。可是到底要不要插入上一层呢?我们要想每层的跳跃都非常高效,那就越是平衡越好(第一层1级跳,第二层2级跳,第3层4级跳,第4层8级跳)。但是用算法实现起来,确实非常地复杂的,并且要严格地按照2地指数次幂,我们还要对原有地结构进行调整。所以跳表的思路是抛硬币,听天由命,产生一个随机数。Redis 中 25%概率再向上扩展。这样子,每一个元素能够有X层的概率为0.25^(X-1)次方。在 Redis 中level初始化时就定义好了,为 32 层。那么,第32层有多少个元素的概率大家可以算一下。

整个插入过程,可以简化理解为:先插入最底层 if (随机概率) 扩展上一层

随机函数:

/* Returns a random level for the new skiplist node we are going to create.
 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
 * (both inclusive), with a powerlaw-alike distribution where higher
 * levels are less likely to be returned. */
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

skiplist 的插入如下图所示:

读懂才会用 : 带你见识 Redis 的 zset
对于 zskiplist 的删除操作,可以分为3个步骤:

  • 根据member(obj)和score找到节点的位置(代码里变量x即为该节点,update记录每层x的上一个节点)
  • 调动zslDeleteNode把x节点从skiplist逻辑上删除
  • 释放x节点内存
/* Delete an element with matching score/element from the skiplist.
 * The function returns 1 if the node was found and deleted, otherwise
 * 0 is returned.
 *
 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
 * it is not freed (but just unlinked) and *node is set to the node pointer,
 * so that it is possible for the caller to reuse the node (including the
 * referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}

整个删除过程,可以简化理解为:先找到,断关联,删内存

在 zset 的创建中(zaddGenericCommand 方法)隐藏这一个逻辑分支:

if (server.zset_max_ziplist_entries == 0 ||
    server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
    zobj = createZsetObject();
} else {
    zobj = createZsetZiplistObject();
}

Redis 初始化的时候,只判断存储的元素长度是否大于64个字节(server.zset_max_ziplist_entries默认128)。大于64个字节选择zkiplist,否则ziplist。当执行增删改查的方法,根据是ziplist 还是 zkiplist 选择不同的实现。关于zkiplist 本文不详细叙述。只需要记住zset 在两种情况下使用 ziplist :

  1. 保存的元素个数不足 128 个
  2. 单个元素的大小超过 64 bytes

执行增删改查的方法,根据是ziplist 还是 zkiplist 选择不同的实现。zkiplist 本文不详细叙述。

结论

至此,我们介绍了Redis 中 zset 最复杂的跳表部分,结合代码和理解,请思考这4个命令背后都是依赖于什么数据结构的支撑。

ZSCORE yuwen s03 基于哈希表 O(1)复杂度

ZRANGE yuwen 1 2 withscores 基于skiplist和span查找

ZRANGEBYSCORE yuwen 90 100 withscores 基于skiplist和score查找

ZREVRANGE yuwen 1 2 withscores 基于skiplist和score 和 * backward 查找

使用场景

1. 延时队列

zset 会按 score 进行排序,如果 score 代表想要执行时间的时间戳。在某个时间将它插入zset集合中,它变会按照时间戳大小进行排序,也就是对执行时间前后进行排序。

起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,可以达到延时执行的目的。

2. 排行榜

经常浏览技术社区的话,应该对 “1小时最热门” 这类榜单不陌生。如何实现呢?如果记录在数据库中,不太容易对实时统计数据做区分。我们以当前小时的时间戳作为 zset 的 key,把贴子ID作为 member ,点击数评论数等作为 score,当 score 发生变化时更新 score。利用 ZREVRANGE 或者 ZRANGE 查到对应数量的记录。

3. 限流

滑动窗口是限流常见的一直策略。如果我们把一个用户的 ID 作为key 来定义一个 zset ,member 或者 score 都为访问时的时间戳。我们只需统计某个 key 下在指定时间戳区间内的个数,就能得到这个用户滑动窗口内访问频次,与最大通过次数比较,来决定是否允许通过。

以上三种场景的示例代码,在下一篇给出。也欢迎大家思考是否还有其他应用场景。

关注我

如果您在微信阅读,请您点击链接 关注我 ,如果您在 PC 上阅读请扫码关注我,欢迎与我交流随时指出错误
读懂才会用 : 带你见识 Redis 的 zset