事件循环(第 2 部分)
6.5 事件循环基础
我们在上一章学到的内容:
- • 基于回调的编程:对事件做出反应,而不是阻塞线程。
- • 使用非阻塞IO处理输入:将数据保存在缓冲区中,直到有足够的数据继续处理。
- • 事件循环和应用程序代码之间的界限。一个最小化的事件循环仍然不简单,要将通用的事件处理与应用程序逻辑分开。
- • 在循环迭代之间保存应用程序状态。对于请求-响应协议,在两种状态(读取和写入)之间交替。
这是你需要了解的最基本知识。然后还有更多高级主题,以弥简单示例代码与生产环境软件之间的差距。
源代码在文章末尾
6.6 流水线请求
在不改变协议的情况下批量处理请求
一个典型的Redis服务器对于像获取(get)、设置(set)、删除(del)这样的简单操作,每秒可以处理2万到20万个请求。许多其他请求-响应应用程序也在相同的处理能力范围内。如果应用程序逻辑(处理请求)的开销较小,那么这会受到单个线程能够处理的IO事件数量的限制。
服务器受到单线程IO的瓶颈限制,因此希望通过每次读取时批量处理多个请求来增加每个IO操作的工作量。
这可以通过流水线请求来实现。通常情况下,客户端发送1个请求,然后等待1个响应;而使用流水线的客户端会发送n个请求,然后等待n个响应。服务器仍然会按顺序处理每个请求,唯一的区别是它可以在一次读取中获取多个请求,从而有效地减少了IO操作的次数。
客户端(逐个发送) 客户端(流水线方式)
────────────────────────────────────────►
╲ ↗ ╲ ↗ ╲ ╲↗ ↗
╲╱ ╲╱ ╲╱╲╱
────────────────────────────────────────►
服务器
请求流水线还可以减少客户端的延迟,因为它可以在一个往返时间(RTT)内获取多个响应。这是Redis的一个有记录的使用场景。
流水线请求引发的问题
流水线并不会改变协议或服务器端的处理方式;请求按顺序发送,服务器按顺序处理它们,然后按顺序发送响应。那么它应该可以正常工作吧?事情并非如此简单。
请记住,TCP只是一个有序的字节流;服务器一次简单地从其中读取消息并不能区分差异。但是实现过程中常常会做出额外的假设。例如:
static void handle_read(Conn *conn) {
// ...
try_one_request(conn) // 假设:最多只有1个请求
if (conn->outgoing.size() > 0) { // 1. 处理了1个请求。
conn->want_read = false;
conn->want_write = true;
}
}
static void handle_write(Conn *conn) {
// ...
if (conn->outgoing.size() == 0) { // 2. 写入了1个响应。
conn->want_read = true; // 3. 等待更多数据。
conn->want_write = false;
}
}
handle_read()
假设输入缓冲区中最多只有1个请求。在它处理完1个请求后,逻辑继续执行 handle_write()
,并且在完成后,handle_write()
会让事件循环等待更多数据,即使输入缓冲区中仍然有未处理的数据。所以流水线就卡住了!
将输入视为字节流
为了解决这个问题,需要放弃对输入缓冲区的额外假设,即不断从缓冲区中读取数据,直到没有数据可处理为止。
static void handle_read(Conn *conn) {
// ...
// try_one_request(conn); // 错误
while (try_one_request(conn)) {} // 正确
// ...
}
并且在我们处理完1个请求后,不能直接清空输入缓冲区,因为可能还有更多消息。
static bool try_one_request(Conn *conn) {
// ...
// conn->incoming.clear() // 错误
buf_consume(conn->incoming, 4 + len); // 正确
return true;
}
总之,在读取字节流时,你可以等待n个字节的到来,但不要假设最多只有n个字节已经到来,因为字节流只与顺序有关,而与时间无关。
使用流水线消息测试正确性
浏览器曾尝试使用流水线请求,但失败了,因为许多服务器无法处理它们。
TCP字节流是网络编程的一个主要障碍。如果服务器不能正确解释字节流,就无法处理流水线请求。因此,使用流水线消息测试实现可以发现错误。例如,如果服务器根据 read()
返回的字节数做出决策,那么在处理流水线消息时就会失败。
6.7 调试与测试
字节流处理
协议与第04章中的相同。但还有更多内容需要测试。其中之一是流水线测试:
std::vector<std::string> query_list = {
"hello1", "hello2", "hello3",
};
for (const std::string &s : query_list) {
int32_t err = send_req(fd, s.data(), s.size());
if (err) { /* ... */ }
}
for (size_t i = 0; i < query_list.size(); ++i) {
int32_t err = read_res(fd);
if (err) { /* ... */ }
}
服务器很可能在一次读取中接收到多个消息,这会测试解析器。
跨越多个循环迭代的任务
测试一条大消息需要多个迭代才能处理的情况。所以我们将消息大小限制提高到一个更大的数值:
const size_t k_max_msg = 32 << 20; // 很可能比内核缓冲区大
然后在流水线中包含一个大请求。
std::vector<std::string> query_list = {
"hello1", "hello2", "hello3",
std::string(k_max_msg, 'z'), // 需要多个事件循环迭代
"hello5",
};
使用 strace
进行调试
使用 strace
命令来验证我们正在测试预期的内容。
$ strace ./server >/dev/null
strace
会显示所有的系统调用:
创建监听套接字:
socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 3
setsockopt(3, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
bind(3, {sa_family=AF_INET, sin_port=htons(1234), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
fcntl(3, F_GETFL) = 0x2 (flags O_RDWR)
fcntl(3, F_SETFL, O_RDWR|O_NONBLOCK) = 0
listen(3, 4096) = 0
进入事件循环,然后唤醒以接受客户端连接:
poll([{fd=3, events=POLLIN}], 1, -1) = 1 ([{fd=3, revents=POLLIN}])
accept(3, {sa_family=AF_INET, sin_port=htons(52184), sin_addr=inet_addr("127.0.0.1")}, [16]) = 4
fcntl(4, F_GETFL) = 0x2 (flags O_RDWR)
fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0
唤醒以从客户端读取数据(注意流水线请求):
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
read(4, "\6\0\0\0hello1\6\0\0\0hello2\6\0\0\0hello3", 65536) = 30
唤醒以一次性写入3个响应(流水线处理正常工作):
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLOUT}])
write(4, "\6\0\0\0hello1\6\0\0\0hello2\6\0\0\0hello3", 30) = 30
多次唤醒以读取一个大请求:
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
read(4, "\0\0\0\2zzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 65536) = 65536
brk(0x559c87e41000) = 0x559c87e41000
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
read(4, "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 65536) = 65536
... 省略...
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
read(4, "zzzz\6\0\0\0hello5", 65536) = 14
多次唤醒以写入大响应:
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLOUT}])
write(4, "\0\0\0\2zzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 33554446) = 2621440
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLOUT}])
write(4, "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 30933006) = 3175899
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLOUT}])
write(4, "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 27757107) = 27757107
客户端关闭连接,然后服务器也关闭:
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
read(4, "", 65536) = 0
close(4) = 0
6.8 乐观的非阻塞写入
在请求-响应协议中,客户端在发送下一个请求之前会读取响应。服务器可以假设,当它接收到一个请求时,套接字已经准备好写入,因为客户端已经处理了之前写入的数据。所以服务器可以直接写入,而无需等待下一次循环迭代,从而节省一次系统调用。
static void handle_read(Conn *conn) {
// ...
if (conn->outgoing.size() > 0) { // 有响应
conn->want_read = false;
conn->want_write = true;
// 在请求-响应协议中,套接字很可能已准备好写入,
// 尝试在不等待下一次迭代的情况下写入。
return handle_write(conn); // 优化
}
}
对于使用流水线的客户端,这种假设是不成立的。在服务器写入数据时,客户端可能还没有开始读取。所以服务器端的写入缓冲区可能会已满。为了使这种优化有效,handle_write()
必须检查 EAGAIN
,以防套接字尚未准备好。
static void handle_write(Conn *conn) {
assert(conn->outgoing.size() > 0);
ssize_t rv = write(conn->fd, &conn->outgoing[0], conn->outgoing.size());
if (rv < 0 && errno == EAGAIN) {
return; // 实际上尚未准备好
}
if (rv < 0) {
conn->want_close = true;
return; // 错误
}
// ...
}
使用 strace
验证这种优化。之前,每次写入前都要调用 poll()
:
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
read(4, "\6\0\0\0hello1", 65536) = 10
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLOUT}])
write(4, "\6\0\0\0hello1", 10) = 10
之后,在同一循环迭代中进行 read()
和 write()
:
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
read(4, "\6\0\0\0hello1", 65536) = 10
write(4, "\6\0\0\0hello1", 10) = 10
6.9 更好的缓冲区处理
我们将 std::vector
用作先进先出(FIFO)缓冲区,因为我们从尾部追加数据,从头部移除数据。
// 追加到末尾
static void
buf_append(std::vector<uint8_t> &buf, const uint8_t *data, size_t len) {
buf.insert(buf.end(), data, data + len);
}
// 从开头移除
static void buf_consume(std::vector<uint8_t> &buf, size_t n) {
buf.erase(buf.begin(), buf.begin() + n);
}
对于动态数组,向尾部追加数据是高效的,因为它不会每次都重新分配内存;它的容量会以指数方式增长,以分摊重新分配的成本。但是从头部移除数据则效率不高,因为每次都必须移动其余的数据。如果缓冲区包含许多流水线请求,逐个从头部移除数据的时间复杂度为 O(N²)。
所以我们需要一种可以在两端进行操作的缓冲区类型:
┌────────────┬────────────┬────────────┐
│ unused │ data │ unused │
└────────────┴────────────┴────────────┘
⇧ ⇧ ⇧ ⇧
buffer_begin data_begin data_end buffer_end
struct Buffer {
uint8_t *buffer_begin;
uint8_t *buffer_end;
uint8_t *data_begin;
uint8_t *data_end;
};
从头部移除数据只需要移动指针;无需移动数据。
static void buf_consume(struct Buffer *buf, size_t n) {
buf->data_begin += n;
}
现在向尾部追加数据有两种选择:
- 1. 像动态数组一样重新分配内存。
- 2. 或者将数据移动到前面以腾出空间。
这留给读者作为练习。