C++ 條件變量使用詳解
【導讀】:本文主要講解條件變量的詳細使用方法。
condition_variable介紹
在C++11中,我們可以使用條件變量(condition_variable)實現(xiàn)多個線程間的同步操作;當條件不滿足時,相關(guān)線程被一直阻塞,直到某種條件出現(xiàn),這些線程才會被喚醒。
其主要成員函數(shù)如下:

條件變量是利用線程間共享的全局變量進行同步的一種機制,主要包括兩個動作:
一個線程因等待"條件變量的條件成立"而掛起; 另外一個線程使"條件成立",給出信號,從而喚醒被等待的線程。
為了防止競爭,條件變量的使用總是和一個互斥鎖結(jié)合在一起;通常情況下這個鎖是std::mutex,并且管理這個鎖 只能是 std::unique_lockstd::mutex RAII模板類。
上面提到的兩個步驟,分別是使用以下兩個方法實現(xiàn):
等待條件成立使用的是condition_variable類成員wait 、wait_for 或 wait_until。
給出信號使用的是condition_variable類成員notify_one或者notify_all函數(shù)。
細節(jié)說明
在條件變量中只能使用std::unique_lock< std::mutex >說明
unique_lock和lock_guard都是管理鎖的輔助類工具,都是RAII風格;它們是在定義時獲得鎖,在析構(gòu)時釋放鎖。它們的主要區(qū)別在于unique_lock鎖機制更加靈活,可以再需要的時候進行l(wèi)ock或者unlock調(diào)用,不非得是析構(gòu)或者構(gòu)造時。它們的區(qū)別可以通過成員函數(shù)就可以一目了然。在這里插入圖片描述
wait/wait_for說明
線程的阻塞是通過成員函數(shù)wait()/wait_for()/wait_until()函數(shù)實現(xiàn)的。這里主要說明前面兩個函數(shù):
wait()成員函數(shù)
函數(shù)聲明如下:
void wait( std::unique_lock<std::mutex>& lock );
//Predicate 謂詞函數(shù),可以普通函數(shù)或者lambda表達式
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
wait 導致當前線程阻塞直至條件變量被通知,或虛假喚醒發(fā)生,可選地循環(huán)直至滿足某謂詞。
wait_for()成員函數(shù)
函數(shù)聲明如下:
template< class Rep, class Period >
std::cv_status wait_for( std::unique_lock<std::mutex>& lock,
const std::chrono::duration<Rep, Period>& rel_time);
template< class Rep, class Period, class Predicate >
bool wait_for( std::unique_lock<std::mutex>& lock,
const std::chrono::duration<Rep, Period>& rel_time,
Predicate pred);
wait_for 導致當前線程阻塞直至條件變量被通知,或虛假喚醒發(fā)生,或者超時返回。
返回值說明:
若經(jīng)過 rel_time 所指定的關(guān)聯(lián)時限則為 std::cv_status::timeout ,否則為 std::cv_status::no_timeout 。
若經(jīng)過 rel_time 時限后謂詞 pred 仍求值為 false 則為 false ,否則為 true 。
以上兩個類型的wait函數(shù)都在會阻塞時,自動釋放鎖權(quán)限,即調(diào)用unique_lock的成員函數(shù)unlock(),以便其他線程能有機會獲得鎖。這就是條件變量只能和unique_lock一起使用的原因,否則當前線程一直占有鎖,線程被阻塞。
notify_all/notify_one
notify函數(shù)聲明如下:
void notify_one() noexcept;
若任何線程在 *this 上等待,則調(diào)用 notify_one 會解阻塞(喚醒)等待線程之一。
void notify_all() noexcept;
若任何線程在 *this 上等待,則解阻塞(喚醒)全部等待線程。
虛假喚醒
在正常情況下,wait類型函數(shù)返回時要不是因為被喚醒,要不是因為超時才返回,但是在實際中發(fā)現(xiàn),因此操作系統(tǒng)的原因,wait類型在不滿足條件時,它也會返回,這就導致了虛假喚醒。因此,我們一般都是使用帶有謂詞參數(shù)的wait函數(shù),因為這種(xxx, Predicate pred )類型的函數(shù)等價于:
while (!pred()) //while循環(huán),解決了虛假喚醒的問題
{
wait(lock);
}
原因說明如下:
假設(shè)系統(tǒng)不存在虛假喚醒的時,代碼形式如下:
if (不滿足xxx條件)
{
//沒有虛假喚醒,wait函數(shù)可以一直等待,直到被喚醒或者超時,沒有問題。
//但實際中卻存在虛假喚醒,導致假設(shè)不成立,wait不會繼續(xù)等待,跳出if語句,
//提前執(zhí)行其他代碼,流程異常
wait();
}
//其他代碼
...
正確的使用方式,使用while語句解決:
while (!(xxx條件) )
{
//虛假喚醒發(fā)生,由于while循環(huán),再次檢查條件是否滿足,
//否則繼續(xù)等待,解決虛假喚醒
wait();
}
//其他代碼
....
條件變量使用
在這里,我們使用條件變量,解決生產(chǎn)者-消費者問題,該問題主要描述如下:
生產(chǎn)者-消費者問題,也稱有限緩沖問題,是一個多進程/線程同步問題的經(jīng)典案例。該問題描述了共享固定大小緩沖區(qū)的兩個進程/線程——即所謂的“生產(chǎn)者”和“消費者”,在實際運行時會發(fā)生的問題。
生產(chǎn)者的主要作用是生成一定量的數(shù)據(jù)放到緩沖區(qū)中,然后重復此過程。與此同時,費者也在緩沖區(qū)消耗這些數(shù)據(jù)。該問題的關(guān)鍵就是要保證生產(chǎn)者不會在緩沖區(qū)滿時加入數(shù)據(jù),消費者也不會在緩沖區(qū)中空時消耗數(shù)據(jù)。
要解決該問題,就必須讓生產(chǎn)者在緩沖區(qū)滿時休眠(要么干脆就放棄數(shù)據(jù)),等到下次消費者消耗緩沖區(qū)中的數(shù)據(jù)的時候,生產(chǎn)者才能被喚醒,開始往緩沖區(qū)添加數(shù)據(jù)。
同樣,也可以讓消費者在緩沖區(qū)空時進入休眠,等到生產(chǎn)者往緩沖區(qū)添加數(shù)據(jù)之后,再喚醒消費者。
生產(chǎn)者-消費者代碼如下:
std::mutex g_cvMutex;
std::condition_variable g_cv;
//緩存區(qū)
std::deque<int> g_data_deque;
//緩存區(qū)最大數(shù)目
const int MAX_NUM = 30;
//數(shù)據(jù)
int g_next_index = 0;
//生產(chǎn)者,消費者線程個數(shù)
const int PRODUCER_THREAD_NUM = 3;
const int CONSUMER_THREAD_NUM = 3;
void producer_thread(int thread_id)
{
while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
//加鎖
std::unique_lock <std::mutex> lk(g_cvMutex);
//當隊列未滿時,繼續(xù)添加數(shù)據(jù)
g_cv.wait(lk, [](){ return g_data_deque.size() <= MAX_NUM; });
g_next_index++;
g_data_deque.push_back(g_next_index);
std::cout << "producer_thread: " << thread_id << " producer data: " << g_next_index;
std::cout << " queue size: " << g_data_deque.size() << std::endl;
//喚醒其他線程
g_cv.notify_all();
//自動釋放鎖
}
}
void consumer_thread(int thread_id)
{
while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(550));
//加鎖
std::unique_lock <std::mutex> lk(g_cvMutex);
//檢測條件是否達成
g_cv.wait( lk, []{ return !g_data_deque.empty(); });
//互斥操作,消息數(shù)據(jù)
int data = g_data_deque.front();
g_data_deque.pop_front();
std::cout << "\tconsumer_thread: " << thread_id << " consumer data: ";
std::cout << data << " deque size: " << g_data_deque.size() << std::endl;
//喚醒其他線程
g_cv.notify_all();
//自動釋放鎖
}
}
int main()
{
std::thread arrRroducerThread[PRODUCER_THREAD_NUM];
std::thread arrConsumerThread[CONSUMER_THREAD_NUM];
for (int i = 0; i < PRODUCER_THREAD_NUM; i++)
{
arrRroducerThread[i] = std::thread(producer_thread, i);
}
for (int i = 0; i < CONSUMER_THREAD_NUM; i++)
{
arrConsumerThread[i] = std::thread(consumer_thread, i);
}
for (int i = 0; i < PRODUCER_THREAD_NUM; i++)
{
arrRroducerThread[i].join();
}
for (int i = 0; i < CONSUMER_THREAD_NUM; i++)
{
arrConsumerThread[i].join();
}
return 0;
}
運行結(jié)果:
- EOF -
點贊和在看就是最大的支持??
