揭秘?zé)o鎖隊(duì)列的幾種實(shí)現(xiàn)及性能對(duì)比!
。
導(dǎo)語(yǔ)?|?本文主要介紹無(wú)鎖隊(duì)列的使用場(chǎng)景,從使用無(wú)鎖隊(duì)列的必要性層層深入,再深入淺出分析了無(wú)鎖隊(duì)列的幾種實(shí)現(xiàn)以及相關(guān)性能的對(duì)比,希望對(duì)有相應(yīng)需求的同學(xué)提供一點(diǎn)經(jīng)驗(yàn)和幫助。
一、無(wú)鎖隊(duì)列用在什么樣的場(chǎng)景?
當(dāng)需要處理的數(shù)據(jù)非常多,比如行情數(shù)據(jù),一秒處理非常多的數(shù)據(jù)的時(shí)候,可以考慮用無(wú)鎖隊(duì)列。但是如果一秒只需要處理幾百或者幾千的數(shù)據(jù),是沒(méi)有必要考慮用無(wú)鎖隊(duì)列的。用互斥鎖就能解決問(wèn)題,數(shù)據(jù)量相對(duì)少的時(shí)候互斥鎖與無(wú)鎖隊(duì)列之間差別并不是很明顯。
二、為什么要用無(wú)鎖隊(duì)列?
有鎖隊(duì)列會(huì)有哪些問(wèn)題?
(一)Cache的損壞,在線程間頻繁切換的時(shí)候會(huì)導(dǎo)致Cache中數(shù)據(jù)的丟失
CPU的運(yùn)行速度比主存快N倍,所以大量的處理器時(shí)間被浪費(fèi)在處理器與主存的數(shù)據(jù)傳輸上,這就是在處理器與主存之間引入Cache的原因。Cache是一種速度更快但容量更小的內(nèi)存,當(dāng)處理器要訪問(wèn)主存中的數(shù)據(jù)時(shí),這些數(shù)據(jù)首先要被拷貝到Cache中,因?yàn)檫@些數(shù)據(jù)在不久的將來(lái)可能又會(huì)被處理器訪問(wèn)。Cache misses對(duì)性能有非常大的影響,因?yàn)樘幚砥髟L問(wèn)Cache中的數(shù)據(jù)將比直接訪問(wèn)主存快得多。
線程被頻繁搶占產(chǎn)生的Cache損壞將導(dǎo)致應(yīng)用程序性能下降。
(二)在同步機(jī)制上爭(zhēng)搶隊(duì)列
CPU會(huì)將大量的時(shí)間浪費(fèi)在保護(hù)隊(duì)列數(shù)據(jù)的互斥鎖,而不是處理隊(duì)列中的數(shù)據(jù)。
然后非阻塞的機(jī)制使用了CAS的特殊操作,使得任務(wù)之間可以不爭(zhēng)搶任何資源,然后在隊(duì)列中預(yù)定的位置上,插入或者提取數(shù)據(jù)。
(三)多線程動(dòng)態(tài)內(nèi)存分配性能下降
多線程同時(shí)分配內(nèi)存時(shí),會(huì)涉及到線程分配同一塊相同地址內(nèi)存的問(wèn)題,這個(gè)時(shí)候會(huì)用鎖來(lái)進(jìn)行同步。顯然頻繁分配內(nèi)存會(huì)導(dǎo)致應(yīng)用程序性能下降。
三、一讀一寫的無(wú)鎖隊(duì)列實(shí)現(xiàn)
yqueue是用來(lái)設(shè)計(jì)隊(duì)列,ypipe用來(lái)設(shè)計(jì)隊(duì)列的寫入時(shí)機(jī)、回滾以及flush。
首先我們來(lái)看yqueue的設(shè)計(jì)
(一)yqueue——無(wú)鎖隊(duì)列
內(nèi)存分配
首先我們需要考慮隊(duì)列的內(nèi)存分配,yqueue中的數(shù)據(jù)結(jié)構(gòu)使用的chunk塊機(jī)制,每次批量分配一批元素,這樣可以減少內(nèi)存的分配和釋放。
template<typename T, int N>// 鏈表結(jié)點(diǎn)稱之為chunk_tstruct chunk_t{T values[N]; //每個(gè)chunk_t可以容納N個(gè)T類型的元素,以后就以一個(gè)chunk_t為單位申請(qǐng)內(nèi)存chunk_t *prev;chunk_t *next;};

93 // Adds an element to the back end of the queue.94 inline void push()95 {96 back_chunk = end_chunk;97 back_pos = end_pos; //99 if (++end_pos != N) //end_pos!=N表明這個(gè)chunk節(jié)點(diǎn)還沒(méi)有滿100 return;101102 chunk_t *sc = spare_chunk.xchg(NULL); // 為什么設(shè)置為NULL?因?yàn)槿绻阎爸等〕鰜?lái)了則沒(méi)有spare chunk了,所以設(shè)置為NULL103 if (sc) // 如果有spare chunk則繼續(xù)復(fù)用它104 {105 end_chunk->next = sc;106 sc->prev = end_chunk;107 }108 else // 沒(méi)有則重新分配109 {110 // static int s_cout = 0;111 // printf("s_cout:%d\n", ++s_cout);112 end_chunk->next = (chunk_t *)malloc(sizeof(chunk_t)); // 分配一個(gè)chunk113 alloc_assert(end_chunk->next);114 end_chunk->next->prev = end_chunk;115 }116 end_chunk = end_chunk->next;117 end_pos = 0;118 }
可以看到112行,在要push一個(gè)元素的時(shí)候,首先看最后一個(gè)chunk,也就是back_chunk的back_pos是不是該chunk的最后一個(gè)元素,如果是,則重新分配一個(gè)chunk,將這個(gè)chunk加到chunk鏈表的下一個(gè)節(jié)點(diǎn)。

