06-使用条件变量和互斥锁实现信号量

2020/07/03 cpp_concurrency 共 2158 字,约 7 分钟

C++11 没有提供信号量,但是可以使用条件变量和互斥锁很容易的实现信号量。信号量是用来在多线程中进行资源同步的。信号量内部维护资源的数量,并且提供2个操作——wait和signal,wait的时候获取资源并减少计数器,signal的时候释放资源并增加计数器。只有当计数器的数目>0的情况下去wait才能够获取到资源。

注意:

使用信号量的一个原则是只有获取资源才wait,释放资源才调用signal!!

信号量实现

#include <condition_variable>
#include <thread>

class semaphore
{
    public:
    semaphore(int count) :m_counter(count){}
    ~semaphore(){}

    void wait()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_counter--;
        while (m_counter < 0)// 防止假唤醒
        {
            m_cv.wait(lock);
        }
    }

    void signal()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        if (++m_counter <= 0)   // 在执行++之后<=0,表明有其他线程在等待资源,因此执行唤醒
        {
            m_cv.notify_one();
        }
    }


    int m_counter;
    std::condition_variable m_cv;
    std::mutex m_mutex;
};

测试代码

#include <algorithm>
#include <vector>
#include <atomic>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <iostream>
#include <chrono>
#include "fxtime.h"

int main()
{
    semaphore s(5);
    int task_count = 1000;
    int i = 0;
    std::vector<int> queue(task_count);
    std::transform(queue.begin(), queue.end(), queue.begin(), [&i](int &n) {i++; return i; });
    int thread_count = 3;
    std::vector<std::thread> threads;
    std::mutex m;


    int sub_index = 0;
    fxtime t;
    for (int i = 0; i < thread_count; ++i)
    {
        auto t = std::thread([&queue, &sub_index,&s,&m, task_count]() {
            do
            {
                s.wait();

                std::unique_lock<std::mutex> lock(m);
                if (sub_index >= task_count)
                {
                    std::cout << "thread return" << std::endl;
                    return;
                }
                std::cout << ",q[" << sub_index << "]=" << queue[sub_index] << ";counter = [" << s.m_counter << "]"<< std::endl;
                sub_index++;
                lock.unlock();
                std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100+10));
                s.signal();
            } while (true);
        });

        threads.push_back(std::move(t));
        //s.wait();	// 这里进行wait是一个明显的错误,因为这里并没有获取资源!!!
    }

    std::cout << ";counter = [" << s.m_counter << "]" << std::endl;
    int exit_thread_count = 0;
    for (auto& t : threads)
    {
        t.join();
        exit_thread_count++;
        std::cout << "exit thread count = " << exit_thread_count << std::endl;
    }
    std::cout << "dont wait>time espaced:" << t.escaped_miliseconds() << "ms" << std::endl;
    std::cout << ";counter = [" << s.m_counter << "]" << std::endl;
    return 0;
}

文档信息

Search

    Table of Contents