2.4、互斥保护共享数据(三)

保护共享数据的其他工具

互斥量(mutex)是保护共享数据的通用工具,(加入我的知识星球,免费获取账号,解锁所有文章。)但在某些特定场景下,其他工具可能更合适。让我们看看这些替代工具。

std::call_once 与 std::once_flag

当你需要确保某段代码只执行一次(如单例模式的初始化),使用 std::call_once 结合 std::once_flag 比互斥量更加高效。

#include <iostream>
#include <thread>
#include <mutex>

class SingletonResource {
private:
    SingletonResource() { 
        std::cout << "Resource initialized!" << std::endl;
        // 耗时的初始化操作
    }
    ~SingletonResource() {
        std::cout << "Resource destroyed!" << std::endl;
    }
    static std::once_flag init_flag;
    static SingletonResource* instance;
public:
    static SingletonResource& getInstance() {
        std::call_once(init_flag, []() {
            instance = new SingletonResource();
        });
        return *instance;
    }
};

// 定义静态成员
std::once_flag SingletonResource::init_flag;
SingletonResource* SingletonResource::instance = nullptr;

// 使用示例
void worker() {
    SingletonResource& resource = SingletonResource::getInstance();
    // 使用资源...
}

int main() {
    std::thread t1(worker);
    std::thread t2(worker);
    std::thread t3(worker);

    t1.join();
    t2.join();
    t3.join();

    // 程序结束前释放单例对象的内存(处理内存泄漏)
    if (SingletonResource::instance != nullptr) {
        delete SingletonResource::instance;
        SingletonResource::instance = nullptr;
    }

    return 0;
}

无论有多少线程同时调用 getInstance(),初始化操作都只会执行一次。std::call_once 比显式使用互斥量效率更高,特别是在初始化完成后的情况下。

读写锁(shared_mutex)

当你的数据结构读操作远多于写操作时,使用读写锁可以显著提高并发性能,因为多个读线程可以同时访问数据:

#include <iostream>
#include <thread>
#include <shared_mutex>
#include <map>
#include <string>

class DNSCache {
private:
    std::map<std::string, std::string> entries;
    mutable std::shared_mutex mutex;

public:
    // 多个线程可以同时读取
    std::string lookupDomain(const std::string& domain) const {
        std::shared_lock lock(mutex)// 共享锁,多线程可同时持有
        auto it = entries.find(domain);
        return (it == entries.end()) ? "Not found" : it->second;
    }

    // 写操作需要独占访问
    void updateDomain(const std::string& domain, const std::string& ip) {
        std::unique_lock lock(mutex)// 独占锁,同时只能一个线程持有
        entries[domain] = ip;
    }
};

void reader(const DNSCache& cache, const std::string& domain) {
    std::string ip = cache.lookupDomain(domain);
    std::cout << "Domain: " << domain << " resolves to " << ip << std::endl;
}

void writer(DNSCache& cache, const std::string& domain, const std::string& ip) {
    cache.updateDomain(domain, ip);
    std::cout << "Updated domain: " << domain << " to IP: " << ip << std::endl;
}

这个实现允许多个线程同时读取缓存,而只有在写入时才会阻塞其他线程。如果你的应用程序是读多写少,这种设计可以极大提高性能。

在初始化过程中保护共享数据

初始化是共享数据保护的特殊场景,因为它通常只需要进行一次,而之后的访问可能不需要同步或需要不同形式的同步。

延迟初始化模式

延迟初始化是一种常见的设计模式,特别是对于昂贵的资源初始化:

// 不安全的延迟初始化(单线程环境)
std::shared_ptr<ExpensiveResource> resource_ptr;

void accessResource() {
    if (!resource_ptr) {
        resource_ptr.reset(new ExpensiveResource()); // 只在首次访问时创建
    }
    resource_ptr->useResource();
}

但在多线程环境中,上述代码可能导致资源被初始化多次,或者更糟糕的是,一个线程可能会在另一个线程完成初始化之前使用未完全初始化的资源。

使用std::call_once保证安全初始化

std::call_once是解决这个问题的优雅方式:

std::shared_ptr<ExpensiveResource> resource_ptr;
std::once_flag resource_flag;

void accessResource() {
    std::call_once(resource_flag, []() {
        resource_ptr.reset(new ExpensiveResource());
    });
    resource_ptr->useResource();
}

这种方式不仅简洁,而且性能优于互斥量方案。特别是初始化完成后,std::call_once的开销几乎可以忽略不计,而传统互斥量方案在每次访问时都会有加锁开销。

深入理解:为什么std::call_once更高效?

std::call_once的高效性来自于其实现方式。初始化完成后,它通常只需要检查一个标志位,而不需要像互斥量那样进行完整的锁操作。在大多数实现中,这是一个原子操作,极大降低了开销。

保护甚少更新的数据结构

对于读多写少的数据结构,传统互斥量的开销过大。下面是一些更适合的解决方案。

读写锁的高级应用

在DNS缓存这类读多写少的场景中,读写锁表现优异。让我们扩展之前的DNS缓存示例,加入一些高级功能:

#include <iostream>
#include <thread>
#include <shared_mutex>
#include <map>
#include <string>
#include <chrono>

class AdvancedDNSCache {
private:
    std::map<std::string, std::pair<std::string, std::chrono::system_clock::time_point>> entries;
    mutable std::shared_mutex mutex;
    const std::chrono::seconds TTL{3600}; // 缓存项有效期1小时

public:
    std::string lookupDomain(const std::string& domain) const {
        std::shared_lock lock(mutex);
        auto it = entries.find(domain);
        if (it == entries.end()) {
            return "Not found";
        }
        
        // 检查缓存是否过期
        auto now = std::chrono::system_clock::now();
        if (now - it->second.second > TTL) {
            return "Expired"// 实际应用中可能会刷新缓存
        }
        
        return it->second.first;
    }

    void updateDomain(const std::string& domain, const std::string& ip) {
        std::unique_lock lock(mutex);
        entries[domain] = {ip, std::chrono::system_clock::now()};
    }

    // 清理过期缓存
    void cleanExpiredEntries() {
        std::unique_lock lock(mutex);
        auto now = std::chrono::system_clock::now();
        auto it = entries.begin();
        while (it != entries.end()) {
            if (now - it->second.second > TTL) {
                it = entries.erase(it);
            } else {
                ++it;
            }
        }
    }
};

这个实现不仅支持并发读取,还添加了缓存过期和清理功能,同时保持高效的锁管理。

双缓冲技术

对于极端读多写少的场景,双缓冲(Double Buffering)技术是一种高级优化策略:

#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <vector>
#include <memory>

template<typename T>
class DoubleBuffer {
private:
    std::shared_ptr<T> buffers[2]; // 两个缓冲区
    std::atomic<int> currentBuffer{0}; // 当前前台缓冲区索引
    std::mutex writeMutex; // 写操作互斥量

public:
    DoubleBuffer(const T& initialData) {
        buffers[0] = std::make_shared<T>(initialData);
        buffers[1] = std::make_shared<T>(initialData);
    }

    // 读操作 - 无锁,高效
    std::shared_ptr<T> getReadBuffer() const {
        return buffers[currentBuffer.load()];
    }

    // 写操作 - 修改后台缓冲区,然后切换
    template<typename UpdateFunc>
    void update(UpdateFunc updateFunction) {
        std::lock_guard<std::mutex> lock(writeMutex);
        
        // 获取后台缓冲区索引
        int backBuffer = 1 - currentBuffer.load();
        
        // 确保后台缓冲区有最新数据
        *buffers[backBuffer] = *buffers[currentBuffer];
        
        // 更新后台缓冲区
        updateFunction(*buffers[backBuffer]);
        
        // 切换前后台缓冲区(原子操作)
        currentBuffer.store(backBuffer);
    }
};

// 使用示例
struct ConfigData {
    int param1;
    float param2;
    std::string param3;
    
    ConfigData(int p1 = 0float p2 = 0.0fconst std::string& p3 = "")
        : param1(p1), param2(p2), param3(p3) {}
};

void reader(const DoubleBuffer<ConfigData>& config) {
    auto data = config.getReadBuffer();
    std::cout << "Read: " << data->param1 << ", " 
              << data->param2 << ", " 
              << data->param3 << std::endl;
}

void writer(DoubleBuffer<ConfigData>& config, int p1, float p2, const std::string& p3) {
    config.update([p1, p2, p3](ConfigData& data) {
        data.param1 = p1;
        data.param2 = p2;
        data.param3 = p3;
    });
    std::cout << "Updated config" << std::endl;
}

双缓冲技术的核心思想是:读线程始终访问当前的"前台"缓冲区(无锁访问),而写线程更新"后台"缓冲区,然后通过原子操作切换前后台缓冲区。这种方式使读操作完全无阻塞,写操作也只会阻塞其他写操作。

