线程池
14.1 多线程的需求
事件循环假定循环中的所有操作都能快速完成,因此禁止阻塞式IO。然而,仍然存在一些问题:
- 1. 使用阻塞式IO的第三方代码。
- 2. 会使事件循环变慢的CPU密集型代码。
源代码在文章末尾
客户端代码
大多数客户端代码都是阻塞式的,例如标准C库(libc)中的DNS客户端(getaddrinfo
)、HTTP客户端库libcurl
。它们不能在事件循环中使用,这就需要多线程。或者你可以找到一个与事件循环兼容的非阻塞、基于回调的API,比如用于DNS的c-ares
。libcurl
同时有阻塞和非阻塞的API。
即使存在非阻塞API,由于回调的存在,使用起来也困难得多。所以,使用阻塞式客户端仍然是可取的。幸运的是,我们的Redis服务器不需要任何客户端代码。
CPU密集型代码
事件循环避免了阻塞式IO,但如果代码在CPU上花费的时间过长,它也无济于事。我们已经有了一些解决方法,比如渐进式哈希表的重哈希,以及对同时过期定时器的限制。但我们遗漏了一件事:当删除一个有序集合时,析构函数会释放每一对元素。对于大型有序集合,这个O(N)操作是有问题的。
真正的Redis添加了UNLINK
命令作为一种解决方法,它会删除键,但在另一个线程中运行析构函数。这是多线程的一个常见用途:在另一个线程中(异步地)运行一段代码。
生产者-消费者问题
大多数多线程问题都可以重新表述为生产者-消费者问题。例如,要在一个线程中运行一个异步任务,会有一个消费者线程,称为“工作线程”,等待下一个工作(任务),而一个生产者线程有工作要做,并试图找到一个消费者线程并与之通信。如果你能解决生产者-消费者问题,你就能解决大多数多线程问题。
14.2 同步原语
通过共享数据进行通信
生产者通过一个共享数据结构与消费者进行通信,通常是一个队列。
// 伪代码
std::vector<Work> queue; // 生产者和消费者共享
void produce() {
Work work = ...;
queue.push_back(work); // 将工作添加到队列中
}
void try_to_consume() {
if (!queue.empty()) {
Work work = queue.front();
queue.pop_front();
// 对工作进行处理...
} else {
// 无事可做!如何等待队列有数据呢?
}
}
但这只是问题的一半。消费者如何等待队列变为非空呢?如果不了解多线程的相关知识,有人可能只是在循环中检查队列:
void consumer() {
while (true) {
try_to_consume();
}
}
这被称为忙等待,或自旋,这不是一个实用的解决方案,因为它会一直占用100%的CPU。有些人会尝试在循环中添加一个sleep()
函数。这就是不了解多线程时进行多线程编程的方式。
睡眠-信号-唤醒机制
在真正的多线程代码中,消费者在等待工作时应该进入睡眠状态,而生产者在更新队列后可以向消费者发送信号,这样消费者线程就只在需要时才执行。这种睡眠-信号-唤醒机制必须由操作系统提供,因为操作系统的调度器控制着哪个线程在何时运行。
这些机制被称为同步原语。同步原语有很多种,它们都是睡眠-信号-唤醒机制。
常见的同步原语
同步原语有多个层次。底层原语有Linux的futex
和Windows的WaitOnAddress()
。它们由操作系统直接提供,通用性最强,但使用起来很困难。它们通常用于实现高层原语,而不是直接被程序员使用。
高层原语有互斥锁(mutex)、条件变量(condition variable)和信号量(semaphore)。它们是由底层操作系统原语实现的,使用起来更方便,但仍然具有通用性。掌握了它们,你就可以解决任何多线程问题。这就是我们要学习的内容。
在这些原语之上,还有更高层次的东西,比如并发队列、线程池、读写锁、Go语言的通道(channel)等。它们的通用性较差,只适用于特定的用例。即使不了解多线程知识,使用它们也很容易,这给人一种他们可以进行多线程编程的错觉。
CPU原子操作
CPU原子操作使得一个多步骤的内存读写操作看起来像是一个单步骤操作。一个指令集包括对固定大小整数的一组固定的原子操作。常见的多步骤原子操作包括原子交换、原子比较并交换等。
// 伪代码
T atomic_exchange(T *cur_value, T new_value) {
T old_value = *cur_value;
*cur_value = new_value;
return old_value;
}
T atomic_compare_and_exchange(T *cur_value, T expected, T new_value) {
T old_value = *cur_value;
if (*cur_value == expected) {
*cur_value = new_value;
}
return old_value;
}
在多线程的介绍中经常会提到原子值,但它们只适用于一些特定的应用场景,因为它们不能使线程睡眠/唤醒。对于大多数多线程程序来说,原子操作既不是必需的,也不是足够的。
14.3 互斥锁和条件变量
互斥锁
当在线程之间共享数据时,必须有一种方法来防止在生产者更新数据时,消费者使用部分更新的数据。这通常是通过互斥锁(mutex,互斥)来实现的,也称为锁。一次只能有一个线程持有锁,而其他线程必须等待。
一个线程如何等待获取锁呢?有两种类型的锁:自旋锁(spinlock)和常规锁。自旋锁通过使用CPU原子操作进行忙等待来等待,这会浪费CPU周期。
// 伪代码
struct SpinLock {
bool is_locked = false;
void lock() {
while (atomic_swap(&is_locked, true)) {} // 忙等待
}
void unlock() {
atomic_store(&is_locked, false);
}
};
常规锁可以在等待时让线程进入睡眠状态。用户空间程序中使用的锁是混合类型的;它们在进入睡眠状态之前会自旋一段时间。
信号量
信号量类似于锁,但它是一个整数计数器,而不是单个布尔标志:
- 1. “加锁”操作会使计数器减1。
- 2. “解锁”操作会使计数器加1。
- 3. 如果计数器变为负数,“加锁”操作会使线程进入睡眠状态。
- 4. 如果计数器之前为负数,“解锁”操作会唤醒一个睡眠的线程。
struct Semaphore {
int counter;
explicit Semaphore(int init) : counter(init) { }
void increase();
void decrease();
};
信号量不要求“加锁”和“解锁”操作成对出现,并且计数器可以初始化为任何值。常规锁是信号量的一种特殊情况,其中1表示解锁状态,0表示锁定状态。
struct Mutex {
Semaphore sem;
explicit Mutex() : sem(1) {}
void lock() { sem.decrease(); }
void unlock() { sem.increase(); }
};
信号量在教科书中很常见,但在实践中使用起来很棘手。我们不会讨论它们,因为有更简单的选择。
等待某件事情发生
让我们在生产者-消费者问题中使队列访问具有互斥性。
// 伪代码
Mutex mu;
std::vector<Work> queue; // 生产者和消费者共享
void produce() {
Work work = ...;
mu.lock();
queue.push_back(work); // 更新队列
mu.unlock();
}
void try_to_consume() {
mu.lock();
if (!queue.empty()) {
Work work = queue.front();
queue.pop_front();
// 对工作进行处理...
} // 否则:无事可做!如何等待队列有数据呢?
mu.unlock();
}
剩下的问题是在不进行忙等待的情况下等待队列变为非空,这需要一个睡眠-信号-唤醒机制。但是,既然互斥锁也是一种这样的机制,我们能用互斥锁做到这一点吗?这是可能的,但并不明显。我们需要一种新的、更简单的唤醒睡眠线程的方法,我们称之为wait()
和signal()
。
// 一个假设的原语
struct Event {
void wait(); // 让线程进入睡眠状态
void signal(); // 唤醒一个睡眠的线程
};
Mutex mu;
Event ev;
std::vector<Work> queue; // 生产者和消费者共享
void produce() {
Work work = ...;
mu.lock();
queue.push_back(work); // 更新队列
ev.signal(); // 如果有睡眠的线程,唤醒它
mu.unlock();
}
void consume() {
mu.lock();
// 等待队列变为非空
while (queue.empty()) {
mu.unlock(); // 在睡眠前释放锁
ev.wait(); // 通过睡眠等待
mu.lock(); // 重新获取锁
}
// 更新队列
Work work = queue.front();
queue.pop_front();
// 对工作进行处理...
mu.unlock();
}
消费者在睡眠前必须释放锁,以便生产者可以更新队列。但问题是:生产者可能在unlock()
和wait()
之间更新队列并调用signal()
。
步骤 | 生产者 | 消费者 |
1 | lock | |
2 | queue empty? | |
3 | unlock | |
4 | lock | |
5 | update the queue | |
6 | unlock | |
7 | signal (nobody!) | |
8 | wait (not empty!) |
如果signal()
在wait()
之前被调用,消费者将永远处于睡眠状态!
条件变量
前面假设的signal
和wait
原语不起作用,因为消费者不能无条件地睡眠;它只能在队列真正为空时才能睡眠,而这是无法保证的,因为在它睡眠之前锁已经被释放了。
为了使其正常工作,“解锁”和“睡眠”应该是一个单一步骤,以防止在此期间条件发生变化。这被称为条件变量,它有两个操作:
- 1.
wait()
会释放锁,进入睡眠状态,并在唤醒时重新获取锁。 - 2.
signal()
会唤醒一个睡眠的线程(如果有的话)。
struct Cond {
void wait(Mutex &mu);
void signal();
};
Mutex mu;
Cond cond;
std::vector<Work> queue; // 生产者和消费者共享
void produce() {
Work work = ...;
mu.lock();
queue.push_back(work); // 更新队列
cond.signal(); // 如果有睡眠的线程,唤醒它
mu.unlock();
}
void consume() {
mu.lock();
// 等待队列变为非空
while (queue.empty()) {
// 释放锁并以单一步骤进入睡眠状态
cond.wait(mu);
// 锁已重新获取
}
// 更新队列
Work work = queue.front();
queue.pop_front();
// 对工作进行处理...
mu.unlock();
}
现在,“解锁”和“等待”是一个单一步骤,生产者在消费者进入睡眠状态之前无法更新队列,所以当消费者睡眠时,队列保证为空。
条件变量总是与一个锁相关联;锁保护着条件。条件可以是任意的,所以你可以用条件变量解决任何多线程问题。
条件变量的虚假唤醒
一个重要的注意事项是,条件总是在循环中检查:
// 条件总是在循环中检查
while (queue.empty()) {
cond.wait(mu);
}
这不是使用条件变量的正确方式:
// 错误的条件变量使用方式
if (queue.empty()) {
cond.wait(mu);
}
// 这是错误的,因为条件可能已经改变
在生产者-消费者场景中,你可能会认为消费者在被唤醒后可以假设队列不为空。这个假设只对单个消费者成立。如果有多个消费者,被signal()
唤醒的那个消费者可能无法消费队列中的数据;其他消费者可以在被唤醒的消费者之前获取锁并取走数据项,从而使队列再次为空!
步骤 | 生产者 | 消费者A | 消费者B |
1 | signal (wakes A) | sleeping… | whatever… |
2 | unlock | sleeping… | whatever… |
3 | sleeping… | lock | |
4 | sleeping… | consume the queue | |
5 | sleeping… | unlock | |
6 | woken by signal | ||
7 | lock | ||
8 | queue empty? | ||
9 | wait again |
signal()
不会将锁传递给被唤醒的线程;被唤醒的线程与其他消费者竞争锁,所以当被唤醒的线程重新获取锁时,条件可能已经改变,这就需要它再次检查条件。这就是条件变量的虚假唤醒。
与wait()
不同,signal()
不依赖于锁,所以它可以从任何地方调用。
// 两种方式都是正确的!
void produce_1() {
Work work = ...;
mu.lock();
queue.push_back(work);
cond.signal(); // 在释放锁之前发出信号
mu.unlock();
}
void produce_2() {
Work work = ...;
mu.lock();
queue.push_back(work);
mu.unlock();
cond.signal(); // 在释放锁之后发出信号
}
14.4 pthread
API
Linux C中的线程
互斥锁和条件变量是最通用的同步原语。它们包含在所有主要的多线程API中。在Linux上,它们作为pthread
API提供,作为libc
的一部分,使用futex
实现。
C++的线程API在Linux上是对pthread
的一个轻量级封装。使用哪个API并不重要,因为它们是同一组同步原语。
启动一个线程
线程用句柄pthread_t
表示。通过使用pthread_create()
创建一个新线程来初始化句柄。
int pthread_create(
pthread_t *thread, const pthread_attr_t *attr,
void *(*func)(void *arg), void *arg);
额外的选项通过pthread_attr_t
传递。如果不需要,它可以为NULL
。
新线程通过执行指定的函数开始,该函数接受一个void *
类型的参数。要传递更多参数,可以传递一个指向结构体的指针。
所有的pthread
函数都返回一个错误码,而不是使用errno
。
等待线程结束
等待一个线程或进程完成被称为“等待线程结束(joining)”,这是通过pthread_join()
完成的。
int pthread_join(pthread_t thread, void **retval);
“等待线程结束”是“分叉-等待(fork and join)”并发模型的一部分,其中“分叉(fork)”意味着生成新的进程或线程。在服务器端应用程序中,线程池的生命周期通常与服务器的生命周期相同,在这种情况下,不需要“等待线程结束”。
终止一个线程
与进程不同,大多数线程不能被合法地终止,即使有相应的API。一个进程中的线程共享所有资源;如果一个线程分配了一些内存或获取了一些锁,当它被终止时,这些资源不会被释放。
一个线程要么永远不会死亡,要么自然结束。
互斥锁
这些函数的功能不言自明:
int pthread_mutex_init(
pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
pthread_mutexattr_t
是一些额外的选项,对于典型的应用程序来说不需要。除了不可恢复的编程错误外,这些函数都不会失败。
条件变量
条件变量在上一节中已经解释过。
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); // 唤醒所有线程
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(
pthread_cond_t *cond, pthread_mutex_t *mutex,
const struct timespec *abstime);
新的东西是broadcast()
:如果生产者创建了一个可以被多次消费的条件,比如在队列中放入多个数据项,生产者必须多次发出信号,或者通过broadcast()
唤醒所有消费者。
timedwait()
是wait()
的一个变体,可以设置超时。令人惊讶的是,它默认不使用单调时间,这可以通过一个选项来修复:
pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC);
pthread_cond_init(&cond, &cond_attr);
14.5 编写线程
14.5 编写线程池代码
并发队列
线程池中有固定数量的消费者线程,称为“工作线程”。数量不定的生产者可以通过一个队列向工作线程发布任务。当队列为空时,消费者线程进入睡眠状态,直到被生产者唤醒,这在前面介绍条件变量时已有说明。
struct ThreadPool {
std::vector<pthread_t> threads;
std::deque<Work> queue;
pthread_mutex_t mu;
pthread_cond_t not_empty;
};
“任务”只是一个带有void *
类型参数的函数指针。
struct Work {
void (*f)(void *) = NULL;
void *arg = NULL;
};
生产者(事件循环)
让我们把伪代码produce()
转换为真正的代码。
void thread_pool_queue(ThreadPool *tp, void (*f)(void *), void *arg) {
pthread_mutex_lock(&tp->mu);
tp->queue.push_back(Work {f, arg});
pthread_cond_signal(&tp->not_empty);
pthread_mutex_unlock(&tp->mu);
}
然后我们可以在线程池中进行有序集合的销毁操作。
// 之前的`entry_del()`
static void entry_del_sync(Entry *ent) {
if (ent->type == T_ZSET) {
zset_clear(&ent->zset);
}
delete ent;
}
// 线程池的包装函数
static void entry_del_func(void *arg) {
entry_del_sync((Entry *)arg);
}
// 新的`entry_del()`
static void entry_del(Entry *ent) {
// 将其从任何数据结构中解除关联
entry_set_ttl(ent, -1); // 从堆数据结构中移除
// 对于大型数据结构,在线程池中运行析构函数
size_t set_size = (ent->type == T_ZSET)? hm_size(&ent->zset.hmap) : 0;
const size_t k_large_container_size = 1000;
if (set_size > k_large_container_size) {
thread_pool_queue(&g_data.thread_pool, &entry_del_func, ent);
} else {
entry_del_sync(ent); // 数据量小;避免上下文切换
}
}
删除大型有序集合不会使事件循环冻结,但频繁的上下文切换会降低系统性能,所以我们为有序集合的大小添加了一个阈值。
消费者(工作线程)
线程启动函数将线程池作为参数。
void thread_pool_init(ThreadPool *tp, size_t num_threads) {
pthread_mutex_init(&tp->mu, NULL);
pthread_cond_init(&tp->not_empty, NULL);
tp->threads.resize(num_threads);
for (size_t i = 0; i < num_threads; ++i) {
int rv = pthread_create(&tp->threads[i], NULL, &worker, tp);
assert(rv == 0);
}
}
worker()
是consumer()
的实际代码。
static void *worker(void *arg) {
ThreadPool *tp = (ThreadPool *)arg;
while (true) {
pthread_mutex_lock(&tp->mu);
// 等待条件:队列不为空
while (tp->queue.empty()) {
pthread_cond_wait(&tp->not_empty, &tp->mu);
}
// 条件满足,消费队列
Work w = tp->queue.front();
tp->queue.pop_front();
pthread_mutex_unlock(&tp->mu);
// 执行工作
w.f(w.arg);
}
return NULL;
}
线程池在main()
函数中创建:
int main() {
thread_pool_init(&g_data.thread_pool, 4);
// ...
}
14.6 更多要学习的内容
有界队列
在我们的例子中,消费者(工作线程)会在队列空时阻塞,而生产者(事件循环)从不阻塞。然而,在某些用例中,队列有大小限制,所以当队列满时,生产者应该:
- 1. 阻塞(睡眠)。
- 2. 不阻塞且不做任何事,但需要一个通知机制。
一个有界队列,生产者和消费者都可能在上面阻塞,是多线程程序中常见的构建块。它需要两个条件变量,分别用于生产者和消费者。尝试将其作为练习进行编码:
struct BlockingQueue {
std::deque<Work> queue;
size_t limit; // 最大大小
pthread_mutex_t mu;
pthread_cond_t not_empty; // 消费者阻塞
pthread_cond_t not_full; // 生产者阻塞
void produce(Work work);
Work consume();
};
将结果发送回事件循环
我们的线程池是“即发即弃”的,但有时生产者需要对结果进行一些处理。这可以通过任何同步原语轻松完成。下面是一个使用信号量的例子:
struct Result {
T value;
Semaphore done;
Result() : done(0) {}
};
// 应用逻辑
void caller() {
Result result;
submit_some_async_task(&result);
// 在信号量上阻塞
result.done.decrease();
// 结果已准备好
do_something_with(result.value);
}
// 在另一个线程中
void some_async_task(Result *result) {
// 生成结果...
result->value = ...;
// 解除调用者的阻塞
result->done.increase();
}
但如果调用者在事件循环中,它不能阻塞。所以除非有一种将结果发送回的方法,否则do_something_with()
不能在事件循环中执行。
如果do_something_with()
涉及IO操作,例如根据结果回复客户端,它必须在事件循环中执行。我们可以将结果放入一个队列中,但消费者(事件循环)不能阻塞。所以应该有一种从其他线程唤醒事件循环的方法。
这通常是通过Unix管道来完成的。事件循环使用poll()
监听管道的读端,而其他线程向管道的写端写入数据,从而导致poll()
唤醒,这样事件循环就可以检查结果。结果数据不需要通过管道传输;通知者可以写入1字节的无用数据,事件循环会丢弃管道中的任何数据。
14.7 我们学到了什么
- 1. 同步原语:互斥锁和条件变量。
- 2. 作为多线程构建块的并发队列。
源代码: