基于生存时间(TTL)的缓存过期机制
13.1 生存时间
缓存过期方法
在许多应用程序中,数据库访问并不是均匀的;很大一部分请求会反复访问一小部分数据。这一小部分热数据可以从缓存中获取,以提高延迟和吞吐量。Redis主要被用作缓存。
源代码在文章末尾
热数据会随着时间而变化。为了维护一个包含热数据的小型缓存,你需要不断地移除数据,以便为新数据腾出空间。Redis提供了两种机制:
- 1. 设置最大内存限制,当达到限制时随机淘汰键。
- 2. 为键设置过期时间(TTL),使用定时器删除过期的键。
我们将实现TTL(生存时间)机制,这需要扩展定时器代码。
定时器快速回顾
正如上一章所解释的,我们需要一种数据结构,它能够反复找到并移除最近的时间戳,也就是说,按排序顺序生成时间戳。
当超时值固定时,这种数据结构可以是链表。由于TTL是任意的,所以我们需要一种更通用的排序数据结构,比如AVL树,但还有一种更好的树状数据结构,叫做堆。不要把它和与之无关的栈/堆(内存分配)术语混淆。
13.2 堆数据结构
二叉搜索树与堆
到目前为止提到的所有二叉树都是二叉搜索树(BST)。它们用于从左到右对数据进行排序。对于二叉搜索树中的每个节点,其左子树中的数据在它之前排序,右子树中的数据在它之后排序。
8
┌───┴─┐
4 ┆ 9
┌─┴─┐ ┆ ┆
2 ┆ 6 ┆ ┆
┌┴┬┴┬┴┬┴┬┴┐
│2│4│6│8│9│
└─┴─┴─┴─┴─┘
堆也是一种二叉树,但数据的排序方式不同:对于堆中的每个节点,它的左子节点和右子节点的数据都在它之后排序。
例如:有两个包含相同数据(1, 2, 3, 4, 5, 6, 9)的有效堆。
1 1
┌───┴───┐ ┌───┴───┐
4 2 2 5
┌─┴─┐ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐
5 6 3 9 4 3 6 9
两个子树的范围可以重叠。所以,与二叉搜索树不同,不能简单地通过遍历树来生成一个完全排序的序列。
最小堆和最大堆
虽然堆并不表示已排序的序列,但它确实会跟踪最小值;根节点包含最小值。这被称为最小堆。顺序相反的堆被称为最大堆。
我们将在TTL定时器中使用最小堆。查找最小值(最近的定时器)的时间复杂度为O(1)。移除或更新一个节点的时间复杂度为O(log N),后面会解释。
AVL树可以完成堆能做的所有事情,通过找到最左边的节点,它可以在O(log N)的时间内找到最小值,这个节点也可以被缓存,以便在O(1)的时间内访问。但为什么使用堆而不是二叉搜索树呢?因为堆更简单,占用的空间更少,并且性能更好。
数组编码的二叉树
二叉树是一些通过指针链接的动态分配节点。然而,一些二叉树可以使用数组在不使用指针的情况下表示:
┌─┐
1 │1│
┌───┴───┐ ├─┼─┐ ┌─┐┌─┬─┐┌─┬─┬─┬─┐
4 2 ────► │4│2│ ────► │1││4│2││5│6│3│9│ more...
┌─┴─┐ ┌─┴─┐ ├─┼─┼─┬─┐ └─┘└─┴─┘└─┴─┴─┴─┘
5 6 3 9 │5│6│3│9│ index 0 1 2 3 4 5 6
... more ... └─┴─┴─┴─┘ level 0 1 1 2 2 2 2
... more ...
在这种表示中,节点按层平铺到数组中。这要求每一层都完全填满;除了最后一层,不允许有空的子树。
每一层的节点数量恰好是2的幂次序列:1, 2, 4, 8,等等。子节点的索引可以根据父节点的索引计算得出。
static size_t heap_left(size_t i) { return i * 2 + 1; }
static size_t heap_right(size_t i) { return i * 2 + 2; }
例如,节点4的索引是1,它的两个子节点的索引分别是3和4。
我们也可以根据子节点的索引计算出父节点的索引。
static size_t heap_parent(size_t i) { return (i + 1) / 2 - 1; }
数组编码的堆
堆数据结构是用数组编码的。它是一种具有两个不变性的二叉树:
- 1. 一个节点的值小于它的两个子节点的值。
- 2. 除了最后一层,每一层都完全填满。
堆是解决最小/最大值问题的首选数据结构。用数组编码意味着无需动态分配,所以插入和删除操作更快。
将堆中的值更新为更小的值
更改一个节点的值可能会破坏第一个不变性,即一个节点的值小于它的子节点的值。通过移动节点可以恢复这个不变性。假设一个节点的值被更改得小于它的父节点的值,这可以通过将这个“错误”的节点与父节点交换来修复。
5 5 3
┌─┴─┐ ───► ┌─┴─┐ ───► ┌─┴─┐
6 8* 6 3 (bad) 6 5
在那之后,父节点的值也会变得更小,这可能也会破坏不变性,需要与祖父节点再进行一次交换。
struct HeapItem { uint64_t val; };
// 当一个节点的值变得大于它的父节点的值时
static void heap_up(HeapItem *a, size_t pos) {
HeapItem t = a[pos];
while (pos > 0 && a[heap_parent(pos)].val > t.val) {
// 与父节点交换
a[pos] = a[heap_parent(pos)];
pos = heap_parent(pos);
}
a[pos] = t;
}
将堆中的值更新为更大的值
同样,如果一个值被更新为更大的值,可能需要与它的一个子节点交换,以恢复不变性。
2* 5 (bad) 3
┌─┴─┐ ───► ┌─┴─┐ ───► ┌─┴─┐
6 3 6 3 6 5
一个节点可能有两个子节点,我们必须与最小的那个子节点交换。
static void heap_down(HeapItem *a, size_t pos, size_t len) {
HeapItem t = a[pos];
while (true) {
// 在父节点及其子节点中找到最小的那个
size_t l = heap_left(pos);
size_t r = heap_right(pos);
size_t min_pos = pos;
uint64_t min_val = t.val;
if (l < len && a[l].val < min_val) {
min_pos = l;
min_val = a[l].val;
}
if (r < len && a[r].val < min_val) {
min_pos = r;
}
if (min_pos == pos) {
break;
}
// 与子节点交换
a[pos] = a[min_pos];
pos = min_pos;
}
a[pos] = t;
}
更新堆中的值
更新堆中的值会导致它向上或向下移动,这取决于值是增加还是减少。时间复杂度为O(log N),因为堆是高度平衡的树。
void heap_update(HeapItem *a, size_t pos, size_t len) {
if (pos > 0 && a[heap_parent(pos)].val > a[pos].val) {
heap_up(a, pos);
} else {
heap_down(a, pos, len);
}
}
插入堆项
树的最后一层在数组的末尾,所以最后一层允许不完整。例如:
┌─┐
1 │1│
┌───┴───┐ ├─┼─┐ ┌─┐┌─┬─┐┌─┬─┬─┬─┐
4 2 ────► │4│2│ ────► │1││4│2││5│ │ │ │
┌─┴─┐ ┌─┴─┐ ├─┼─┼─┬─┐ └─┘└─┴─┘└─┴─┴─┴─┘
5 nil nil nil │5│ │ │ │
└─┴─┴─┴─┘
新的项会插入到最后一层,如果最后一层已满,树会增加一个新的层,且新层只有一个项,这就是新的最后一层。这不会破坏第二个不变性,即除了最后一层,每一层都完全填满。
新的项会附加到数组中,然后通过heap_update()
函数修复第一个不变性。
std::vector<HeapItem> a;
a.push_back(...);
heap_update(a.data(), a.size() - 1, a.size());
删除堆项
从动态数组中删除一个项有两种方法。一种简单的方法是将它后面的项向前移动一位。这是O(N)的时间复杂度,并且会破坏堆数据结构。
另一种方法是将它与最后一个项交换,然后删除最后一个项。这是O(1)的时间复杂度,并且与堆数据结构兼容。你只需要对交换后的项调用heap_update()
函数。
static void heap_delete(std::vector<HeapItem> &a, size_t pos) {
// 将被删除的项与最后一个项交换
a[pos] = a.back();
a.pop_back();
// 更新交换后的项
if (pos < a.size()) {
heap_update(a.data(), pos, a.size());
}
}
13.3 实现TTL
将数据与堆项关联
一个堆项还包含对键的引用,这样它就可以被定时器删除。
struct HeapItem {
uint64_t val; // 堆值,即过期时间
struct Entry *ref; // 与该值关联的额外数据
};
带有TTL的键也可以通过del
命令删除。为了移除相关的堆项,键也应该通过其在数组中的索引来引用堆项。
struct Entry {
struct HNode node; // 哈希表节点
std::string key;
// 用于TTL
size_t heap_idx = -1; // 堆项在数组中的索引
// ...
};
堆数据结构代码在移动项时必须更新Entry::heap_idx
。但是根据我们使用侵入式数据结构的经验,我们可以通过让HeapItem::ref
指向嵌入的数组索引来使代码更通用。
struct HeapItem {
uint64_t val; // 堆值,即过期时间
size_t *ref; // 指向`Entry::heap_idx`
};
这样,数据结构代码就可以更新嵌入的数组索引,而根本不需要了解struct Entry
。
static void heap_up(HeapItem *a, size_t pos) {
HeapItem t = a[pos];
while (pos > 0 && a[heap_parent(pos)].val > t.val) {
// 与父节点交换
a[pos] = a[heap_parent(pos)];
*a[pos].ref = pos; // 新增!
pos = heap_parent(pos);
}
a[pos] = t;
*a[pos].ref = pos; // 新增!
}
// 注意:heap_down()函数也被修改了。
在堆中添加、更新和移除时间戳
为了简便起见,我使用负的TTL值表示移除TTL。
// 设置或移除TTL
static void entry_set_ttl(Entry *ent, int64_t ttl_ms) {
if (ttl_ms < 0 && ent->heap_idx != (size_t)-1) {
// 设置负的TTL意味着移除TTL
heap_delete(g_data.heap, ent->heap_idx);
ent->heap_idx = -1;
} else if (ttl_ms >= 0) {
// 添加或更新堆数据结构
uint64_t expire_at = get_monotonic_msec() + (uint64_t)ttl_ms;
HeapItem item = {expire_at, &ent->heap_idx};
heap_upsert(g_data.heap, ent->heap_idx, item);
}
}
如果没有设置TTL,Entry::heap_idx
的值为−1。它用于更新或删除堆项。
// 更新或追加
static void heap_upsert(std::vector<HeapItem> &a, size_t pos, HeapItem t) {
if (pos < a.size()) {
a[pos] = t; // 更新已有的项
} else {
pos = a.size();
a.push_back(t); // 或者添加一个新项
}
heap_update(a.data(), pos, a.size());
}
找到最近的定时器
next_timer_ms()
函数返回最近的定时器的超时值。我们可以保留上一章中的链表定时器,并使用较小的时间戳。
static uint32_t next_timer_ms() {
uint64_t now_ms = get_monotonic_msec();
uint64_t next_ms = (uint64_t)-1; // 最大值
// 使用链表的空闲定时器
if (!dlist_empty(&g_data.idle_list)) {
Conn *conn = container_of(g_data.idle_list.next, Conn, idle_node);
next_ms = conn->last_active_ms + k_idle_timeout_ms;
}
// 使用堆的TTL定时器
if (!g_data.heap.empty() && g_data.heap[0].val < next_ms) {
next_ms = g_data.heap[0].val;
}
// 超时值
if (next_ms == (uint64_t)-1) {
return -1; // 没有定时器,不设置超时
}
if (next_ms <= now_ms) {
return 0; // 超时了吗?
}
return (int32_t)(next_ms - now_ms);
}
处理过期
只需在process_timers()
函数中处理基于堆的定时器。
static void process_timers() {
uint64_t now_ms = get_monotonic_msec();
// 使用链表的空闲定时器
// ...
// 使用堆的TTL定时器
const std::vector<HeapItem> &heap = g_data.heap;
while (!heap.empty() && heap[0].val < now_ms) {
Entry *ent = container_of(heap[0].ref, Entry, heap_idx);
hm_delete(&g_data.db, &ent->node, &hnode_same);
entry_del(ent); // 删除键
}
}
在删除键时,也应该从堆中移除TTL。
static void entry_del(Entry *ent) {
// ...
entry_set_ttl(ent, -1); // 从堆数据结构中移除
delete ent;
}
限制每次循环迭代的工作量
基于事件的并发意味着我们不能让事件循环停止太长时间,所以我们应该注意潜在的高延迟任务。如果许多键被设置为同时过期,服务器将在一段时间内忙于删除它们。所以我们为定时器处理设置一个限制。
const size_t k_max_works = 2000;
size_t nworks = 0;
while (!heap.empty() && heap[0].val < now_ms && nworks++ < k_max_works) {
Entry *ent = container_of(heap[0].ref, Entry, heap_idx);
hm_delete(&g_data.db,&ent->node, &hnode_same);
entry_del(ent); // 删除键
}
如果超过了限制,程序将进入下一次事件循环迭代,超时值设为0,这会导致poll()
函数不再等待。poll()
函数仍然会通知任何IO就绪事件,所以程序可以在处理大量过期定时器的同时穿插进行IO处理。
添加TTL命令
EXPIRE
用于设置TTL,TTL
返回TTL值,PERSIST
用于移除TTL。
static void do_expire(std::vector<std::string> &cmd, Buffer &out) {
// 解析参数
int64_t ttl_ms = 0;
if (!str2int(cmd[2], ttl_ms)) {
return out_err(out, ERR_BAD_ARG, "expect int64");
}
// 查找键
LookupKey key;
key.key.swap(cmd[1]);
key.node.hcode = str_hash((uint8_t *)key.key.data(), key.key.size());
HNode *node = hm_lookup(&g_data.db, &key.node, &entry_eq);
// 设置TTL
if (node) {
Entry *ent = container_of(node, Entry, node);
entry_set_ttl(ent, ttl_ms);
}
return out_int(out, node? 1: 0);
}
源代码: