java并發(fā)線程深入理解CAS以及ABA問題的處理
◆ 一、什么是CAS
CAS(Compare And Swap,比較并交換),通常指的是這樣一種原子操作:針對一個變量,首先比較它的內(nèi)存值與某個期望值是否相同,如果相同,就給它賦一個新值。
◆ CAS實現(xiàn)過程如下圖:
1、一個初始值變量V,值為5;一開始先讀取V實際內(nèi)存中的值賦值給E
2、比如我們需要給最原始的V+1操作,那么此時用E+1來進行操作(這是防止V在其他線程已經(jīng)被改變),這樣完成了U=E+1的操作
3、判斷E和V的值是否一致,如果一致則證明在以上操作過程中V沒有被其他線程改變則將U的值賦值給V,如果不一致那V就被其他改變了,這樣給U的+1操作就不成立,返回當前的V。

CAS 的邏輯用偽代碼描述如下:
if (value == expectedValue) {
value = newValue;以上偽代碼描述了一個由比較和賦值兩階段組成的復合操作,CAS 可以看作是它們合并后的整體——一個不可分割的原子操作,并且其原子性是直接在硬件層面得到保障的。
CAS可以看做是樂觀鎖(對比數(shù)據(jù)庫的悲觀、樂觀鎖)的一種實現(xiàn)方式,Java原子類中的遞增操作就通過CAS自旋實現(xiàn)的。
CAS是一種無鎖算法,在不使用鎖(沒有線程被阻塞)的情況下實現(xiàn)多線程之間的變量同步。
◆1-2、CAS應用
在java并發(fā)線程中,我們可以使用以下方式進行加鎖處理:\
1、synchronize 在并發(fā)競爭比較激烈的時候不推薦使用,會讓未獲得鎖的線程阻塞,因為會切換到內(nèi)核態(tài),進行park,這樣就會有性能問題
2、使用ReentrantLock lock.lock(加鎖) lock.unlock(解鎖),需要注意的是unlock需要放到finally中,防止代碼塊異常,拋出后鎖一直存在。
3、CAS也可以實現(xiàn)線程鎖機制。
在 Java 中,CAS 操作是由 Unsafe 類提供支持的,該類定義了三種針對不同類型變量的 CAS 操作,如圖

它們都是 native 方法,由 Java 虛擬機提供具體實現(xiàn),這意味著不同的 Java 虛擬機對它們的實現(xiàn)可能會略有不同。
以 compareAndSwapInt 為例,Unsafe 的 compareAndSwapInt 方法接收 4 個參數(shù),分別是:對象實例、內(nèi)存偏移量、字段期望值、字段新值。該方法會針對指定對象實例中的相應偏移量的字段執(zhí)行 CAS 操作。
◆1-3、并發(fā)鎖的實現(xiàn)
◆1-3-1、使用synchronized
我們都知道用synchronized可以實現(xiàn)鎖機制,但是在并發(fā)量大的時候,會**鎖競爭的現(xiàn)象,這樣就會涉及到用戶態(tài)到內(nèi)核態(tài)的切換**,這是極其耗費性能的,因此建議在并發(fā)量不大的情況下,可以使用synchronized進行加鎖。并發(fā)量大的時候,不建議使用
public class ThreadLockSynchronized {
private volatile static int count = 0;
static Object object = "";
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
synchronized (object) {
for (int j = 0; j < 10000; j++) {
count++;
}
}
}
});
thread.start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("count:" + count);
}
}以上代碼測試結(jié)果如下:

◆1-3-2、使用ReentrantLock
使用ReentrantLock的時候一定要注意,要將unlock()放到finally代碼塊中,防止業(yè)務代碼異常,無法釋放鎖
public class ThreadLockReentrantLock {
private volatile static int count = 0;
static ReentrantLock reentrantLock=new ReentrantLock();
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
//使用reentrantLock.lock();進行加鎖操作
reentrantLock.lock();
try {
for (int j = 0; j < 10000; j++) {
count++;
}
} finally {
//reentrantLock.unlock();一定要放到finally中,防止業(yè)務代碼異常,導致鎖不釋放
reentrantLock.unlock();
}
}
});
thread.start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("count:" + count);
}
}◆1-3-3、使用CAS
◆1-3-3-1、通過反射獲得Unsafe
Unsafe是jdk提供的工具類,我們要使用需要通過反射機制取到,同時獲得其獲取偏移量的操作
public class UnsafeFactory {
/**
* 獲取 Unsafe 對象
* @return
*/
public static Unsafe getUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 獲取字段的內(nèi)存偏移量
* @param unsafe
* @param clazz
* @param fieldName
* @return
*/
public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) {
try {
return unsafe.objectFieldOffset(clazz.getDeclaredField(fieldName));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
}◆1-3-3-2、通過CAS實現(xiàn)對變量的修改
首先定義一個對象Entity,其中定義一個int類型變量X,然后通過CAS對X進行修改。
public class CASTest {
public static void main(String[] args) {
Entity entity = new Entity();
//通過反射機制獲得Unsafe
Unsafe unsafe = UnsafeFactory.getUnsafe();
//獲得x內(nèi)存中的偏移量
long offset = UnsafeFactory.getFieldOffset(unsafe, Entity.class, "x");
System.out.println(offset);
boolean successful;
// 4個參數(shù)分別是:對象實例、字段的內(nèi)存偏移量、字段原值、字段更新值
//通過CAS將x由0改為3
successful = unsafe.compareAndSwapInt(entity, offset, 0, 3);
System.out.println(successful + "\t" + entity.x);
//通過CAS將x由3改為5
successful = unsafe.compareAndSwapInt(entity, offset, 3, 5);
System.out.println(successful + "\t" + entity.x);
//通過CAS將x由3改為8--本條是不成立的,上面已經(jīng)將x改為5
successful = unsafe.compareAndSwapInt(entity, offset, 3, 8);
System.out.println(successful + "\t" + entity.x);
}
}
class Entity{
int x;
}運行結(jié)果如下:
首先獲得X的偏移量為12,其次就是對x進行修改的結(jié)果打印。第三次修改失敗,因為第二次已經(jīng)將x改為5,如果在用第一次的3作為原值去修改,就會修改失敗。

◆1-3-3-3、CAS特點
根據(jù)以上執(zhí)行結(jié)果,就證明了CAS的特點:先比較、后更新,這兩步,底層會幫助我們實現(xiàn)原子操作,有序性和可見性避免上線文切換。
◆1-4、CAS源碼分析
◆1-4-1、java層代碼
Unsafe類中提供了三種CAS操作如下,這三種都是native方法,都是Hotspot代碼,

下面用compareAndSwapInt方法來舉例說明
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);方法參數(shù):
1、對象實例
2、字段內(nèi)存值的偏移量(根據(jù)對象實例和偏移量就可以獲得對應的變量)
3、期望對象實例中的原值
4、字段更新值
◆1-4-2、Hotspot層
◆1-4-2-1、Unsafe_CompareAndSwapInt方法
調(diào)用Hotspot方法源碼如下:
#unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
// 根據(jù)偏移量,計算value的地址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
// Atomic::cmpxchg(x, addr, e) cas邏輯 x:要交換的值 e:要比較的值
//cas成功,返回期望值e,等于e,此方法返回true
//cas失敗,返回內(nèi)存中的value值,不等于e,此方法返回false
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;◆1-4-2-1-1、方法解析
首先調(diào)用的方法:
Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)
前面兩個值是hotspot傳入的值,后面四個值為java方法傳入進來的。
其次根據(jù)偏移量,計算value的地址
//獲得對象
oop p = JNIHandles::resolve(obj);
// 根據(jù)偏移量,計算value的地址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);然后調(diào)用Atomic::cmpxchg實現(xiàn)CAS操作
// Atomic::cmpxchg(x, addr, e) cas邏輯 x:要交換的值 e:要比較的值
//cas成功,返回期望值e,等于e,此方法返回true
//cas失敗,返回內(nèi)存中的value值,不等于e,此方法返回false
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;◆1-4-2-2、Atomic::cmpxchg方法
需要注意下面的代碼是Linux_x86,不同系統(tǒng)處理CAS是不同的
#atomic_linux_x86.inline.hpp
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
//判斷當前執(zhí)行環(huán)境是否為多處理器環(huán)境
int mp = os::is_MP();
//LOCK_IF_MP(%4) 在多處理器環(huán)境下,為 cmpxchgl 指令添加 lock 前綴,以達到內(nèi)存屏障的效果
//cmpxchgl 指令是包含在 x86 架構(gòu)及 IA-64 架構(gòu)中的一個原子條件指令,
//它會首先比較 dest 指針指向的內(nèi)存值是否和 compare_value 的值相等,
//如果相等,則雙向交換 dest 與 exchange_value,否則就單方面地將 dest 指向的內(nèi)存值交給exchange_value。
//這條指令完成了整個 CAS 操作,因此它也被稱為 CAS 指令。
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value; cmpxchgl的詳細執(zhí)行過程:
首先,輸入是"r" (exchange_value), “a” (compare_value), “r” (dest), “r” (mp),表示compare_value存入eax寄存器,而exchange_value、dest、mp的值存入任意的通用寄存器。嵌入式匯編規(guī)定把輸出和輸入寄存器按統(tǒng)一順序編號,順序是從輸出寄存器序列從左到右從上到下以“%0”開始,分別記為%0、%1···%9。也就是說,輸出的eax是%0,輸入的exchange_value、compare_value、dest、mp分別是%1、%2、%3、%4。
因此,cmpxchg %1,(%3)實際上表示cmpxchg exchange_value,(dest)
需要注意的是cmpxchg有個隱含操作數(shù)eax,其實際過程是先比較eax的值(也就是compare_value)和dest地址所存的值是否相等,
輸出是"=a" (exchange_value),表示把eax中存的值寫入exchange_value變量中。
Atomic::cmpxchg這個函數(shù)最終返回值是exchange_value,也就是說,如果cmpxchgl執(zhí)行時compare_value和dest指針指向內(nèi)存值相等則會使得dest指針指向內(nèi)存值變成exchange_value,最終eax存的compare_value賦值給了exchange_value變量,即函數(shù)最終返回的值是原先的compare_value。此時Unsafe_CompareAndSwapInt的返回值(jint)(Atomic::cmpxchg(x, addr, e)) == e就是true,表明CAS成功。如果cmpxchgl執(zhí)行時compare_value和(dest)不等則會把當前dest指針指向內(nèi)存的值寫入eax,最終輸出時賦值給exchange_value變量作為返回值,導致(jint)(Atomic::cmpxchg(x, addr, e)) == e得到false,表明CAS失敗。
現(xiàn)代處理器指令集架構(gòu)基本上都會提供 CAS 指令,例如 x86 和 IA-64 架構(gòu)中的 cmpxchgl 指令和 comxchgq 指令,sparc 架構(gòu)中的 cas 指令和 casx 指令。
不管是 Hotspot 中的 Atomic::cmpxchg 方法,還是 Java 中的 compareAndSwapInt 方法,它們本質(zhì)上都是對相應平臺的 CAS 指令的一層簡單封裝。CAS 指令作為一種硬件原語,有著天然的原子性,這也正是 CAS 的價值所在。
◆1-5、通過CAS實現(xiàn)鎖
我們再回到一開始的代碼中,該如何實現(xiàn)鎖呢
public class ThreadLockCAS {
private volatile static int count = 0;
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
count++;
}
}
});
thread.start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("count:" + count);
}
}◆1-5-1、實現(xiàn)思路
通過上對CAS的理解,我們就可以這么操作:
1、設置一個中間值為0
2、第一個線程進來的時候通過CAS將中間值改為1
3、其他線程進來 再次準備將0改為1的時候就會失敗
4、第一個線程業(yè)務操作完畢,再次通過cas將中間值改為0
5、下一個線程再次通過CAS將0改為1就會成功,獲得鎖
◆1-5-2、實現(xiàn)一個CAS比較與交換的幫助類
◆1-5-2-1、實現(xiàn)加鎖的類
其中UnsafeFactory類的代碼在上面已經(jīng)貼過,這個地方就不再貼了。這個類中主要就是cas()這個方法。這就是CAS方式加鎖的操作。
state就是作為改變交換的值。
public class CASLock {
private volatile int state;
private static final Unsafe UNSAFE;
private static final long OFFSET;
static {
try {
UNSAFE= UnsafeFactory.getUnsafe();
OFFSET=UnsafeFactory.getFieldOffset(UNSAFE,CASLock.class,"state");
} catch (Exception e) {
throw new Error(e);
}
}
//設置CAS操作
public boolean cas(){
return UNSAFE.compareAndSwapInt(this,OFFSET,0,1);
}
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
}◆1-5-2-2、使用鎖實現(xiàn)
如下代碼,在第一個for循環(huán)中創(chuàng)建線程,然后線程中實現(xiàn)了一個自旋操作,這樣第一個線程進入的鎖之后,其他線程都在進行自旋操作,等第一個線程通過casLock.setState(0),釋放鎖的時候,下一個線程casLock.getState()==0 && casLock.cas()就可以成立。
public class ThreadLockCAS {
private volatile static int count = 0;
static CASLock casLock = new CASLock();
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
for (; ; ) {
//state=0
if (casLock.getState() == 0 && casLock.cas()) {
try {
for (int j = 0; j < 10000; j++) {
count++;
}
} finally {
casLock.setState(0);
}
break;
}
}
});
thread.start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(count);
}
}雖然通過以上代碼實現(xiàn)了加鎖操作,由于使用了自旋操作,這樣等待的線程就會一直空轉(zhuǎn),消耗CPU資源,顯然這樣得實現(xiàn)方式是不太友好的
◆1-6、CAS缺陷
CAS 雖然高效地解決了原子操作,但是還是存在一些缺陷的,主要表現(xiàn)在三個方面:
自旋 CAS 長時間地不成功,則會給 CPU 帶來非常大的開銷
只能保證一個共享變量原子操作
ABA 問題
◆1-6-1、ABA問題及解決方案
CAS算法實現(xiàn)一個重要前提需要取出內(nèi)存中某時刻的數(shù)據(jù),而在下時刻比較并替換,那么在這個時間差類會導致數(shù)據(jù)的變化。
◆1-6-1-1、什么是ABA問題
當有多個線程對一個原子類進行操作的時候,某個線程在短時間內(nèi)將原子類的值A修改為B,又馬上將其修改為A,此時其他線程不感知,還是會修改成功。