递归加锁

在某些场景下,同一线程可能需要多次获取同一个锁。例如,在递归函数中或者类的方法互相调用时。

递归锁基本原理

递归锁允许同一线程多次获取锁而不会导致死锁。C++标准库提供了std::recursive_mutex来实现这一功能:

#include <iostream>
#include <thread>
#include <mutex>

std::recursive_mutex resource_mutex;

void recursiveFunction(int depth) {
    if (depth <= 0return;
    
    // 可以多次获取同一锁
    resource_mutex.lock();
    std::cout << "Locked at depth " << depth << std::endl;
    
    // 递归调用自身
    recursiveFunction(depth - 1);
    
    resource_mutex.unlock();
    std::cout << "Unlocked at depth " << depth << std::endl;
}

int main() {
    std::thread t(recursiveFunction, 5);
    t.join();
    return 0;
}

这个简单示例展示了递归锁的基本用法。在递归函数中,每个嵌套调用都会获取锁,而不会导致死锁。

递归锁的应用:接口调用

递归锁在类方法互相调用时特别有用:

#include <iostream>
#include <thread>
#include <mutex>
#include <map>
#include <string>

class StudentDatabase {
private:
    std::map<int, std::string> students;
    std::recursive_mutex mutex;

public:
    bool addStudent(int id, const std::string& name) {
        std::lock_guard<std::recursive_mutex> lock(mutex);
        
        // 检查学生是否已存在
        if (queryStudent(id) != "Not found") { // 这里调用了另一个使用锁的方法
            std::cout << "Student already exists!" << std::endl;
            return false;
        }
        
        students[id] = name;
        std::cout << "Added student: " << name << std::endl;
        return true;
    }
    
    std::string queryStudent(int id) {
        std::lock_guard<std::recursive_mutex> lock(mutex);
        auto it = students.find(id);
        if (it == students.end()) {
            return "Not found";
        }
        return it->second;
    }
    
    void addScore(int id, int score) {
        std::lock_guard<std::recursive_mutex> lock(mutex);
        // 首先查询学生
        std::string name = queryStudent(id); // 这里嵌套获取了锁
        if (name == "Not found") {
            std::cout << "Cannot add score, student not found" << std::endl;
            return;
        }
        
        std::cout << "Added score " << score << " for student " << name << std::endl;
        // 在实际应用中,这里会更新分数
    }
};

void testDatabase(StudentDatabase& db) {
    db.addStudent(1"Alice");
    db.addStudent(2"Bob");
    db.addScore(195);
    db.addScore(385); // 这个应该失败,因为学生不存在
}

在这个例子中,addStudent和addScore方法都调用了queryStudent方法,而每个方法都获取了同一个互斥量。如果使用普通互斥量,这将导致死锁,但递归互斥量允许此类嵌套调用。

递归锁的工作原理与限制

递归互斥量通过计数机制工作:

  • • 当线程首次锁定互斥量时,计数器设为1
  • • 同一线程再次锁定时,计数器加1
  • • 线程解锁时,计数器减1
  • • 只有当计数器归零时,互斥量才会真正释放,允许其他线程获取

需要注意的是,使用递归锁时必须确保解锁次数等于加锁次数,否则互斥量将无法释放。

另外,尽管递归锁解决了技术上的问题,但从设计角度看,频繁使用递归锁可能意味着类设计不够清晰。一个更好的方案是重构代码,抽取共享功能到无锁的私有方法中。

如何选择合适的保护机制

根据我多年的并发编程经验,选择保护共享数据的机制时应考虑以下因素:

  • • 访问模式:读多写少用读写锁或双缓冲,写多读少用标准互斥量。
  • • 粒度要求:需要细粒度控制时,考虑为不同数据使用不同的互斥量。
  • • 性能需求:对性能要求极高时,可能需要无锁数据结构或自定义同步机制。
  • • 代码复杂性:有时候稍微牺牲性能换取代码可读性和可维护性是值得的。

在我看来,最优的并发设计不是选择最复杂的机制,而是选择恰到好处的机制。有时候,通过重新设计数据流,甚至可以完全避免共享状态,从根本上消除同步需求。

总结

掌握多样化的共享数据保护技术,对编写高性能并发程序至关重要。记住,互斥量只是工具箱中的一种工具,而非唯一工具。在今天的高并发计算环境中,能够根据具体场景灵活选择合适的同步机制,将成为区分普通程序员和卓越程序员的关键因素。

 

阅读剩余
THE END