這個(gè)邏輯相對(duì)來(lái)說(shuō)還是比較簡(jiǎn)單的。唯一需要關(guān)注的,就是
102 chunk_t *sc = spare_chunk.xchg(NULL);這一行,這個(gè)spare_chunk是怎么來(lái)的?
154 // Removes an element from the front end of the queue.155 inline void pop()156 {157 if (++begin_pos == N) // 刪除滿一個(gè)chunk才回收chunk158 {159 chunk_t *o = begin_chunk;160 begin_chunk = begin_chunk->next;161 begin_chunk->prev = NULL;162 begin_pos = 0;163164 // 'o' has been more recently used than spare_chunk,165 // so for cache reasons we'll get rid of the spare and166 // use 'o' as the spare.167 chunk_t *cs = spare_chunk.xchg(o); //由于局部性原理,總是保存最新的空閑塊而釋放先前的空閑快168 free(cs);169 }170 }
當(dāng)pop的時(shí)候,如果刪除一個(gè)chunk里面沒(méi)有元素了,這個(gè)時(shí)候會(huì)需要將這個(gè)chunk所開(kāi)辟的空間釋放掉,但是這里使用了一個(gè)技巧即:將這個(gè)chunk先不釋放,先放到spare_chunk里面,等到下次需要開(kāi)辟新的空間的時(shí)候再把這個(gè)spare_chunk拿來(lái)用。
(二)ypipe——yqueue的封裝
yqueue負(fù)責(zé)元素內(nèi)存的分配與釋放,入隊(duì)以及出隊(duì)列;ypipe負(fù)責(zé)yqueue讀寫指針的變化。
ypipe是在yqueue_t的基礎(chǔ)上再構(gòu)建一個(gè)單讀單寫的無(wú)鎖隊(duì)列。
這里有三個(gè)指針:
T* w:指向第一個(gè)未刷新的元素,只被寫線程使用。
T* r:指向第一個(gè)沒(méi)有被預(yù)提取的元素,只被讀線程使用。
T*f:指向下一輪要被刷新的一批元素的第一個(gè)。
ypipe的定義:
37 // Initialises the pipe.38 inline ypipe_t()49 // The destructor doesn't have to be virtual. It is mad virtual50 // just to keep ICC and code checking tools from complaining.51 inline virtual ~ypipe_t()52 {53 }67 // 寫入數(shù)據(jù),incomplete參數(shù)表示寫入是否還沒(méi)完成,在沒(méi)完成的時(shí)候不會(huì)修改flush指針,即這部分?jǐn)?shù)據(jù)不會(huì)讓讀線程看到。68 inline void write(const T &value_, bool incomplete_);92 inline bool unwrite(T *value_);104 // 刷新所有已經(jīng)完成的數(shù)據(jù)到管道,返回false意味著讀線程在休眠,在這種情況下調(diào)用者需要喚醒讀線程。105 // 批量刷新的機(jī)制, 寫入批量后喚醒讀線程;106 // 反悔機(jī)制 unwrite107 inline bool flush();136 // Check whether item is available for reading.137 // 這里面有兩個(gè)點(diǎn),一個(gè)是檢查是否有數(shù)據(jù)可讀,一個(gè)是預(yù)取138 inline bool check_read();163 // Reads an item from the pipe. Returns false if there is no value.164 // available.165 inline bool read(T *value_)178 // Applies the function fn to the first elemenent in the pipe179 // and returns the value returned by the fn.180 // The pipe mustn't be empty or the function crashes.181 inline bool probe(bool (*fn)(T &))189 protected:190 // Allocation-efficient queue to store pipe items.191 // Front of the queue points to the first prefetched item, back of192 // the pipe points to last un-flushed item. Front is used only by193 // reader thread, while back is used only by writer thread.194 yqueue_tqueue ;195196 // Points to the first un-flushed item. This variable is used197 // exclusively by writer thread.198 T *w; //指向第一個(gè)未刷新的元素,只被寫線程使用199200 // Points to the first un-prefetched item. This variable is used201 // exclusively by reader thread.202 T *r; //指向第一個(gè)還沒(méi)預(yù)提取的元素,只被讀線程使用203204 // Points to the first item to be flushed in the future.205 T *f; //指向下一輪要被刷新的一批元素中的第一個(gè)206207 // The single point of contention between writer and reader thread.208 // Points past the last flushed item. If it is NULL,209 // reader is asleep. This pointer should be always accessed using210 // atomic operations.211 atomic_ptr_tc; //讀寫線程共享的指針,指向每一輪刷新的起點(diǎn)(看代碼的時(shí)候會(huì)詳細(xì)說(shuō))。當(dāng)c為空時(shí),表示讀線程睡眠(只會(huì)在讀線程中被設(shè)置為空) 212213 // Disable copying of ypipe object.214 ypipe_t(const ypipe_t &);215 const ypipe_t &operator=(const ypipe_t &);
(三)ypipe設(shè)計(jì)的目的
為了批量讀寫,即用戶可以自主的決定寫了多少數(shù)據(jù)之后開(kāi)啟讀
那因?yàn)橛辛松a(chǎn)者和消費(fèi)者,就會(huì)涉及到同步的問(wèn)題,ypipe這里測(cè)試發(fā)現(xiàn),用鎖和條件變量性能最佳。
我們來(lái)分兩種情況看一下讀寫的具體步驟。

第一種情況:批量寫,第一輪寫





在這個(gè)時(shí)候才能開(kāi)始讀數(shù)據(jù):

第二種方式:條件變量+互斥鎖:


flush函數(shù)
101 // Flush all the completed items into the pipe. Returns false if102 // the reader thread is sleeping. In that case, caller is obliged to103 // wake the reader up before using the pipe again.104 // 刷新所有已經(jīng)完成的數(shù)據(jù)到管道,返回false意味著讀線程在休眠,在這種情況下調(diào)用者需要喚醒讀線程。105 // 批量刷新的機(jī)制, 寫入批量后喚醒讀線程;106 // 反悔機(jī)制 unwrite107 inline bool flush()108 {109 // If there are no un-flushed items, do nothing.110 if (w == f) // 不需要刷新,即是還沒(méi)有新元素加入111 return true;112113 // Try to set 'c' to 'f'.114 // read時(shí)如果沒(méi)有數(shù)據(jù)可以讀取則c的值會(huì)被置為NULL115 if (c.cas(w, f) != w) // 嘗試將c設(shè)置為f,即是準(zhǔn)備更新w的位置116 {117118 // Compare-and-swap was unseccessful because 'c' is NULL.119 // This means that the reader is asleep. Therefore we don't120 // care about thread-safeness and update c in non-atomic121 // manner. We'll return false to let the caller know122 // that reader is sleeping.123 c.set(f); // 更新為新的f位置124 w = f;125 return false; //線程看到flush返回false之后會(huì)發(fā)送一個(gè)消息給讀線程,這需要寫業(yè)務(wù)去做處理126 }127 else // 讀端還有數(shù)據(jù)可讀取128 {129 // Reader is alive. Nothing special to do now. Just move130 // the 'first un-flushed item' pointer to 'f'.131 w = f; // 更新f的位置132 return true;133 }134 }
flush的目的就是將改變w的值,同時(shí)改變c的值,這里有兩種情況:
c的值與w的值相等
說(shuō)明隊(duì)列的w值沒(méi)有更新,不對(duì)隊(duì)列的數(shù)據(jù)進(jìn)行讀取:
else??//?讀端還有數(shù)據(jù)可讀取{//??Reader?is?alive.?Nothing?special?to?do?now.?Just?move// the 'first un-flushed item' pointer to 'f'.w = f; // 更新f的位置return true;}
c的值與w的值不相等
// Try to set 'c' to 'f'.// read時(shí)如果沒(méi)有數(shù)據(jù)可以讀取則c的值會(huì)被置為NULLif (c.cas(w, f) != w) // 嘗試將c設(shè)置為f,即是準(zhǔn)備更新w的位置{// Compare-and-swap was unseccessful because 'c' is NULL.// This means that the reader is asleep. Therefore we don't// care about thread-safeness and update c in non-atomic// manner. We'// that reader is sleeping.c.set(f); // 更新為新的f位置w = f;return false; //線程看到flush返回false之后會(huì)發(fā)送一個(gè)消息給讀線程,這需要寫業(yè)務(wù)去做處理}
這發(fā)生在c在w位置后面,此時(shí)更新c與w的值,并返回false,表示隊(duì)列可讀。
write函數(shù)
write函數(shù)相對(duì)簡(jiǎn)單:
64 // Write an item to the pipe. Don't flush it yet. If incomplete is65 // set to true the item is assumed to be continued by items66 // subsequently written to the pipe. Incomplete items are neverflushed down the stream.67 // 寫入數(shù)據(jù),incomplete參數(shù)表示寫入是否還沒(méi)完成,在沒(méi)完成的時(shí)候不會(huì)修改flush指針,即這部分?jǐn)?shù)據(jù)不會(huì)讓讀線程看到。68 inline void write(const T &value_, bool incomplete_)69 {70 // Place the value to the queue, add new terminator element.71 queue.back() = value_;72 queue.push();7374 // Move the "flush up to here" poiter.75 if (!incomplete_)76 {77 f = &queue.back(); // 記錄要刷新的位置78 // printf("1 f:%p, w:%p\n", f, w);79 }80 else81 {82 // printf("0 f:%p, w:%p\n", f, w);83 }84 }
write只更新f的位置。write并不能決定該隊(duì)列是否能讀,因?yàn)閣rite并不能改變w指針,如果要隊(duì)列能讀,需要w指針改變位置才行。
從write和flush可以看出,在更新w和f的時(shí)候并沒(méi)有互斥的保護(hù),所以該無(wú)鎖隊(duì)列的設(shè)計(jì)并不適合多線程場(chǎng)景。
read函數(shù)
138 inline bool check_read()139 {140 // Was the value prefetched already? If so, return.141 if (&queue.front() != r && r) //判斷是否在前幾次調(diào)用read函數(shù)時(shí)已經(jīng)預(yù)取數(shù)據(jù)了return true;142 return true;143144 // There's no prefetched value, so let us prefetch more values.145 // Prefetching is to simply retrieve the146 // pointer from c in atomic fashion. If there are no147 // items to prefetch, set c to NULL (using compare-and-swap).148 // 兩種情況149 // 1. 如果c值和queue.front(), 返回c值并將c值置為NULL,此時(shí)沒(méi)有數(shù)據(jù)可讀150 // 2. 如果c值和queue.front(), 返回c值,此時(shí)可能有數(shù)據(jù)度的去151 r = c.cas(&queue.front(), NULL); //嘗試預(yù)取數(shù)據(jù)152153 // If there are no elements prefetched, exit.154 // During pipe's lifetime r should never be NULL, however,155 // it can happen during pipe shutdown when items are being deallocated.156 if (&queue.front() == r || !r) //判斷是否成功預(yù)取數(shù)據(jù)157 return false;158159 // There was at least one value prefetched.160 return true;161 }162163 // Reads an item from the pipe. Returns false if there is no value.164 // available.165 inline bool read(T *value_)166 {167 // Try to prefetch a value.168 if (!check_read())169 return false;170171 // There was at least one value prefetched.172 // Return it to the caller.173 *value_ = queue.front();174 queue.pop();175 return true;176 }
這里也是有兩種情況:
r不為空且r不等于&queue.front(),說(shuō)明此時(shí)隊(duì)列中有可讀數(shù)據(jù),直接讀取即可。
r指針指向隊(duì)頭元素(r==&queue.front())或者r為空,說(shuō)明隊(duì)列中并沒(méi)有可讀的數(shù)據(jù),此時(shí)將r指針更新成c的值,這個(gè)過(guò)程我們叫做預(yù)取。預(yù)取的指令就是:r=c;c在flush的時(shí)候會(huì)被設(shè)置為w。而w與&queue.front()之間都是有距離的。這一段距離中間的數(shù)據(jù)就是預(yù)取數(shù)據(jù),所以每次read 都能取出一段數(shù)據(jù)。

當(dāng)&queue.front()==c時(shí),代表數(shù)據(jù)被取完了,這時(shí)把c指向NULL,接著讀線程會(huì)睡眠,這也是給寫線程檢查讀線程是否睡眠的標(biāo)志。
我們可以測(cè)試一下結(jié)果,對(duì)一個(gè)數(shù)據(jù)加200萬(wàn)次,分別用環(huán)形數(shù)組、鏈表、互斥鎖、ypipe隊(duì)列分別是什么樣的性能。

通過(guò)測(cè)試發(fā)現(xiàn)在一讀一寫的情況下,ypipe的優(yōu)勢(shì)是非常大的。
那多讀多寫的場(chǎng)景呢?
四、多讀多寫的無(wú)鎖隊(duì)列實(shí)現(xiàn)
上面我們介紹的是一讀一寫的場(chǎng)景,用ypipe的方式會(huì)性能比較快,但是ypipe不適用于多讀多寫的場(chǎng)景,因?yàn)樵谧x的時(shí)候是沒(méi)有對(duì)r指針加鎖,在寫的時(shí)候也沒(méi)有對(duì)w指針加鎖。
多讀多寫的線程安全隊(duì)列有以下幾種實(shí)現(xiàn)方式:
互斥鎖
互斥鎖+條件變量:BlockQueue
內(nèi)存屏障:SimpleLockFreeQueue
CAS原子操作:ArrayLockFreeQueue(也可以理解成RingBuffer)
其中互斥鎖的性能是幾種方式里面性能最低的,沒(méi)什么講的必要,這里就不對(duì)比這種實(shí)現(xiàn)方式了。
(一)RingBuffer(ArrayLockFreeQueue)
下面我們來(lái)看基于循環(huán)數(shù)組的無(wú)鎖隊(duì)列,也就是RingBuffer如何解決多線程競(jìng)爭(zhēng)的問(wèn)題。
首先看下RingBuffer的數(shù)據(jù)結(jié)構(gòu)如下:
14 template15 class ArrayLockFreeQueue16 {17 public:1819 ArrayLockFreeQueue();20 virtual ~ArrayLockFreeQueue();2122 QUEUE_INT size();2324 bool enqueue(const ELEM_T &a_data);//入隊(duì)列2526 bool dequeue(ELEM_T &a_data);//出隊(duì)列2728 bool try_dequeue(ELEM_T &a_data);2930 private:3132 ELEM_T m_thequeue[Q_SIZE];3334 volatile QUEUE_INT m_count;35 volatile QUEUE_INT m_writeIndex;3637 volatile QUEUE_INT m_readIndex;3839 volatile QUEUE_INT m_maximumReadIndex;4041 inline QUEUE_INT countToIndex(QUEUE_INT a_count);42 };
m_count: //隊(duì)列的元素個(gè)數(shù)
我們先來(lái)看三種不同的下標(biāo):
m_writeIndex: //?新元素入隊(duì)列時(shí)存放位置在數(shù)組中的下標(biāo)。
m_readIndex: //?下一個(gè)出列的元素在數(shù)組中的下標(biāo)。
m_maximumReadIndex: //?這個(gè)值非常關(guān)鍵,表示最后一個(gè)已經(jīng)完成入隊(duì)列操作的元素在數(shù)組中的下標(biāo)。如果它的值跟m_writeIndex不一致,表明有寫請(qǐng)求尚未完成。這意味著,有寫請(qǐng)求成功申請(qǐng)了空間但數(shù)據(jù)還沒(méi)完全寫進(jìn)隊(duì)列。所以如果有線程要讀取,必須要等到寫線程將數(shù)據(jù)完全寫入到隊(duì)列之后。
以上三種不同的下標(biāo)都是必須的,因?yàn)殛?duì)列允許任意數(shù)量的生產(chǎn)者和消費(fèi)者圍繞著它工作。已經(jīng)存在一種基于循環(huán)數(shù)組的無(wú)鎖隊(duì)列,使得唯一的生產(chǎn)者和唯一的消費(fèi)者可以良好的工作。它的實(shí)現(xiàn)相當(dāng)簡(jiǎn)潔非常值得閱讀。該程序使用gcc內(nèi)置的__sync_bool_compare_and_swap,但重新做了宏定義封裝。

