刨根問底 Java 并發(fā)之 CAS
你知道的越多,不知道的就越多,業(yè)余的像一棵小草!
成功路上并不擁擠,因為堅持的人不多。
編輯:業(yè)余草
推薦:https://www.xttblog.com/?p=5186
每天進步一點點,日積月累就很了不起!
前言
后端開發(fā)中大家肯定遇到過實現(xiàn)一個線程安全的計數(shù)器這種需求,根據(jù)經(jīng)驗?zāi)銘?yīng)該知道我們要在多線程中實現(xiàn) 「共享變量」 的原子性和可見性問題,于是鎖成為一個不可避免的話題,今天我們討論的是與之對應(yīng)的無鎖 CAS。本文會從怎么來的、是什么、怎么用、原理分析、遇到的問題等不同的角度帶你真正搞懂 CAS。
為什么要無鎖
我們一想到在多線程下保證安全的方式頭一個要拎出來的肯定是鎖,不管從硬件、操作系統(tǒng)層面都或多或少在使用鎖。鎖有什么缺點嗎?當然有了,不然 JDK 里為什么出現(xiàn)那么多各式各樣的鎖,就是因為每一種鎖都有其優(yōu)劣勢。

使用鎖就需要獲得鎖、釋放鎖,CPU 需要通過上下文切換和調(diào)度管理來進行這個操作,對于一個 「獨占鎖」 而言一個線程在持有鎖后沒執(zhí)行結(jié)束其他的哥們就必須在外面等著,等到前面的哥們執(zhí)行完畢 CPU 大哥就會把鎖拿出來其他的線程來搶了(非公平)。鎖的這種概念基于一種悲觀機制,它總是認為數(shù)據(jù)會被修改,所以你在操作一部分代碼塊之前先加一把鎖,操作完畢后再釋放,這樣就安全了。其實在 JDK1.5 使用 synchronized就可以做到。

但是像上面的操作在多線程下會讓 CPU 不斷的切換,非常消耗資源,我們知道可以使用具體的某一類鎖來避免部分問題。那除了鎖的方式還有其他的嗎?當然,有人就提出了無鎖算法,比較有名的就是我們今天要說的 CAS(compare and swap),和鎖不同的是它是一種樂觀的機制,它認為別人去拿數(shù)據(jù)的時候不會修改,但是在修改數(shù)據(jù)的時候去判斷一下數(shù)據(jù)此時的狀態(tài),這樣的話 CPU 不會切換,在讀多的情況下性能將得到大幅提升。當前我們使用的大部分 CPU 都有 CAS 指令了,從硬件層面支持無鎖,這樣開發(fā)的時候去調(diào)用就可以了。
不論是鎖還是無鎖都有其優(yōu)劣勢,后面我們也會通過例子說明 CAS 的問題。
什么是CAS?
前面提了無鎖的 CAS,那到底 CAS 是個啥呢?我已經(jīng)迫不及待了,我們來看看維基百科的解釋
比較并交換(compare and swap, CAS),是原子操作的一種,可用于在多線程編程中實現(xiàn)不被打斷的數(shù)據(jù)交換操作,從而避免多線程同時改寫某一數(shù)據(jù)時由于執(zhí)行順序不確定性以及中斷的不可預(yù)知性產(chǎn)生的數(shù)據(jù)不一致問題。該操作通過將內(nèi)存中的值與指定數(shù)據(jù)進行比較,當數(shù)值一樣時將內(nèi)存中的數(shù)據(jù)替換為新的值。
CAS 給我們提供了一種思路,通過 「比較」 和 「替換」 來完成原子性,來看一段代碼:
int cas(long *addr, long old, long new) {
/* 原子執(zhí)行 */
if(*addr != old)
return 0;
*addr = new;
return 1;
}
這是一段 c 語言代碼,可以看到有 3 個參數(shù),分別是:
*addr: 進行比較的值old: 內(nèi)存當前值new: 準備修改的新值,寫入到內(nèi)存
只要我們當前傳入的進行比較的值和內(nèi)存里的值相等,就將新值修改成功,否則返回 0 告訴比較失敗了。學(xué)過數(shù)據(jù)庫的同學(xué)都知道悲觀鎖和樂觀鎖,樂觀鎖總是認為數(shù)據(jù)不會被修改。基于這種假設(shè) CAS 的操作也認為內(nèi)存里的值和當前值是相等的,所以操作總是能成功,我們可以不需要加鎖就實現(xiàn)多線程下的原子性操作。
在多線程情況下使用 CAS 同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程并不會被阻塞掛起,而是告訴它這次修改失敗了,你可以重新嘗試,于是可以寫這樣的代碼。
while (!cas(&addr, old, newValue)) {
}
// success
printf("new value = %ld", addr);
不過這樣的代碼相信你可能看出其中的蹊蹺了,這個我們后面來分析,下面來看看 Java 里是怎么用 CAS 的。
Java 中的CAS
還是前面的問題,如果讓你用 Java 的 API 來實現(xiàn)你可能會想到兩種方式,一種是加鎖(可能是 synchronized 或者其他種類的鎖),另一種是使用 atomic 類,如 AtomicInteger,這一系列類是在 JDK1.5 的時候出現(xiàn)的,在我們常用的 java.util.concurrent.atomic 包下,我們來看個例子:
ExecutorService executorService = Executors.newCachedThreadPool();
AtomicInteger atomicInteger = new AtomicInteger(0);
for (int i = 0; i < 5000; i++) {
executorService.execute(atomicInteger::incrementAndGet);
}
System.out.println(atomicInteger.get());
executorService.shutdown();
這個例子開啟了 5000 個線程去進行累加操作,不管你執(zhí)行多少次答案都是 5000。這么神奇的操作是如何實現(xiàn)的呢?就是依靠 CAS 這種技術(shù)來完成的,我們揭開 AtomicInteger 的老底看看它的代碼:
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
/**
* Creates a new AtomicInteger with the given initial value.
* @param initialValue the initial value
*/
public AtomicInteger(int initialValue) {
value = initialValue;
}
/**
* Gets the current value.
* @return the current value
*/
public final int get() {
return value;
}
/**
* Atomically increments by one the current value.
* @return the updated value
*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
}
這里我只帖出了我們前面例子相關(guān)的代碼,其他都是類似的,可以看到 incrementAndGet 調(diào)用了 unsafe.getAndAddInt 方法。Unsafe 這個類是 JDK 提供的一個比較底層的類,它不讓我們程序員直接使用,主要是怕操作不當把機器玩壞了。。。(其實可以通過反射的方式獲取到這個類的實例)你會在 JDK 源碼的很多地方看到這家伙,我們先說說它有什么能力:
內(nèi)存管理:包括分配內(nèi)存、釋放內(nèi)存
操作類、對象、變量:通過獲取對象和變量偏移量直接修改數(shù)據(jù)
掛起與恢復(fù):將線程阻塞或者恢復(fù)阻塞狀態(tài)
CAS:調(diào)用 CPU 的 CAS 指令進行比較和交換
內(nèi)存屏障:定義內(nèi)存屏障,避免指令重排序
這里只是大致提一下常用的操作,具體細節(jié)可以在文末的參考鏈接中查看。下面我們繼續(xù)看 unsafe 的 getAndAddInt 在做什么。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
public native int getIntVolatile(Object var1, long var2);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
其實很簡單,先通過 getIntVolatile 獲取到內(nèi)存的當前值,然后進行比較,展開 compareAndSwapInt 方法的幾個參數(shù):
var1: 當前要操作的對象(其實就是AtomicInteger實例)var2: 當前要操作的變量偏移量(可以理解為 CAS 中的內(nèi)存當前值)var4: 期望內(nèi)存中的值var5: 要修改的新值
所以 this.compareAndSwapInt(var1, var2, var5, var5 + var4) 的意思就是,比較一下 var2 和內(nèi)存當前值 var5 是否相等,如果相等那我就將內(nèi)存值 var5 修改為 var5 + var4(var4 就是 1,也可以是其他數(shù))。
這里我們還需要解釋一下 「偏移量」 是個啥?你在前面的代碼中可能看到這么一段:
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
可以看出在靜態(tài)代碼塊執(zhí)行的時候?qū)?AtomicInteger 類的 value 這個字段的偏移量獲取出來,拿這個 long 數(shù)據(jù)干嘛呢?在 Unsafe 類里很多地方都需要傳入 obj 和偏移量,結(jié)合我們說 Unsafe 的諸多能力,其實就是直接通過更底層的方式將對象字段在內(nèi)存的數(shù)據(jù)修改掉。
使用上面的方式就可以很好的解決多線程下的原子性和可見性問題。由于代碼里使用了 do while 這種循環(huán)結(jié)構(gòu),所以 CPU 不會被掛起,比較失敗后重試,就不存在上下文切換了,實現(xiàn)了無鎖并發(fā)編程
CAS存在的問題
自旋的劣勢
你留意上面的代碼會發(fā)現(xiàn)一個問題,while 循環(huán)如果在最壞情況下總是失敗怎么辦?會導(dǎo)致 CPU 在不斷處理。像這種 while(!compareAndSwapInt) 的操作我們稱之為自旋,CAS 是樂觀的,認為大家來并不都是修改數(shù)據(jù)的,現(xiàn)實可能出現(xiàn)非常多的線程過來都要修改這個數(shù)據(jù),此時隨著并發(fā)量的增加會導(dǎo)致 CAS 操作長時間不成功,CPU 也會有很大的開銷。所以我們要清楚,如果是讀多寫少的情況也就滿足樂觀,性能是非常好的。
ABA 問題
提到 CAS 不得不說 ABA 問題,它是說假如內(nèi)存的值原來是 A,被一個線程修改為了 B,此時又有一個線程把它修改為了 A,那么 CAS 肯定是操作成功的。真的這樣做的話代碼可能就有 bug 了,對于修改數(shù)據(jù)為 B 的那個線程它應(yīng)該讀取到 B 而不是 A,如果你做過數(shù)據(jù)庫相關(guān)的樂觀鎖機制可能會想到我們在比較的時候使用一個版本號 version 來進行判斷就可以搞定。在 JDK 里提供了一個 AtomicStampedReference 類來解決這個問題,來看一個例子:
int stamp = 10001;
AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(0, stamp);
stampedReference.compareAndSet(0, 10, stamp, stamp + 1);
System.out.println("value: " + stampedReference.getReference());
System.out.println("stamp: " + stampedReference.getStamp());
它的構(gòu)造函數(shù)是 2 個參數(shù),多傳入了一個初始 時間戳,用這個戳來給數(shù)據(jù)加了一個版本,這樣的話多個線程來修改如果提供的戳不同。在修改數(shù)據(jù)的時候除了提供一個新的值之外還要提供一個新的戳,這樣在多線程情況下只要數(shù)據(jù)被修改了那么戳一定會發(fā)生改變,另一個線程拿到的是舊的戳所以會修改失敗。
嘗試應(yīng)用
既然 CAS 提供了這么好的 API,我們不妨用它來實現(xiàn)一個簡易版的獨占鎖。思路是當某個線程進入 lock 方法就比較鎖對象的內(nèi)存值是否是 false,如果是則代表這把鎖它可以獲取,獲取后將內(nèi)存之修改為 true,獲取不到就自旋。在 unlock 的時候?qū)?nèi)存值再修改為 false 即可,代碼如下:
public class SpinLock {
private AtomicBoolean mutex = new AtomicBoolean(false);
public void lock() {
while (!mutex.compareAndSet(false, true)) {
// System.out.println(Thread.currentThread().getName()+ " wait lock release");
}
}
public void unlock() {
while (!mutex.compareAndSet(true, false)) {
// System.out.println(Thread.currentThread().getName()+ " wait lock release");
}
}
}
這里使用了 AtomicBoolean 這個類,當然用 AtomicInteger 也是可以的,因為我們只保存一個狀態(tài) boolean 占用比較小就用它了。這個鎖的實現(xiàn)比較簡單,缺點非常明顯,由于 while 循環(huán)導(dǎo)致的自旋會讓其他線程都在占用 CPU,但是也可以使用,關(guān)于鎖的優(yōu)化版本實現(xiàn)我會在后續(xù)的文章中進行改進和說明,正因為這些問題我們也會在后續(xù)研究 AQS 這把利器的優(yōu)點。
CAS 源碼
看了上面的這些代碼和解釋相信你對 CAS 已經(jīng)理解了,下面我們要說的原理是前面的 native 方法中的 C++ 代碼寫了什么,在 openjdk 的 /hotspot/src/share/vm/prims 目錄中有一個 Unsafe.cpp 文件中有這樣一段代碼:
注意:這里以 hotspot 實現(xiàn)為例
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
// 通過偏移量獲取對象變量地址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
// 執(zhí)行一個原子操作
// 如果結(jié)果和現(xiàn)在不同,就直接返回,因為有其他人修改了;否則會一直嘗試去修改。直到成功。
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_ENDC
好了,關(guān)于CAS今天就分享到這里了,希望對你有所幫助。
