05-条件变量——std::condition_variable

2020/07/03 cpp_concurrency 共 6557 字,约 19 分钟

条件变量是另外一种用来同步的方法。而所谓条件变量,就是说在某一条件达成的时候线程可以继续往下执行,否则将一直等待下去。C++提供了2种条件变量,这里只说condition_variable(因为condition_variable_any 我也不知道咋用的~)。虽然条件变量可以用来进行同步,但是其同步功能还是需要mutex来完成,它自己只提供条件唤醒、等待等操作。

假唤醒

所谓假唤醒,就是被唤醒的时候其条件不符合被唤醒的预期。在使用条件变量的时候造成假唤醒的典型代码如下:

std::unique_lock ulock(mutex);
if(!state)
{
    cv.wait(ulock);
}

上面的代码当第一次加锁成功后,由于state为false,此时条件变量将进入等待状态,直到下一次被唤醒。但是,由于系统底层的信号问题,导致他被唤醒的时候,其state并不一定是true(可能被唤醒之前是true但是又被其他线程修改为false了),导致线程被唤醒,代码继续往下执行,但是条件与我们的预期不符,这种情况好一点的可能啥错都不会发生,但是如果是具有任务队列的线程,那么可能就会造成未定义的行为直接导致程序崩溃。

要解决上面的问题,应该使用while循环,也就是如下的代码:

std::unique_lock ulock(mutex);
while(!state)
{
    cv.wait(ulock);
}

因为当线程被唤醒的时候,cv是处于加锁的状态,此时如果state为false,那么它将会再次进入休眠状态并释放锁,等待下次的被唤醒。如果为true则与我们的预期相符,执行后面的代码。其实标准库有一个上面代码的简化版本——接收2个参数的wait成员函数,实例如下:

std::unique_lock ulock(mutex);
cv.wait(ulock,[&state](){return !state;});

实际上,接收2个参数的wait,也是在其内部一直进行while循环等待而已。

条件变量的其他用法可以参考《条件变量和互斥锁实现信号量》。

关于 wait_for 是否使用第三个参数的讨论

当我们要结束线程时可以选择在析构函数中使用条件变量通知线程。一般情况下,线程总是要执行某些任务的,有时候在线程空闲的时候我们选择sleep,有时候则选择使用条件变量进行等待。其实,C++11提供了 wait_for 满足前面两种情况。C++11的wait_for有两个版本,一个是接受2个参数的,其中第二个参数是表示超时时间的;另一个版本的接收3个参数,第二个参数表示超市时间,第三个参数是一个条件,当线程被唤醒或者进入wait的时候,就会循环检查条件以及是否超时,其底层代码如下:

template<typename _Lock, typename _Clock,typename _Duration, typename _Predicate>
bool wait_until(_Lock& __lock,const chrono::time_point<_Clock, _Duration>& __atime,_Predicate __p)
{
    while (!__p())
    {
        if (wait_until(__lock, __atime) == cv_status::timeout)
            return __p();
    }
    return true;
}

如果我们的子线程有任务需要执行,又需要通知它结束的时候尽早退出,该怎么做的?答案是使用第三个参数的wait_for,示例代码如下:

class task
{
public:
    task(const int64_t interval = 60*10):m_interval(interval),m_stop(false)
    {
        m_thread = std::thread([this](){
            while (!this->m_stop)
            {
                timer_scope t;
                std::cout << "will sleep for " << this->m_interval << ", and wait for condition" << std::endl;
                std::unique_lock<std::mutex> ulock(this->m_mutex);
                this->m_cv.wait_for(ulock,std::chrono::seconds(this->m_interval),[this](){
                    return this->m_stop.load();
                });
            }
        });
    }

    ~task()
    {
        stop();
        if (m_thread.joinable())
        {
            m_thread.join();
        }
    }
    void stop()
    {
        m_stop = true;
        std::unique_lock<std::mutex> ulock(this->m_mutex);
        m_cv.notify_one();
    }

private:
    int64_t m_interval;
    std::thread m_thread;
    std::condition_variable m_cv;
    std::mutex m_mutex;
    std::atomic<bool> m_stop;
};

condition_variable源码:

class condition_variable { // class for waiting for conditions
public:
    using native_handle_type = _Cnd_t;

    condition_variable() {
        _Cnd_init_in_situ(_Mycnd());
    }

    ~condition_variable() noexcept {
        _Cnd_destroy_in_situ(_Mycnd());
    }

    condition_variable(const condition_variable&) = delete;
    condition_variable& operator=(const condition_variable&) = delete;

    void notify_one() noexcept { // wake up one waiter
        _Cnd_signal(_Mycnd());
    }

    void notify_all() noexcept { // wake up all waiters
        _Cnd_broadcast(_Mycnd());
    }

