鎖、CAS操作和無鎖隊(duì)列的實(shí)現(xiàn)
來源:yishizuofei
blog.csdn.net/yishizuofei/article/details/78353722
鎖的機(jī)制
鎖和人很像,有的人樂觀,總會(huì)想到好的一方面,所以只要越努力,就會(huì)越幸運(yùn);有的人悲觀,總會(huì)想到不好的一方面,患得患失,所以經(jīng)常會(huì)做不好事。
我一直把前一個(gè)當(dāng)作為我前進(jìn)的動(dòng)力和方向,快樂充實(shí)的過好每一天。
常用的鎖機(jī)制也有兩種:
1、樂觀鎖:假設(shè)不會(huì)發(fā)生并發(fā)沖突,每次不加鎖而去完成某項(xiàng)操作,只在提交操作時(shí),檢查是否違反數(shù)據(jù)完整性。如果因?yàn)闆_突失敗就繼續(xù)重試,直到成功為止。而樂觀鎖用到的機(jī)制就是CAS。
樂觀鎖大多是基于數(shù)據(jù)版本記錄機(jī)制實(shí)現(xiàn)。?為數(shù)據(jù)增加一個(gè)版本標(biāo)識(shí),比如在基于數(shù)據(jù)庫表的版本解決方案中,一般是通過微數(shù)據(jù)庫表增加一個(gè)“version”字段來實(shí)現(xiàn)。讀取數(shù)據(jù)時(shí),將此版本號(hào)一同讀出,之后更新時(shí),對此版本號(hào)加一。
此時(shí),?將提交數(shù)據(jù)的版本數(shù)據(jù)與數(shù)據(jù)庫表對應(yīng)記錄的當(dāng)前版本信息進(jìn)行比對,如果提交的數(shù)據(jù)版本號(hào)大于數(shù)據(jù)庫表當(dāng)前版本號(hào),則予以更新,否則認(rèn)為是過期數(shù)據(jù)。樂觀鎖的缺點(diǎn)是不能解決臟讀的問題。
注意:?在實(shí)際生產(chǎn)環(huán)境里邊,如果并發(fā)量不大且不允許臟讀,可以使用悲觀鎖解決并發(fā)問題。但如果系統(tǒng)的并發(fā)非常大的話,悲觀鎖定會(huì)帶來非常大的性能問題,所以我們就要選擇樂觀鎖。
2、悲觀鎖:假定會(huì)發(fā)生并發(fā)沖突,屏蔽一切可能違反數(shù)據(jù)完整性的操作。悲觀鎖的實(shí)現(xiàn),往往依靠底層提供的鎖機(jī)制。
悲觀鎖會(huì)導(dǎo)致其他所有需要鎖的線程掛起,等待持有鎖的線程釋放鎖。如果所有線程都在等待其他線程釋放鎖,而不能主動(dòng)釋放鎖資源,那么也會(huì)造成死鎖問題。
鎖的機(jī)制存在以下問題:
(1)在多線程競爭下,加鎖、釋放鎖會(huì)導(dǎo)致比較多的上下文切換和調(diào)度延時(shí),引起性能問題。
(2)一個(gè)線程持有鎖會(huì)導(dǎo)致其他所有需要次所的線程掛起。
(3)如果一個(gè)優(yōu)先級(jí)搞得線程等待一個(gè)優(yōu)先級(jí)低的線程釋放鎖會(huì)導(dǎo)致優(yōu)先級(jí)倒置,引起性能風(fēng)險(xiǎn)。
CAS操作
Compare And Set(或Compare And Swap),CAS是解決多線程并行情況下使用鎖造成性能損耗的一種機(jī)制,CAS操作包含三個(gè)操作數(shù)——內(nèi)存位置(V)、預(yù)期原值(A)、新值(B)。
如果內(nèi)存位置的值與預(yù)期原值相同,那么處理器會(huì)自動(dòng)將內(nèi)存的值更新為新值。否則,處理器不做任何操作。無論哪種情況,處理器都會(huì)在CAS指令之前返回該位置的值。
CAS有效地說明了 “我認(rèn)為位置V應(yīng)該包含值A(chǔ);如果包含該值,則將B放到這個(gè)位置;否則,不要更新該位置,只告訴我這個(gè)位置現(xiàn)在的值即可。”
現(xiàn)在幾乎所有的CPU指令都支持CAS的原子操作,X86下對應(yīng)的是CMPXCHG匯編指令。有了這個(gè)操作,我們就可以用其來實(shí)現(xiàn)各種無鎖的數(shù)據(jù)結(jié)構(gòu)。
這個(gè)操作可以用以下的例子來描述:
意思是,看一看內(nèi)存*reg里的值是不是oldval,如果是的話,則對其賦值newval,并返回true,表示更新成功,如果返回false,則表示修改失敗。
bool?compare_and_swap(int?*reg,int?oldval,int?newval)
{
????int?reg_val?=?*reg;
????if(reg_val?==?oldval)
????{
????????*reg?=?newval;
????????return?true;
????}
????return?false;
}
CAS操作無鎖隊(duì)列的實(shí)現(xiàn)(參考)
Q:CAS的實(shí)現(xiàn)
A:gcc提供了兩個(gè)函數(shù)
bool?__sync_bool_compare_and_swap?(type?*ptr,?type?oldval,?
????????????????????????????????????type?newval, ...);
type?__sync_val_compare_and_swap?(type?*ptr,?type?oldval,?
????????????????????????????type?newval, ...);
這兩個(gè)函數(shù)提供原子的比較和交換,如果*ptr == oldval,就將newval寫入*ptr,
第一個(gè)函數(shù)在相等并寫入的情況下返回true,這個(gè)函數(shù)比第二個(gè)好在,返回bool值可以知道有沒有更新成功。
第二個(gè)函數(shù)在返回操作之前的值。
第二個(gè)函數(shù)用c語言描述:
type?__sync_val_compare_and_swap?(type?*ptr,?type?oldval,?
????????????????????????????type?newval,?...)
{
????type?cur?=?*ptr;
????if?(cur?==?oldval)
????{
????????*ptr?=?newval;
????}
????return?cur;//?返回操作之前的值
}
type只能是1,2,4或8字節(jié)長度的int類型,否則會(huì)發(fā)生下面的錯(cuò)誤https://img-blog.csdn.net/20180812085312545?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lpc2hpenVvZmVp/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70
Q: 操作系統(tǒng)級(jí)別是如何實(shí)現(xiàn)的
A: X86中有一個(gè)CMPXCHG的匯編指令
Q: CAS指令有什么缺點(diǎn)
A: 1.存在ABA問題因?yàn)镃AS需要在操作值的時(shí)候檢查下值有沒有發(fā)生變化,如果沒有發(fā)生變化則更新,但是如果一個(gè)值原來是A,變成了B,又變成了A,那么使用CAS進(jìn)行檢查時(shí)會(huì)發(fā)現(xiàn)它的值沒有發(fā)生變化,但是實(shí)際上卻變化了。
ABA問題的解決思路就是使用版本號(hào)。在變量前面追加上版本號(hào),每次變量更新的時(shí)候把版本號(hào)加一,那么A-B-A 就會(huì)變成1A-2B-3A。
2.循環(huán)時(shí)間長開銷大自旋CAS如果長時(shí)間不成功,會(huì)給CPU帶來非常大的執(zhí)行開銷。
3.只能保證一個(gè)共享變量的原子操作對一個(gè)共享變量執(zhí)行操作時(shí),我們可以使用循環(huán)CAS的方式來保證原子操作,但是對多個(gè)共享變量操作時(shí),循環(huán)CAS就無法保證操作的原子性,這個(gè)時(shí)候就可以用鎖,或者有一個(gè)取巧的辦法,就是把多個(gè)共享變量合并成一個(gè)共享變量來操作。比如有兩個(gè)共享變量i=2,j=a,合并一下ij=2a,然后用CAS來操作ij。
gcc從4.1.2提供了__sync_*系列的built-in函數(shù),用于提供加減和邏輯運(yùn)算的原子操作。
其聲明如下:
原子操作的后置加加 type __sync_fetch_and_add (type *ptr, type value, …)
原子操作的前置加加 type __sync_add_and_fetch (type *ptr, type value, …)
其他類比
type?__sync_fetch_and_sub(type?*ptr,?type?value,?…)??
type?__sync_fetch_and_or(type?*ptr,?type?value,?…)??
type?__sync_fetch_and_and(type?*ptr,?type?value,?…)??
type?__sync_fetch_and_xor(type?*ptr,?type?value,?…)??
type?__sync_fetch_and_nand(type?*ptr,?type?value,?…)
type?__sync_sub_and_fetch(type?\*ptr,?type?value,?…)??
type?__sync_or_and_fetch(type?*ptr,?type?value,?…)??
type?__sync_and_and_fetch(type?*ptr,?type?value,?…)??
type?__sync_xor_and_fetch(type?*ptr,?type?value,?…)??
type?__sync_nand_and_fetch(type?*ptr,?type?value,?…)?
這兩組函數(shù)的區(qū)別在于第一組返回更新前的值,第二組返回更新后的值。
關(guān)于CAS函數(shù),參考:?
http://blog.csdn.net/youfuchen/article/details/23179799
在多線程環(huán)境下有以下情景:
比如對同一鏈隊(duì)列進(jìn)行入隊(duì)操作時(shí)一個(gè)線程正在將新的隊(duì)列節(jié)點(diǎn) 掛載到 隊(duì)尾節(jié)點(diǎn)的next上,可是還沒來的及更新隊(duì)尾節(jié)點(diǎn) 但同一時(shí)刻另一個(gè)線程也在進(jìn)行入隊(duì)操作將新的隊(duì)列節(jié)點(diǎn)也掛在到了沒更新的隊(duì)尾節(jié)點(diǎn)那么先掛載的節(jié)點(diǎn)就丟失了。
為了解決多線程環(huán)境下的這些問題,我們第一時(shí)間肯定想到了加上互斥鎖控制同一時(shí)刻只能有一個(gè)線程可以對隊(duì)列進(jìn)行寫操作,但是加鎖的操作太消耗系統(tǒng)資源了很繁重 。
因?yàn)閷εR界區(qū)的操作只有一步 就是對隊(duì)列的尾節(jié)點(diǎn)進(jìn)行更新,只要讓這一步進(jìn)行的是原子操作就可以了,所以使用到了CAS操作。
為了有一個(gè)對比 寫了一份thread_queue.c是用鎖對臨界區(qū)進(jìn)行控制訪問的
另一份是lock_free_queue.c是用CAS確保對臨界區(qū)的操作是原子操作
“queue.h”
#ifndef?QUEUE_H_
#define?QUEUE_H_
#include?
#include?
/*
普通的
鏈?zhǔn)疥?duì)列
*/
typedef?struct?QNode
{
????int?data;
????struct?QNode?*next;
}QNode,?*QueuePtr;
typedef?struct?LinkQueue
{
????QueuePtr?front;
????QueuePtr?rear;
}LinkQueue;
void?init_Queue(LinkQueue?*q);//初始化隊(duì)列
void?push_Queue(LinkQueue?*q,?int?e);//隊(duì)尾入隊(duì)
int?pop_Queue(LinkQueue?*q,?int?*e);//隊(duì)頭出隊(duì)
int?is_Empty(LinkQueue?*q);
void?show(LinkQueue?*q);
#endif?/*?QUEUE_H_?*/
“queue.c”
#include?"queue.h"
/*
初始化
為隊(duì)列構(gòu)建一個(gè)頭結(jié)點(diǎn)
讓front和rear都指向這個(gè)頭結(jié)點(diǎn)
*/
void?init_Queue(LinkQueue?*q)
{
????q->front?=?q->rear?=?(QNode?*)malloc(sizeof(QNode));
????q->front->next?=?NULL;
}
/*
普通的入隊(duì)操作
*/
void?push_Queue(LinkQueue?*q,?int?e)
{
????QueuePtr?newNode?=?(QueuePtr)malloc(sizeof(QNode));
????newNode->data?=?e;
????newNode->next?=?NULL;
????q->rear->next?=?newNode;
????q->rear?=?newNode;
}
/*
cas的入隊(duì)操作
和普通的入隊(duì)操作一樣
新建節(jié)點(diǎn)后
要將新節(jié)點(diǎn)掛在隊(duì)尾時(shí)需要進(jìn)行cas操作
因?yàn)楣俜轿臋n:The definition given in?the?Intel?documentation?allows?only?for?the?use?of?the?types?int,?long,?long?long?as?well?as?their?unsigned?counterparts
只能用?int,?long,?long?long
所以要把指針類型?QueuePtr?變成?long
用long的另一個(gè)原因就是:屏蔽32位和64位的差異 long在32位是4字節(jié) 64位是8字節(jié)
*/
void?cas_push(LinkQueue?*q,?int?e)
{
????QueuePtr?newNode?=?(QueuePtr)malloc(sizeof(QNode));
????newNode->data?=?e;
????newNode->next?=?NULL;
????QueuePtr?tmp;
????do
????{
????????tmp?=?q->rear;
????}while?(!__sync_bool_compare_and_swap((long?*)(&(tmp->next)),?NULL,?(long)newNode));
????q->rear?=?newNode;
}
/*
以前的判空是?q->front?==?q->rear
但是這樣子會(huì)增加出隊(duì)的操作?當(dāng)出的是最后一個(gè)元素時(shí),?q->rear需要指向?q->front
我把這一步省了?暫時(shí)沒有發(fā)現(xiàn)有什么副作用
所以我改成了?q->front->next?==?NULL
*/
int?is_Empty(LinkQueue?*q)
{
????if?(q->front->next?==?NULL)
????{
????????return(1);
????}
????return(0);
}
/*
普通的出隊(duì)操作
如果隊(duì)空?返回0?也就是false
e作為接受元素的緩沖
*/
int?pop_Queue(LinkQueue?*q,?int?*e)
{
????if?(is_Empty(q))
????{
????????return(0);
????}
????QueuePtr?tmp;
????tmp?=?q->front->next;
????q->front->next?=?tmp->next;
????*e?=?tmp->data;
????free(tmp);
????return(1);
}
/*
cas的出隊(duì)操作
每一次都要判斷這個(gè)隊(duì)列是不是空
然后執(zhí)行cas的出隊(duì)操作:
(1)tmp = q->rear 把舊的隊(duì)頭存起來
(2)執(zhí)行原子操作:看?舊的隊(duì)頭?是否等于?現(xiàn)在的隊(duì)頭 tmp ==?*(&(q->front))?如果相等執(zhí)行?*(&(q->front))?= tmp->next 返回true?
????否則,即執(zhí)行這一步原子操作的時(shí)候,別的線程修改了隊(duì)列,導(dǎo)致隊(duì)尾指向改變了,返回false?,while(!false)回到第一步重新執(zhí)行
*/
int?cas_pop(LinkQueue?*q,?int?*e)
{
????QueuePtr?tmp;
????do?{
????????if?(is_Empty(q))
????????{
????????????return(0);
????????}
????????//printf("cas_pop...\n");
????????tmp?=?q->front->next;
????}?while?(!__sync_bool_compare_and_swap((long?*)(&(q->front->next)),?(long)tmp,?(long)tmp->next));
????*e?=?tmp->data;
????free(tmp);
????return(1);
}
/*
遍歷隊(duì)列?打印里面的元素?為了求證隊(duì)列里面的元素
*/
void?show(LinkQueue?*q)
{
????printf("void?show(LinkQueue?*q)\n");
????QueuePtr?tmp?=?q->front->next;
????while?(tmp)
????{
????????printf("%d?",?tmp->data);
????????tmp?=?tmp->next;
????}
????printf("\n");
}
“thread_queue.c”
#include?"queue.h"
#include?
#include?
#include?
#include?
#define?THREAD_NUMBER?4//開啟的線程數(shù),電腦是4核,所以用4
//sem_t?queue_sem;//信號(hào)量
pthread_mutex_t?mutex;//互斥鎖
void?*thread_push(void?*arg);
void?*thread_pop(void?*arg);
int?main()
{
????LinkQueue?que;
????init_Queue(&que);
????/*初始化二進(jìn)制信號(hào)量?初始值為1?代表每一次只有1個(gè)線程可以訪問?
????本來更加應(yīng)該用互斥量?比較貼合情景?但是不太熟?就用了信號(hào)量
????*/
????//int?res?=?sem_init(&queue_sem,?0,?1);
????//assert(res?!=?-1);
????int?i;
????pthread_t?threadArr[THREAD_NUMBER];
????for?(i?=?0;?i?????{
????????pthread_create(&threadArr[i],?NULL,?thread_push,?(void?*)&que);
????}
????for?(i?=?0;?i?????{
????????pthread_join(threadArr[i],?NULL);
????}
????show(&que);
????for?(i?=?0;?i?????{
????????pthread_create(&threadArr[i],?NULL,?thread_pop,?(void?*)&que);
????}
????for?(i?=?0;?i?????{
????????pthread_join(threadArr[i],?NULL);
????}
????//sem_destroy(&queue_sem);
????exit(EXIT_SUCCESS);
}
void?*thread_push(void?*arg)
{
????printf("start?push\n");
????LinkQueue?*?quePtr?=?(LinkQueue?*)arg;
????int?i;
????for?(i?=?0;?i?20;?++i)
????{
????????//sem_wait(&queue_sem);
????????pthread_mutex_lock(&mutex);
????????push_Queue(quePtr,?i);
????????pthread_mutex_unlock(&mutex);
????????//sem_post(&queue_sem);
????}
????printf("finish?push\n");
????pthread_exit(NULL);
}
void?*thread_pop(void?*arg)
{
????printf("start?pop\n");
????LinkQueue?*?quePtr?=?(LinkQueue?*)arg;
????int?tmp;
????int?res;
????while?(1)
????{
????????//sem_wait(&queue_sem);
????????pthread_mutex_lock(&mutex);
????????res?=?pop_Queue(quePtr,?&tmp);
????????pthread_mutex_unlock(&mutex);
????????//sem_post(&queue_sem);
????????if?(!res)
????????{
????????????break;
????????}
????????printf("%d?",?tmp);
????}
????printf("finish?pop\n");
????pthread_exit(NULL);
}
“l(fā)ock_free_queue.c”
#include?"queue.h"
#include?
#include?
#include?
#define?THREAD_NUMBER?4//開啟的線程數(shù),電腦是4核,所以用4
void?*thread_push(void?*arg);
void?*thread_pop(void?*arg);
/*
初始化空隊(duì)列
為了模擬線程對資源的搶占
開啟4個(gè)線程?每個(gè)線程push?20個(gè)元素?0~19
等待4個(gè)線程結(jié)束
打印隊(duì)列元素?驗(yàn)證push
開啟四個(gè)線程?每個(gè)線程都對隊(duì)列進(jìn)行?pop操作
*/
int?main()
{
????LinkQueue?que;
????init_Queue(&que);
????int?i;
????/*
????創(chuàng)造四個(gè)新線程?每個(gè)線程都執(zhí)行?thread_push(&que)
????*/
????pthread_t?threadArr[THREAD_NUMBER];
????for?(i?=?0;?i?????{
????????pthread_create(&threadArr[i],?NULL,?thread_push,?(void?*)&que);
????}
????/*
????等待四個(gè)線程都執(zhí)行完
????要不然主線程一下子就跑完了?程序就結(jié)束了
????還有就是?為了show函數(shù)?可以驗(yàn)證元素是不是都push進(jìn)去了
????*/
????for?(i?=?0;?i?????{
????????pthread_join(threadArr[i],?NULL);
????}
????show(&que);
????/*
????創(chuàng)造四個(gè)新線程?每個(gè)線程都執(zhí)行?thread_pop(&que)
????*/
????for?(i?=?0;?i?????{
????????pthread_create(&threadArr[i],?NULL,?thread_pop,?(void?*)&que);
????}
????for?(i?=?0;?i?????{
????????pthread_join(threadArr[i],?NULL);
????}
????exit(EXIT_SUCCESS);
}
void?*thread_push(void?*arg)
{
????printf("start?push\n");
????LinkQueue?*?quePtr?=?(LinkQueue?*)arg;
????int?i;
????for?(i?=?0;?i?20;?++i)
????{
????????cas_push(quePtr,?i);
????}
????printf("finish?push\n");
????pthread_exit(NULL);
}
void?*thread_pop(void?*arg)
{
????printf("start?pop\n");
????LinkQueue?*?quePtr?=?(LinkQueue?*)arg;
????int?tmp;
????int?res;
????while?(1)
????{
????????res?=?cas_pop(quePtr,?&tmp);
????????if?(!res)
????????{
????????????break;
????????}
????????printf("%d?",?tmp);
????????//sleep(1);
????}
????printf("finish?pop\n");
????pthread_exit(NULL);
}