◆測試ABA問題
代碼運行過程:
第一個線程要從1改為3,進入線程之后等待1秒。
第二個線程將1改為2,然后又將2改為1
第一個線程從1改為3,(因為原值還是1,這樣就修改成功了)-實際上此1非彼1
public class ABATest {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(1);
new Thread(() -> {
int value = atomicInteger.get();
System.out.println("Thread1 read value: " + value);
// 阻塞1s
LockSupport.parkNanos(1000000000L);
// Thread1通過CAS修改value值為3
if (atomicInteger.compareAndSet(value, 3)) {
System.out.println("Thread1 update from " + value + " to 3");
} else {
System.out.println("Thread1 update fail!");
}
}, "Thread1").start();
new Thread(() -> {
int value = atomicInteger.get();
System.out.println("Thread2 read value: " + value);
// Thread2通過CAS修改value值為2
if (atomicInteger.compareAndSet(value, 2)) {
System.out.println("Thread2 update from " + value + " to 2");
// do something
value = atomicInteger.get();
System.out.println("Thread2 read value: " + value);
// Thread2通過CAS修改value值為1
if (atomicInteger.compareAndSet(value, 1)) {
System.out.println("Thread2 update from " + value + " to 1");
}
}
}, "Thread2").start();
}
}運行結(jié)果如下:
Thread1不清楚Thread2對value的操作,誤以為value=1沒有修改過

