1. 跳跃表
跳跃表是Redis的一大特色了..但是面试也经常被问到..所以得了解了解
1.1. 概述
跳跃表是Redis zset的底层实现之一,zset在member较多时会采用跳跃表作为底层实现,它在添加、删除、查找节点上都拥有与红黑树相当的性能,它其实说白了就是一种特殊的链表,链表的每个节点存了不同的“层”信息,用这种分层存节点的方式在查找节点时能跳过些节点,从而使添加、删除、查找操作都拥有了O(logn)的平均时间复杂度
通俗的讲就是:跳跃表是一种有序的数据结构,它通过在每个节点中维持多个指向其他节点的指针,从而达到快速访问节点的目的。
1.2. 结构
//注意4.x版本的ZSKIPLIST_MAXLEVEL 还是32
#define ZSKIPLIST_MAXLEVEL 64 /* Should be enough for 2^64 elements */
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
// 成员对象
sds ele;
//分值
double score;
//指向上一个节点,用于zrevrange命令
// backward变量是特意为zrevrange*系列命令准备的,目的是为了使跳跃表实现反向遍历,普通跳跃表的实现里是非必要的
struct zskiplistNode *backward;
struct zskiplistLevel {
//前进指针,指向下一个节点
struct zskiplistNode *forward;
//到达后一个节点的跨度(两个相邻节点span为1)
unsigned long span;
} level[];//该节点在各层的信息,柔性数组成员
} zskiplistNode;
typedef struct zskiplist {
//表头和表尾节点
struct zskiplistNode *header, *tail;
// 表中节点的数量
unsigned long length;
// 表中层数最大的节点的层数
int level;
} zskiplist;
1.2.1. 在redis的源码中,作者为了满身redis的要求,对skiplist进行了修改
- 允许重复的 score 值:多个不同的 member 的 score 值可以相同,当score相同时根据member(代码里的ele)的字典序来排名。
- 进行对比操作时,不仅要检查 score 值,还要检查 member :当 score 值可以重复时,单靠 score 值无法判断一个元素的身份,所以需要连 member 域都一并检查才行。
- 每个节点都带有一个高度为 1 层的后退指针,用于从表尾方向向表头方向迭代:当执行 ZREVRANGE 或 ZREVRANGEBYSCORE 这类以逆序处理有序集的命令时,就会用到这个属性。
1.3. 创建
时间复杂度 O(1)
/* Create a new skiplist. */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
//分配空间
zsl = zmalloc(sizeof(*zsl));
//设置默认层数
zsl->level = 1;
//设置跳跃表长度
zsl->length = 0;
//64level的头结点,分数为0,没有obj的跳跃表头节点
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
//跳跃表头节点初始化
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
//头结点每个level的下一个节点都初始化为null,跨度为0
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
//跳跃表头节点的后退指针backward置为NULL
zsl->header->backward = NULL;
//表头指向跳跃表尾节点的指针置为NULL
zsl->tail = NULL;
return zsl;
}
/* Create a skiplist node with the specified number of levels.
* The SDS string 'ele' is referenced by the node after the call. */
//为指定高度的节点分配空间并赋值,insert操作也要用到
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
/柔性数组成员
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;zskiplist
return zn;
}
zskiplist的头结点不是一个有效的节点,它有ZSKIPLIST_MAXLEVEL层(32层或者64层,看版本),每层的forward指向该层跳跃表的第一个节点,若没有则为null。
这种类型有个车头,车头写了这里有多少人,最高多少层,头在哪,尾在哪,然后里面有一排一排的...
1.4. 插入
时间复杂度O(logn)
先看一个动画插入
看不懂没有关系,还有详细的!
插入节点的流程如下:
/* Insert a new node in the skiplist. Assumes the element does not already
* exist (up to the caller to enforce that). The skiplist takes ownership
* of the passed SDS string 'ele'. */
//创建一个节点,分数为score,对象为ele,插入到zsl表头管理的跳跃表中,并返回新节点的地址
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
//获取跳跃表头结点地址,从头节点开始一层一层遍历
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
//遍历头节点的每个level,从下标最大层减1一直到0
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
//这个while循环是查找的过程,沿着x指针遍历跳跃表,满足以下条件则要继续在当层往前走
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
//记录该层一共跨越了多少节点 加上 上一层遍历所跨越的节点数
rank[i] += x->level[i].span;
//指向下一个节点
x = x->level[i].forward;
}
//while循环跳出时,用update[i]记录第i层所遍历到的最后一个节点,遍历到i=0时,就要在该节点后要插入节点
update[i] = x;
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
//获得一个随机的层数
level = zslRandomLevel();
//如果大于当前所有节点最大的层数时
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
//将大于等于原来zsl->level层以上的rank[]设置为0
rank[i] = 0;
//将大于等于原来zsl->level层以上update[i]指向头结点
update[i] = zsl->header;
//update[i]已经指向头结点,将第i层的跨度设置为length,length代表跳跃表的节点数量
update[i]->level[i].span = zsl->length;
}
//更新表中的最大成数值
zsl->level = level;
}
//创建一个节点
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
//设置新节点的前进指针为查找时(while循环)每一层最后一个节点的的前进指针
x->level[i].forward = update[i]->level[i].forward;
//再把查找时每层的最后一个节点的前进指针设置为新创建的节点地址
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
//更新插入节点的跨度值
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
//如果插入节点的level小于原来的zsl->level才会执行
for (i = level; i < zsl->level; i++) {
//因为高度没有达到这些层,所以只需将查找时每层最后一个节点的值的跨度加1
update[i]->level[i].span++;
}
//设置插入节点的后退指针,就是查找时最下层的最后一个节点,该节点的地址记录在update[0]中
//如果插入在第二个节点,也就是头结点后的位置就将后退指针设置为NULL
x->backward = (update[0] == zsl->header) ? NULL : update[0];
//x节点不是最尾部的节点
if (x->level[0].forward)
//将x节点后面的节点的后退节点设置成为x地址
x->level[0].forward->backward = x;
else
//更新表头的tail指针,指向最尾部的节点x
zsl->tail = x;
//跳跃表节点计数器加1
zsl->length++;
return x;
}
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
//返回一个随机的层数,不是level的索引是层数
int zslRandomLevel(void) {
int level = 1;
//有1/4的概率加入到上一层中
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
先看个图来理解理解:
假如往下图插入一个score为7的节点,则会按照下图方式所示进行: 找到插入位置(蓝色的线表示查找路径):
根据表头的level知道表里最高的level是4,所以从第一个节点(默认第一个节点下标0,跟其他节点结构一模一样,但是只是其他值不用,只用层数)的第四层开始往下遍历
第一个节点的level4直接指向了第五个节点(图里最后一个节点,score为8), 8 > 7, 所以这层不适合
level4 --> level3
第一个节点的level3指向第三个节点(score等于4), 4 < 7 所以跳到 第三个节点
第三个节点的level3又指向了第五个节点, 8 > 7 ,所以这层不适合
level3 --> level2 (第三个节点上层数移动)
第三个节点的level2还是指向了第五个节点, 8 > 7 , 还是不适合
level2 --> level1
第三个节点的level1指向了第四个节点的level1, 6 < 7, 所以跳到第四个节点(score为6)
第四个节点的level1指向了第五个节点的level1, 8 > 7, 还是不适合!
level1 --> level0
已经挪到了最底层,直接插入到第四个节点后面!
假设层数为3(根据zslRandomLevel函数来获取随机一个值),将节点7插入进skiplist并修改附近节点的相关属性:
插入时,会判断新插入的层数是多少! 比如之前 score等于6的节点的level1 和level2是指向score等于8的节点的!
比如之前的score 等于 4的节点的level2是指向 score等于8的节点的! 新插入的7层数有遮挡! 所以重新排
那么这时候就有两种不同情况了!
如果新插入的层数比以前的都高!
//如果大于当前所有节点最大的层数时 level > zsl->level
如果新插入的值层数比以前的矮
// 这种情况比较简单, update数组里存的就是之前 查找时每层最后一个节点的值的跨度加1
另外,注意,在上面代码里,是分步修改的!一部分在 新插入的level和以前的level 之间的,先指向头节点!再进行修改!
待新节点插入后,再修改查时候的节点跨度
另外,得注意是不是插入到了 头节点后 面的位置或者 尾节点! 这种情况特殊点!
1.5. 删除
1.5.1. 释放一个节点的内存
时间复杂度O(1)
void zslFreeNode(zskiplistNode *node) {
//member的引用计数-1,防止内存泄漏
sdsfree(node->ele);
zfree(node);
}
/* Free an sds string. No operation is performed if 's' is NULL. */
void sdsfree(sds s) {
if (s == NULL) return;
s_free((char*)s-sdsHdrSize(s[-1]));
}
1.5.2. 释放整个skiplist的内存
时间复杂度O(n)
/* Free a whole skiplist. */
void zslFree(zskiplist *zsl)
//任何一个节点一定有level[0],所以迭代level[0]来删除所有节点{
zskiplistNode *node = zsl->header->level[0].forward, *next;
zfree(zsl->header);
while(node) {
next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
zfree(zsl);
}
1.5.3. 从skiplist中删除并释放掉一个节点
时间复杂度O(logn)
主要分为以下3个步骤:
- 根据member(obj)和score找到节点的位置(代码里变量x即为该节点,update记录每层x的上一个节点)
- 调动zslDeleteNode把x节点从skiplist逻辑上删除
- 释放x节点内存
/* Delete an element with matching score/object from the skiplist. */
//从skiplist逻辑上删除一个节点并释放该节点的内存
int zslDelete(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0)))
x = x->level[i].forward;
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;//要删除的节点
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
zslDeleteNode(zsl, x, update);
zslFreeNode(x);//obj的引用计数-1并释放节点内存
return 1;
}
return 0; /* not found */
}
/* Internal function used by zslDelete, zslDeleteRangeByScore and zslDeleteRangeByRank and zslDeleteRangeByLex*/
//从skiplist逻辑上删除一个节点(不释放内存,仅改变节点位置关系)
//x为要删除的节点
//update为每一层x的上一个节点(为了更新x上一个节点的forward和span属性)
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {//当前层有x节点
update[i]->level[i].span += x->level[i].span - 1;//...=x->level[i].span;
update[i]->level[i].forward = x->level[i].forward;//跨过x节点
} else {//当前层没有x节点
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {//是否是tail节点
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)//删除了最高层数的节点
zsl->level--;
zsl->length--;
}
1.5.4. 根据范围删除节点
时间复杂度O(log(n)+m), m是范围内元素的个数
- zslDeleteRangeByScore //删除 满足score范围 的节点
- zslDeleteRangeByRank //删除 满足排名范围 的节点
- zslDeleteRangeByLex //在所有节点的score相同的skiplist中,删除 满足member字典序范围 的节点
1.6. 查找
根据查找范围的类型 zskiplist查找可以分为三类:
- rank范围查找
- score范围查找
- member范围查找
1.6.1. rank范围查找 zrangeGenericCommand
功能:给定一个zero-based排名范围(start,end),从zskiplist中找出满足该范围的所有节点。 zrangeGenericCommand函数考虑了一些与客户端交互的内容,学zskiplist的时候没必要细看,它主要分为以下两步:
- 调用zslGetElementByRank找到排名start+1的节点·······O(logn)
- 从这个节点开始遍历(end-start+1)个节点·······O(m)
/* Finds an element by its rank. The rank argument needs to be 1-based. */
//O(logn)
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) { //从高层向底层累加span直到累加的值等于rank
while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
{
traversed += x->level[i].span;
x = x->level[i].forward;
}
if (traversed == rank) {
return x;
}
}
return NULL;
}
1.6.2. score范围查找 genericZrangebyscoreCommand
功能:给定一个score的范围,从zskiplist中找出满足该score范围的所有节点。
为了方便的表示score范围的开闭区间,redis在server.h里声明了一个表示zset分数区间的类型zrangespec,关于score范围查找的相关函数都会用到它:
/* Struct to hold a inclusive/exclusive range spec by score comparison. */
typedef struct {
double min, max;
//是否是开闭区间,1为开,0位闭
int minex, maxex; /* are min or max exclusive? */
} zrangespec;
判断一个score与zrangespec区间内最小值、最大值的关系:
//给定value是否>(>=)此范围的下界
//gte(greater than or equal to)
static int zslValueGteMin(double value, zrangespec *spec) {
return spec->minex ? (value > spec->min) : (value >= spec->min);
}
//给定value是否<(<=)此范围的上界
//lte(less than or equal to)
int zslValueLteMax(double value, zrangespec *spec) {
return spec->maxex ? (value < spec->max) : (value <= spec->max);
}
根据上述两个函数,就可以用O(1)时间复杂度判断一个zskiplist的score是否与zrangespec分数区间有交集:
/* Returns if there is a part of the zset is in range. */
//用O(1)的时间复杂度判断zset(zsl)的分数范围是否与给定分数范围(range)有交集。
//
//range(6,9] zsl{1,2,3,4,5} 或zsl{10,12,13} 都是不在范围内
//
int zslIsInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
/* Test for ranges that will always be empty. */
if (range->min > range->max ||
(range->min == range->max && (range->minex || range->maxex)))
return 0;
x = zsl->tail;
if (x == NULL || !zslValueGteMin(x->score,range))//尾节点小于范围下界
return 0;
x = zsl->header->level[0].forward;
if (x == NULL || !zslValueLteMax(x->score,range))//头节点大于范围上界
return 0;
return 1;//在
}
genericZrangebyscoreCommand函数也考虑了很多与客户端交互的内容,就学习底层跳跃表实现时没必要细看,我们只需要知道底层是如何做到的即可,主要执行如下步骤:
- 调用zslParseRange把客户端传过来的范围min、max转换成zrangespec区间类型 保存在range变量里。
- 调用zslFirstInRange找到zskiplist中满足range条件的最小节点。(假设是正序的范围查找)·······O(logn)
- 从这个节点开始遍历,直到调用zslValueLteMax()找到最后一个小于range条件上界的节点。·······O(m)
核心的代码zslFirstInRange:
/* Find the first node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
//满足range条件最小的那个 O(logn)
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
int i;
/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,range)) return NULL;//保证下面的逻辑一定能找到范围内的节点
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *OUT* of range. */
while (x->level[i].forward &&
!zslValueGteMin(x->level[i].forward->score,range))
x = x->level[i].forward;
}
/* This is an inner range, so the next node cannot be NULL. */
x = x->level[0].forward;
serverAssert(x != NULL);
/* Check if score <= max. */
if (!zslValueLteMax(x->score,range)) return NULL;
return x;
}
1.6.3. member范围查找 genericZrangebylexCommand
功能:在一个所有节点的score都相同的zskiplist中,找到满足member字符串字典序范围的所有节点。
底层用memcmp比较两个字符串的大小。实现的流程与genericZrangebyscoreCommand很像,有兴趣再看。
叶落山城秋 : 跳跃表的特点,另外,跳跃的插入和删除很很多会被问到! 特别是插入!
另外,每种操作的时间复杂度要记住
函数 | 作用 | 时间复杂度 |
---|---|---|
zslCreate | 创建一个新的跳跃表 | O(1) |
zslFree | 释放给定跳跃表,以及表中包含的所有节点 | O(N),N为跳跃表的长度 |
zslInsert | 将包含给定成员和分值的新节点添加到跳跃表中 | 平均O(logN),最坏O(N),N为跳跃表长度 |
zslDelete | 删除跳跃表中包含给定成员和分值的节点 | 平均O(logN),最坏O(N),N为跳跃表长度 |
zslGetRank | 返回包含给定成员和分值的节点在跳跃表中的排位 | 平均O(logN),最坏O(N),N为跳跃表长度 |
zslGetElementByRank | 返回跳跃表在给定排位上的节点 | 平均O(logN),最坏O(N),N为跳跃表长度 |
zslIsInRange | 给定一个分值范围(range),比如0到15,20到28,诸如此类,如果跳跃表中至少有一个节点的分值在这个范围之内,那么返回1,否则返回0 | 通过跳跃表的表头节点和表尾节点,这个检测可以用O(1)复杂度完成 |
zslFirstInRange | 给定一个分值范围,返回跳跃表中第一个符合这个范围的节点 | 平均O(logN),最坏O(N),N为跳跃表长度 |
以上参考: