事件循环

6.1 简介

将第04章中的回显服务器重写为使用事件循环的形式。

while running:
    want_read = [...]           # 套接字文件描述符
    want_write = [...]          # 套接字文件描述符
    can_read, can_write = wait_for_readiness(want_read, want_write) # 阻塞!
    for fd in can_read:
        data = read_nb(fd)      # 非阻塞,仅从缓冲区中读取数据
        handle_data(fd, data)   # 无IO操作的应用逻辑
    for fd in can_write:
        data = pending_data(fd) # 由应用程序生成的数据
        n = write_nb(fd, data)  # 非阻塞,仅将数据追加到缓冲区
        data_written(fd, n)     # n <= len(data),受可用空间限制

应用程序代码与事件循环代码
一些库可以对事件循环进行抽象:事件循环代码通过回调与应用程序代码进行交互,而应用程序代码则通过定义良好的API与事件循环进行交互。我们不是在编写库,但事件循环代码和应用程序代码之间仍然存在一个隐含的界限。
完整代码在文章末尾

6.2 每个连接的状态

使用事件循环时,一个应用程序任务可能会跨越多个循环迭代,因此状态必须显式地存储在某个地方。以下是我们每个连接的状态:

struct Conn {
    int fd = -1;
    // 应用程序的意图,供事件循环使用
    bool want_read = false;
    bool want_write = false;
    bool want_close = false;
    // 缓冲的输入和输出
    std::vector<uint8_t> incoming;  // 待应用程序解析的数据
    std::vector<uint8_t> outgoing;  // 应用程序生成的响应
};
  • • Conn::want_read 和 Conn::want_write 表示就绪API的文件描述符列表。
  • • Conn::want_close 告知事件循环销毁该连接。
  • • Conn::incoming 对来自套接字的数据进行缓冲,以便协议解析器处理。
  • • Conn::outgoing 对生成的响应进行缓冲,这些响应将被写入套接字。

输入缓冲区的必要性
由于现在读取操作是非阻塞的,我们在解析协议时不能只是等待读取 n 个字节;read_full() 函数现在已经不适用了。相反,我们将这样做:

在每次循环迭代中,如果套接字准备好读取:

  1. 1. 进行一次非阻塞读取。
  2. 2. 将新数据添加到 Conn::incoming 缓冲区中。
  3. 3. 尝试解析累积的缓冲区数据。
  4. 4. 如果数据不够,在该次迭代中不做任何操作。
  5. 5. 处理已解析的消息。
  6. 6. 从 Conn::incoming 中移除该消息。

为什么要缓冲输出数据?
由于现在写入操作是非阻塞的,我们不能随意向套接字写入数据;只有当套接字准备好写入时才能写入数据。一个较大的响应可能需要多个循环迭代才能完成。所以响应数据必须存储在一个缓冲区(Conn::outgoing)中。

6.3 事件循环代码

从文件描述符到连接状态的映射
poll() 函数返回一个文件描述符列表。我们需要将每个文件描述符映射到 Conn 对象。

// 所有客户端连接的映射,以fd为键
std::vector<Conn *> fd2conn;

在Unix系统上,文件描述符被分配为最小的可用非负整数,所以从文件描述符到 Conn 的映射可以是一个以文件描述符为索引的简单数组,并且该数组将被紧密填充。没有什么比这更高效的了。有时简单的数组可以替代像哈希表这样的复杂数据结构。

poll() 系统调用
就绪API接受一个程序想要进行IO操作的文件描述符列表,然后返回一个准备好进行IO操作的文件描述符列表。有两种就绪状态:可读和可写。

can_read, can_write = wait_for_readiness(want_read, want_write)

我们将使用 poll() 函数,它对输入和输出使用相同的文件描述符列表。

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
    int   fd;
    short events;   // 请求:想要读取、写入,还是两者都要?
    short revents;  // 返回:可以读取吗?可以写入吗?
};
  • • nfds 参数是 fds 数组的大小。
  • • timeout 参数设置为 -1,表示没有超时。
  • • pollfd::events 是 POLLINPOLLOUTPOLLERR 的组合:
    • • POLLIN 和 POLLOUT 对应于 want_read 和 want_write 文件描述符列表。
    • • POLLERR 表示我们总是希望得到通知的套接字错误。
  • • pollfd::revents 是由 poll() 函数返回的。它使用相同的标志集来指示文件描述符是否在 can_read 或 can_write 列表中。

步骤1:为 poll() 构建文件描述符列表
应用程序代码决定就绪通知的类型。它通过 Conn 中的 want_read 和 want_write 标志与事件循环进行通信,然后根据这些标志构建 fds 参数:

// 所有客户端连接的映射,以fd为键
std::vector<Conn *> fd2conn;
// 事件循环
std::vector<struct pollfd> poll_args;
while (true) {
    // 准备poll()的参数
    poll_args.clear();
    // 将监听套接字放在第一个位置
    struct pollfd pfd = {fd, POLLIN, 0};
    poll_args.push_back(pfd);
    // 其余的是连接套接字
    for (Conn *conn : fd2conn) {
        if (!conn) {
            continue;
        }
        struct pollfd pfd = {conn->fd, POLLERR, 0};
        // 根据应用程序的意图设置poll()标志
        if (conn->want_read) {
            pfd.events |= POLLIN;
        }
        if (conn->want_write) {
            pfd.events |= POLLOUT;
        }
        poll_args.push_back(pfd);
    }

    // 更多...
}

步骤2:调用 poll()

// 事件循环
while (true) {
    // 准备poll()的参数
    // ...

    // 等待就绪
    int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
    if (rv < 0 && errno == EINTR) {
        continue;   // 不是错误
    }
    if (rv < 0) {
        die("poll");
    }

    // ...
}

poll() 是整个程序中唯一的阻塞系统调用。通常,当至少有一个文件描述符准备好时它会返回。然而,即使没有任何文件描述符准备好,它也可能偶尔返回并将 errno 设置为 EINTR

如果一个进程在阻塞系统调用期间接收到Unix信号,该系统调用会立即返回并带有 EINTR,以便进程有机会处理该信号。对于非阻塞系统调用,不应该出现 EINTR

EINTR 不是错误,系统调用应该被重试。即使你不使用信号,你仍然应该处理 EINTR,因为可能存在意外的信号源。

步骤3:接受新连接
我们将监听套接字放在文件描述符列表的第0个位置。

// 事件循环
while (true) {
    // 准备poll()的参数
    poll_args.clear();
    // 将监听套接字放在第一个位置
    struct pollfd pfd = {fd, POLLIN, 0};
    poll_args.push_back(pfd);
    // ...

    // 等待就绪
    int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
    // ...

    // 处理监听套接字
    if (poll_args[0].revents) {
        if (Conn *conn = handle_accept(fd)) {
            // 将其放入映射中
            if (fd2conn.size() <= (size_t)conn->fd) {
                fd2conn.resize(conn->fd + 1);
            }
            fd2conn[conn->fd] = conn;
        }
    }
    // ...
}   // 事件循环

在就绪通知中,accept() 被视为 read(),所以它使用 POLLINpoll() 返回后,检查第一个文件描述符,看是否可以进行 accept() 操作。

handle_accept() 为新连接创建 Conn 对象。我们稍后会编写这个函数。

步骤4:调用应用程序回调函数
文件描述符列表的其余部分是连接套接字。如果它们准备好进行IO操作,就调用应用程序代码。

while (true) {
    // ...
    // 等待就绪
    int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
    // ...

    // 处理连接套接字
    for (size_t i = 1; i < poll_args.size(); ++i) { // 注意:跳过第一个
        uint32_t ready = poll_args[i].revents;
        Conn *conn = fd2conn[poll_args[i].fd];
        if (ready & POLLIN) {
            handle_read(conn);  // 应用程序逻辑
        }
        if (ready & POLLOUT) {
            handle_write(conn); // 应用程序逻辑
        }
    }
}

步骤5:终止连接
我们总是对连接套接字进行 POLLERR 的 poll() 操作,所以在出现错误时我们可以销毁连接。应用程序代码也可以设置 Conn::want_close 来请求事件循环销毁连接。

// 处理连接套接字
for (size_t i = 1; i < poll_args.size(); ++i) {
    uint32_t ready = poll_args[i].revents;
    Conn *conn = fd2conn[poll_args[i].fd];
    // 读取和写入...

    // 由于套接字错误或应用程序逻辑关闭套接字
    if ((ready & POLLERR) || conn->want_close) {
        (void)close(conn->fd);
        fd2conn[conn->fd] = NULL;
        delete conn;
    }
}

你可以添加一个回调函数 handle_err() 来让应用程序代码处理错误,但在我们的应用程序中没有什么可做的,所以我们在这里直接关闭套接字。

6.4 具有非阻塞IO的应用程序代码

非阻塞 accept()
在进入事件循环之前,使用 fcntl 将监听套接字设置为非阻塞模式。

static void fd_set_nb(int fd) {
    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
}

然后事件循环回调应用程序代码来执行 accept() 操作。

static Conn *handle_accept(int fd) {
    // 接受连接
    struct sockaddr_in client_addr = {};
    socklen_t addrlen = sizeof(client_addr);
    int connfd = accept(fd, (struct sockaddr *)&client_addr, &addrlen);
    if (connfd < 0) {
        return NULL;
    }
    // 将新连接的fd设置为非阻塞模式
    fd_set_nb(connfd);
    // 创建一个`struct Conn`
    Conn *conn = new Conn();
    conn->fd = connfd;
    conn->want_read = true// 读取第一个请求
    return conn;
}

连接套接字也被设置为非阻塞模式,等待进行第一次读取操作。

具有非阻塞读取的协议解析器
查看每个子步骤的注释。

static void handle_read(Conn *conn) {
    // 1. 进行一次非阻塞读取。
    uint8_t buf[64 * 1024];
    ssize_t rv = read(conn->fd, buf, sizeof(buf));
    if (rv <= 0) {  // 处理IO错误(rv < 0)或EOF(rv == 0)
        conn->want_close = true;
        return;
    }
    // 2. 将新数据添加到`Conn::incoming`缓冲区中。
    buf_append(conn->incoming, buf, (size_t)rv);
    // 3. 尝试解析累积的缓冲区数据。
    // 4. 处理已解析的消息。
    // 5. 从`Conn::incoming`中移除该消息。
    try_one_request(conn)
    // ...
}

处理操作被拆分到 try_one_request() 函数中。如果数据不够,它将不做任何操作,直到在未来的循环迭代中有更多数据。

// 如果有足够的数据,处理一个请求
static bool try_one_request(Conn *conn) {
    // 3. 尝试解析累积的缓冲区数据。
    // 协议:消息头部
    if (conn->incoming.size() < 4) {
        return false;   // 想要读取
    }
    uint32_t len = 0;
    memcpy(&len, conn->incoming.data(), 4);
    if (len > k_max_msg) {  // 协议错误
        conn->want_close = true;
        return false;   // 想要关闭
    }
    // 协议:消息体
    if (4 + len > conn->incoming.size()) {
        return false;   // 想要读取
    }
    const uint8_t *request = &conn->incoming[4];
    // 4. 处理已解析的消息。
    // ...
    // 生成响应(回显)
    buf_append(conn->outgoing, (const uint8_t *)&len, 4);
    buf_append(conn->outgoing, request, len);
    // 5. 从`Conn::incoming`中移除该消息。
    buf_consume(conn->incoming, 4 + len);
    return true;        // 成功
}

我们使用 std::vector 作为缓冲区类型,它只是一个动态数组。

// 追加到末尾
static void
buf_append(std::vector<uint8_t> &buf, const uint8_t *data, size_t len) {
    buf.insert(buf.end(), data, data + len);
}
// 从开头移除
static void buf_consume(std::vector<uint8_t> &buf, size_t n) {
    buf.erase(buf.begin(), buf.begin() + n);
}

非阻塞写入
没有应用程序逻辑,只是写入一些数据并从缓冲区中移除它。write() 函数可能返回较少的字节数,这是可以的,因为事件循环会再次调用它。

static void handle_write(Conn *conn) {
    assert(conn->outgoing.size() > 0);
    ssize_t rv = write(conn->fd, conn->outgoing.data(), conn->outgoing.size());
    if (rv < 0) {
        conn->want_close = true;    // 错误处理
        return;
    }
    // 从`outgoing`中移除已写入的数据
    buf_consume(conn->outgoing, (size_t)rv);
    // ...
}

请求和响应之间的状态转换
在请求-响应协议中,程序要么在读取请求,要么在写入响应。在 handle_read() 和 handle_write() 的末尾,我们需要在这两种状态之间进行切换。

static void handle_read(Conn *conn) {
    // ...
    // 更新就绪意图
    if (conn->outgoing.size() > 0) {    // 有响应
        conn->want_read = false;
        conn->want_write = true;
    }   // 否则:想要读取
}
static void handle_write(Conn *conn) {
    // ...
    if (conn->outgoing.size() == 0) {   // 所有数据已写入
        conn->want_read = true;
        conn->want_write = false;
    } // 否则:想要写入
}

这并非普遍适用。例如,一些代理和消息传递协议不是请求-响应式的,并且可以同时进行读写操作。

总结
协议与第04章中的相同。所以你可以重用测试客户端。这个服务器是最基本的,有点像生产级别的东西,但它仍然只是一个简单示例,想要了解更高级的内容,请进入下一章。

源代码

  • • 06_client.cpp
  • • 06_server.cpp
阅读剩余
THE END