3.互斥与死锁

如果没有实现移动构造和移动赋值,还可以进行赋值的操作的情况下,说明一定实现了拷贝赋值

系统不会默认提供默认移动赋值,拷贝赋值,拷贝构造

互斥量——锁

临界区

临界区是多个线程能够同时访问到的代码,由于多个线程在运行时都会涉及到将共享资源拷贝到寄存器中,那么就会出现不同线程拷贝到同一数据,这就造成了数据竞争问题

锁的简单使用

死循环对于CPU时间片的抢占是很严重的

std::mutex mtx1;
int shared_data = 100;
void use_lock()
{
    while (true)
    {
        mtx1.lock();
        shared_data++;
        std::cout << std::this_thread::get_id() << " : " << shared_data << "\n";
        mtx1.unlock();
        // 死循环对于时间片的抢占是很严重的
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}
void test_lock()
{
    std::thread t([](){
       while(true)
       {
        mtx1.lock();
        shared_data--;
        std::cout << std::this_thread::get_id() << " : " << shared_data << "\n";
        mtx1.unlock();
        std::this_thread::sleep_for(std::chrono::seconds(1));
       }
    });
    std::thread t1(use_lock);
    t.join();
    t1.join();
}

lock_guard——锁的RAII

在作用域结束时自动调用其析构函数解锁,这么做的一个好处是简化了一些特殊情况从函数中返回的写法,比如异常或者条件不满足时,函数内部直接return,锁也会自动解开

void test_lock_guard(){
    while(true)
    {
        {
            std::lock_guard<std::mutex> lock(mtx1);
            shared_data--;
            std::cout << std::this_thread::get_id() << " : " << shared_data << "\n";
        }
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

锁的局限性

锁只能够解决函数体内部的数据竞争,但是对于函数返回值并不能保证数据安全

例子

template<typename T>
class threadsafe_stack{
private:
    mutable std::mutex mtx;
    std::stack<T> data;
public:
    threadsafe_stack(){}
    threadsafe_stack(const threadsafe_stack& other)
    {
        std::lock_guard<std::mutex> lock(other.mtx);
        data = other.data;
    }
    threadsafe_stack& operator=(threadsafe_stack& val) = delete;
    bool empty() const{
        std::lock_guard<std::mutex> lock(mtx);
        return data.empty();
    }
    void push(T newValue)
    {
        std::lock_guard<std::mutex> lock(mtx);
        data.push(std::move(newValue));
    }
    T pop()
    {
        std::lock_guard<std::mutex> lock(mtx);
        auto element = data.top();
        data.pop();
        return element;
    }
};
void test_threadsafe_stack1(){
    threadsafe_stack<int> safe_stack;
    safe_stack.push(1);
    std::thread t1([&safe_stack]() {
        if (!safe_stack.empty()) {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            safe_stack.pop();
            }
        });
    std::thread t2([&safe_stack]() {
        if (!safe_stack.empty()) {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            safe_stack.pop();
        }
    });
    t1.join();
    t2.join();
}

对于test_threadsafe_stack1(),在运行时会出现段错误,因为判断数据是否为空后,互斥锁就已经解锁了,之后再pop时,由于pop直接加锁进行数据的弹出,所以一个线程会弹出数据,另一个线程也会弹出数据(但是此时已经为空),从而出现段错误

所以需要对于弹出数据的函数pop进行优化:抛出异常

并且由于是直接返回的是原始值element,但是如果element是一个占用内存空间很大的值,那么其他线程在调用的时候有可能会失效(因为涉及到返回值的拷贝,栈溢出),从而导致崩溃,所以可以使用智能指针

解决方法:智能指针和引用 + 抛出异常/返回空指针

    std::shared_ptr<T> pop()
    {
        std::lock_guard<std::mutex> lock(m);
        //②试图弹出前检查是否为空栈
        if (data.empty()) throw empty_stack();// return nullptr;
        //③改动栈容器前设置返回值
            std::shared_ptr<T> const res(std::make_shared<T>(data.top()));    
            data.pop();
        return res;
    }
    void pop(T& value)
    {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty()) throw empty_stack();// value ;
        value = data.top();
        data.pop();
    }

死锁

死锁一般是由于调运顺序不一致导致的,比如两个线程循环调用。当线程1先加锁A,再加锁B,而线程2先加锁B,再加锁A。那么在某一时刻就可能造成死锁。比如线程1对A已经加锁,线程2对B已经加锁,那么他们都希望彼此占有对方的锁,又不释放自己占有的锁导致了死锁。

总结:对方线程都占用自己希望使用的锁,并且不解锁

解决死锁的方式

  1. 要想解决死锁,那就需要避免循环加锁的情况,那么可以通过将加锁和解锁的功能分别封装成一个函数,避免逻辑上加锁的重叠

    也就是加锁和解锁组合成原子操作,各自只管理对应的功能

  2. 或者是同时对多把锁进行加锁,才去执行对应的逻辑

产生死锁的例子

class som_big_object {
public:
    som_big_object(int data) : _data(data) {}
    // 拷贝构造
    som_big_object(const som_big_object& b2) : _data(b2._data) {}
    // 移动构造
    som_big_object(som_big_object&& b2) noexcept : _data(std::move(b2._data)) {}
    // 重载输出运算符
    friend std::ostream& operator<<(std::ostream& os, const som_big_object& big_obj) {
        os << big_obj._data;
        return os;
    }
    // 重载赋值运算符
    som_big_object& operator=(const som_big_object& b2) {
        if (this == &b2) {
            return *this;
        }
        _data = b2._data;
        return *this;
    }
    // 交换数据
    friend void swap(som_big_object& b1, som_big_object& b2) {
        std::swap(b1._data, b2._data);
    }
private:
    int _data;
};

class big_object_mgr {
public:
    big_object_mgr(int data = 0) : _obj(data) {}
    // 删除拷贝构造和拷贝赋值运算符
    big_object_mgr(const big_object_mgr&) = delete;
    big_object_mgr& operator=(const big_object_mgr&) = delete;
    // 移动构造
    big_object_mgr(big_object_mgr&& other) noexcept : _obj(std::move(other._obj)) {}
    // 移动赋值运算符
    big_object_mgr& operator=(big_object_mgr&& other) noexcept {
        if (this != &other) {
            _obj = std::move(other._obj);
        }
        return *this;
    }
    void printinfo() {
        std::cout << "current obj data is " << _obj << std::endl;
    }
    friend void danger_swap(big_object_mgr& objm1, big_object_mgr& objm2);
private:
    std::mutex _mtx;
    som_big_object _obj;
};
void danger_swap(big_object_mgr& objm1, big_object_mgr& objm2) {
    std::cout << "thread [" << std::this_thread::get_id() << "] begin\n";
    if (&objm1 == &objm2) return;

    std::lock_guard<std::mutex> guard1(objm1._mtx);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::lock_guard<std::mutex> guard2(objm2._mtx);
    swap(objm1, objm2);
}

void test_danger_swap() {
    big_object_mgr objm1(5);
    big_object_mgr objm2(100);
    std::thread t1(danger_swap, std::ref(objm1), std::ref(objm2));
    std::thread t2(danger_swap, std::ref(objm2), std::ref(objm1));
    t1.join();
    t2.join();
    objm1.printinfo();
    objm2.printinfo();
}

int main() {
    test_danger_swap();
    return 0;
}

在测试使用危险方式交换,发生死锁是因为第一个线程在给第一个对象加上锁后等待了一秒,之后另一个线程启动后给第二个对象加锁,从而二者都在等待了一秒之后都在等待对方解锁,从而发生死锁

解决该死锁的方式,使用std::lock同时加上多个锁+领养锁机制

void safe_swap(big_object_mgr& objm1, big_object_mgr& objm2){
    std::cout << "thread [" << std::this_thread::get_id() << "] begin\n";
    if (&objm1 == &objm2) return;
    // 使用std::lock来加锁,并且使用领养锁来解锁
    // 领养锁是知道不需要给锁来初始化和加锁,只负责解锁
    std::lock(objm1._mtx,objm2._mtx);
    std::lock_guard<std::mutex> guard1(objm1._mtx,std::adopt_lock);
    std::lock_guard<std::mutex> guard2(objm2._mtx,std::adopt_lock);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    swap(objm1, objm2);
    std::cout << "thread [" << std::this_thread::get_id() << "] end\n";
}
void test_safe_swap() {
    big_object_mgr objm1(5);
    big_object_mgr objm2(100);
    std::thread t1(safe_swap, std::ref(objm1), std::ref(objm2));
    std::thread t2(safe_swap, std::ref(objm2), std::ref(objm1));
    t1.join();
    t2.join();
    objm1.printinfo();
    objm2.printinfo();
}

使用std::scoped_lock

std::scoped_lock(C++17)会给两个锁初始化为lock_guard并且同时上锁

void safe_swap_scope(big_object_mgr& objm1, big_object_mgr& objm2){
    std::cout << "thread[" << std::this_thread::get_id() << "] begin\n";
    std::scoped_lock gurad(objm1._mtx,objm2._mtx);
    swap(objm1,objm2);
    std::cout << "thread[" << std::this_thread::get_id() << "] end\n";
}
void test_safe_swap1() {
    big_object_mgr objm1(5);
    big_object_mgr objm2(100);
    std::thread t1(safe_swap_scope, std::ref(objm1), std::ref(objm2));
    std::thread t2(safe_swap_scope, std::ref(objm2), std::ref(objm1));
    t1.join();
    t2.join();
    objm1.printinfo();
    objm2.printinfo();
}

层级锁

class hierachical_mutex{
public:
    explicit hierachical_mutex(unsigned long value):
     _hierachy_value(value),
     _previous_hierachy_value(0){}

    hierachical_mutex(const hierachical_mutex&) = delete;
    hierachical_mutex& operator=(const hierachical_mutex&) = delete;

    void lock()
    {
        check_for_hierarchy_violation();
        _internal_mutex.lock();
        update_hierarchy_value();
    }

    void unlock()
    {
        if(_this_thread_hierarchy_value != _hierachy_value)
        {
            throw std::logic_error("mutex hierarchy violated");
        }
        _this_thread_hierarchy_value = _previous_hierachy_value;
        _internal_mutex.unlock();
    }

    bool try_lock()
    {
        check_for_hierarchy_violation();
        if(!_internal_mutex.try_lock()) return false;
        update_hierarchy_value();
        return true;
    }

private:
    std::mutex _internal_mutex;
    // 锁对应的层级值
    unsigned long const _hierachy_value;
    // 上一级层级值
    unsigned long  _previous_hierachy_value;
    // 本线程当前持有的线程值
    static thread_local unsigned long _this_thread_hierarchy_value;

    void check_for_hierarchy_violation()
    {
        if(_this_thread_hierarchy_value <= _hierachy_value)
        {
            throw std::logic_error("mutex hierarchy violated");
        }
    }
    void update_hierarchy_value()
    {
        // 当当前的线程的锁的优先级的最大值小于新的锁时
        // 更新
        _previous_hierachy_value = _this_thread_hierarchy_value;
        _this_thread_hierarchy_value = _hierachy_value;
    }
};
thread_local unsigned long hierachical_mutex::_this_thread_hierarchy_value(ULONG_MAX);

void test_hierarchy_lock() {
    hierachical_mutex  hmtx1(1000);
    hierachical_mutex  hmtx2(500);
    std::thread t1([&hmtx1, &hmtx2]() {
        hmtx1.lock();
        hmtx2.lock();
        hmtx2.unlock();
        hmtx1.unlock();
        });
    std::thread t2([&hmtx1, &hmtx2]() {
        hmtx2.lock();
        hmtx1.lock();
        hmtx1.unlock();
        hmtx2.unlock();
        });
    t1.join();
    t2.join();
}

hierarchical_mutex 类是一个专门的互斥锁实现,旨在强制线程之间的严格锁定层次结构。其主要目标是通过确保互斥锁总是以预定义的顺序获取来防止死锁。通过为每个互斥锁分配一个层次值,该类确保线程在持有更高级别的互斥锁时,无法获取较低层次级别的互斥锁,从而在整个应用程序中保持一致的锁定顺序。

设计原则

  1. 层次强制:核心思想是为每个互斥锁分配一个唯一的层次级别。线程必须按照这些级别的降序获取互斥锁。这种排序确保循环依赖(死锁的常见原因)是不可能的。

  2. 线程特定的层次跟踪:每个线程维护其当前的层次级别。当互斥锁被锁定时,线程的层次级别更新为该互斥锁的级别。解锁时,层次级别还原为先前状态。

  3. 异常处理:如果线程试图锁定违反层次的互斥锁(即在持有更高级别互斥锁时试图锁定较低级别的互斥锁),该类会抛出 std::logic_error。这种即时反馈有助于开发人员在开发过程中识别并纠正潜在的死锁场景。

类结构分析

让我们详细剖析 hierarchical_mutex 类,以理解其组件和功能。

1. 类声明和成员

class hierarchical_mutex {
public:
    explicit hierarchical_mutex(unsigned long value);
    hierarchical_mutex(const hierarchical_mutex&) = delete;
    hierarchical_mutex& operator=(const hierarchical_mutex&) = delete;

    void lock();
    void unlock();
    bool try_lock();

private:
    std::mutex _internal_mutex;
    const unsigned long _hierarchy_value;
    unsigned long _previous_hierarchy_value;

    static thread_local unsigned long _this_thread_hierarchy_value;

    void check_for_hierarchy_violation();
    void update_hierarchy_value();
};
  • 内部互斥锁 (_internal_mutex):提供标准锁定机制的底层互斥锁。所有锁定和解锁操作在层次检查后委托给此互斥锁。

  • 层次值

    • _hierarchy_value:分配给此互斥锁实例的唯一层次级别。
    • _previous_hierarchy_value:存储线程在获取此互斥锁之前的层次级别。
    • _this_thread_hierarchy_value(静态线程局部):跟踪线程的当前层次级别。作为 thread_local,每个线程都有自己的此变量实例,避免了竞争条件。

2. 构造函数

explicit hierarchical_mutex(unsigned long value)
    : _hierarchy_value(value), _previous_hierarchy_value(0) {}
  • 参数value 分配互斥锁的层次级别。
  • 初始化
    • _hierarchy_value 被设置为提供的 value
    • _previous_hierarchy_value 初始化为 0,表示没有先前的层次级别。

3. 删除的复制操作

hierarchical_mutex(const hierarchical_mutex&) = delete;
hierarchical_mutex& operator=(const hierarchical_mutex&) = delete;
  • 理由:互斥锁是不可复制的,以防止多个互斥锁实例管理相同的同步机制,这可能导致未定义的行为。

4. 锁定机制

void lock() {
    check_for_hierarchy_violation();
    _internal_mutex.lock();
    update_hierarchy_value();
}
  • 过程
    1. 层次检查:在获取互斥锁之前,check_for_hierarchy_violation() 确保当前线程的层次级别高于互斥锁的级别。如果不是,则抛出异常,阻止锁定。
    2. 获取互斥锁:如果层次检查通过,则锁定内部互斥锁。
    3. 更新层次:在成功锁定后,update_hierarchy_value() 更新线程的层次级别以反映新获取的互斥锁。

5. 解锁机制

void unlock() {
    if (_this_thread_hierarchy_value != _hierarchy_value) {
        throw std::logic_error("mutex hierarchy violated");
    }
    _this_thread_hierarchy_value = _previous_hierarchy_value;
    _internal_mutex.unlock();
}
  • 过程
    1. 层次验证:确保线程的当前层次级别与互斥锁的级别匹配。如果不是,表示锁管理中的逻辑错误,并抛出异常。
    2. 恢复先前的层次:线程的层次级别还原为先前状态,有效地在层次中后退一步。
    3. 释放互斥锁:解锁内部互斥锁。

6. 尝试锁定机制

bool try_lock() {
    check_for_hierarchy_violation();
    if (!_internal_mutex.try_lock()) {
        return false;
    }
    update_hierarchy_value();
    return true;
}
  • 过程
    1. 层次检查:类似于 lock(),首先检查层次违规。
    2. 尝试锁定:尝试获取内部互斥锁,而不阻塞。
    3. 如果成功,更新层次:如果锁定成功,更新线程的层次级别。
    4. 返回状态:如果锁定成功,返回 true,否则返回 false

7. 私有辅助函数

  • check_for_hierarchy_violation():

    void check_for_hierarchy_violation() {
      if (_this_thread_hierarchy_value <= _hierarchy_value) {
          throw std::logic_error("mutex hierarchy violated");
      }
    }
    • 目的:确保被获取的互斥锁具有低于线程当前层次级别的层次级别。这强制互斥锁按照层次降序锁定。
    • 错误处理:如果检测到违规,抛出 std::logic_error
  • update_hierarchy_value():

    void update_hierarchy_value() {
      _previous_hierarchy_value = _this_thread_hierarchy_value;
      _this_thread_hierarchy_value = _hierarchy_value;
    }
    • 目的:在成功锁定互斥锁后,更新线程当前的层次级别为此互斥锁的级别,同时存储先前的级别以备将来恢复。

静态线程局部变量

static thread_local unsigned long _this_thread_hierarchy_value;
  • 功能:独立维护每个线程的当前层次级别。作为 thread_local,它确保每个线程都有自己的单独实例,避免线程之间的干扰。
  • 初始化:通常初始化为最高可能的层次级别(在提供的代码中未显示),确保任何线程第一次锁定的互斥锁设定了层次强制的基准。

异常安全

该类使用异常处理来维护层次的完整性:

  • 在锁定期间:如果检测到层次违规,异常会在尝试锁定互斥锁之前抛出,确保互斥锁的状态保持不变。

  • 在解锁期间:如果检测到层次不一致(例如,解锁的互斥锁不是线程当前持有的互斥锁),抛出异常,提醒开发人员潜在的误用。

防止死锁

死锁通常是由于不同线程之间不一致的互斥锁获取顺序引起的。通过强制执行严格的层次结构:

  1. 一致顺序:所有线程按照预定的层次级别降序获取互斥锁。
  2. 无循环依赖:由于高级别的互斥锁在低级别之前获取,循环等待(死锁的主要原因)是不可能的。
  3. 早期检测:任何偏离层次的尝试都会立即通过异常捕获,允许开发人员在问题导致死锁之前纠正它。

使用示例

考虑一个具有不同层次级别的多个互斥锁的场景:

hierarchical_mutex high_level_mutex(10000);
hierarchical_mutex medium_level_mutex(5000);
hierarchical_mutex low_level_mutex(1000);

void thread_function() {
    high_level_mutex.lock();
    // 执行需要高层次互斥锁的操作。

    medium_level_mutex.lock();
    // 执行需要中层次互斥锁的操作。

    low_level_mutex.lock();
    // 执行需要低层次互斥锁的操作。

    low_level_mutex.unlock();
    medium_level_mutex.unlock();
    high_level_mutex.unlock();
}

在此设置中:

  • 正确顺序:线程必须在 medium_level_mutex 之前锁定 high_level_mutex,在 low_level_mutex 之前锁定 medium_level_mutex
  • 强制执行:尝试逆转顺序(例如,在 high_level_mutex 之前锁定 low_level_mutex)将导致 std::logic_error,防止潜在的死锁。

优势

  1. 防止死锁:通过强制执行严格的锁定顺序,该类有效地消除了死锁的主要来源。
  2. 开发人员反馈:层次违规时立即异常警告开发人员在运行时的潜在同步问题。
  3. 线程安全:利用 thread_local 存储来维护每个线程的层次状态,确保线程安全的操作。

潜在限制

  1. 结构僵化:严格的层次可能对某些需要更灵活的同步机制的应用程序来说过于限制。
  2. 维护开销:开发人员必须仔细分配和管理每个互斥锁的层次级别,在大型代码库中可能变得繁琐。
  3. 异常管理:依赖异常进行控制流可能不符合所有编程范例,如果处理不当,可能会导致问题。

结论

hierarchical_mutex 类是一种经过深思熟虑的实现,旨在通过强制执行层次锁定顺序来增强互斥锁的安全性。通过集成层次检查和维护线程特定状态,它提供了一种防止死锁的强大机制,这是并发编程中的常见挑战。虽然它在互斥锁管理中引入了额外的复杂性,但在复杂的多线程应用程序中,其在可靠性和安全性方面的优势可能是显著的。