有序集合
11.1 有序集合接口
三种查询类型
有序集合是一组已排序的(分数,名称)对的集合。真正的Redis有许多有序集合命令,但它们本质上只是以下三种查询类型的接口:
- 1. 点查询:通过名称查找单个(分数,名称)对。
- 2. 范围查询:
- • 根据(分数,名称)定位到最近的(分数,名称)对。
- • 从某个(分数,名称)对开始按排序顺序迭代。
- 3. 排名查询:
- • 查找排序顺序中的第n个(分数,名称)对。
- • 查找给定(分数,名称)对在排序顺序中的位置。
哈希表只能进行点查询。数据库具备点查询和范围查询能力,因为它们基于树结构。排名查询是Redis独有的特性,需要增强型的树数据结构,后续会详细解释。
两种更新类型
- 1. 插入新的(分数,名称)对。
- 2. 查询已有的(分数,名称)对,然后进行更新或删除。
真实Redis中的有序集合命令
- 1. 真实Redis中的点查询命令:
- •
Zadd key score name
:添加一个新的(分数,名称)对,或者通过名称更新已有的(分数,名称)对。 - •
Zscore key name
:通过名称查找(分数,名称)对,并返回分数。 - •
Zrem key name
:通过名称查找(分数,名称)对,然后删除它。
- •
- 2. 真实Redis中的范围查询命令:
- •
Zscan
:遍历整个有序集合。 - •
ZrangeByScore
:仅根据分数进行范围查询,不涉及名称。 - •
ZrangeByLex
:假设分数相同,仅根据名称进行范围查询。 - •
Zrange
:ZrangeByScore
或ZrangeByLex
的另一种语法形式。 - •
Zrev*
:按降序进行范围查询。 - •
ZremRangeByScore
、ZremRangeByLex
:进行范围查询,然后删除查询结果。
- •
- 3. 真实Redis中的排名查询命令:
- •
Zrank
:通过名称查找(分数,名称)对,并返回其排名(在排序顺序中的位置)。 - • 范围查询中的偏移量参数是基于通过排名查找(分数,名称)对。
- •
简化的有序集合接口
虽然只有三种查询类型,但真实Redis中有众多有序集合命令,且没有一个命令能充分利用有序集合的全部功能。(分数,名称)对是按(分数,名称)排序的,但ZrangeByScore
仅使用分数,ZrangeByLex
仅使用名称,而Zscan
两者都不使用。
让我们用一个更简单、更通用的接口来替代它们:
ZQUERY key score name offset limit
它使用元组比较返回大于或等于(分数,名称)的(分数,名称)对。
时间复杂度
用户接口的复杂度并不值得模仿。值得借鉴的是真实Redis中最优的时间复杂度。
操作 | 平均时间复杂度 | 最坏时间复杂度 | 数据结构 |
添加或删除一个(分数,名称)对 | O(log N) | O(N) | 哈希表和树的插入操作 |
通过名称进行点查询 | O(1) | O(N) | 哈希表查找 |
根据(分数,名称)定位 | O(log N) | O(log N) | 树查找 |
按排序顺序迭代 | O(1) | O(log N) | 树迭代 |
查找(分数,名称)对的排名 | O(log N) | O(log N) | 增强型树 |
根据排名定位 | O(log N) | O(log N) | 增强型树 |
哈希表对于无序索引是最优的,没有比O(1)更快的了。基于比较的排序每个元素的时间复杂度为O(log N),这可以通过平衡树实现。
新的内容是排名查询,它用于偏移。在数据库中,对范围查询按d进行偏移至少需要O(d)的时间,因为它需要逐个遍历到下一个第d个元素。这就是为什么在SQL中使用偏移进行分页不是一个好主意。Redis增强了数据结构,以便能够在O(log N)的时间内找到第d个元素;对于有序集合来说,按一个很大的数字进行偏移不是问题。
11.2 有序集合数据类型
有序集合API
有序集合是一组已排序的(分数,名称)对的集合,有两种索引方式。
// 伪代码
struct SortedSet {
std::set<std::pair<double, std::string>> by_score;
std::unordered_map<std::string, double> by_name;
};
对于侵入式数据结构,数据只有一份副本:
struct ZSet {
AVLNode *root = NULL; // 按(分数,名称)索引
HMap hmap; // 按名称索引
};
struct ZNode {
// 数据结构节点
AVLNode tree;
HNode hmap;
// 数据
double score = 0;
size_t len = 0;
char name[0]; // 柔性数组
};
我们将实现以下点查询和更新操作:
bool zset_insert(ZSet *zset, const char *name, size_t len, double score);
ZNode *zset_lookup(ZSet *zset, const char *name, size_t len);
void zset_delete(ZSet *zset, ZNode *node);
柔性数组
在C语言中,结构体末尾的零长度数组可以利用任何额外的空间。
struct ZNode {
// ...
size_t len = 0;
char name[0]; // 柔性数组
};
这用于将字符串嵌入到节点中,以减少内存分配。
static ZNode *znode_new(const char *name, size_t len, double score) {
ZNode *node = (ZNode *)malloc(sizeof(ZNode) + len); // 结构体 + 数组
avl_init(&node->tree);
node->hmap.next = NULL;
node->hmap.hcode = str_hash((uint8_t *)name, len);
node->score = score;
node->len = len;
memcpy(&node->name[0], name, len);
return node;
}
C++不支持柔性数组,所以不能使用new
来创建这个结构体。因此,我们有一个使用malloc()
的分配函数。我们还应该添加配对的释放函数,以便分配方法的细节不会泄露。
static void znode_del(ZNode *node) {
free(node);
}
多种值类型
有两种值类型:字符串和有序集合。让我们将它们添加到Entry
中。
enum {
T_INIT = 0,
T_STR = 1, // 字符串
T_ZSET = 2, // 有序集合
};
struct Entry {
struct HNode node; // 哈希表节点
std::string key;
// 值
uint32_t type = 0;
// 以下两者之一
std::string str;
ZSet zset;
};
旁注:带标签的联合(Tagged union)
在Entry
中同时保留两个值是浪费空间的,我们可以使用带标签的联合。
struct Entry {
// ...
uint32_t type = 0;
union {
std::string str;
ZSet zset;
};
};
但问题是std::string
有构造函数和析构函数,而C++不知道如何构造或析构一个联合。所以我们必须手动调用构造函数和析构函数。
struct Entry {
// ...
explicit Entry(uint32_t type) : type(type) {
if (type == T_STR) {
new (&str) std::string; // 调用构造函数
} else if (type == T_ZSET) {
new (&zset) ZSet;
}
}
~Entry() {
if (type == T_STR) {
str.~basic_string(); // 调用析构函数
} else if (type == T_ZSET) {
zset_clear(&zset);
}
}
};
这些语法有点复杂。我更喜欢使用普通函数的C风格代码。
旁注:运行时多态
带标签联合的一种替代方案是带有运行时类型信息的类层次结构。
// 没有值的基类
struct Entry {
// 公共字段...
virtual ~Entry() {} // 必须存在
};
// 带有值的子类
struct EntryKV : Entry {
std::string str;
};
struct EntryZSet : Entry {
ZSet zset;
virtual ~EntryZSet() {
zset_clear(&zset);
}
};
在添加新键时,Entry
会被创建为其某个子类的实例。但在查找时,必须有一种方法来判断它是哪个子类。
Entry *ent = lookup(); // 基类
EntryZSet *ent_zset = dynamic_cast<EntryZSet *>(ent); // 运行时类型信息
if (ent_zset) {
// 某个特定的子类
}
dynamic_cast()
能够确定它是哪个子类,前提是存在虚方法,因为类型信息与虚函数表相关联。虚析构函数满足这一条件。当使用运行时多态时,析构函数必须是虚函数,因为它们可能会从指向基类的指针调用。
运行时多态可以在普通C语言中通过标签或函数指针实现。在实际项目中,你不需要练习这些C++特性。
11.3 有序集合的查找、插入和删除
按名称查找
按名称查找只是哈希表的查找操作。
ZNode *zset_lookup(ZSet *zset, const char *name, size_t len) {
if (!zset->tree) {
return NULL;
}
HKey key;
key.node.hcode = str_hash((uint8_t *)name, len);
key.name = name;
key.len = len;
HNode *found = hm_lookup(&zset->hmap, &key.node, &hcmp);
return found ? container_of(found, ZNode, hmap) : NULL;
}
HKey
是用于哈希表键比较函数的辅助结构体。
struct HKey {
HNode node;
const char *name = NULL;
size_t len = 0;
};
static bool hcmp(HNode *node, HNode *key) {
ZNode *znode = container_of(node, ZNode, hmap);
HKey *hkey = container_of(key, HKey, node);
if (znode->len != hkey->len) {
return false;
}
return 0 == memcmp(znode->name, hkey->name, znode->len);
}
AVL树插入
树插入操作与上一章的search_and_insert()
相同。
static void tree_insert(ZSet *zset, ZNode *node) {
AVLNode *parent = NULL; // 在这个节点下插入
AVLNode **from = &zset->root; // 指向下一个节点的传入指针
while (*from) { // 树搜索
parent = *from;
from = zless(&node->tree, parent) ? &parent->left : &parent->right;
}
*from = &node->tree; // 附加新节点
node->tree.parent = parent;
zset->root = avl_fix(&node->tree);
}
zless()
是元组比较函数。
// (lhs.score, lhs.name) < (rhs.score, rhs.name)
static bool zless(AVLNode *lhs, AVLNode *rhs) {
ZNode *zl = container_of(lhs, ZNode, tree);
ZNode *zr = container_of(rhs, ZNode, tree);
if (zl->score != zr->score) {
return zl->score < zl->score;
}
int rv = memcmp(zl->name, zr->name, min(zl->len, zr->len));
return (rv != 0) ? (rv < 0) : (zl->len < zr->len);
}
插入或更新
Zadd
命令必须处理(分数,名称)对已经存在的情况。
bool zset_insert(ZSet *zset, const char *name, size_t len, double score) {
if (ZNode *node = zset_lookup(zset, name, len)) {
zset_update(zset, node, score);
return false;
}
ZNode *node = znode_new(name, len, score);
hm_insert(&zset->hmap, &node->hmap);
tree_insert(zset, node);
return true;
}
如果分数发生变化,分离并重新插入AVL树节点将修复顺序。
static void zset_update(ZSet *zset, ZNode *node, double score) {
// 分离树节点
zset->root = avl_del(&node->tree);
avl_init(&node->tree);
// 重新插入树节点
node->score = score;
tree_insert(zset, node);
}
删除一个(分数,名称)对
我们没有搜索并删除的函数,因为删除操作可以通过节点引用完成。将搜索和删除分开在不同的函数中更灵活,因为有多种搜索方式。
void zset_delete(ZSet *zset, ZNode *node) {
// 从哈希表中删除
HKey key;
key.node.hcode = node->hmap.hcode;
key.name = node->name;
key.len = node->len;
HNode *found = hm_delete(&zset->hmap, &key.node, &hcmp);
assert(found);
// 从树中删除
zset->root = avl_del(&node->tree);
// 释放节点内存
znode_del(node);
}
11.4 有序集合范围查询
范围查询命令:ZQUERY key score name offset limit
。
- 1. 定位到第一个满足pair >= (score, name)的(分数,名称)对。
- 2. 移动到第n个后继/前驱节点(偏移量)。
- 3. 迭代并输出。
相关函数声明:
ZNode *zset_seekge(ZSet *zset, double score, const char *name, size_t len);
ZNode *znode_offset(ZNode *node, int64_t offset);
定位操作就是树搜索
ZNode *zset_seekge(ZSet *zset, double score, const char *name, size_t len) {
AVLNode *found = NULL;
for (AVLNode *node = zset->root; node; ) {
if (zless(node, score, name, len)) {
node = node->right; // node < key
} else {
found = node; // 候选节点
node = node->left;
}
}
return found ? container_of(found, ZNode, tree) : NULL;
}
偏移和迭代
迭代操作就是偏移量为±1的情况,也就是遍历AVL树。
// 偏移到后继或前驱节点。
AVLNode *avl_offset(AVLNode *node, int64_t offset);
ZNode *znode_offset(ZNode *node, int64_t offset) {
AVLNode *tnode = node ? avl_offset(&node->tree, offset) : NULL;
return tnode ? container_of(tnode, ZNode, tree) : NULL;
}
avl_offset()
函数将在后面实现。我们现在可以编写ZQUERY
函数了。
static void do_zquery(std::vector<std::string> &cmd, Buffer &out) {
// 解析参数并查找键值对
// ...
// 1. 定位到键
ZNode *znode = zset_seekge(zset, score, name.data(), name.size());
// 2. 偏移
znode = znode_offset(znode, offset);
// 3. 迭代并输出
size_t ctx = out_begin_arr(out);
int64_t n = 0;
while (znode && n < limit) {
out_str(out, znode->name, znode->len);
out_dbl(out, znode->score);
znode = znode_offset(znode, +1);
n += 2;
}
out_end_arr(out, ctx, (uint32_t)n);
}
在迭代结束之前,(分数,名称)对的数量是未知的,所以序列化数组的大小是一个临时值,稍后会进行修正。
static size_t out_begin_arr(Buffer &out) {
out.push_back(TAG_ARR);
buf_append_u32(out, 0); // 由out_end_arr()填充
return out.size() - 4; // `ctx`参数
}
static void out_end_arr(Buffer &out, size_t ctx, uint32_t n) {
assert(out[ctx - 1] == TAG_ARR);
memcpy(&out[ctx], &n, 4);
}
练习:添加一个反向版本,按降序进行定位和迭代操作。
11.5 有序集合排名查询
常规树遍历
对节点进行偏移的一种简单方法是每次一步地遍历树。
AVLNode *avl_offset(AVLNode *node, int64_t offset) {
for (; offset > 0 && node; offset--) {
node = successor(node);
}
for (; offset < 0 && node; offset++) {
node = predecessor(node);
}
return node;
}
在最坏情况下,找到后继节点的时间复杂度为O(log N),但平均情况下为O(1)。所以逐个对节点进行d步偏移,平均时间复杂度为O(d)。
static AVLNode *successor(AVLNode *node) {
// 找到右子树中最左边的节点
if (node->right) {
for (node = node->right; node->left; node = node->left) {}
return node;
}
// 找到这样的祖先节点:我是其左子树中最右边的节点
while (AVLNode *parent = node->parent) {
if (node == parent->left) {
return parent;
}
node = parent;
}
return NULL;
}
static AVLNode *predecessor(AVLNode *node); // 对称操作
子树大小增强
我们将增强树结构以实现排名查询:
- 1. 查找排序顺序中的第n个(分数,名称)对。
- 2. 查找给定(分数,名称)对在排序顺序中的排名。
要对节点进行d步偏移,先找到当前的排名r,然后找到排名为r + d的节点。无论d是多少,这两个步骤在最坏情况下的时间复杂度都是O(log N)。
这是通过在树节点中增加子树大小信息来实现的。
struct AVLNode {
AVLNode *parent = NULL;
AVLNode *left = NULL;
AVLNode *right = NULL;
uint32_t height = 0;
uint32_t cnt = 0; // 增强信息:子树大小
};
inline uint32_t avl_cnt(AVLNode *node) { return node ? node->cnt : 0; }
这个子树大小信息会随着子树高度一起传播。
static void avl_update(AVLNode *node) {
node->height = 1 + max(avl_height(node->left), avl_height(node->right));
node->cnt = 1 + avl_cnt(node->left) + avl_cnt(node->right); // 新增内容
}
基于排名的树遍历
父节点和子节点(例如B和D)之间的排名有如下关系:
D (排名=r+s+1) B (排名=r)
┌───┴───┐ ┌───┴───┐
(rank=r) B E A D (排名=r+s+1)
┌─┴─┐ ┌─┴─┐
A C (大小=s) (大小=s) C E
观察1:父节点和子节点之间的排名差是它们之间的子树大小加1。所以,通过沿着路径遍历,可以计算出到任意节点的排名差。
观察2:子树内的最大排名差加1就是子树的大小。所以,我们可以知道目标排名是否在某个子树内。
这意味着对节点进行d步偏移,只需向上或向下遍历,直到排名差为d。
AVLNode *avl_offset(AVLNode *node, int64_t offset) {
int64_t pos = 0; // 与起始节点的排名差
while (offset != pos) {
if (pos < offset && pos + avl_cnt(node->right) >= offset) {
// 目标节点在右子树内
node = node->right;
pos += avl_cnt(node->left) + 1;
} else if (pos > offset && pos - avl_cnt(node->left) <= offset) {
// 目标节点在左子树内
node = node->left;
pos -= avl_cnt(node->right) + 1;
} else {
// 移动到父节点
AVLNode *parent = node->parent;
if (!parent) {
return NULL;
}
if (parent->right == node) {
pos -= avl_cnt(node->left) + 1;
} else {
pos += avl_cnt(node->right) + 1;
}
node = parent;
}
}
return node;
}
该算法通过最低公共祖先找到通向目标节点的路径。它最多向上遍历一次,最多向下遍历一次。所以,在最坏情况下,时间复杂度为O(log N)。
绝对排名
avl_offset()
使用排名差进行偏移,我们也可以使用排名差来找到节点的绝对排名,这实现了ZRank
命令。
// 返回节点在排序顺序中的排名
int64_t avl_rank(AVLNode *node);
这留给读者作为练习。你还可以实现ZCount
命令,它可以在不遍历的情况下计算范围内(分数,名称)对的数量。
顺序统计树
排名差的概念适用于任何类似树的数据结构,包括真实Redis中使用的跳表。在教科书中,这被称为顺序统计树。
如果应用于B+树,数据库也可以在O(log N)的时间内进行偏移或计数操作。不幸的是,数据库在数据结构方面非常保守。
一个应用是文本编辑器中使用的Rope数据结构。它是一种存储小字符串片段的树。它的使用方式类似于字节数组,但它可以在O(log N)的时间内进行索引和在任意位置插入操作。
11.6 我们学到了什么
- 1. 使用侵入式数据结构的多索引数据。
- 2. 顺序统计树:一种快速偏移树节点的方法。
源代码: