事件循环
6.1 简介
将第04章中的回显服务器重写为使用事件循环的形式。
while running:
want_read = [...] # 套接字文件描述符
want_write = [...] # 套接字文件描述符
can_read, can_write = wait_for_readiness(want_read, want_write) # 阻塞!
for fd in can_read:
data = read_nb(fd) # 非阻塞,仅从缓冲区中读取数据
handle_data(fd, data) # 无IO操作的应用逻辑
for fd in can_write:
data = pending_data(fd) # 由应用程序生成的数据
n = write_nb(fd, data) # 非阻塞,仅将数据追加到缓冲区
data_written(fd, n) # n <= len(data),受可用空间限制
应用程序代码与事件循环代码
一些库可以对事件循环进行抽象:事件循环代码通过回调与应用程序代码进行交互,而应用程序代码则通过定义良好的API与事件循环进行交互。我们不是在编写库,但事件循环代码和应用程序代码之间仍然存在一个隐含的界限。
完整代码在文章末尾
6.2 每个连接的状态
使用事件循环时,一个应用程序任务可能会跨越多个循环迭代,因此状态必须显式地存储在某个地方。以下是我们每个连接的状态:
struct Conn {
int fd = -1;
// 应用程序的意图,供事件循环使用
bool want_read = false;
bool want_write = false;
bool want_close = false;
// 缓冲的输入和输出
std::vector<uint8_t> incoming; // 待应用程序解析的数据
std::vector<uint8_t> outgoing; // 应用程序生成的响应
};
- •
Conn::want_read
和Conn::want_write
表示就绪API的文件描述符列表。 - •
Conn::want_close
告知事件循环销毁该连接。 - •
Conn::incoming
对来自套接字的数据进行缓冲,以便协议解析器处理。 - •
Conn::outgoing
对生成的响应进行缓冲,这些响应将被写入套接字。
输入缓冲区的必要性
由于现在读取操作是非阻塞的,我们在解析协议时不能只是等待读取 n
个字节;read_full()
函数现在已经不适用了。相反,我们将这样做:
在每次循环迭代中,如果套接字准备好读取:
- 1. 进行一次非阻塞读取。
- 2. 将新数据添加到
Conn::incoming
缓冲区中。 - 3. 尝试解析累积的缓冲区数据。
- 4. 如果数据不够,在该次迭代中不做任何操作。
- 5. 处理已解析的消息。
- 6. 从
Conn::incoming
中移除该消息。
为什么要缓冲输出数据?
由于现在写入操作是非阻塞的,我们不能随意向套接字写入数据;只有当套接字准备好写入时才能写入数据。一个较大的响应可能需要多个循环迭代才能完成。所以响应数据必须存储在一个缓冲区(Conn::outgoing
)中。
6.3 事件循环代码
从文件描述符到连接状态的映射
poll()
函数返回一个文件描述符列表。我们需要将每个文件描述符映射到 Conn
对象。
// 所有客户端连接的映射,以fd为键
std::vector<Conn *> fd2conn;
在Unix系统上,文件描述符被分配为最小的可用非负整数,所以从文件描述符到 Conn
的映射可以是一个以文件描述符为索引的简单数组,并且该数组将被紧密填充。没有什么比这更高效的了。有时简单的数组可以替代像哈希表这样的复杂数据结构。
poll()
系统调用
就绪API接受一个程序想要进行IO操作的文件描述符列表,然后返回一个准备好进行IO操作的文件描述符列表。有两种就绪状态:可读和可写。
can_read, can_write = wait_for_readiness(want_read, want_write)
我们将使用 poll()
函数,它对输入和输出使用相同的文件描述符列表。
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
int fd;
short events; // 请求:想要读取、写入,还是两者都要?
short revents; // 返回:可以读取吗?可以写入吗?
};
- •
nfds
参数是fds
数组的大小。 - •
timeout
参数设置为 -1,表示没有超时。 - •
pollfd::events
是POLLIN
、POLLOUT
、POLLERR
的组合:- •
POLLIN
和POLLOUT
对应于want_read
和want_write
文件描述符列表。 - •
POLLERR
表示我们总是希望得到通知的套接字错误。
- •
- •
pollfd::revents
是由poll()
函数返回的。它使用相同的标志集来指示文件描述符是否在can_read
或can_write
列表中。
步骤1:为 poll()
构建文件描述符列表
应用程序代码决定就绪通知的类型。它通过 Conn
中的 want_read
和 want_write
标志与事件循环进行通信,然后根据这些标志构建 fds
参数:
// 所有客户端连接的映射,以fd为键
std::vector<Conn *> fd2conn;
// 事件循环
std::vector<struct pollfd> poll_args;
while (true) {
// 准备poll()的参数
poll_args.clear();
// 将监听套接字放在第一个位置
struct pollfd pfd = {fd, POLLIN, 0};
poll_args.push_back(pfd);
// 其余的是连接套接字
for (Conn *conn : fd2conn) {
if (!conn) {
continue;
}
struct pollfd pfd = {conn->fd, POLLERR, 0};
// 根据应用程序的意图设置poll()标志
if (conn->want_read) {
pfd.events |= POLLIN;
}
if (conn->want_write) {
pfd.events |= POLLOUT;
}
poll_args.push_back(pfd);
}
// 更多...
}
步骤2:调用 poll()
// 事件循环
while (true) {
// 准备poll()的参数
// ...
// 等待就绪
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
if (rv < 0 && errno == EINTR) {
continue; // 不是错误
}
if (rv < 0) {
die("poll");
}
// ...
}
poll()
是整个程序中唯一的阻塞系统调用。通常,当至少有一个文件描述符准备好时它会返回。然而,即使没有任何文件描述符准备好,它也可能偶尔返回并将 errno
设置为 EINTR
。
如果一个进程在阻塞系统调用期间接收到Unix信号,该系统调用会立即返回并带有 EINTR
,以便进程有机会处理该信号。对于非阻塞系统调用,不应该出现 EINTR
。
EINTR
不是错误,系统调用应该被重试。即使你不使用信号,你仍然应该处理 EINTR
,因为可能存在意外的信号源。
步骤3:接受新连接
我们将监听套接字放在文件描述符列表的第0个位置。
// 事件循环
while (true) {
// 准备poll()的参数
poll_args.clear();
// 将监听套接字放在第一个位置
struct pollfd pfd = {fd, POLLIN, 0};
poll_args.push_back(pfd);
// ...
// 等待就绪
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
// ...
// 处理监听套接字
if (poll_args[0].revents) {
if (Conn *conn = handle_accept(fd)) {
// 将其放入映射中
if (fd2conn.size() <= (size_t)conn->fd) {
fd2conn.resize(conn->fd + 1);
}
fd2conn[conn->fd] = conn;
}
}
// ...
} // 事件循环
在就绪通知中,accept()
被视为 read()
,所以它使用 POLLIN
。poll()
返回后,检查第一个文件描述符,看是否可以进行 accept()
操作。
handle_accept()
为新连接创建 Conn
对象。我们稍后会编写这个函数。
步骤4:调用应用程序回调函数
文件描述符列表的其余部分是连接套接字。如果它们准备好进行IO操作,就调用应用程序代码。
while (true) {
// ...
// 等待就绪
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
// ...
// 处理连接套接字
for (size_t i = 1; i < poll_args.size(); ++i) { // 注意:跳过第一个
uint32_t ready = poll_args[i].revents;
Conn *conn = fd2conn[poll_args[i].fd];
if (ready & POLLIN) {
handle_read(conn); // 应用程序逻辑
}
if (ready & POLLOUT) {
handle_write(conn); // 应用程序逻辑
}
}
}
步骤5:终止连接
我们总是对连接套接字进行 POLLERR
的 poll()
操作,所以在出现错误时我们可以销毁连接。应用程序代码也可以设置 Conn::want_close
来请求事件循环销毁连接。
// 处理连接套接字
for (size_t i = 1; i < poll_args.size(); ++i) {
uint32_t ready = poll_args[i].revents;
Conn *conn = fd2conn[poll_args[i].fd];
// 读取和写入...
// 由于套接字错误或应用程序逻辑关闭套接字
if ((ready & POLLERR) || conn->want_close) {
(void)close(conn->fd);
fd2conn[conn->fd] = NULL;
delete conn;
}
}
你可以添加一个回调函数 handle_err()
来让应用程序代码处理错误,但在我们的应用程序中没有什么可做的,所以我们在这里直接关闭套接字。
6.4 具有非阻塞IO的应用程序代码
非阻塞 accept()
在进入事件循环之前,使用 fcntl
将监听套接字设置为非阻塞模式。
static void fd_set_nb(int fd) {
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
}
然后事件循环回调应用程序代码来执行 accept()
操作。
static Conn *handle_accept(int fd) {
// 接受连接
struct sockaddr_in client_addr = {};
socklen_t addrlen = sizeof(client_addr);
int connfd = accept(fd, (struct sockaddr *)&client_addr, &addrlen);
if (connfd < 0) {
return NULL;
}
// 将新连接的fd设置为非阻塞模式
fd_set_nb(connfd);
// 创建一个`struct Conn`
Conn *conn = new Conn();
conn->fd = connfd;
conn->want_read = true; // 读取第一个请求
return conn;
}
连接套接字也被设置为非阻塞模式,等待进行第一次读取操作。
具有非阻塞读取的协议解析器
查看每个子步骤的注释。
static void handle_read(Conn *conn) {
// 1. 进行一次非阻塞读取。
uint8_t buf[64 * 1024];
ssize_t rv = read(conn->fd, buf, sizeof(buf));
if (rv <= 0) { // 处理IO错误(rv < 0)或EOF(rv == 0)
conn->want_close = true;
return;
}
// 2. 将新数据添加到`Conn::incoming`缓冲区中。
buf_append(conn->incoming, buf, (size_t)rv);
// 3. 尝试解析累积的缓冲区数据。
// 4. 处理已解析的消息。
// 5. 从`Conn::incoming`中移除该消息。
try_one_request(conn)
// ...
}
处理操作被拆分到 try_one_request()
函数中。如果数据不够,它将不做任何操作,直到在未来的循环迭代中有更多数据。
// 如果有足够的数据,处理一个请求
static bool try_one_request(Conn *conn) {
// 3. 尝试解析累积的缓冲区数据。
// 协议:消息头部
if (conn->incoming.size() < 4) {
return false; // 想要读取
}
uint32_t len = 0;
memcpy(&len, conn->incoming.data(), 4);
if (len > k_max_msg) { // 协议错误
conn->want_close = true;
return false; // 想要关闭
}
// 协议:消息体
if (4 + len > conn->incoming.size()) {
return false; // 想要读取
}
const uint8_t *request = &conn->incoming[4];
// 4. 处理已解析的消息。
// ...
// 生成响应(回显)
buf_append(conn->outgoing, (const uint8_t *)&len, 4);
buf_append(conn->outgoing, request, len);
// 5. 从`Conn::incoming`中移除该消息。
buf_consume(conn->incoming, 4 + len);
return true; // 成功
}
我们使用 std::vector
作为缓冲区类型,它只是一个动态数组。
// 追加到末尾
static void
buf_append(std::vector<uint8_t> &buf, const uint8_t *data, size_t len) {
buf.insert(buf.end(), data, data + len);
}
// 从开头移除
static void buf_consume(std::vector<uint8_t> &buf, size_t n) {
buf.erase(buf.begin(), buf.begin() + n);
}
非阻塞写入
没有应用程序逻辑,只是写入一些数据并从缓冲区中移除它。write()
函数可能返回较少的字节数,这是可以的,因为事件循环会再次调用它。
static void handle_write(Conn *conn) {
assert(conn->outgoing.size() > 0);
ssize_t rv = write(conn->fd, conn->outgoing.data(), conn->outgoing.size());
if (rv < 0) {
conn->want_close = true; // 错误处理
return;
}
// 从`outgoing`中移除已写入的数据
buf_consume(conn->outgoing, (size_t)rv);
// ...
}
请求和响应之间的状态转换
在请求-响应协议中,程序要么在读取请求,要么在写入响应。在 handle_read()
和 handle_write()
的末尾,我们需要在这两种状态之间进行切换。
static void handle_read(Conn *conn) {
// ...
// 更新就绪意图
if (conn->outgoing.size() > 0) { // 有响应
conn->want_read = false;
conn->want_write = true;
} // 否则:想要读取
}
static void handle_write(Conn *conn) {
// ...
if (conn->outgoing.size() == 0) { // 所有数据已写入
conn->want_read = true;
conn->want_write = false;
} // 否则:想要写入
}
这并非普遍适用。例如,一些代理和消息传递协议不是请求-响应式的,并且可以同时进行读写操作。
总结
协议与第04章中的相同。所以你可以重用测试客户端。这个服务器是最基本的,有点像生产级别的东西,但它仍然只是一个简单示例,想要了解更高级的内容,请进入下一章。
源代码:
- •
06_client.cpp
- •
06_server.cpp