07-条件变量在生产者-消费者模型中的陷阱

2020/07/03 cpp_concurrency 共 2188 字,约 7 分钟

条件变量在生产者-消费者模型中的使用

在典型的生产者-消费者模型中,生产者获取锁将数据放入队列,消费者进行轮询判断队列是否为空不为空则从队列中获取数据并释放锁。

该模型可以很好的管理好生产者和消费者。但是存在一个问题——消费者线程总是要去轮询,如果一直没有数据那么会导致空轮询,大量浪费CPU资源。

这里想到的一个办法是使用条件变量来解决该问题。当生产者生产数据后进行notify,消费者则wait直到收到notify消息。该方法看似完美的解决了生产者忙轮询的问题。但实际上却引入了新的问题!!——当生产者进行notify的时候,如果没有消费者进行wait,会导致该notify丢失,导致的问题就是队列中永远有一些数据无法被消费,并且如果生产者线程退出后,消费者线程会被阻塞在wait上导致死锁。在《in action》中4.1的代码清单中就是这样写的,点我跳转到示例代码

示例代码如下:

#include <algorithm>
#include <vector>
#include <atomic>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <iostream>
#include <queue>

// 测试notify的时候没有正在等待的线程
void main()
{
    std::vector<std::thread> threads;
    const int consume_count = 5;
    semaphore s(consume_count);
    std::mutex mutex;
    std::condition_variable condition;

    std::queue<int> queue;

    // 生产者4
    auto product_thread = std::thread([&queue,&mutex,&condition]() {
        int nCounter = 0;
        while (true)
        {
            std::unique_lock<std::mutex> ulock(mutex);
            nCounter++;
            queue.push(nCounter);
            std::cout << "product:" << nCounter << std::endl;
            ulock.unlock();
            std::this_thread::sleep_for(std::chrono::seconds(1));	// 生产者每一秒生产一个
            condition.notify_one();	// 这里的本意是生产出来后就通知消费者,但是这里有一个致命的陷阱!!!
            // 当消费者线程都在处理数据而没有线程处于wait状态的时候,该notify会被遗弃
            // 因此,当消费者由于速度比较慢在消费完毕后再次进入wait,而生产者线程退出后不再进行notify,导致消费者进入死锁状态!!!

            if (nCounter >= 20)	// 快速生产20个后停止
            {
                return;
            }
        }
    });

    threads.push_back(std::move(product_thread));

    // 消费者
    for (int i = 0; i < consume_count; ++i)
    {
        auto t = std::thread([&queue, &mutex, &condition]() {
            while (true)
            {
                std::unique_lock<std::mutex> ulock(mutex);
                condition.wait(ulock);	// 
                if (!queue.empty())
                {
                    std::cout << "consume:" << queue.front() << std::endl;
                    queue.pop();
                }
                ulock.unlock();

                std::this_thread::sleep_for(std::chrono::seconds(10));	// 消费者速度比生产者慢,使当生产者notify的时候,消费者非wait的几率较大
            }
        });
        threads.push_back(std::move(t));
    }


    for (auto& t : threads)
    {
        t.join();
    }
}

解决问题

为了避免死锁,可以使用wait_for代替wait函数。这样即使后面没有其他线程唤醒自己也会在指定的时间之后来消费队列中的数据。 另一个思路是,在生产者线程中增加判断——即使没有数据要生产,如果队列中有数据也定时的去notify。虽然这样一来又和消费者那样也会进行忙轮询,但是要好过多个消费者去忙轮询。 上面的2个方法都不是特别的好,使用wait进行条件等待才是最好的解决办法。

更好的办法是,在进入wait之前,你必须保证你的任务队列是空的,否则你就应该继续执行任务,而不是在队列中有任务的情况下你还需要其他线程来告诉你说,哥们,有任务来了,快醒醒。即使作为一个打工仔,眼前有事没做完,还要等老板告诉你去做眼前的事,那你离被开大概也不远了(玩笑话)~~。在此,只需要在条件变量wait的时候指定一个等待的条件即可解决该问题。

文档信息

Search

    Table of Contents