◆1-6-1-2、ABA問題的解決方案
數(shù)據(jù)庫有個鎖稱為樂觀鎖,是一種基于數(shù)據(jù)版本實現(xiàn)數(shù)據(jù)同步的機制,每次修改一次數(shù)據(jù),版本就會進行累加。
AtomicStampedReference解決CAS的ABA問題
同樣,Java也提供了相應的原子引用類AtomicStampedReference

reference即我們實際存儲的變量,stamp是版本,每次修改可以通過+1保證版本唯一性。這樣就可以保證每次修改后的版本也會往上遞增。
AtomicMarkableReference解決CAS的ABA問題
可以理解為上面AtomicStampedReference的簡化版,就是不關心修改過幾次,僅僅關心是否修改過。因此變量mark是boolean類型,僅記錄值是否有過修改。

使用AtomicStampedReference,利用版本號解決ABA問題
public class AtomicStampedReferenceTest {
public static void main(String[] args) {
// 定義AtomicStampedReference Pair.reference值為1, Pair.stamp為1
AtomicStampedReference atomicStampedReference = new AtomicStampedReference(1,1);
new Thread(()->{
int[] stampHolder = new int[1];
int value = (int) atomicStampedReference.get(stampHolder);
int stamp = stampHolder[0];
System.out.println("Thread1 read value: " + value + ", stamp: " + stamp);
// 阻塞1s
LockSupport.parkNanos(1000000000L);
// Thread1通過CAS修改value值為3 stamp是版本,每次修改可以通過+1保證版本唯一性
if (atomicStampedReference.compareAndSet(value, 3,stamp,stamp+1)) {
System.out.println("Thread1 update from " + value + " to 3");
} else {
System.out.println("Thread1 update fail!");
}
},"Thread1").start();
new Thread(()->{
int[] stampHolder = new int[1];
int value = (int)atomicStampedReference.get(stampHolder);
int stamp = stampHolder[0];
System.out.println("Thread2 read value: " + value+ ", stamp: " + stamp);
// Thread2通過CAS修改value值為2
if (atomicStampedReference.compareAndSet(value, 2,stamp,stamp+1)) {
System.out.println("Thread2 update from " + value + " to 2");
// do something
value = (int) atomicStampedReference.get(stampHolder);
stamp = stampHolder[0];
System.out.println("Thread2 read value: " + value+ ", stamp: " + stamp);
// Thread2通過CAS修改value值為1
if (atomicStampedReference.compareAndSet(value, 1,stamp,stamp+1)) {
System.out.println("Thread2 update from " + value + " to 1");
}
}
},"Thread2").start();
}
}執(zhí)行結(jié)果

總結(jié):
本篇文章主要講述CAS的加鎖的實現(xiàn)方式,以及通過使用AtomicStampedRefrence和AtomicMarkableReference解決ABA的問題。
來源:
https://www.toutiao.com/article/7150893378815656486/?log_from=566667aa45dbb_1666055565317