隊(duì)列已滿判斷:
(m_writeIndex+1) % Q_SIZE == m_readIndex對(duì)應(yīng)代碼:
countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)隊(duì)列為空判斷:
m_readIndex == m_maximumReadIndex該RingBuffer的重點(diǎn)主要是以下四個(gè)方面的問(wèn)題:
多線程寫入的時(shí)候,m_writeIndex如何更新?
m_maximumReadIndex這個(gè)變量為什么會(huì)需要?它有什么作用?
多線程讀的惡時(shí)候,m_readIndex如何更新?
m_maximumReadIndex在什么時(shí)候改變?
(二)enqueue入隊(duì)列
42 template <typename ELEM_T, QUEUE_INT Q_SIZE>43 bool ArrayLockFreeQueue::enqueue(const ELEM_T &a_data) 44 {45 QUEUE_INT currentWriteIndex; // 獲取寫指針的位置46 QUEUE_INT currentReadIndex;47 // 1. 獲取可寫入的位置48 do49 {50 currentWriteIndex = m_writeIndex;51 currentReadIndex = m_readIndex;52 if(countToIndex(currentWriteIndex + 1) ==53 countToIndex(currentReadIndex))54 {55 return false; // 隊(duì)列已經(jīng)滿了56 }57 // 目的是為了獲取一個(gè)能寫入的位置58 } while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));59 // 獲取寫入位置后 currentWriteIndex 是一個(gè)臨時(shí)變量,保存我們寫入的位置60 // We know now that this index is reserved for us. Use it to save the data61 m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把數(shù)據(jù)更新到對(duì)應(yīng)的位置6263 // 2. 更新可讀的位置,按著m_maximumReadIndex+1的操作64 // update the maximum read index after saving the data. It wouldn't fail if there is only one thread65 // inserting in the queue. It might fail if there are more than 1 producer threads because this66 // operation has to be done in the same order as the previous CAS67 while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))68 {69 // this is a good place to yield the thread in case there are more70 // software threads than hardware processors and you have more71 // than 1 producer thread72 // have a look at sched_yield (POSIX.1b)73 sched_yield(); // 當(dāng)線程超過(guò)cpu核數(shù)的時(shí)候如果不讓出cpu導(dǎo)致一直循環(huán)在此。74 }7576 AtomicAdd(&m_count, 1);7778 return true;7980 }
圖示(非常重要):
以下插圖展示了對(duì)隊(duì)列執(zhí)行操作時(shí)各個(gè)下標(biāo)時(shí)如何變化的。如果一個(gè)位置被標(biāo)記為X,表示這個(gè)位置里面存放了數(shù)據(jù)。空白表示位置是空的。對(duì)于下圖的情況,隊(duì)列中存放了兩個(gè)元素。WriteIndex指示的位置是新元素將會(huì)被插入的位置。ReadIndex指向的位置中的元素將會(huì)在下一次pop操作中被彈出。

當(dāng)生產(chǎn)者準(zhǔn)備將數(shù)據(jù)插入到隊(duì)列中時(shí),它首先通過(guò)增加WriteIndex的值來(lái)申請(qǐng)空間。MaximumReadIndex指向最后一個(gè)存放有效數(shù)據(jù)的位置(也就是實(shí)際的讀的隊(duì)列尾)。

一旦空間的申請(qǐng)完成,生產(chǎn)者就可以將數(shù)據(jù)拷貝到剛剛申請(qǐng)的位置中。完成之后增加MaximumReadIndex使得它與WriteIndex一致。

現(xiàn)在隊(duì)列中有3個(gè)元素,接著又有一個(gè)生產(chǎn)者嘗試向隊(duì)列中插入元素。

在第一個(gè)生產(chǎn)者完成數(shù)據(jù)拷貝之前,又有另外一個(gè)生產(chǎn)者申請(qǐng)了一個(gè)新的空間準(zhǔn)備拷貝元素。現(xiàn)在有兩個(gè)生產(chǎn)者同時(shí)向隊(duì)列插入數(shù)據(jù)。

現(xiàn)在生產(chǎn)者開(kāi)始拷貝數(shù)據(jù),在完成拷貝之后,對(duì)MaximumReadIndex的遞增操作必須嚴(yán)格遵循一個(gè)順序:第一個(gè)生產(chǎn)者線程首先遞增MaximumReadIndex,接著才輪到第二個(gè)生產(chǎn)者。這個(gè)順序必須被嚴(yán)格遵守的原因是,我們必須保證數(shù)據(jù)被完全拷貝到隊(duì)列之后才允許消費(fèi)者線程將其出列。
while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)){sched_yield(); // 當(dāng)線程超過(guò)cpu核數(shù)的時(shí)候如果不讓出cpu導(dǎo)致一直循環(huán)在此。}

第一個(gè)生產(chǎn)者完成了數(shù)據(jù)拷貝,并對(duì)MaximumReadIndex完成了遞增,現(xiàn)在第二個(gè)生產(chǎn)者可以遞增MaximumReadIndex了。

第二個(gè)生產(chǎn)者完成了對(duì)MaximumReadIndex的遞增,現(xiàn)在隊(duì)列中有5個(gè)元素。
(三)dequeue出隊(duì)列
88 template89 bool ArrayLockFreeQueue::dequeue(ELEM_T &a_data) 90 {91 QUEUE_INT currentMaximumReadIndex;92 QUEUE_INT currentReadIndex;9394 do95 {96 // to ensure thread-safety when there is more than 1 producer thread97 // a second index is defined (m_maximumReadIndex)98 currentReadIndex = m_readIndex;99 currentMaximumReadIndex = m_maximumReadIndex;100101 if(countToIndex(currentReadIndex) ==102 countToIndex(currentMaximumReadIndex)) // 如果不為空,獲取到讀索引的位置103 {104 // the queue is empty or105 // a producer thread has allocate space in the queue but is106 // waiting to commit the data into it107 return false;108 }109 // retrieve the data from the queue110 a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時(shí)位置讀取的111112 // try to perfrom now the CAS operation on the read index. If we succeed113 // a_data already contains what m_readIndex pointed to before we114 // increased it115 if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))116 {117 AtomicSub(&m_count, 1); // 真正讀取到了數(shù)據(jù),元素-1118 return true;119 }120 } while(true);121122 assert(0);123 // Add this return statement to avoid compiler warnings124 return false;125126 }
以下插入展示了元素出列的時(shí)候各種下標(biāo)是如何變化的,隊(duì)列中初始有2個(gè)元素。WriteIndex指示的位置是新元素將會(huì)被插入的位置。ReadIndex指向的位置中的元素將會(huì)在下一次pop操作中被彈出。

消費(fèi)者線程拷貝數(shù)組ReadIndex位置的元素,然后嘗試CAS操作將 ReadIndex加1。如果操作成功消費(fèi)者成功地將數(shù)據(jù)出列。因?yàn)镃AS操作是原子的,所以只有唯一的線程可以在同一時(shí)刻更新ReadIndex的值。
如果操作失敗,讀取新的ReadIndex的值,重復(fù)以上操作(copy數(shù)據(jù),CAS)。

現(xiàn)在又有一個(gè)消費(fèi)者將元素出列,隊(duì)列變成空。

現(xiàn)在有一個(gè)生產(chǎn)者正在向隊(duì)列中添加元素。它已經(jīng)成功的申請(qǐng)了空間,但尚未完成數(shù)據(jù)拷貝。任何其他企圖從隊(duì)列中移除元素的消費(fèi)者都會(huì)發(fā)現(xiàn)隊(duì)列非空(因?yàn)閣riteIndex不等于readIndex)。但它不能讀取readIndex所指向位置中的數(shù)據(jù),因?yàn)閞eadIndex與MaximumReadIndex相等。這個(gè)時(shí)候讀數(shù)據(jù)失敗,需要等到生產(chǎn)者完成數(shù)據(jù)拷貝增加MaximumReadIndex的值才可以讀。
當(dāng)生產(chǎn)者完成數(shù)據(jù)拷貝,隊(duì)列的大小是1,消費(fèi)者線程就可以讀取這個(gè)數(shù)據(jù)了。
(四)yielding處理器的必要性
67 while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))68 {69 // this is a good place to yield the thread in case there are more70 // software threads than hardware processors and you have more71 // than 1 producer thread72 // have a look at sched_yield (POSIX.1b)73 sched_yield(); // 當(dāng)線程超過(guò)cpu核數(shù)的時(shí)候如果不讓出cpu導(dǎo)致一直循環(huán)在此。74 }
在enqueue的第二個(gè)CAS里面有一個(gè)sched_yield()來(lái)主動(dòng)讓出處理器的操作,對(duì)于一個(gè)聲稱無(wú)鎖的算法而言,這個(gè)調(diào)用看起來(lái)有點(diǎn)兒奇怪。多線程環(huán)境下影響性能的其中一個(gè)因素就是Cache損壞。而產(chǎn)生Cache損壞的一種情況就是一個(gè)線程被搶占,操作系統(tǒng)需要保存被搶占線程的上下文,然后被選中作為下一個(gè)調(diào)度線程的上下文載入。此時(shí)Cache中緩存的數(shù)據(jù)都會(huì)失效,因?yàn)樗潜粨屨季€程的數(shù)據(jù)而不是新線程的數(shù)據(jù)。
無(wú)鎖算法和通過(guò)阻塞機(jī)制同步的算法的一個(gè)主要區(qū)別在于無(wú)鎖算法不會(huì)阻塞在線程同步上。那這里的讓出CPU,與阻塞在線程同步上有啥區(qū)別?為什么不直接自旋?
首先說(shuō)下sched_yield的必要性:sched_yield的調(diào)用與有多少個(gè)生產(chǎn)者線程在并發(fā)地往隊(duì)列中存放數(shù)據(jù)有關(guān):每個(gè)生產(chǎn)者線程所執(zhí)行的CAS操作都必須嚴(yán)格遵循FIFO次序,一個(gè)用于申請(qǐng)空間,另一個(gè)用于通知消費(fèi)者數(shù)據(jù)已經(jīng)寫入完成可以被讀取了。如果我們的應(yīng)用程序只有唯一的生產(chǎn)者這個(gè)操作隊(duì)列,sched_yield將永遠(yuǎn)沒(méi)有機(jī)會(huì)被調(diào)用,因?yàn)閑nqueue的第二個(gè)CAS操作永遠(yuǎn)不會(huì)失敗。因為一個(gè)生產(chǎn)者的情況下沒(méi)人能破壞生產(chǎn)者執(zhí)行這兩個(gè)CAS操作的FIFO順序。
而對(duì)于多個(gè)生產(chǎn)者線程往隊(duì)列中存放數(shù)據(jù)的時(shí)候,問(wèn)題就出現(xiàn)了。概括來(lái)說(shuō),一個(gè)生產(chǎn)者通過(guò)第1個(gè)CAS操作申請(qǐng)空間,然后將數(shù)據(jù)寫入到申請(qǐng)到的空間中,然后執(zhí)行第2個(gè)CAS操作通知消費(fèi)者數(shù)據(jù)準(zhǔn)備完畢可供讀取了。這第2個(gè)CAS操作必須遵循FIFO順序,也就是說(shuō),如果A線程第首先執(zhí)行完第一個(gè)CAS操作,那么它也要第1個(gè)執(zhí)行完第2個(gè)CAS操作,如果A線程在執(zhí)行完第一個(gè)CAS操作之后停止,然后B線程執(zhí)行完第1個(gè)CAS操作,那么B線程將無(wú)法完成第2個(gè)CAS操作,因?yàn)樗却鼳先完成第2個(gè)CAS操作。而這就是問(wèn)題產(chǎn)生的根源。讓我們考慮如下場(chǎng)景,3個(gè)消費(fèi)者線程和1個(gè)消費(fèi)者線程:
線程1,2,3按順序調(diào)用第1個(gè)CAS操作申請(qǐng)了空間。那么它們完成第2個(gè) CAS操作的順序也應(yīng)該與這個(gè)順序一致1,2,3。
線程2首先嘗試執(zhí)行第2個(gè)CAS,但它會(huì)失敗,因?yàn)榫€程1還沒(méi)完成它的第2此CAS操作呢。同樣對(duì)于線程3也是一樣的。
線程2和3將會(huì)不斷的調(diào)用它們的第2個(gè)CAS操作,直到線程1完成它的第2個(gè)CAS操作為止。
線程1最終完成了它的第2個(gè)CAS,現(xiàn)在線程3必須等線程2先完成它的第2個(gè)CAS。
線程2也完成了,最終線程3也完成。
在上面的場(chǎng)景中,生產(chǎn)者可能會(huì)在第2個(gè)CAS操作上自旋一段時(shí)間,用于等待先于它執(zhí)行第1個(gè)CAS操作的線程完成它的第2次CAS操作。在一個(gè)物理處理器數(shù)量大于操作隊(duì)列線程數(shù)量的系統(tǒng)上,這不會(huì)有太嚴(yán)重的問(wèn)題:因?yàn)槊總€(gè)線程都可以分配在自己的處理器上執(zhí)行,它們最終都會(huì)很快完成各自的第2次CAS操作。雖然算法導(dǎo)致線程處理忙等狀態(tài),但這正是我們所期望的,因?yàn)檫@使得操作更快的完成。也就是說(shuō)在這種情況下我們是不需要sche_yield()的,它完全可以從代碼中刪除。
但是,在一個(gè)物理處理器數(shù)量少于線程數(shù)量的系統(tǒng)上,sche_yield()就變得至關(guān)重要了。讓我們?cè)俅慰疾樯厦?個(gè)線程的場(chǎng)景,當(dāng)線程3準(zhǔn)備向隊(duì)列中插入數(shù)據(jù):如果線程1在執(zhí)行完第1個(gè)CAS操作,在執(zhí)行第2個(gè)CAS操作之前被搶占,那么線程2,3就會(huì)一直在它們的第2個(gè)CAS操作上忙等(它們忙等,不讓出處理器,線程1也就沒(méi)機(jī)會(huì)執(zhí)行,它們就只能繼續(xù)忙等),直到線程1重新被喚醒,完成它的第2個(gè)CAS操作。這就是需要sche_yield()的場(chǎng)合了,操作系統(tǒng)應(yīng)該避免讓線程2,3處于忙等狀態(tài)。它們應(yīng)該盡快的讓出處理器讓線程1執(zhí)行,使得線程1可以把它的第2個(gè)CAS操作完成。這樣線程2和3才能繼續(xù)完成它們的操作。
也就是說(shuō),如果不適用sched_yield,一直自旋,那么可能多個(gè)線程同時(shí)阻塞在第二個(gè)CAS那兒。
(五)多讀多寫的RingBuffer存在的問(wèn)題
多于一個(gè)生產(chǎn)者線程性能提升不明顯
如果有多于一個(gè)的生產(chǎn)者線程,那么將它們很可能花費(fèi)大量的時(shí)間用于等待更新MaximumReadIndex(第2個(gè)CAS)。這個(gè)隊(duì)列最初的設(shè)計(jì)場(chǎng)景是滿足單一消費(fèi)者,所以不用懷疑在多生產(chǎn)者的情形下會(huì)比單一生產(chǎn)者有大幅的性能下降。
另外如果你只打算將此隊(duì)列用于單一生產(chǎn)者的場(chǎng)合,那么第2個(gè)CAS操作可以去除。同樣m_maximumReadIndex也可以一同被移除了,所有對(duì)m_maximumReadIndex的引用都改成m_writeIndex。所以,在這樣的場(chǎng)合下push和pop可以被改寫如下:
template <typename ELEM_T>bool ArrayLockFreeQueue::push(const ELEM_T &a_data) {uint32_t currentReadIndex;uint32_t currentWriteIndex;currentWriteIndex = m_writeIndex;currentReadIndex = m_readIndex;if (countToIndex(currentWriteIndex + 1) ==countToIndex(currentReadIndex)){// the queue is fullreturn false;}// save the date into the qm_theQueue[countToIndex(currentWriteIndex)] = a_data;// increment atomically write index. Now a consumer thread can read// the piece of data that was just storedAtomicAdd(&m_writeIndex, 1);return true;}template <typename ELEM_T>bool ArrayLockFreeQueue::pop(ELEM_T &a_data) {uint32_t currentMaximumReadIndex;uint32_t currentReadIndex;do{// m_maximumReadIndex doesn't exist when the queue is set up as// single-producer. The maximum read index is described by the current// write indexcurrentReadIndex = m_readIndex;currentMaximumReadIndex = m_writeIndex;if (countToIndex(currentReadIndex) ==countToIndex(currentMaximumReadIndex)){// the queue is empty or// a producer thread has allocate space in the queue but is// waiting to commit the data into itreturn false;}// retrieve the data from the queuea_data = m_theQueue[countToIndex(currentReadIndex)];// try to perfrom now the CAS operation on the read index. If we succeed// a_data already contains what m_readIndex pointed to before we// increased itif (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))){return true;}// it failed retrieving the element off the queue. Someone else must// have read the element stored at countToIndex(currentReadIndex)// before we could perform the CAS operation} while(1); // keep looping to try again!// Something went wrong. it shouldn't be possible to reach hereassert(0);// Add this return statement to avoid compiler warningsreturn false;}
但是如果是單讀單寫的場(chǎng)景,沒(méi)有必要用這個(gè)無(wú)鎖隊(duì)列,可以看以上單讀單寫的無(wú)鎖隊(duì)列。
與智能指針一起使用,內(nèi)存無(wú)法得到釋放
如果你打算用這個(gè)隊(duì)列來(lái)存放智能指針對(duì)象。需要注意,將一個(gè)智能指針存入隊(duì)列之后,如果它所占用的位置沒(méi)有被另一個(gè)智能指針覆蓋,那么它所指向的內(nèi)存是無(wú)法被釋放的(因?yàn)樗囊糜?jì)數(shù)器無(wú)法下降為0)。這對(duì)于一個(gè)操作頻繁的隊(duì)列來(lái)說(shuō)沒(méi)有什么問(wèn)題,但是程序員需要注意的是,一旦隊(duì)列被填滿過(guò)一次那么應(yīng)用程序所占用的內(nèi)存就不會(huì)下降,即使隊(duì)列被清空。除非自己做改動(dòng),每次pop手動(dòng)delete。
計(jì)算隊(duì)列的大小存在ABA問(wèn)題
size函數(shù)可能會(huì)返回一個(gè)不正確的值,size的實(shí)現(xiàn)如下:
template <typename ELEM_T>inline uint32_t ArrayLockFreeQueue::size() {uint32_t currentWriteIndex = m_writeIndex;uint32_t currentReadIndex = m_readIndex;if (currentWriteIndex >= currentReadIndex){return (currentWriteIndex - currentReadIndex);}else{return (m_totalSize + currentWriteIndex - currentReadIndex);}}
下面的場(chǎng)景描述了size為何會(huì)返回一個(gè)不正確的值:
當(dāng)currentWriteIndex=m_writeIndex執(zhí)行之后,m_writeIndex=3,m_readIndex=2那么實(shí)際size是1。
之后操作線程被搶占,且在它停止運(yùn)行的這段時(shí)間內(nèi),有2個(gè)元素被插入和從隊(duì)列中移除。所以m_writeIndex=5,m_readIndex=4,而size還是 1。
現(xiàn)在被搶占的線程恢復(fù)執(zhí)行,讀取m_readIndex值,這個(gè)時(shí)候 currentReadIndex=4,currentWriteIndex=3。
currentReadIndex>currentWriteIndex'所以m_totalSize+ currentWriteIndex-currentReadIndex`被返回,這個(gè)值意味著隊(duì)列幾乎是滿的,而實(shí)際上隊(duì)列幾乎是空的。
實(shí)際上也就是ABA的一個(gè)場(chǎng)景。與本文一起上傳的代碼中包含了處理這個(gè)問(wèn)題的解決方案。
解決方案:添加一個(gè)用于保存隊(duì)列中元素?cái)?shù)量的成員count。這個(gè)成員可以通過(guò)AtomicAdd/AtomicSub來(lái)實(shí)現(xiàn)原子的遞增和遞減。
但需要注意的是這增加了一定開(kāi)銷,因?yàn)樵舆f增,遞減操作比較昂貴也很難被編譯器優(yōu)化。
例如,在core 2 duo E6400 2.13 Ghz的機(jī)器上,單生產(chǎn)者單消費(fèi)者,隊(duì)列數(shù)組的初始大小是1000,測(cè)試執(zhí)行10000k次的插入,沒(méi)有count成員的版本用時(shí)2.64秒,而維護(hù)了count成員的版本用時(shí)3.42秒。而對(duì)于2消費(fèi)者,1生產(chǎn)者的情況,沒(méi)有count成員的版本用時(shí)3.98秒,維護(hù)count的版本用時(shí) 5.15秒。
這也就是為什么我把是否啟用此成員變量的選擇交給實(shí)際的使用者。使用者可以根據(jù)自己的使用場(chǎng)合選擇是否承受額外的運(yùn)行時(shí)開(kāi)銷。
在array_lock_free_queue.h中有一個(gè)名為ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE的宏變量,如果它被定義那么將啟用count變量,否則將size函數(shù)將有可能返回不正確的值。
(六)多讀多寫RingBuffer的性能
無(wú)鎖vs阻塞隊(duì)列
并發(fā)的插入和移除100W元素所花費(fèi)的時(shí)間(越小越好,隊(duì)列的數(shù)組大小初始為16384)。在單生產(chǎn)者的情況下,無(wú)鎖隊(duì)列戰(zhàn)勝了阻塞隊(duì)列。而隨著生產(chǎn)者數(shù)量的增加,無(wú)鎖隊(duì)列的效率迅速下降。因?yàn)樵诙鄠€(gè)生產(chǎn)者的情況下,第2個(gè) CAS將對(duì)性能產(chǎn)生影響。

然后我們來(lái)看代碼中的情況:
再來(lái)看看消費(fèi)者線程數(shù)量對(duì)性能的影響。
一個(gè)生產(chǎn)者線程

兩個(gè)生產(chǎn)者

三個(gè)生產(chǎn)者

(七)RingBuffer結(jié)論
CAS操作是原子的,線程并行執(zhí)行push/pop不會(huì)導(dǎo)致死鎖。
多生產(chǎn)者同時(shí)向隊(duì)列push數(shù)據(jù)的時(shí)候不會(huì)將數(shù)據(jù)寫入到同一個(gè)位置,產(chǎn)生數(shù)據(jù)覆蓋。
多消費(fèi)者同時(shí)執(zhí)行pop不會(huì)導(dǎo)致一個(gè)元素被出列多于1次。
線程不能將數(shù)據(jù)push進(jìn)已經(jīng)滿的隊(duì)列中,不能從空的隊(duì)列中pop元素。
push和pop都沒(méi)有ABA問(wèn)題。
但是,雖然這個(gè)隊(duì)列是線程安全的,但是在多生產(chǎn)者線程的環(huán)境下它的性能還是不如阻塞隊(duì)列。因此,在符合下述條件的情況下可以考慮使用這個(gè)隊(duì)列來(lái)代替阻塞隊(duì)列:
只有一個(gè)生產(chǎn)者線程。
只有一個(gè)頻繁操作隊(duì)列的生產(chǎn)者,但偶爾會(huì)有其它生產(chǎn)者向隊(duì)列push數(shù)據(jù)。
在reactor網(wǎng)絡(luò)框架中,如果只有一個(gè)reactor在處理client的話,用數(shù)組實(shí)現(xiàn)的RingBuffer來(lái)存儲(chǔ)消息是比較合適的。
(八)四種線程安全隊(duì)列實(shí)現(xiàn)性能對(duì)比
互斥鎖隊(duì)列vs互斥鎖+條件變量隊(duì)列vs內(nèi)存屏障鏈表vsRingBuffer CAS實(shí)現(xiàn)。
4寫1讀

4寫4讀

1寫4讀

可以發(fā)現(xiàn)RingBuffer的實(shí)現(xiàn)性能在幾個(gè)場(chǎng)景中都是比較好的,但是相對(duì)而言,在1寫4讀的場(chǎng)景下性能是最明顯的,幾乎是內(nèi)存屏障的3倍性能了。
為什么鏈表的方式性能相對(duì)BlockQueue沒(méi)有很大的提升呢?
鏈表的方式需要不斷的申請(qǐng)和釋放元素。當(dāng)然,用內(nèi)存池可以適當(dāng)改善這個(gè)影響,但是內(nèi)存池在分配內(nèi)存與釋放內(nèi)存的時(shí)候也會(huì)涉及到線程間的數(shù)據(jù)競(jìng)爭(zhēng),所以用鏈表的方式性能相對(duì)提升不多。
入隊(duì):
74 template <typename U>75 inline bool enqueue(U &&item)76 {77 idx_t nodeIdx = allocate_node_for(std::forward(item));7879 auto tail_ = tail.load(std::memory_order_relaxed);80 while (!tail.compare_exchange_weak(tail_, nodeIdx, std::memory_order_release, std::memory_order_relaxed))81 continue;82 get_node_at(tail_)->next.store(nodeIdx, std::memory_order_release);8384 return true;85 }
出隊(duì):
87 inline bool try_dequeue(T &item) {…….125 add_node_to_free_list(head_, headNode);}
鏈表需要不斷地去更新頭節(jié)點(diǎn)和尾節(jié)點(diǎn)指針的位置,在一個(gè)while循環(huán)里面反復(fù)去執(zhí)行。
80 while (!tail.compare_exchange_weak(tail_, nodeIdx, std::memory_order_release, std::memory_order_relaxed))81 continue;
參考資料:
1.無(wú)鎖單生產(chǎn)者-單使用者循環(huán)隊(duì)列
2.基于數(shù)組的無(wú)鎖隊(duì)列(譯)
?作者簡(jiǎn)介
劉婷
騰訊開(kāi)發(fā)工程師
騰訊C++開(kāi)發(fā)工程師,畢業(yè)于中科院,目前在融媒體項(xiàng)目中心的應(yīng)用架構(gòu)組工作。
?推薦閱讀
PyTorch分布式訓(xùn)練進(jìn)階:這些細(xì)節(jié)你都注意到了嗎?
從0到1詳解ZooKeeper的應(yīng)用場(chǎng)景及架構(gòu)原理!

