定时器与超时
12.1 定时器的需求
在Redis服务器中,定时器有许多用途:
- 1. 生存时间(TTL)和缓存过期。
- 2. 网络IO超时。
- 3. 关闭空闲连接。
缓存过期
TTL常用于缓存,原因如下:
- 1. 内存是有限的。TTL是一种缓存淘汰方法。
- 2. 一些惰性系统不会将数据库与缓存进行同步,而是依赖缓存过期来保持数据的新鲜度。
源代码在文章末尾
网络IO超时
网络超时是实际软件中一个主要的缺失部分。我们需要处理远程对等端无法访问或干脆消失的情况。
假设TCP的保活机制正常工作,TCP连接最终会超时并被操作系统关闭。但这可能需要很长时间,以分钟或小时为单位来衡量,而应用程序使用的超时时间是以秒或毫秒为单位来衡量的。所以对于网络应用程序来说,超时是一个需要明确考虑的因素。
空闲超时
在读取请求的过程中,合理的超时值是以秒或毫秒为单位来衡量的,因为通常情况下只是几次TCP重传。与其长时间卡住,不如报告错误。
然而,在等待新请求时,超时值应该更大一些,因为长连接是预期的使用场景。但即便如此,它也不是无限的。
12.2 系统调用中的超时
阻塞式IO
对于常规文件,在Linux系统上是不可能设置超时的。
对于阻塞式套接字,超时是通过setsockopt()
函数以及SO_RCVTIMEO
和SO_SNDTIMEO
选项来设置的。具体细节请查看socket.7
的手册页。
一个阻塞式的read()
或write()
操作在超时后会返回EAGAIN
。
一个阻塞式的connect()
操作在超时后会返回ETIMEDOUT
。
非阻塞式IO
对于非阻塞式系统调用,超时是不相关的。在事件循环中,就绪API是唯一的阻塞式系统调用,所以它是唯一可以设置超时的系统调用。
int poll(struct pollfd *fds, nfds_t nfds, int timeout); // 以毫秒为单位
超时值为-1
表示不设置超时。
睡眠
在阻塞式代码中,你可以通过睡眠来等待未来的某个时间点。但在事件循环中这是不可能的。不过,poll()
函数可以因超时而唤醒,这是一种睡眠的形式。
timerfd
timerfd_create()
函数会创建一个内核定时器,并返回一个称为timerfd
的定时器句柄。到期的定时器可以通过poll()
函数使用该定时器句柄以及用于IO就绪的套接字句柄来进行通知。
// 返回一个定时器句柄
int timerfd_create(int clockid, int flags);
// 设置到期时间
int timerfd_settime(int fd, int flags,
const struct itimerspec *new_value, struct itimerspec *old_value);
timerfd
可以与事件循环一起使用,但这并不是非常有用,因为定时器可以在用户空间中实现,而无需任何系统调用。
12.3 定时器数据结构
等待最近的定时器
poll()
函数只有一个超时值,但我们希望使用多个定时器。一种常见的方法是定期从poll()
函数中唤醒,比如说每隔1秒,然后遍历每个定时器来检查是否到期。这种解决方案的精度有限,并且每次唤醒的时间复杂度为O(N)。
人们发明了许多专门用于定时器的数据结构。但通用的解决方案是基于排序的;让某些事情在特定时间发生意味着让它们按照特定顺序发生,而按时间对事情进行排序是一个排序问题。所以这个问题可以重新表述为:按排序顺序输出一个时间戳列表。
如果定时器是已排序的,poll()
函数只需要最近的定时器的超时值。这就是在事件循环中实现定时器的方式。
全排序数据结构
我们可以使用AVL树对定时器进行排序:
- 1. 添加、更新或删除一个定时器的时间复杂度为O(log N)。
- 2. 查找最近的定时器(最左边的节点)的时间复杂度为O(log N)。
- 3. 可以缓存最小节点,使得查找最近的定时器的时间复杂度为O(1)。
部分排序数据结构
定时器并不需要完全排序;唯一的要求是找到最近的定时器。堆数据结构可以满足这一要求。堆可以在O(1)的时间内找到最小值,并且更新操作的时间复杂度为O(log N)。它是一种基于数组的隐式树,没有子节点或父节点指针,占用的空间更少。它是定时器最常用的选择。
固定超时值和先进先出(FIFO)
套接字超时问题可以用简单得多的数据结构来解决。请注意,所有套接字的超时值是相同的,所以当添加一个定时器时,它会成为最远的定时器,这意味着如果超时值是固定的,那么定时器的顺序就是它们添加的顺序。
这就是一个先进先出(FIFO)队列。新的定时器添加到队列的末尾,最近的定时器从队列的开头移除,这可以用链表在O(1)的时间内完成。
如果你有多个超时值,例如,IO超时和空闲超时,只需使用多个链表即可。
12.4 实现定时器
获取时间
clock_gettime()
是用于获取时间的系统调用。
int clock_gettime(clockid_t clockid, struct timespec *tp);
struct timespec {
time_t tv_sec; /* 秒数 */
long tv_nsec; /* 纳秒数 [0, 999'999'999] */
};
clockid
参数是时间戳的类型。最常见的类型是墙上时间(wall time)和单调时间(monotonic time)。
#define CLOCK_REALTIME 0 // 墙上时间,即Unix时间戳
#define CLOCK_MONOTONIC 1 // 单调时间
墙上时间戳与现实世界中的时间记录有关。最常见的是Unix时间戳。它是自1970年以来的天数乘以24×3600,大致是自1970年以来的秒数。计算机中的墙上时间是可以调整的,所以两个时间戳之间的时间间隔可能不稳定,甚至可能是负数。因此,它不适合用于定时器。
单调时间是不能调整的,它只会向前移动。它的值与任何现实世界的时间记录都没有关系。它只能用于测量时间间隔。
poll()
函数内部使用CLOCK_MONOTONIC
来设置超时,所以我们也将使用它。
static uint64_t get_monotonic_msec() {
struct timespec tv = {0, 0};
clock_gettime(CLOCK_MONOTONIC, &tv);
return uint64_t(tv.tv_sec) * 1000 + tv.tv_nsec / 1000 / 1000;
}
双向链表
我们将使用链表来实现空闲连接的超时处理。每次套接字进行IO操作时,将超时时间设置为从当前时间开始的一个固定偏移量,并且将该套接字移动到链表的末尾。
链表必须是双向链表,以便节点可以从链表中间分离出来。
struct DList {
DList *prev = NULL;
DList *next = NULL;
};
inline void dlist_detach(DList *node) {
DList *prev = node->prev;
DList *next = node->next;
prev->next = next;
next->prev = prev;
}
链表节点嵌入到struct Conn
结构体中。
struct Conn {
// ...
// 定时器
uint64_t last_active_ms = 0;
DList idle_node;
};
链表头是一个虚拟节点,它不是链表的成员。它用于避免处理空链表的特殊情况。
// 全局状态
static struct {
HMap db;
// 所有客户端连接的映射,以文件描述符fd为键
std::vector<Conn *> fd2conn;
// 空闲连接的定时器
DList idle_list; // 链表头
} g_data;
虚拟节点自身相连,形成一个环:
inline void dlist_init(DList *node) {
node->prev = node->next = node;
}
一个空链表就是只有虚拟节点的链表:
inline bool dlist_empty(DList *node) {
return node->next == node;
}
由于有虚拟节点,插入操作不需要处理空链表的情况。
inline void dlist_insert_before(DList *target, DList *rookie) {
DList *prev = target->prev;
prev->next = rookie;
rookie->prev = prev;
rookie->next = target;
target->prev = rookie;
}
创建和更新定时器
新的Conn
结构体通过在虚拟节点之前插入的方式添加到链表的末尾:
// 创建一个`struct Conn`结构体
Conn *conn = new Conn();
conn->fd = connfd;
conn->want_read = true;
conn->last_active_ms = get_monotonic_msec();
dlist_insert_before(&g_data.idle_list, &conn->idle_node);
每次套接字准备好进行IO操作时,将其移动到链表的末尾:
// 处理连接套接字
for (size_t i = 1; i < poll_args.size(); ++i) {
uint32_t ready = poll_args[i].revents;
if (ready == 0) {
continue;
}
Conn *conn = g_data.fd2conn[poll_args[i].fd];
// 通过将conn移动到链表末尾来更新空闲定时器
conn->last_active_ms = get_monotonic_msec();
dlist_detach(&conn->idle_node);
dlist_insert_before(&g_data.idle_list, &conn->idle_node);
// ...
}
从最近的定时器中获取超时值
poll()
函数的超时时间是根据链表的第一个元素计算的:
static int32_t next_timer_ms() {
if (dlist_empty(&g_data.idle_list)) {
return -1; // 没有定时器,不设置超时
}
uint64_t now_ms = get_monotonic_msec();
Conn *conn = container_of(g_data.idle_list.next, Conn, idle_node);
uint64_t next_ms = conn->last_active_ms + k_idle_timeout_ms;
if (next_ms <= now_ms) {
return 0; // 超时了吗?
}
return (int32_t)(next_ms - now_ms);
}
int32_t timeout_ms = next_timer_ms();
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), timeout_ms);
// ...
process_timers();
在每次事件循环迭代时处理定时器
检查最近的定时器,看它们是否已经过期。poll()
函数可能会因为IO操作而唤醒,所以可能没有定时器到期。由于定时器是已排序的,所以在遇到第一个未过期的定时器时停止检查。
static void process_timers() {
uint64_t now_ms = get_monotonic_msec();
while (!dlist_empty(&g_data.idle_list)) {
Conn *conn = container_of(g_data.idle_list.next, Conn, idle_node);
uint64_t next_ms = conn->last_active_ms + k_idle_timeout_ms;
if (next_ms >= now_ms) {
break; // 未过期
}
fprintf(stderr, "removing idle connection: %d\n", conn->fd);
conn_destroy(conn);
}
}
超时的套接字将被销毁。记住要将它们从链表中移除。
static void conn_destroy(Conn *conn) {
(void)close(conn->fd);
g_data.fd2conn[conn->fd] = NULL;
dlist_detach(&conn->idle_node);
delete conn;
}
完成
服务器应该在超时后关闭空闲连接。可以使用socat
进行测试:
socat tcp:127.0.0.1:1234 -
练习:为read()
和write()
操作添加IO超时(使用不同的超时值)。
源代码: