詳解并發(fā)編程基礎之原子操作(atomic包)
前言
最近想寫一個并發(fā)編程系列的文章,使用
Go也有一段時間了,但是對并發(fā)的理解不是很透徹,借著這次總結,希望能更進一步。我們以"原子操作"開篇,對于并發(fā)操作而言,原子操作是個非?,F(xiàn)實的問題,比較典型的應用的就是i++操作,并發(fā)情況下,同時對內存中的i進行讀取,就會產(chǎn)生與預期不符的結果,所以Go語言中的sync/atomic就是解決這個問題的,接下來我們一起來看一看Go的原子操作。
什么是原子性、原子操作
原子(atomic)本意是"不能被進一步分割的最小粒子",而原子操作(atomic operation)意為"不可中斷的一個或一系列操作"。其實用大白話說出來就是讓多個線程對同一塊內存的操作是串行的,不會因為并發(fā)操作把內存寫的不符合預期。我們來看這樣一個例子:假設現(xiàn)在是一個銀行賬戶系統(tǒng),用戶A想要自己從自己的賬戶中轉1萬元到用戶B的賬戶上,直到轉帳成功完成一個事務,主要做這兩件事:
從A的賬戶中減去1萬元,如果A的賬戶原來就有2萬元,現(xiàn)在就變成了1萬元 給B的賬戶添加1萬元,如果B的賬戶原來有2萬元,那么現(xiàn)在就變成了3萬元
假設在操作一的時候,系統(tǒng)發(fā)生了故障,導致給B賬戶添加款項失敗了,那么就要進行回滾。回滾就是回到事務之前的狀態(tài),我們把這種要么一起成功的操作叫做原子操作,而原子性就是要么完整的被執(zhí)行、要么完全不執(zhí)行。
如何保證原子性
鎖機制
在處理器層面,可以采用總線加鎖或者對緩存加鎖的方式來實現(xiàn)多處理器之間的原子操作。通過加鎖保證從系統(tǒng)內存中讀取或寫入一個字節(jié)是原子的,也就是當一個處理器讀取一個字節(jié)時,其他處理器不能訪問這個字節(jié)的內存地址。
總線鎖:處理器提供一個Lock#信號,當一個處理器上在總線上輸出此信號時,其他處理器的請求將被阻塞住,那么該處理器可以獨占共享內存??偩€鎖會把CPU和內存之間的通信鎖住了,在鎖定期間,其他處理就不能操作其他內存地址的數(shù)據(jù),所以總線鎖定的開銷比較大,所以處理會在某些場合使用緩存鎖進行優(yōu)化。緩存鎖:內存區(qū)域如果被緩存在處理器上的緩存行中,并且在Lock#操作期間,那么當它執(zhí)行操作回寫到內存時,處理不在總線上聲言Lock#信號,而是修改內部的內存地址,并允許它的緩存一致機制來保證操作的原子性,因為緩存一致性機制會阻止同時修改由兩個以上處理器緩存的內存區(qū)域的數(shù)據(jù),其他處理器回寫已被鎖定的緩存行的數(shù)據(jù)時,就會使緩存無效。
鎖機制雖然可以保證原子性,但是鎖機制會存在以下問題:
多線程競爭的情況下,頻繁的加鎖、釋放鎖會導致較多的上下文切換和調度延時,性能會很差 當一個線程占用時間比較長時,就導致其他需要此鎖的線程掛起.
上面我們說的都是悲觀鎖,要解決這種低效的問題,我們可以采用樂觀鎖,每次不加鎖,而是假設沒有沖突去完成某項操作,如果因為沖突失敗就重試,直到成功為止。也就是我們接下來要說的CAS(compare and swap).
CAS(compare and swap)
CAS的全稱為Compare And Swap,直譯就是比較交換。是一條CPU的原子指令,其作用是讓CPU先進行比較兩個值是否相等,然后原子地更新某個位置的值,其實現(xiàn)方式是給予硬件平臺的匯編指令,在intel的CPU中,使用的cmpxchg指令,就是說CAS是靠硬件實現(xiàn)的,從而在硬件層面提升效率。簡述過程是這樣:
假設包含3個參數(shù)內存位置(V)、預期原值(A)和新值(B)。
V表示要更新變量的值,E表示預期值,N表示新值。僅當V值等于E值時,才會將V的值設為N,如果V值和E值不同,則說明已經(jīng)有其他線程在做更新,則當前線程什么都不做,最后CAS返回當前V的真實值。CAS操作時抱著樂觀的態(tài)度進行的,它總是認為自己可以成功完成操作?;谶@樣的原理,CAS操作即使沒有鎖,也可以發(fā)現(xiàn)其他線程對于當前線程的干擾。
偽代碼可以這樣寫:
func CompareAndSwap(int *addr,int oldValue,int newValue) bool{
if *addr == nil{
return false
}
if *addr == oldValue {
*addr = newValue
return true
}
return false
}
不過上面的代碼可能會發(fā)生一個問題,也就是ABA問題,因為CAS需要在操作值的時候檢查下值有沒有發(fā)生變化,如果沒有發(fā)生變化則更新,但是如果一個值原來是A,變成了B,又變成了A,那么使用CAS進行檢查時會發(fā)現(xiàn)它的值沒有發(fā)生變化,但是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那么A-B-A 就會變成1A-2B-3A。
go語言中如何進行原子操作
在Go語言標準庫中,sync/atomic包將底層硬件提供的原子操作封裝成了Go的函數(shù),主要分為5個系列的函數(shù),分別是:
func SwapXXXX(addr *int32, new int32) (old int32)系列:其實就是原子性的將 new值保存到*addr并返回舊值。代碼表示:
old = *addr
*addr = new
return old
func CompareAndSwapXXXX((addr *int64, old, new int64) (swapped bool)系列:其就是原子性的比較 *addr和old的值,如果相同則將new賦值給*addr并返回真,代碼表示:
if *addr == old{
*addr = new
return ture
}
return false
func AddXXXX(addr *int64, delta int64) (new int64)系列:原子性的將 val的值添加到*addr并返回新值。代碼表示:
*addr += delta
return *addr
func LoadXXXX(addr *uint32) (val uint32)系列:原子性的獲取 *addr的值func StoreXXXX(addr *int32, val int32)原子性的將val值保存到 *addr
Go語言在1.4版本時添加一個新的類型Value,此類型的值就相當于一個容器,可以被用來"原子地"存儲(store)和加載(Load)任意類型的值。這些使用起來都還比較簡單,就不寫例子了,接下來我們一起看一看這些方法是如何實現(xiàn)的。
源碼解析
由于系列比較多。底層實現(xiàn)的方法也大同小異,這里就主要分析一下Value的實現(xiàn)方法吧。為什么不分析其他系列的呢?因為原子操作由底層硬件支持,所以看其他系列實現(xiàn)都要看匯編,Go的匯編是基于Plan9的,這個匯編語言真的資料甚少,我也是真的不懂,水平不夠,也不自討苦吃了,等后面真的能看懂這些匯編了,再來分析吧。這個網(wǎng)站有一些關于plan9匯編的知識,有興趣可以看一看:http://doc.cat-v.org/plan_9/4th_edition/papers/asm。
Value結構
我們先來看一下Value的結構:
type Value struct {
v interface{}
}
Value結構里就只有一個字段,是interface類型,雖然這里是interface類型,但是這里要注意,第一次Store寫入的類型就確定了之后寫入的類型,否則會發(fā)生panic。因為這里是interface類型,所以為了之后寫入與讀取操作方便,又在這個包里定義了一個ifaceWords結構,其實他就是一個空interface,他的作用就是將interface分解成類型和數(shù)值。結構如下:
// ifaceWords is interface{} internal representation.
type ifaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}
Value的寫入操作
我們一起來看一看他是如何實現(xiàn)寫入操作的:
// Store sets the value of the Value to x.
// All calls to Store for a given Value must use values of the same concrete type.
// Store of an inconsistent type panics, as does Store(nil).
func (v *Value) Store(x interface{}) {
if x == nil {
panic("sync/atomic: store of nil value into Value")
}
vp := (*ifaceWords)(unsafe.Pointer(v))
xp := (*ifaceWords)(unsafe.Pointer(&x))
for {
typ := LoadPointer(&vp.typ)
if typ == nil {
// Attempt to start first store.
// Disable preemption so that other goroutines can use
// active spin wait to wait for completion; and so that
// GC does not see the fake type accidentally.
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
runtime_procUnpin()
continue
}
// Complete first store.
StorePointer(&vp.data, xp.data)
StorePointer(&vp.typ, xp.typ)
runtime_procUnpin()
return
}
if uintptr(typ) == ^uintptr(0) {
// First store in progress. Wait.
// Since we disable preemption around the first store,
// we can wait with active spinning.
continue
}
// First store completed. Check type and overwrite data.
if typ != xp.typ {
panic("sync/atomic: store of inconsistently typed value into Value")
}
StorePointer(&vp.data, xp.data)
return
}
}
// Disable/enable preemption, implemented in runtime.
func runtime_procPin()
func runtime_procUnpin()
這段代碼中的注釋集已經(jīng)告訴了我們,調用Store方法寫入的類型必須與原類型相同,不一致便會發(fā)生panic。接下來分析代碼實現(xiàn):
首先判斷條件寫入?yún)?shù)不能為 nil,否則觸發(fā)panic通過使用 unsafe.Pointer將oldValue和newValue轉換成ifaceWords類型。方便我們獲取他的原始類型(typ)和值(data).為了保證原子性,所以這里使用一個 for換來處理,當已經(jīng)有Store正在進行寫入時,會進行等待.如果還沒寫入過數(shù)據(jù),那么獲取不到原始類型,就會開始第一次寫入操作,這里會把先調用 runtime_procPin()方法禁止調度器對當前 goroutine 的搶占(preemption),這樣也可以防止GC線程看到假類型。調用 CAS方法來判斷當前地址是否有被搶占,這里大家可能對unsafe.Pointer(^uintptr(0))這一句話有點不明白,因為是第一個寫入數(shù)據(jù),之前是沒有數(shù)據(jù)的,所以通過這樣一個中間值來做判斷,如果失敗就會解除搶占鎖,解除禁止調度器,繼續(xù)循環(huán)等待.設置中間值成功后,我們接下來就可以安全的把 v設為傳入的新值了,這里會先寫入值,在寫入類型(typ),因為我們會根據(jù)ty來做完成判斷。第一次寫入沒完成,我們還會通過 uintptr(typ) == ^uintptr(0)來進行判斷,因為還是第一次放入的中間類型,他依然會繼續(xù)等待第一次完成。如果第一次寫入完成,會檢查上一次寫入的類型與這次寫入的類型是否一致,不一致則會拋出 panic.
這里代碼量沒有多少,相信大家一定看懂了吧~。
Value的讀操作
先看一下代碼:
// Load returns the value set by the most recent Store.
// It returns nil if there has been no call to Store for this Value.
func (v *Value) Load() (x interface{}) {
vp := (*ifaceWords)(unsafe.Pointer(v))
typ := LoadPointer(&vp.typ)
if typ == nil || uintptr(typ) == ^uintptr(0) {
// First store not yet completed.
return nil
}
data := LoadPointer(&vp.data)
xp := (*ifaceWords)(unsafe.Pointer(&x))
xp.typ = typ
xp.data = data
return
}
讀取操作的代碼就很簡單了:1.第一步使用unsafe.Pointer將oldValue轉換成ifaceWords類型,然后獲取他的類型,如果沒有類型或者類型出去中間值,那么說明現(xiàn)在還沒數(shù)據(jù)或者第一次寫入還沒有完成。2. 通過檢查后,調用LoadPointer方法可以獲取他的值,然后構造一個新interface的typ和data返回。
小彩蛋
前面我們在說CAS時,說到了ABA問題,所以我就寫了demo試一試Go標準庫atomic.CompareAndSwapXXX方法是否有解決這個問題,看運行結果是沒有,所以這里大家使用的時候要注意一下(雖然我也沒想到什么現(xiàn)在什么業(yè)務場景會出現(xiàn)這個問題,但是還是要注意一下,需要自己評估)。
func main() {
var share uint64 = 1
wg := sync.WaitGroup{}
wg.Add(3)
// 協(xié)程1,期望值是1,欲更新的值是2
go func() {
defer wg.Done()
swapped := atomic.CompareAndSwapUint64(&share,1,2)
fmt.Println("goroutine 1",swapped)
}()
// 協(xié)程2,期望值是1,欲更新的值是2
go func() {
defer wg.Done()
time.Sleep(5 * time.Millisecond)
swapped := atomic.CompareAndSwapUint64(&share,1,2)
fmt.Println("goroutine 2",swapped)
}()
// 協(xié)程3,期望值是2,欲更新的值是1
go func() {
defer wg.Done()
time.Sleep(1 * time.Millisecond)
swapped := atomic.CompareAndSwapUint64(&share,2,1)
fmt.Println("goroutine 3",swapped)
}()
wg.Wait()
fmt.Println("main exit")
}
總結
原子操作是并發(fā)編程的一個基礎,也是為我學習sync.once打基礎,好啦,現(xiàn)在你們應該知道下篇文章的內容是什么啦,敬請期待~。
推薦閱讀
