事件循环(第 2 部分)

6.5 事件循环基础

我们在上一章学到的内容:

  • • 基于回调的编程:对事件做出反应,而不是阻塞线程。
  • • 使用非阻塞IO处理输入:将数据保存在缓冲区中,直到有足够的数据继续处理。
  • • 事件循环和应用程序代码之间的界限。一个最小化的事件循环仍然不简单,要将通用的事件处理与应用程序逻辑分开。
  • • 在循环迭代之间保存应用程序状态。对于请求-响应协议,在两种状态(读取和写入)之间交替。

这是你需要了解的最基本知识。然后还有更多高级主题,以弥简单示例代码与生产环境软件之间的差距。

源代码在文章末尾

6.6 流水线请求

在不改变协议的情况下批量处理请求
一个典型的Redis服务器对于像获取(get)、设置(set)、删除(del)这样的简单操作,每秒可以处理2万到20万个请求。许多其他请求-响应应用程序也在相同的处理能力范围内。如果应用程序逻辑(处理请求)的开销较小,那么这会受到单个线程能够处理的IO事件数量的限制。

服务器受到单线程IO的瓶颈限制,因此希望通过每次读取时批量处理多个请求来增加每个IO操作的工作量。

这可以通过流水线请求来实现。通常情况下,客户端发送1个请求,然后等待1个响应;而使用流水线的客户端会发送n个请求,然后等待n个响应。服务器仍然会按顺序处理每个请求,唯一的区别是它可以在一次读取中获取多个请求,从而有效地减少了IO操作的次数。

客户端(逐个发送)   客户端(流水线方式)
────────────────────────────────────────►
 ╲  ↗ ╲  ↗             ╲ ╲↗ ↗
  ╲╱   ╲╱               ╲╱╲╱
────────────────────────────────────────►
服务器

请求流水线还可以减少客户端的延迟,因为它可以在一个往返时间(RTT)内获取多个响应。这是Redis的一个有记录的使用场景。

流水线请求引发的问题
流水线并不会改变协议或服务器端的处理方式;请求按顺序发送,服务器按顺序处理它们,然后按顺序发送响应。那么它应该可以正常工作吧?事情并非如此简单。

请记住,TCP只是一个有序的字节流;服务器一次简单地从其中读取消息并不能区分差异。但是实现过程中常常会做出额外的假设。例如:

static void handle_read(Conn *conn) {
    // ...
    try_one_request(conn)               // 假设:最多只有1个请求
    if (conn->outgoing.size() > 0) {    // 1. 处理了1个请求。
        conn->want_read = false;
        conn->want_write = true;
    }
}

static void handle_write(Conn *conn) {
    // ...
    if (conn->outgoing.size() == 0) {   // 2. 写入了1个响应。
        conn->want_read = true;         // 3. 等待更多数据。
        conn->want_write = false;
    }
}

handle_read() 假设输入缓冲区中最多只有1个请求。在它处理完1个请求后,逻辑继续执行 handle_write(),并且在完成后,handle_write() 会让事件循环等待更多数据,即使输入缓冲区中仍然有未处理的数据。所以流水线就卡住了!

将输入视为字节流
为了解决这个问题,需要放弃对输入缓冲区的额外假设,即不断从缓冲区中读取数据,直到没有数据可处理为止。

static void handle_read(Conn *conn) {
    // ...
    // try_one_request(conn);               // 错误
    while (try_one_request(conn)) {}        // 正确
    // ...
}

并且在我们处理完1个请求后,不能直接清空输入缓冲区,因为可能还有更多消息。

static bool try_one_request(Conn *conn) {
    // ...
    // conn->incoming.clear()               // 错误
    buf_consume(conn->incoming, 4 + len);   // 正确
    return true;
}

总之,在读取字节流时,你可以等待n个字节的到来,但不要假设最多只有n个字节已经到来,因为字节流只与顺序有关,而与时间无关。

使用流水线消息测试正确性
浏览器曾尝试使用流水线请求,但失败了,因为许多服务器无法处理它们。

TCP字节流是网络编程的一个主要障碍。如果服务器不能正确解释字节流,就无法处理流水线请求。因此,使用流水线消息测试实现可以发现错误。例如,如果服务器根据 read() 返回的字节数做出决策,那么在处理流水线消息时就会失败。

6.7 调试与测试

字节流处理
协议与第04章中的相同。但还有更多内容需要测试。其中之一是流水线测试:

std::vector<std::string> query_list = {
    "hello1""hello2""hello3",
};
for (const std::string &s : query_list) {
    int32_t err = send_req(fd, s.data(), s.size());
    if (err) { /* ... */ }
}
for (size_t i = 0; i < query_list.size(); ++i) {
    int32_t err = read_res(fd);
    if (err) { /* ... */ }
}

服务器很可能在一次读取中接收到多个消息,这会测试解析器。

跨越多个循环迭代的任务
测试一条大消息需要多个迭代才能处理的情况。所以我们将消息大小限制提高到一个更大的数值:

const size_t k_max_msg = 32 << 20;  // 很可能比内核缓冲区大

然后在流水线中包含一个大请求。

std::vector<std::string> query_list = {
    "hello1""hello2""hello3",
    std::string(k_max_msg, 'z'), // 需要多个事件循环迭代
    "hello5",
};

使用 strace 进行调试
使用 strace 命令来验证我们正在测试预期的内容。

$ strace ./server >/dev/null

strace 会显示所有的系统调用:

创建监听套接字:

socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 3
setsockopt(3, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
bind(3, {sa_family=AF_INET, sin_port=htons(1234), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
fcntl(3, F_GETFL)                       = 0x2 (flags O_RDWR)
fcntl(3, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
listen(34096)                         = 0

进入事件循环,然后唤醒以接受客户端连接:

poll([{fd=3, events=POLLIN}], 1-1)    = 1 ([{fd=3, revents=POLLIN}])
accept(3, {sa_family=AF_INET, sin_port=htons(52184), sin_addr=inet_addr("127.0.0.1")}, [16]) = 4
fcntl(4, F_GETFL)                       = 0x2 (flags O_RDWR)
fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0

唤醒以从客户端读取数据(注意流水线请求):

poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2-1) = 1 ([{fd=4, revents=POLLIN}])
read(4"\6\0\0\0hello1\6\0\0\0hello2\6\0\0\0hello3"65536) = 30

唤醒以一次性写入3个响应(流水线处理正常工作):

poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2-1) = 1 ([{fd=4, revents=POLLOUT}])
write(4"\6\0\0\0hello1\6\0\0\0hello2\6\0\0\0hello3"30) = 30

多次唤醒以读取一个大请求:

poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2-1) = 1 ([{fd=4, revents=POLLIN}])
read(4"\0\0\0\2zzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 65536) = 65536
brk(0x559c87e41000)                     = 0x559c87e41000
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2-1) = 1 ([{fd=4, revents=POLLIN}])
read(4"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 65536) = 65536
... 省略...
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2-1) = 1 ([{fd=4, revents=POLLIN}])
read(4"zzzz\6\0\0\0hello5"65536)    = 14

多次唤醒以写入大响应:

poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2-1) = 1 ([{fd=4, revents=POLLOUT}])
write(4"\0\0\0\2zzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 33554446) = 2621440
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2-1) = 1 ([{fd=4, revents=POLLOUT}])
write(4"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 30933006) = 3175899
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2-1) = 1 ([{fd=4, revents=POLLOUT}])
write(4"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"..., 27757107) = 27757107

客户端关闭连接,然后服务器也关闭:

poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2-1) = 1 ([{fd=4, revents=POLLIN}])
read(4""65536)                      = 0
close(4)                                = 0

6.8 乐观的非阻塞写入

在请求-响应协议中,客户端在发送下一个请求之前会读取响应。服务器可以假设,当它接收到一个请求时,套接字已经准备好写入,因为客户端已经处理了之前写入的数据。所以服务器可以直接写入,而无需等待下一次循环迭代,从而节省一次系统调用。

static void handle_read(Conn *conn) {
    // ...
    if (conn->outgoing.size() > 0) {    // 有响应
        conn->want_read = false;
        conn->want_write = true;
        // 在请求-响应协议中,套接字很可能已准备好写入,
        // 尝试在不等待下一次迭代的情况下写入。
        return handle_write(conn);      // 优化
    }
}

对于使用流水线的客户端,这种假设是不成立的。在服务器写入数据时,客户端可能还没有开始读取。所以服务器端的写入缓冲区可能会已满。为了使这种优化有效,handle_write() 必须检查 EAGAIN,以防套接字尚未准备好。

static void handle_write(Conn *conn) {
    assert(conn->outgoing.size() > 0);
    ssize_t rv = write(conn->fd, &conn->outgoing[0], conn->outgoing.size());
    if (rv < 0 && errno == EAGAIN) {
        return// 实际上尚未准备好
    }
    if (rv < 0) {
        conn->want_close = true;
        return// 错误
    }
    // ...
}

使用 strace 验证这种优化。之前,每次写入前都要调用 poll()

poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2-1) = 1 ([{fd=4, revents=POLLIN}])
read(4"\6\0\0\0hello1"65536)        = 10
poll([{fd=3, events=POLLIN}, {fd=4, events=POLLOUT|POLLERR}], 2-1) = 1 ([{fd=4, revents=POLLOUT}])
write(4"\6\0\0\0hello1"10)          = 10

之后,在同一循环迭代中进行 read() 和 write()

poll([{fd=3, events=POLLIN}, {fd=4, events=POLLIN|POLLERR}], 2-1) = 1 ([{fd=4, revents=POLLIN}])
read(4"\6\0\0\0hello1"65536)        = 10
write(4"\6\0\0\0hello1"10)          = 10

6.9 更好的缓冲区处理

我们将 std::vector 用作先进先出(FIFO)缓冲区,因为我们从尾部追加数据,从头部移除数据。

// 追加到末尾
static void
buf_append(std::vector<uint8_t> &buf, const uint8_t *data, size_t len) {
    buf.insert(buf.end(), data, data + len);
}
// 从开头移除
static void buf_consume(std::vector<uint8_t> &buf, size_t n) {
    buf.erase(buf.begin(), buf.begin() + n);
}

对于动态数组,向尾部追加数据是高效的,因为它不会每次都重新分配内存;它的容量会以指数方式增长,以分摊重新分配的成本。但是从头部移除数据则效率不高,因为每次都必须移动其余的数据。如果缓冲区包含许多流水线请求,逐个从头部移除数据的时间复杂度为 O(N²)。

所以我们需要一种可以在两端进行操作的缓冲区类型:

      ┌────────────┬────────────┬────────────┐
      │   unused   │    data    │   unused   │
      └────────────┴────────────┴────────────┘
      ⇧            ⇧            ⇧            ⇧
buffer_begin   data_begin   data_end   buffer_end
struct Buffer {
    uint8_t *buffer_begin;
    uint8_t *buffer_end;
    uint8_t *data_begin;
    uint8_t *data_end;
};

从头部移除数据只需要移动指针;无需移动数据。

static void buf_consume(struct Buffer *buf, size_t n) {
    buf->data_begin += n;
}

现在向尾部追加数据有两种选择:

  1. 1. 像动态数组一样重新分配内存。
  2. 2. 或者将数据移动到前面以腾出空间。

这留给读者作为练习。

阅读剩余
THE END