    void wait(unique_lock<mutex>& _Lck) { // wait for signal
        // Nothing to do to comply with LWG-2135 because std::mutex lock/unlock are nothrow
        _Cnd_wait(_Mycnd(), _Lck.mutex()->_Mymtx());
    }

    template <class _Predicate>
    void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) { // wait for signal and test predicate
        while (!_Pred()) {
            wait(_Lck);
        }
    }

    template <class _Rep, class _Period>
    cv_status wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time) {
        // wait for duration
        if (_Rel_time <= chrono::duration<_Rep, _Period>::zero()) {
            return cv_status::timeout;
        }

        // TRANSITION, ABI: The standard says that we should use a steady clock,
        // but unfortunately our ABI speaks struct xtime, which is relative to the system clock.
        _CSTD xtime _Tgt;
        const bool _Clamped     = _To_xtime_10_day_clamped(_Tgt, _Rel_time);
        const cv_status _Result = wait_until(_Lck, &_Tgt);
        if (_Clamped) {
            return cv_status::no_timeout;
        }

        return _Result;
    }

    template <class _Rep, class _Period, class _Predicate>
    bool wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time, _Predicate _Pred) {
        // wait for signal with timeout and check predicate
        return _Wait_until1(_Lck, chrono::steady_clock::now() + _Rel_time, _Pred);
    }

    template <class _Clock, class _Duration>
    cv_status wait_until(unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time) {
        // wait until time point
        for (;;) {
            const auto _Now = _Clock::now();
            if (_Abs_time <= _Now) {
                return cv_status::timeout;
            }

            _CSTD xtime _Tgt;
            (void) _To_xtime_10_day_clamped(_Tgt, _Abs_time - _Now);
            const cv_status _Result = wait_until(_Lck, &_Tgt);
            if (_Result == cv_status::no_timeout) {
                return cv_status::no_timeout;
            }
        }
    }

    template <class _Clock, class _Duration, class _Predicate>
    bool wait_until(
        unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate _Pred) {
        // wait for signal with timeout and check predicate
        return _Wait_until1(_Lck, _Abs_time, _Pred);
    }

    cv_status wait_until(unique_lock<mutex>& _Lck, const xtime* _Abs_time) {
        // wait for signal with timeout
        if (!_Mtx_current_owns(_Lck.mutex()->_Mymtx())) {
            _Throw_Cpp_error(_OPERATION_NOT_PERMITTED);
        }

        // Nothing to do to comply with LWG-2135 because std::mutex lock/unlock are nothrow
        const int _Res = _Cnd_timedwait(_Mycnd(), _Lck.mutex()->_Mymtx(), _Abs_time);
        switch (_Res) {
        case _Thrd_success:
            return cv_status::no_timeout;
        case _Thrd_timedout:
            return cv_status::timeout;
        default:
            _Throw_C_error(_Res);
        }
    }

    template <class _Predicate>
    bool wait_until(unique_lock<mutex>& _Lck, const xtime* _Abs_time, _Predicate _Pred) {
        // wait for signal with timeout and check predicate
        return _Wait_until1(_Lck, _Abs_time, _Pred);
    }

    _NODISCARD native_handle_type native_handle() {
        return _Mycnd();
    }

    void _Register(unique_lock<mutex>& _Lck, int* _Ready) { // register this object for release at thread exit
        _Cnd_register_at_thread_exit(_Mycnd(), _Lck.release()->_Mymtx(), _Ready);
    }

    void _Unregister(mutex& _Mtx) { // unregister this object for release at thread exit
        _Cnd_unregister_at_thread_exit(_Mtx._Mymtx());
    }

private:
    aligned_storage_t<_Cnd_internal_imp_size, _Cnd_internal_imp_alignment> _Cnd_storage;

    _Cnd_t _Mycnd() noexcept { // get pointer to _Cnd_internal_imp_t inside _Cnd_storage
        return reinterpret_cast<_Cnd_t>(&_Cnd_storage);
    }

    template <class _Predicate>
    bool _Wait_until1(unique_lock<mutex>& _Lck, const xtime* _Abs_time, _Predicate& _Pred) {
        // wait for signal with timeout and check predicate
        while (!_Pred()) {
            if (wait_until(_Lck, _Abs_time) == cv_status::timeout) {
                return _Pred();
            }
        }

        return true;
    }

    template <class _Clock, class _Duration, class _Predicate>
    bool _Wait_until1(
        unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate& _Pred) {
        while (!_Pred()) {
            const auto _Now = _Clock::now();
            if (_Abs_time <= _Now) {
                return false;
            }

            _CSTD xtime _Tgt;
            const bool _Clamped = _To_xtime_10_day_clamped(_Tgt, _Abs_time - _Now);
            if (wait_until(_Lck, &_Tgt) == cv_status::timeout && !_Clamped) {
                return _Pred();
            }
        }

        return true;
    }
};

文档信息

Search

    Table of Contents