TheRiver | blog

You have reached the world's edge, none but devils play past here

0%

skip-list in redis

在计算机科学中,跳跃列表是一种数据结构。它使得包含n个元素的有序序列的查找和插入操作的平均时间复杂度都是O(log n),优于数组的O(n)复杂度。

快速的查询效果是通过维护一个多层次的链表实现的,且与前一层(下面一层)链表元素的数量相比,每一层链表中的元素的数量更少(见右下角示意图)。一开始时,算法在最稀疏的层次进行搜索,直至需要查找的元素在该层两个相邻的元素中间。这时,算法将跳转到下一个层次,重复刚才的搜索,直到找到需要查找的元素为止。跳过的元素的方法可以是随机性选择[2]或确定性选择[3],其中前者更为常见。

2020-11-06_skiplist_gif.gif

跳表就是有序链表加上多层索引的实现,实现上比红黑树要简单,比红黑树要的一点是可以快速找到一个有序的区间,但是总体上的平衡性是比不上红黑树的,下面主要看看redis中sorted set里面是怎么使用跳表的。

2020-11-07-redis_skiplist_pic.png

第一次费时间画如此精细的图,画完很满意,看图就很好理解了。

struct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
sds ele; //对象指针
double score; //分数
struct zskiplistNode *backward; //指向前驱结点,前指针不指向表头
struct zskiplistLevel {
struct zskiplistNode *forward; //指向后继结点
unsigned long span; //对应层级的步长,用来计算排位的
} level[];
} zskiplistNode;

typedef struct zskiplist {
struct zskiplistNode *header, *tail; //链表的头和尾
unsigned long length; //原始链表的长度
int level; //最大的层级(不算表头节点)
} zskiplist;

创建

zslCreate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* Create a new skiplist. */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;

zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
//ZSKIPLIST_MAXLEVEL=64
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}

创建zskiplist结构体,保存全局信息,一开始创建的时候会顺带创建一个表头节点,包含64层

zslCreateNode

1
2
3
4
5
6
7
8
9
/* Create a skiplist node with the specified number of levels.
* The SDS string 'ele' is referenced by the node after the call. */
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}

创建zskiplistNode结构,保存节点信息

插入

zslInsert

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

/* Insert a new node in the skiplist. Assumes the element does not already
* exist (up to the caller to enforce that). The skiplist takes ownership
* of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;

serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
//返回1-maxlevel的随机数
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;

/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}

x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}

这里需要注意的是update和rank这两个值,update保存每一层的x,即要插入的节点的每一层的左边的节点。后续farwork指针修改用到。然后rank是保存每一层x对应到头结点的位置,而rank[0]是第一层中要插入节点的左边的节点,rank[0]-rank[i]就是插入位置左边的节点到i层对应节点的距离,i的span减去这个值就是插入节点的spn.i的新的span就是rank[0]-rank[i]向右再挪动1位。

然后这里的层级是随机数,随机生成一个1-max level的层级。就是0-max-1的数组索引

zsetAdd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160

/* Add a new element or update the score of an existing element in a sorted
* set, regardless of its encoding.
*
* The set of flags change the command behavior. They are passed with an integer
* pointer since the function will clear the flags and populate them with
* other flags to indicate different conditions.
*
* The input flags are the following:
*
* ZADD_INCR: Increment the current element score by 'score' instead of updating
* the current element score. If the element does not exist, we
* assume 0 as previous score.
* ZADD_NX: Perform the operation only if the element does not exist.
* ZADD_XX: Perform the operation only if the element already exist.
*
* When ZADD_INCR is used, the new score of the element is stored in
* '*newscore' if 'newscore' is not NULL.
*
* The returned flags are the following:
*
* ZADD_NAN: The resulting score is not a number.
* ZADD_ADDED: The element was added (not present before the call).
* ZADD_UPDATED: The element score was updated.
* ZADD_NOP: No operation was performed because of NX or XX.
*
* Return value:
*
* The function returns 1 on success, and sets the appropriate flags
* ADDED or UPDATED to signal what happened during the operation (note that
* none could be set if we re-added an element using the same score it used
* to have, or in the case a zero increment is used).
*
* The function returns 0 on erorr, currently only when the increment
* produces a NAN condition, or when the 'score' value is NAN since the
* start.
*
* The commad as a side effect of adding a new element may convert the sorted
* set internal encoding from ziplist to hashtable+skiplist.
*
* Memory managemnet of 'ele':
*
* The function does not take ownership of the 'ele' SDS string, but copies
* it if needed. */
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
/* Turn options into simple to check vars. */
int incr = (*flags & ZADD_INCR) != 0;
int nx = (*flags & ZADD_NX) != 0;
int xx = (*flags & ZADD_XX) != 0;
*flags = 0; /* We'll return our response flags. */
double curscore;

