基于生存时间(TTL)的缓存过期机制

13.1 生存时间

缓存过期方法

在许多应用程序中,数据库访问并不是均匀的;很大一部分请求会反复访问一小部分数据。这一小部分热数据可以从缓存中获取,以提高延迟和吞吐量。Redis主要被用作缓存。
源代码在文章末尾
热数据会随着时间而变化。为了维护一个包含热数据的小型缓存,你需要不断地移除数据,以便为新数据腾出空间。Redis提供了两种机制:

  1. 1. 设置最大内存限制,当达到限制时随机淘汰键。
  2. 2. 为键设置过期时间(TTL),使用定时器删除过期的键。

我们将实现TTL(生存时间)机制,这需要扩展定时器代码。

定时器快速回顾

正如上一章所解释的,我们需要一种数据结构,它能够反复找到并移除最近的时间戳,也就是说,按排序顺序生成时间戳。

当超时值固定时,这种数据结构可以是链表。由于TTL是任意的,所以我们需要一种更通用的排序数据结构,比如AVL树,但还有一种更好的树状数据结构,叫做堆。不要把它和与之无关的栈/堆(内存分配)术语混淆。

13.2 堆数据结构

二叉搜索树与堆

到目前为止提到的所有二叉树都是二叉搜索树(BST)。它们用于从左到右对数据进行排序。对于二叉搜索树中的每个节点,其左子树中的数据在它之前排序,右子树中的数据在它之后排序。

   8

┌───┴─┐
4 ┆ 9
┌─┴─┐ ┆ ┆
2 ┆ 6 ┆ ┆
┌┴┬┴┬┴┬┴┬┴┐
│2│4│6│8│9│
└─┴─┴─┴─┴─┘

堆也是一种二叉树,但数据的排序方式不同:对于堆中的每个节点,它的左子节点和右子节点的数据都在它之后排序。

例如:有两个包含相同数据(1, 2, 3, 4, 5, 6, 9)的有效堆。

  1                   1

┌───┴───┐ ┌───┴───┐
4 2 2 5
┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
5 6 3 9 4 3 6 9

两个子树的范围可以重叠。所以,与二叉搜索树不同,不能简单地通过遍历树来生成一个完全排序的序列。

最小堆和最大堆

虽然堆并不表示已排序的序列,但它确实会跟踪最小值;根节点包含最小值。这被称为最小堆。顺序相反的堆被称为最大堆。

我们将在TTL定时器中使用最小堆。查找最小值(最近的定时器)的时间复杂度为O(1)。移除或更新一个节点的时间复杂度为O(log N),后面会解释。

AVL树可以完成堆能做的所有事情,通过找到最左边的节点,它可以在O(log N)的时间内找到最小值,这个节点也可以被缓存,以便在O(1)的时间内访问。但为什么使用堆而不是二叉搜索树呢?因为堆更简单,占用的空间更少,并且性能更好。

数组编码的二叉树

二叉树是一些通过指针链接的动态分配节点。然而,一些二叉树可以使用数组在不使用指针的情况下表示:

                    ┌─┐
  1                 │1│

┌───┴───┐ ├─┼─┐ ┌─┐┌─┬─┐┌─┬─┬─┬─┐
4 2 ────► │4│2│ ────► │1││4│2││5│6│3│9│ more...
┌─┴─┐ ┌─┴─┐ ├─┼─┼─┬─┐ └─┘└─┴─┘└─┴─┴─┴─┘
5 6 3 9 │5│6│3│9│ index 0 1 2 3 4 5 6
... more ... └─┴─┴─┴─┘ level 0 1 1 2 2 2 2
... more ...

在这种表示中,节点按层平铺到数组中。这要求每一层都完全填满;除了最后一层,不允许有空的子树。

每一层的节点数量恰好是2的幂次序列:1, 2, 4, 8,等等。子节点的索引可以根据父节点的索引计算得出。

static size_t heap_left(size_t i)   { return i * 2 + 1; }
static size_t heap_right(size_t i)  { return i * 2 + 2; }

例如,节点4的索引是1,它的两个子节点的索引分别是3和4。

我们也可以根据子节点的索引计算出父节点的索引。

static size_t heap_parent(size_t i) { return (i + 1) / 2 - 1; }

数组编码的堆

堆数据结构是用数组编码的。它是一种具有两个不变性的二叉树:

  1. 1. 一个节点的值小于它的两个子节点的值。
  2. 2. 除了最后一层,每一层都完全填满。

堆是解决最小/最大值问题的首选数据结构。用数组编码意味着无需动态分配,所以插入和删除操作更快。

将堆中的值更新为更小的值

更改一个节点的值可能会破坏第一个不变性,即一个节点的值小于它的子节点的值。通过移动节点可以恢复这个不变性。假设一个节点的值被更改得小于它的父节点的值,这可以通过将这个“错误”的节点与父节点交换来修复。

5 5 3
┌─┴─┐ ───► ┌─┴─┐ ───► ┌─┴─┐
6 8* 6 3 (bad) 6 5

在那之后,父节点的值也会变得更小,这可能也会破坏不变性,需要与祖父节点再进行一次交换。

struct HeapItem { uint64_t val; };

// 当一个节点的值变得大于它的父节点的值时
static void heap_up(HeapItem *a, size_t pos) {
    HeapItem t = a[pos];
    while (pos > 0 && a[heap_parent(pos)].val > t.val) {
        // 与父节点交换
        a[pos] = a[heap_parent(pos)];
        pos = heap_parent(pos);
    }
    a[pos] = t;
}

将堆中的值更新为更大的值

同样,如果一个值被更新为更大的值,可能需要与它的一个子节点交换,以恢复不变性。

2* 5 (bad) 3
┌─┴─┐ ───► ┌─┴─┐ ───► ┌─┴─┐
6 3 6 3 6 5

一个节点可能有两个子节点,我们必须与最小的那个子节点交换。

static void heap_down(HeapItem *a, size_t pos, size_t len) {
    HeapItem t = a[pos];
    while (true) {
        // 在父节点及其子节点中找到最小的那个
        size_t l = heap_left(pos);
        size_t r = heap_right(pos);
        size_t min_pos = pos;
        uint64_t min_val = t.val;
        if (l < len && a[l].val < min_val) {
            min_pos = l;
            min_val = a[l].val;
        }
        if (r < len && a[r].val < min_val) {
            min_pos = r;
        }
        if (min_pos == pos) {
            break;
        }
        // 与子节点交换
        a[pos] = a[min_pos];
        pos = min_pos;
    }
    a[pos] = t;
}

更新堆中的值

更新堆中的值会导致它向上或向下移动,这取决于值是增加还是减少。时间复杂度为O(log N),因为堆是高度平衡的树。

void heap_update(HeapItem *a, size_t pos, size_t len) {
    if (pos > 0 && a[heap_parent(pos)].val > a[pos].val) {
        heap_up(a, pos);
    } else {
        heap_down(a, pos, len);
    }
}

插入堆项

树的最后一层在数组的末尾,所以最后一层允许不完整。例如:

                    ┌─┐
  1                 │1│

┌───┴───┐ ├─┼─┐ ┌─┐┌─┬─┐┌─┬─┬─┬─┐
4 2 ────► │4│2│ ────► │1││4│2││5│ │ │ │
┌─┴─┐ ┌─┴─┐ ├─┼─┼─┬─┐ └─┘└─┴─┘└─┴─┴─┴─┘
5 nil nil nil │5│ │ │ │
└─┴─┴─┴─┘

新的项会插入到最后一层,如果最后一层已满,树会增加一个新的层,且新层只有一个项,这就是新的最后一层。这不会破坏第二个不变性,即除了最后一层,每一层都完全填满。

新的项会附加到数组中,然后通过heap_update()函数修复第一个不变性。

std::vector<HeapItem> a;
a.push_back(...);
heap_update(a.data(), a.size() - 1, a.size());

删除堆项

从动态数组中删除一个项有两种方法。一种简单的方法是将它后面的项向前移动一位。这是O(N)的时间复杂度,并且会破坏堆数据结构。

另一种方法是将它与最后一个项交换,然后删除最后一个项。这是O(1)的时间复杂度,并且与堆数据结构兼容。你只需要对交换后的项调用heap_update()函数。

static void heap_delete(std::vector<HeapItem> &a, size_t pos) {
    // 将被删除的项与最后一个项交换
    a[pos] = a.back();
    a.pop_back();
    // 更新交换后的项
    if (pos < a.size()) {
        heap_update(a.data(), pos, a.size());
    }
}

13.3 实现TTL

将数据与堆项关联

一个堆项还包含对键的引用,这样它就可以被定时器删除。

struct HeapItem {
    uint64_t val;       // 堆值,即过期时间
    struct Entry *ref;  // 与该值关联的额外数据
};

带有TTL的键也可以通过del命令删除。为了移除相关的堆项,键也应该通过其在数组中的索引来引用堆项。

struct Entry {
    struct HNode node;      // 哈希表节点
    std::string key;
    // 用于TTL
    size_t heap_idx = -1;   // 堆项在数组中的索引
    // ...
};

堆数据结构代码在移动项时必须更新Entry::heap_idx。但是根据我们使用侵入式数据结构的经验,我们可以通过让HeapItem::ref指向嵌入的数组索引来使代码更通用。

struct HeapItem {
    uint64_t val;       // 堆值,即过期时间
    size_t *ref;        // 指向`Entry::heap_idx`
};

这样,数据结构代码就可以更新嵌入的数组索引,而根本不需要了解struct Entry

static void heap_up(HeapItem *a, size_t pos) {
    HeapItem t = a[pos];
    while (pos > 0 && a[heap_parent(pos)].val > t.val) {
        // 与父节点交换
        a[pos] = a[heap_parent(pos)];
        *a[pos].ref = pos;              // 新增!
        pos = heap_parent(pos);
    }
    a[pos] = t;
    *a[pos].ref = pos;                  // 新增!
}
// 注意:heap_down()函数也被修改了。

在堆中添加、更新和移除时间戳

为了简便起见,我使用负的TTL值表示移除TTL。

// 设置或移除TTL
static void entry_set_ttl(Entry *ent, int64_t ttl_ms) {
    if (ttl_ms < 0 && ent->heap_idx != (size_t)-1) {
        // 设置负的TTL意味着移除TTL
        heap_delete(g_data.heap, ent->heap_idx);
        ent->heap_idx = -1;
    } else if (ttl_ms >= 0) {
        // 添加或更新堆数据结构
        uint64_t expire_at = get_monotonic_msec() + (uint64_t)ttl_ms;
        HeapItem item = {expire_at, &ent->heap_idx};
        heap_upsert(g_data.heap, ent->heap_idx, item);
    }
}

如果没有设置TTL,Entry::heap_idx的值为−1。它用于更新或删除堆项。

// 更新或追加
static void heap_upsert(std::vector<HeapItem> &a, size_t pos, HeapItem t) {
    if (pos < a.size()) {
        a[pos] = t;         // 更新已有的项
    } else {
        pos = a.size();
        a.push_back(t);     // 或者添加一个新项
    }
    heap_update(a.data(), pos, a.size());
}

找到最近的定时器

next_timer_ms()函数返回最近的定时器的超时值。我们可以保留上一章中的链表定时器,并使用较小的时间戳。

static uint32_t next_timer_ms() {
    uint64_t now_ms = get_monotonic_msec();
    uint64_t next_ms = (uint64_t)-1;    // 最大值
    // 使用链表的空闲定时器
    if (!dlist_empty(&g_data.idle_list)) {
        Conn *conn = container_of(g_data.idle_list.next, Conn, idle_node);
        next_ms = conn->last_active_ms + k_idle_timeout_ms;
    }
    // 使用堆的TTL定时器
    if (!g_data.heap.empty() && g_data.heap[0].val < next_ms) {
        next_ms = g_data.heap[0].val;
    }
    // 超时值
    if (next_ms == (uint64_t)-1) {
        return -1;  // 没有定时器,不设置超时
    }
    if (next_ms <= now_ms) {
        return 0;   // 超时了吗?
    }
    return (int32_t)(next_ms - now_ms);
}

处理过期

只需在process_timers()函数中处理基于堆的定时器。

static void process_timers() {
    uint64_t now_ms = get_monotonic_msec();
    // 使用链表的空闲定时器
    // ...
    // 使用堆的TTL定时器
    const std::vector<HeapItem> &heap = g_data.heap;
    while (!heap.empty() && heap[0].val < now_ms) {
        Entry *ent = container_of(heap[0].ref, Entry, heap_idx);
        hm_delete(&g_data.db, &ent->node, &hnode_same);
        entry_del(ent); // 删除键
    }
}

在删除键时,也应该从堆中移除TTL。

static void entry_del(Entry *ent) {
    // ...
    entry_set_ttl(ent, -1); // 从堆数据结构中移除
    delete ent;
}

限制每次循环迭代的工作量

基于事件的并发意味着我们不能让事件循环停止太长时间,所以我们应该注意潜在的高延迟任务。如果许多键被设置为同时过期,服务器将在一段时间内忙于删除它们。所以我们为定时器处理设置一个限制。

    const size_t k_max_works = 2000;
    size_t nworks = 0;
    while (!heap.empty() && heap[0].val < now_ms && nworks++ < k_max_works) {
        Entry *ent = container_of(heap[0].ref, Entry, heap_idx);
        hm_delete(&g_data.db,&ent->node, &hnode_same);
        entry_del(ent); // 删除键
    }

如果超过了限制,程序将进入下一次事件循环迭代,超时值设为0,这会导致poll()函数不再等待。poll()函数仍然会通知任何IO就绪事件,所以程序可以在处理大量过期定时器的同时穿插进行IO处理。

添加TTL命令

EXPIRE用于设置TTL,TTL返回TTL值,PERSIST用于移除TTL。

static void do_expire(std::vector<std::string> &cmd, Buffer &out) {
    // 解析参数
    int64_t ttl_ms = 0;
    if (!str2int(cmd[2], ttl_ms)) {
        return out_err(out, ERR_BAD_ARG, "expect int64");
    }
    // 查找键
    LookupKey key;
    key.key.swap(cmd[1]);
    key.node.hcode = str_hash((uint8_t *)key.key.data(), key.key.size());
    HNode *node = hm_lookup(&g_data.db, &key.node, &entry_eq);
    // 设置TTL
    if (node) {
        Entry *ent = container_of(node, Entry, node);
        entry_set_ttl(ent, ttl_ms);
    }
    return out_int(out, node? 10);
}

源代码:

阅读剩余
THE END