有序集合

11.1 有序集合接口

三种查询类型

有序集合是一组已排序的(分数,名称)对的集合。真正的Redis有许多有序集合命令,但它们本质上只是以下三种查询类型的接口:

  1. 1. 点查询:通过名称查找单个(分数,名称)对。
  2. 2. 范围查询
    • • 根据(分数,名称)定位到最近的(分数,名称)对。
    • • 从某个(分数,名称)对开始按排序顺序迭代。
  3. 3. 排名查询
    • • 查找排序顺序中的第n个(分数,名称)对。
    • • 查找给定(分数,名称)对在排序顺序中的位置。

哈希表只能进行点查询。数据库具备点查询和范围查询能力,因为它们基于树结构。排名查询是Redis独有的特性,需要增强型的树数据结构,后续会详细解释。

两种更新类型

  1. 1. 插入新的(分数,名称)对
  2. 2. 查询已有的(分数,名称)对,然后进行更新或删除

真实Redis中的有序集合命令

  1. 1. 真实Redis中的点查询命令
    • • Zadd key score name:添加一个新的(分数,名称)对,或者通过名称更新已有的(分数,名称)对。
    • • Zscore key name:通过名称查找(分数,名称)对,并返回分数。
    • • Zrem key name:通过名称查找(分数,名称)对,然后删除它。
  2. 2. 真实Redis中的范围查询命令
    • • Zscan:遍历整个有序集合。
    • • ZrangeByScore:仅根据分数进行范围查询,不涉及名称。
    • • ZrangeByLex:假设分数相同,仅根据名称进行范围查询。
    • • ZrangeZrangeByScoreZrangeByLex的另一种语法形式。
    • • Zrev*:按降序进行范围查询。
    • • ZremRangeByScoreZremRangeByLex:进行范围查询,然后删除查询结果。
  3. 3. 真实Redis中的排名查询命令
    • • Zrank:通过名称查找(分数,名称)对,并返回其排名(在排序顺序中的位置)。
    • • 范围查询中的偏移量参数是基于通过排名查找(分数,名称)对。

简化的有序集合接口

虽然只有三种查询类型,但真实Redis中有众多有序集合命令,且没有一个命令能充分利用有序集合的全部功能。(分数,名称)对是按(分数,名称)排序的,但ZrangeByScore仅使用分数,ZrangeByLex仅使用名称,而Zscan两者都不使用。

让我们用一个更简单、更通用的接口来替代它们:

ZQUERY key score name offset limit

它使用元组比较返回大于或等于(分数,名称)的(分数,名称)对。

时间复杂度

用户接口的复杂度并不值得模仿。值得借鉴的是真实Redis中最优的时间复杂度。

操作 平均时间复杂度 最坏时间复杂度 数据结构
添加或删除一个(分数,名称)对 O(log N) O(N) 哈希表和树的插入操作
通过名称进行点查询 O(1) O(N) 哈希表查找
根据(分数,名称)定位 O(log N) O(log N) 树查找
按排序顺序迭代 O(1) O(log N) 树迭代
查找(分数,名称)对的排名 O(log N) O(log N) 增强型树
根据排名定位 O(log N) O(log N) 增强型树

哈希表对于无序索引是最优的,没有比O(1)更快的了。基于比较的排序每个元素的时间复杂度为O(log N),这可以通过平衡树实现。

新的内容是排名查询,它用于偏移。在数据库中,对范围查询按d进行偏移至少需要O(d)的时间,因为它需要逐个遍历到下一个第d个元素。这就是为什么在SQL中使用偏移进行分页不是一个好主意。Redis增强了数据结构,以便能够在O(log N)的时间内找到第d个元素;对于有序集合来说,按一个很大的数字进行偏移不是问题。

11.2 有序集合数据类型

有序集合API

有序集合是一组已排序的(分数,名称)对的集合,有两种索引方式。

// 伪代码
struct SortedSet {
    std::set<std::pair<doublestd::string>> by_score;
    std::unordered_map<std::stringdouble>  by_name;
};

对于侵入式数据结构,数据只有一份副本:

struct ZSet {
    AVLNode *root = NULL;   // 按(分数,名称)索引
    HMap hmap;              // 按名称索引
};

struct ZNode {
    // 数据结构节点
    AVLNode tree;
    HNode   hmap;
    // 数据
    double  score = 0;
    size_t  len = 0;
    char    name[0];        // 柔性数组
};

我们将实现以下点查询和更新操作:

bool   zset_insert(ZSet *zset, const char *name, size_t len, double score);
ZNode *zset_lookup(ZSet *zset, const char *name, size_t len);
void   zset_delete(ZSet *zset, ZNode *node);

柔性数组

在C语言中,结构体末尾的零长度数组可以利用任何额外的空间。

struct ZNode {
    // ...
    size_t  len = 0;
    char    name[0];        // 柔性数组
};

这用于将字符串嵌入到节点中,以减少内存分配。

static ZNode *znode_new(const char *name, size_t len, double score) {
    ZNode *node = (ZNode *)malloc(sizeof(ZNode) + len); // 结构体 + 数组
    avl_init(&node->tree);
    node->hmap.next = NULL;
    node->hmap.hcode = str_hash((uint8_t *)name, len);
    node->score = score;
    node->len = len;
    memcpy(&node->name[0], name, len);
    return node;
}

C++不支持柔性数组,所以不能使用new来创建这个结构体。因此,我们有一个使用malloc()的分配函数。我们还应该添加配对的释放函数,以便分配方法的细节不会泄露。

static void znode_del(ZNode *node) {
    free(node);
}

多种值类型

有两种值类型:字符串和有序集合。让我们将它们添加到Entry中。

enum {
    T_INIT  = 0,
    T_STR   = 1,    // 字符串
    T_ZSET  = 2,    // 有序集合
};

struct Entry {
    struct HNode node;  // 哈希表节点
    std::string key;
    // 值
    uint32_t type = 0;
    // 以下两者之一
    std::string str;
    ZSet zset;
};

旁注:带标签的联合(Tagged union)

Entry中同时保留两个值是浪费空间的,我们可以使用带标签的联合。

struct Entry {
    // ...
    uint32_t type = 0;
    union {
        std::string str;
        ZSet zset;
    };
};

但问题是std::string有构造函数和析构函数,而C++不知道如何构造或析构一个联合。所以我们必须手动调用构造函数和析构函数。

struct Entry {
    // ...
    explicit Entry(uint32_t type) : type(type) {
        if (type == T_STR) {
            new (&str) std::string;     // 调用构造函数
        } else if (type == T_ZSET) {
            new (&zset) ZSet;
        }
    }
    ~Entry() {
        if (type == T_STR) {
            str.~basic_string();        // 调用析构函数
        } else if (type == T_ZSET) {
            zset_clear(&zset);
        }
    }
};

这些语法有点复杂。我更喜欢使用普通函数的C风格代码。

旁注:运行时多态

带标签联合的一种替代方案是带有运行时类型信息的类层次结构。

// 没有值的基类
struct Entry {
    // 公共字段...
    virtual ~Entry() {}     // 必须存在
};
// 带有值的子类
struct EntryKV : Entry {
    std::string str;
};
struct EntryZSet : Entry {
    ZSet zset;
    virtual ~EntryZSet() {
        zset_clear(&zset);
    }
};

在添加新键时,Entry会被创建为其某个子类的实例。但在查找时,必须有一种方法来判断它是哪个子类。

Entry *ent = lookup();  // 基类
EntryZSet *ent_zset = dynamic_cast<EntryZSet *>(ent);   // 运行时类型信息
if (ent_zset) {
    // 某个特定的子类
}

dynamic_cast()能够确定它是哪个子类,前提是存在虚方法,因为类型信息与虚函数表相关联。虚析构函数满足这一条件。当使用运行时多态时,析构函数必须是虚函数,因为它们可能会从指向基类的指针调用。

运行时多态可以在普通C语言中通过标签或函数指针实现。在实际项目中,你不需要练习这些C++特性。

11.3 有序集合的查找、插入和删除

按名称查找

按名称查找只是哈希表的查找操作。

ZNode *zset_lookup(ZSet *zset, const char *name, size_t len) {
    if (!zset->tree) {
        return NULL;
    }
    HKey key;
    key.node.hcode = str_hash((uint8_t *)name, len);
    key.name = name;
    key.len = len;
    HNode *found = hm_lookup(&zset->hmap, &key.node, &hcmp);
    return found ? container_of(found, ZNode, hmap) : NULL;
}

HKey是用于哈希表键比较函数的辅助结构体。

struct HKey {
    HNode node;
    const char *name = NULL;
    size_t len = 0;
};

static bool hcmp(HNode *node, HNode *key) {
    ZNode *znode = container_of(node, ZNode, hmap);
    HKey *hkey = container_of(key, HKey, node);
    if (znode->len != hkey->len) {
        return false;
    }
    return 0 == memcmp(znode->name, hkey->name, znode->len);
}

AVL树插入

树插入操作与上一章的search_and_insert()相同。

static void tree_insert(ZSet *zset, ZNode *node) {
    AVLNode *parent = NULL;         // 在这个节点下插入
    AVLNode **from = &zset->root;   // 指向下一个节点的传入指针
    while (*from) {                 // 树搜索
        parent = *from;
        from = zless(&node->tree, parent) ? &parent->left : &parent->right;
    }
    *from = &node->tree;            // 附加新节点
    node->tree.parent = parent;
    zset->root = avl_fix(&node->tree);
}

zless()是元组比较函数。

// (lhs.score, lhs.name) < (rhs.score, rhs.name)
static bool zless(AVLNode *lhs, AVLNode *rhs) {
    ZNode *zl = container_of(lhs, ZNode, tree);
    ZNode *zr = container_of(rhs, ZNode, tree);
    if (zl->score != zr->score) {
        return zl->score < zl->score;
    }
    int rv = memcmp(zl->name, zr->name, min(zl->len, zr->len));
    return (rv != 0) ? (rv < 0) : (zl->len < zr->len);
}

插入或更新

Zadd命令必须处理(分数,名称)对已经存在的情况。

bool zset_insert(ZSet *zset, const char *name, size_t len, double score) {
    if (ZNode *node = zset_lookup(zset, name, len)) {
        zset_update(zset, node, score);
        return false;
    }
    ZNode *node = znode_new(name, len, score);
    hm_insert(&zset->hmap, &node->hmap);
    tree_insert(zset, node);
    return true;
}

如果分数发生变化,分离并重新插入AVL树节点将修复顺序。

static void zset_update(ZSet *zset, ZNode *node, double score) {
    // 分离树节点
    zset->root = avl_del(&node->tree);
    avl_init(&node->tree);
    // 重新插入树节点
    node->score = score;
    tree_insert(zset, node);
}

删除一个(分数,名称)对

我们没有搜索并删除的函数,因为删除操作可以通过节点引用完成。将搜索和删除分开在不同的函数中更灵活,因为有多种搜索方式。

void zset_delete(ZSet *zset, ZNode *node) {
    // 从哈希表中删除
    HKey key;
    key.node.hcode = node->hmap.hcode;
    key.name = node->name;
    key.len = node->len;
    HNode *found = hm_delete(&zset->hmap, &key.node, &hcmp);
    assert(found);
    // 从树中删除
    zset->root = avl_del(&node->tree);
    // 释放节点内存
    znode_del(node);
}

11.4 有序集合范围查询

范围查询命令:ZQUERY key score name offset limit

  1. 1. 定位到第一个满足pair >= (score, name)的(分数,名称)对
  2. 2. 移动到第n个后继/前驱节点(偏移量)
  3. 3. 迭代并输出

相关函数声明:

ZNode *zset_seekge(ZSet *zset, double score, const char *name, size_t len);
ZNode *znode_offset(ZNode *node, int64_t offset);

定位操作就是树搜索

ZNode *zset_seekge(ZSet *zset, double score, const char *name, size_t len) {
    AVLNode *found = NULL;
    for (AVLNode *node = zset->root; node; ) {
        if (zless(node, score, name, len)) {
            node = node->right; // node < key
        } else {
            found = node;       // 候选节点
            node = node->left;
        }
    }
    return found ? container_of(found, ZNode, tree) : NULL;
}

偏移和迭代

迭代操作就是偏移量为±1的情况,也就是遍历AVL树。

// 偏移到后继或前驱节点。
AVLNode *avl_offset(AVLNode *node, int64_t offset);

ZNode *znode_offset(ZNode *node, int64_t offset) {
    AVLNode *tnode = node ? avl_offset(&node->tree, offset) : NULL;
    return tnode ? container_of(tnode, ZNode, tree) : NULL;
}

avl_offset()函数将在后面实现。我们现在可以编写ZQUERY函数了。

static void do_zquery(std::vector<std::string> &cmd, Buffer &out) {
    // 解析参数并查找键值对
    // ...
    // 1. 定位到键
    ZNode *znode = zset_seekge(zset, score, name.data(), name.size());
    // 2. 偏移
    znode = znode_offset(znode, offset);
    // 3. 迭代并输出
    size_t ctx = out_begin_arr(out);
    int64_t n = 0;
    while (znode && n < limit) {
        out_str(out, znode->name, znode->len);
        out_dbl(out, znode->score);
        znode = znode_offset(znode, +1);
        n += 2;
    }
    out_end_arr(out, ctx, (uint32_t)n);
}

在迭代结束之前,(分数,名称)对的数量是未知的,所以序列化数组的大小是一个临时值,稍后会进行修正。

static size_t out_begin_arr(Buffer &out) {
    out.push_back(TAG_ARR);
    buf_append_u32(out, 0);     // 由out_end_arr()填充
    return out.size() - 4;      // `ctx`参数
}
static void out_end_arr(Buffer &out, size_t ctx, uint32_t n) {
    assert(out[ctx - 1] == TAG_ARR);
    memcpy(&out[ctx], &n, 4);
}

练习:添加一个反向版本,按降序进行定位和迭代操作。

11.5 有序集合排名查询

常规树遍历

对节点进行偏移的一种简单方法是每次一步地遍历树。

AVLNode *avl_offset(AVLNode *node, int64_t offset) {
    for (; offset > 0 && node; offset--) {
        node = successor(node);
    }
    for (; offset < 0 && node; offset++) {
        node = predecessor(node);
    }
    return node;
}

在最坏情况下,找到后继节点的时间复杂度为O(log N),但平均情况下为O(1)。所以逐个对节点进行d步偏移,平均时间复杂度为O(d)。

static AVLNode *successor(AVLNode *node) {
    // 找到右子树中最左边的节点
    if (node->right) {
        for (node = node->right; node->left; node = node->left) {}
        return node;
    }
    // 找到这样的祖先节点:我是其左子树中最右边的节点
    while (AVLNode *parent = node->parent) {
        if (node == parent->left) {
            return parent;
        }
        node = parent;
    }
    return NULL;
}
static AVLNode *predecessor(AVLNode *node);     // 对称操作

子树大小增强

我们将增强树结构以实现排名查询:

  1. 1. 查找排序顺序中的第n个(分数,名称)对
  2. 2. 查找给定(分数,名称)对在排序顺序中的排名

要对节点进行d步偏移,先找到当前的排名r,然后找到排名为r + d的节点。无论d是多少,这两个步骤在最坏情况下的时间复杂度都是O(log N)。

这是通过在树节点中增加子树大小信息来实现的。

struct AVLNode {
    AVLNode *parent = NULL;
    AVLNode *left = NULL;
    AVLNode *right = NULL;
    uint32_t height = 0;
    uint32_t cnt = 0;       // 增强信息:子树大小
};

inline uint32_t avl_cnt(AVLNode *node) { return node ? node->cnt : 0; }

这个子树大小信息会随着子树高度一起传播。

static void avl_update(AVLNode *node) {
    node->height = 1 + max(avl_height(node->left), avl_height(node->right));
    node->cnt = 1 + avl_cnt(node->left) + avl_cnt(node->right);     // 新增内容
}

基于排名的树遍历

父节点和子节点(例如B和D)之间的排名有如下关系:

             D (排名=r+s+1)        B (排名=r)
         ┌───┴───┐             ┌───┴───┐
(rank=r) B       E             A       D (排名=r+s+1)
       ┌─┴─┐                         ┌─┴─┐
       A   C (大小=s)       (大小=s) C   E

观察1:父节点和子节点之间的排名差是它们之间的子树大小加1。所以,通过沿着路径遍历,可以计算出到任意节点的排名差。
观察2:子树内的最大排名差加1就是子树的大小。所以,我们可以知道目标排名是否在某个子树内。

这意味着对节点进行d步偏移,只需向上或向下遍历,直到排名差为d。

AVLNode *avl_offset(AVLNode *node, int64_t offset) {
    int64_t pos = 0;    // 与起始节点的排名差
    while (offset != pos) {
        if (pos < offset && pos + avl_cnt(node->right) >= offset) {
            // 目标节点在右子树内
            node = node->right;
            pos += avl_cnt(node->left) + 1;
        } else if (pos > offset && pos - avl_cnt(node->left) <= offset) {
            // 目标节点在左子树内
            node = node->left;
            pos -= avl_cnt(node->right) + 1;
        } else {
            // 移动到父节点
            AVLNode *parent = node->parent;
            if (!parent) {
                return NULL;
            }
            if (parent->right == node) {
                pos -= avl_cnt(node->left) + 1;
            } else {
                pos += avl_cnt(node->right) + 1;
            }
            node = parent;
        }
    }
    return node;
}

该算法通过最低公共祖先找到通向目标节点的路径。它最多向上遍历一次,最多向下遍历一次。所以,在最坏情况下,时间复杂度为O(log N)。

绝对排名

avl_offset()使用排名差进行偏移,我们也可以使用排名差来找到节点的绝对排名,这实现了ZRank命令。

// 返回节点在排序顺序中的排名
int64_t avl_rank(AVLNode *node);

这留给读者作为练习。你还可以实现ZCount命令,它可以在不遍历的情况下计算范围内(分数,名称)对的数量。

顺序统计树

排名差的概念适用于任何类似树的数据结构,包括真实Redis中使用的跳表。在教科书中,这被称为顺序统计树。

如果应用于B+树,数据库也可以在O(log N)的时间内进行偏移或计数操作。不幸的是,数据库在数据结构方面非常保守。

一个应用是文本编辑器中使用的Rope数据结构。它是一种存储小字符串片段的树。它的使用方式类似于字节数组,但它可以在O(log N)的时间内进行索引和在任意位置插入操作。

11.6 我们学到了什么

  1. 1. 使用侵入式数据结构的多索引数据
  2. 2. 顺序统计树:一种快速偏移树节点的方法
    源代码:
阅读剩余
THE END