/* NaN as input is an error regardless of all the other parameters. */
if (isnan(score)) {
*flags = ZADD_NAN;
return 0;
}

/* Update the sorted set according to its encoding. */
//压缩列表,暂时不看
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;

if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*flags |= ZADD_NOP;
return 1;
}

/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*flags |= ZADD_NAN;
return 0;
}
if (newscore) *newscore = score;
}

/* Remove and re-insert when score changed. */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) {
/* Optimize: check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
sdslen(ele) > server.zset_max_ziplist_value)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*flags |= ZADD_ADDED;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
//跳表
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;

//先从hash找,这里相当于是hash+skip list的结合,hash在其他post另外整理
de = dictFind(zs->dict,ele);
//ele原来就存在
if (de != NULL) {
/* NX? Return, same element already exists. */
//nx是只新增,不更新 所以就ret
if (nx) {
*flags |= ZADD_NOP;
return 1;
}
curscore = *(double*)dictGetVal(de);

/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*flags |= ZADD_NAN;
return 0;
}
if (newscore) *newscore = score;
}

/* Remove and re-insert when score changes. */
//更新分数
if (score != curscore) {
//更新跳表的分值
znode = zslUpdateScore(zs->zsl,curscore,ele,score);
/* Note that we did not removed the original element from
* the hash table representing the sorted set, so we just
* update the score. */
//更新hash的分值
dictGetVal(de) = &znode->score; /* Update score ptr. */
*flags |= ZADD_UPDATED;
}
return 1;
//插入的是新的元素 xx是只更新不新增,所以这里过滤下
} else if (!xx) {
ele = sdsdup(ele);
//插入
znode = zslInsert(zs->zsl,score,ele);
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
*flags |= ZADD_ADDED;
if (newscore) *newscore = score;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* Never reached. */
}

更新

zslUpdateScore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

/* Update the score of an elmenent inside the sorted set skiplist.
* Note that the element must exist and must match 'score'.
* This function does not update the score in the hash table side, the
* caller should take care of it.
*
* Note that this function attempts to just update the node, in case after
* the score update, the node would be exactly at the same position.
* Otherwise the skiplist is modified by removing and re-adding a new
* element, which is more costly.
*
* The function returns the updated element skiplist node pointer. */
zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;

/* We need to seek to element to update to start: this is useful anyway,
* we'll have to update or remove it. */
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < curscore ||
(x->level[i].forward->score == curscore &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
//存放左边的位置,删除用到
update[i] = x;
}

/* Jump to our element: note that this function assumes that the
* element with the matching score exists. */
//x=current的位置
x = x->level[0].forward;
serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0);

/* If the node, after the score update, would be still exactly
* at the same position, we can just update the score without
* actually removing and re-inserting the element in the skiplist. */
//case1 x->backward == NULL && x->level[0].forward == NULL
//只有一个节点直接更新

//case2 x->backward == NULL && x->level[0].forward->score > newscore
//原来是level0的第一个节点(不算表头节点),并且右边的分数大于新的分数

//case3 x->backward->score < newscore && x->level[0].forward == NULL
//左边的分数小于新的分数并且右边为空

//case4 x->backward->score < newscore && x->level[0].forward->score > newscore
//左边的分数小于新的分数并且右边的分数大于新的分数
if ((x->backward == NULL || x->backward->score < newscore) &&
(x->level[0].forward == NULL || x->level[0].forward->score > newscore))
{
x->score = newscore;
return x;
}

/* No way to reuse the old node: we need to remove and insert a new
* one at a different place. */
zslDeleteNode(zsl, x, update);
zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
/* We reused the old node x->ele SDS string, free the node now
* since zslInsert created a new one. */
x->ele = NULL;
zslFreeNode(x);
return newnode;
}

先判断如果新的分值更新后如果跳表规则不变,就直接更新在原来的位置。否则,先删除,再插入。

删除

zslDeleteNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
//左边元素到x的步长更新为到x的右边元素的步长
update[i]->level[i].span += x->level[i].span - 1;
//更新后继指针
update[i]->level[i].forward = x->level[i].forward;
//else的情况没有想到
} else {
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {
//更新前驱指针
x->level[0].forward->backward = x->backward;
} else {
//更新尾指针
zsl->tail = x->backward;
}
//从最高层遍历获取新的层级
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}

zslDelete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/* Delete an element with matching score/element from the skiplist.
* The function returns 1 if the node was found and deleted, otherwise
* 0 is returned.
*
* If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
* it is not freed (but just unlinked) and *node is set to the node pointer,
* so that it is possible for the caller to reuse the node (including the
* referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}
----------- ending -----------