2.2、互斥保护共享数据(一)

组织和编排代码以保护共享数据

在实际开发中,仅仅使用互斥量还不够,我们需要合理组织代码,确保共享数据和保护它的互斥量总是一起使用。最佳实践是将共享数据和互斥量封装在同一个类中,并提供安全的访问接口。

反面教材:不要像这样暴露保护的数据

class SharedData {
public:
    std::mutex mutex; // 公开的互斥量
    int data;         // 公开的数据
};
// 使用时需要手动加锁和解锁,容易出错

正确示例:封装数据和互斥量,提供安全接口

class ThreadSafeCounter {
public:
    void increment() {
        std::lock_guard<std::mutex> lock(mutex_);
        ++counter_;
    }
    
    int get_value() {
        std::lock_guard<std::mutex> lock(mutex_);
        return counter_;
    }
    
private:
    std::mutex mutex_;
    int counter_ = 0;
};

// 使用示例
ThreadSafeCounter counter;
counter.increment(); // 安全增加
int value = counter.get_value(); // 安全获取

通过这种封装方式,我们确保了任何对共享数据的访问都会自动受到保护,大大降低了出错的可能性。

发现接口固有的条件竞争

即使我们为每个接口添加了互斥量保护,仍然可能存在所谓的接口间条件竞争。这类问题出现在调用者使用多个线程安全的接口,但整体行为却不是线程安全的情况。

考虑一个线程安全的栈实现:

template<typename T>
class ThreadSafeStack {
public:
    void push(T value) {
        std::lock_guard<std::mutex> lock(mutex_);
        data_.push(value);
    }
    
    bool empty() const {
        std::lock_guard<std::mutex> lock(mutex_);
        return data_.empty();
    }
    
    pop() {
        std::lock_guard<std::mutex> lock(mutex_);
        if (data_.empty()) {
            throw std::runtime_error("Stack is empty");
        }
        T value = data_.top();
        data_.pop();
        return value;
    }
    
private:
    mutable std::mutex mutex_;
    std::stack<T> data_;
};

虽然每个方法都是线程安全的,但考虑以下场景:

ThreadSafeStack<int> stack;
// 线程A
if (!stack.empty()) { // 检查通过,栈不为空
    // 此时线程B执行了pop操作
    int value = stack.pop(); // 可能抛出异常,因为栈现在为空
}

这就是接口间的条件竞争。虽然empty()pop()都是线程安全的,但在它们的调用间隙,栈的状态可能被其他线程改变。

解决方法是提供一个原子的"检查并取出"操作:

template<typename T>
class ThreadSafeStack {
public:
    // 其他方法...
    
    // 安全的检查并取出
    std::optional<T> try_pop() {
        std::lock_guard<std::mutex> lock(mutex_);
        if (data_.empty()) {
            return std::nullopt// C++17 特性
        }
        T value = data_.top();
        data_.pop();
        return value;
    }
};

这样,检查和操作被合并为一个原子操作,消除了条件竞争的可能性。

死锁:问题和解决方法

死锁是多线程编程中另一个常见问题。当两个或多个线程各自持有一个资源,并等待对方持有的资源时,就会发生死锁。

考虑这个经典的死锁场景:

std::mutex mutex1, mutex2;

void thread_a() {
    std::lock_guard<std::mutex> lock1(mutex1)// 锁定mutex1
    std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 模拟工作
    std::lock_guard<std::mutex> lock2(mutex2)// 尝试锁定mutex2
    // 如果线程B已经锁定了mutex2,这里会阻塞
}

void thread_b() {
    std::lock_guard<std::mutex> lock2(mutex2)// 锁定mutex2
    std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 模拟工作
    std::lock_guard<std::mutex> lock1(mutex1)// 尝试锁定mutex1
    // 如果线程A已经锁定了mutex1,这里会阻塞
}

如果同时执行这两个线程,它们可能会互相等待对方释放资源,导致死锁。

解决死锁的方法:

  1. 1. 同时锁定多个互斥量:使用std::lock()函数可以原子地锁定多个互斥量。
void safe_operation() {
    std::lock(mutex1, mutex2); // 原子地锁定两个互斥量
    std::lock_guard<std::mutex> lock1(mutex1, std::adopt_lock);
    std::lock_guard<std::mutex> lock2(mutex2, std::adopt_lock);
    // 安全操作
}
  1. 2. 按固定顺序获取锁:始终按照同一顺序请求锁。
void safe_operation() {
    std::lock_guard<std::mutex> lock1(mutex1)// 总是先锁定mutex1
    std::lock_guard<std::mutex> lock2(mutex2)// 再锁定mutex2
    // 安全操作
}
  1. 3. 使用层次锁:为互斥量分配层次号,确保总是按照从低到高的顺序锁定。
class HierarchicalMutex {
    // 自定义实现的层次互斥量
};

HierarchicalMutex mutex_level10(10);
HierarchicalMutex mutex_level5(5);

// 必须先锁定低层次的mutex_level5,才能锁定高层次的mutex_level10
  1. 4. 避免嵌套锁:尽量避免在已持有一个锁的情况下请求另一个锁。
  2. 5. 使用std::scoped_lock (C++17):一个更简单的同时锁定多个互斥量的方法。
void safe_operation() {
    std::scoped_lock lock(mutex1, mutex2)// C++17特性,自动管理多个锁
    // 安全操作
}

实战案例:线程安全的消息队列

让我们把学到的知识应用到一个实际的例子中:实现一个线程安全的消息队列,常用于生产者-消费者模式。

#include <iostream>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>

template<typename T>
class ThreadSafeQueue {
public:
    void push(T value) {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            queue_.push(value);
        } // 释放锁后再通知,减少持锁时间
        cond_var_.notify_one();
    }
    
    pop() {
        std::unique_lock<std::mutex> lock(mutex_);
        // 等待直到队列非空
        cond_var_.wait(lock, [this]{ return !queue_.empty(); });
        T value = queue_.front();
        queue_.pop();
        return value;
    }
    
    bool try_pop(T& value, std::chrono::milliseconds timeout) {
        std::unique_lock<std::mutex> lock(mutex_);
        // 带超时的等待
        if (!cond_var_.wait_for(lock, timeout, [this]{ return !queue_.empty(); })) {
            return false// 超时
        }
        value = queue_.front();
        queue_.pop();
        return true;
    }
    
    bool empty() const {
        std::lock_guard<std::mutex> lock(mutex_);
        return queue_.empty();
    }
    
private:
    mutable std::mutex mutex_;
    std::queue<T> queue_;
    std::condition_variable cond_var_;
};

使用这个队列进行生产者-消费者通信:

ThreadSafeQueue<int> queue;

// 生产者线程
void producer() {
    for (int i = 0; i < 10; ++i) {
        queue.push(i);
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

// 消费者线程
void consumer() {
    for (int i = 0; i < 10; ++i) {
        int value = queue.pop(); // 阻塞直到有数据
        std::cout << "消费者取得: " << value << std::endl;
    }
}

int main() {
    std::thread prod(producer);
    std::thread cons(consumer);
    
    prod.join();
    cons.join();
    
    return 0;
}

这个示例展示了如何使用互斥量和条件变量创建一个线程安全的数据结构,它不仅能保护共享数据,还能实现线程间的高效通信。

总结

多线程编程中的共享数据保护不仅仅是技术问题,更是设计思想的体现。我认为,真正优秀的并发代码应该遵循以下原则:

  • • 最小化共享:尽可能减少线程间共享的数据,优先考虑无共享设计。
  • • 责任明确:每个互斥量应该有明确的负责范围,而不是混合保护多种不相关的资源。
  • • 粒度适中:锁的粒度既不要过大(影响并发性能),也不要过细(增加复杂性和死锁风险)。
  • • 接口完整:提供的线程安全接口应该能原子地完成常见组合操作,避免接口间条件竞争。
    记住,并发编程的复杂性不在于单个互斥量的使用,而在于整体架构和设计。只有系统地思考并发问题,才能写出既安全又高效的多线程代码。
    本文首发于【讳疾忌医-note】公众号,未经授权,不得转载。
    (加入我的知识星球,免费获取账号,解锁所有文章。)
阅读剩余
THE END