golang并發(fā)底層實(shí)現(xiàn)竟然都是它?。?!
《手摸手系列》把go sync包中的并發(fā)組件已經(jīng)寫完了,本文作為完結(jié)篇,最后再來(lái)探討下go運(yùn)行時(shí)鎖的實(shí)現(xiàn)。記得在《手摸手Go 并發(fā)編程的基建Semaphore》那篇中我們聊過sync.Mutex最終是依賴sema.go中實(shí)現(xiàn)的sleep和wakeup原語(yǔ)來(lái)實(shí)現(xiàn)的。如果細(xì)心的小伙伴會(huì)發(fā)現(xiàn):
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait uint32 // Number of waiters. Read w/o the lock.
}
semaRoot里也定義了一個(gè)mutex。正所謂“醫(yī)者不能自醫(yī)”,故而Go針對(duì)運(yùn)行時(shí)的并發(fā)實(shí)現(xiàn)了一個(gè)簡(jiǎn)版的互斥鎖mutex。具體怎么 玩的呢?先來(lái)回顧一下:
基本使用
我們以sema.go中的semacquire1方法為例,mutex的使用如下:
// Easy case.
if cansemacquire(addr) {
return
}
... ...
lockWithRank(&root.lock, lockRankRoot)
... ...
if cansemacquire(addr) {
....
unlock(&root.lock)
break
}
... ...
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
// goparkunlock里的鉤子里藏著unlock方法調(diào)用
func parkunlock_c(gp *g, lock unsafe.Pointer) bool {
unlock((*mutex)(lock))
return true
}
上述邏輯正如之前所說:
先通過 CAS快速判斷是否獲取信號(hào)量,成功直接返回;若 CAS失敗,則嘗試獲取排他鎖,因?yàn)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">sync.Mutex是基于sema.go實(shí)現(xiàn),故而這里使用了運(yùn)行時(shí)的mutex來(lái)進(jìn)入臨界區(qū)獲得運(yùn)行時(shí)鎖則再次嘗試 CAS成功則解除運(yùn)行時(shí)排他鎖返回;否則調(diào)用gopark掛起當(dāng)前協(xié)程等待喚醒
那么Go的運(yùn)行時(shí)排他鎖是如何實(shí)現(xiàn)的呢?先來(lái)看看其數(shù)據(jù)結(jié)構(gòu)
數(shù)據(jù)結(jié)構(gòu)
mutex結(jié)構(gòu)體比較簡(jiǎn)單,只有一個(gè)用于運(yùn)行時(shí)靜態(tài)鎖排名的lockRankStruct和一個(gè)uintpter的key(根據(jù)不同平臺(tái)實(shí)現(xiàn)方案用處不同)。在無(wú)競(jìng)爭(zhēng)的情況下,它跟自旋鎖一樣快,類似的也是在用戶空間利用atomic.Cas來(lái)嘗試搶占鎖,失敗才會(huì)掉進(jìn)系統(tǒng)調(diào)用,進(jìn)行內(nèi)核中休眠。
// 互斥鎖。
// 零值mutex是未上鎖狀態(tài)(不需要初始化鎖)
// 初始化有利于靜態(tài)鎖定級(jí) 但是不是必須的
type mutex struct {
// 如果禁用了鎖定級(jí)則為空結(jié)構(gòu)體否則包括鎖定等級(jí)
lockRankStruct
// 基于futex的實(shí)現(xiàn)將其看作uint32的key,
// 而基于sema的實(shí)現(xiàn)將其看作M* waitm
// 過去曾經(jīng)是一個(gè)union,但是unions破壞了精確的GC
key uintptr
}
lockRankStruct提供了一種運(yùn)行時(shí)的靜態(tài)鎖排名的機(jī)制。靜態(tài)鎖排名會(huì)建立文件化的鎖之間的總排序順序,如果違反總的排序,則會(huì)報(bào)錯(cuò)。只要鎖排序是按照文檔設(shè)計(jì)的順序,鎖排序死鎖就不會(huì)發(fā)生。如果要做Go運(yùn)行時(shí)使用這個(gè)機(jī)制,你需要設(shè)置
GOEXPERIMENT=staticlockranking。默認(rèn)未開啟,此時(shí)
lockRanStruct是一個(gè)空結(jié)構(gòu)體。lockWithRank()等效于lock()
mutex跟常規(guī)的鎖一樣提供了Lock和Unlock方法,不過底層根據(jù)平臺(tái)不同實(shí)現(xiàn)也不同,總結(jié)起來(lái)大致分了三類:
dragonfly freebsd linux平臺(tái)下,根據(jù)具體平臺(tái)實(shí)現(xiàn)下面兩個(gè)方法
// 從原子上講,如果 *addr == val { sleep },可能發(fā)生虛假喚醒,但是這是被允許的
// 休眠時(shí)間不超過ns,ns<0表示永遠(yuǎn)休眠
futexsleep(addr *uint32, val uint32, ns int64)
//如果有任何進(jìn)程在addr上休眠,則最多喚醒cnt
futexwakeup(addr *uint32, cnt uint32)
aix darwin nacl netbsd openbsd plan9 solaris windows平臺(tái)下,根據(jù)具體平臺(tái)實(shí)現(xiàn)下面三個(gè)方法
// 如果尚未創(chuàng)建mp信號(hào)量,則為其創(chuàng)建一個(gè)
func semacreate(mp *m)
// 如果ns<0,則獲取m的信號(hào)量并返回0
// 如果ns>=0,則在ns納秒內(nèi)嘗試獲取m的信號(hào)量
// 如果獲取了信號(hào)量返回0 如果中斷或超時(shí)返回-1
func semasleep(ns int64) int32
//喚醒已經(jīng)或即將在其信號(hào)量上休眠的mp
func semawakeup(mp *m)
js,wasm平臺(tái)下的實(shí)現(xiàn),因?yàn)閖s/wasm還不支持線程,所以不存在搶占。
因?yàn)榻^大多數(shù)生產(chǎn)環(huán)境都是Linux平臺(tái)下,所以今天主要來(lái)看看Linux的具體實(shí)現(xiàn),其主要利用系統(tǒng)內(nèi)核提供的futex來(lái)實(shí)現(xiàn),好了 上代碼:
Lock
// +build !goexperiment.staticlockranking即staticlockranking未開啟的情況下,// +build dragonfly freebsd linux的實(shí)現(xiàn)如下
//lock_futex.go
func lock(l *mutex) {
lockWithRank(l, getLockRank(l))
}
//go:nosplit
func lockWithRank(l *mutex, rank lockRank) {
lock2(l)
}
func lock2(l *mutex) {
gp := getg()
if gp.m.locks < 0 {
throw("runtime·lock: lock count")
}
//g綁定的m的lock屬性自增加一
gp.m.locks++
// 投機(jī)搶占鎖 運(yùn)氣好搶到直接返回 不需要進(jìn)行內(nèi)核調(diào)用
v := atomic.Xchg(key32(&l.key), mutex_locked)
if v == mutex_unlocked {
return
}
// wait為MUTEX_LOCKED還是MUTEX_SLEEPING取決于此互斥對(duì)象mutex是否存在線程在休眠
// 如果我們?cè)?jīng)將l->key從MUTEX_SLEEPING更改為其他值,則在返回之前必須小心將其更改回MUTEX_SLEEPING,
// 以確保休眠線程得到其喚醒調(diào)用
wait := v
// 單處理器上 無(wú)自旋
// 多處理器上,自旋進(jìn)行ACTIVE_SPIN嘗試
spin := 0
if ncpu > 1 {
spin = active_spin
}
for {
// 嘗試鎖定 自旋
for i := 0; i < spin; i++ {
for l.key == mutex_unlocked {
if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
return
}
}
procyield(active_spin_cnt)
}
// 嘗試鎖定 重新調(diào)度
for i := 0; i < passive_spin; i++ {
for l.key == mutex_unlocked {
if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
return
}
}
osyield()
}
// 休眠
v = atomic.Xchg(key32(&l.key), mutex_sleeping)
if v == mutex_unlocked {
return
}
wait = mutex_sleeping
futexsleep(key32(&l.key), mutex_sleeping, -1)
}
}
procyield只是執(zhí)行30次PAUSE
active_spin_cnt = 30
procyield(active_spin_cnt)
// asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX // 將30放入AX寄存器
again:
PAUSE
SUBL $1, AX // 每次減一
JNZ again
RET
linux系統(tǒng)下osyield()為系統(tǒng)調(diào)用的封裝
osyield()
//sys_linux_amd64.s
TEXT runtime·osyield(SB),NOSPLIT,$0
MOVL $SYS_sched_yield, AX
SYSCALL
RET
可以看到Lock的邏輯也并不復(fù)雜:
首先給當(dāng)前g綁定的m的lock屬性自增加一,禁止P被搶占 atomic.Xchg嘗試修改mutex.key為mutex_locked,獲取原始值為mutex_unlocked表明獲取鎖,直接返回;否則失敗繼續(xù)執(zhí)行進(jìn)入for死循環(huán) 首先如果時(shí)多核機(jī)器,嘗試進(jìn)行自旋+ atomic.Cas獲取鎖,嘗試自旋4次每次失敗執(zhí)行30次PAUSE, 成功則返回調(diào)用 osyield()嘗試一次重新調(diào)度嘗試休眠,如果此時(shí)鎖資源被釋放 則獲取鎖直接返回 調(diào)用 futexsleep()進(jìn)入休眠,休眠失敗在從步驟1繼續(xù)嘗試 直到獲取鎖 或 休眠成功為止
可以看出Lock還是盡量先通過自旋、CAS、osyield(),實(shí)在沒辦法才進(jìn)入futexsleep(),為什么呢?下面說
接下來(lái)我們來(lái)一起看看關(guān)鍵的futexsleep()如何實(shí)現(xiàn)的:
//src/runtime/os_linux.go
// Atomically,
// if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
// Don't sleep longer than ns; ns < 0 means forever.
//go:nosplit
func futexsleep(addr *uint32, val uint32, ns int64) {
// Some Linux kernels have a bug where futex of
// FUTEX_WAIT returns an internal error code
// as an errno. Libpthread ignores the return value
// here, and so can we: as it says a few lines up,
// spurious wakeups are allowed.
if ns < 0 {
futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
return
}
var ts timespec
ts.setNsec(ns)
futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}
//go:noescape
func futex(addr unsafe.Pointer, op int32, val uint32, ts, addr2 unsafe.Pointer, val3 uint32) int32
可以看到,futex_sleep()最終調(diào)用了沒有函數(shù)體的futex()方法,其匯編實(shí)現(xiàn)如下:
// src/runtime/sys_linux_amd64.s
// int64 futex(int32 *uaddr, int32 op, int32 val,
// struct timespec *timeout, int32 *uaddr2, int32 val2);
TEXT runtime·futex(SB),NOSPLIT,$0
MOVQ addr+0(FP), DI
MOVL op+8(FP), SI
MOVL val+12(FP), DX
MOVQ ts+16(FP), R10
MOVQ addr2+24(FP), R8
MOVL val3+32(FP), R9
MOVL $SYS_futex, AX
SYSCALL
MOVL AX, ret+40(FP)
RET
Linux環(huán)境下,系統(tǒng)調(diào)用是原生支持的(darwin上需要通過cgo來(lái)完成),所以runtime.futex直接通過匯編來(lái)實(shí)現(xiàn)了,通過將$SYS_futex,也就是202號(hào)系統(tǒng)調(diào)用編號(hào)送入了AX,然后通過SYSCALL來(lái)完成對(duì)futex的系統(tǒng)調(diào)用。
linux下futex是如何實(shí)現(xiàn)的呢?
從Linux內(nèi)核看futex
啥是futex?
futex(Fast Userspace muTEX快速用戶空間互斥體)是低級(jí)同步原語(yǔ),為高級(jí)API提支持。它的目標(biāo)是在無(wú)競(jìng)爭(zhēng)的情況下不進(jìn)入內(nèi)核或不分配內(nèi)核資源。它第一次出現(xiàn)在Linux內(nèi)核開發(fā)的2.5.7版本,其語(yǔ)義在2.5.40版本固定下來(lái),然后在2.6.x系列穩(wěn)定版內(nèi)核中出現(xiàn)。
futex出現(xiàn)之前,linux的同步機(jī)制主要分為兩類:
用戶態(tài)的同步機(jī)制 利用原子指令實(shí)現(xiàn)自旋鎖,但不適用于臨界區(qū)執(zhí)行時(shí)間過長(zhǎng)的場(chǎng)景 內(nèi)核同步機(jī)制 如semaphore等,使用自旋+等待的形式,lock和unlock都是系統(tǒng)調(diào)用,即使沒有沖突也要通過系統(tǒng)調(diào)用進(jìn)入內(nèi)核之后才能識(shí)別。
理想狀態(tài)下,在無(wú)沖突的情況下在用戶空間通過自旋鎖解決,如果仍未拿到鎖資源需要等待時(shí)再通過系統(tǒng)調(diào)用完成sleep和wake的語(yǔ)義。那么自旋失敗的情況下,將當(dāng)前線程掛起,然后是不是由持有鎖的線程釋放鎖的時(shí)候再將其喚醒呢?帶著這個(gè)疑問我們一起粗略看看linux內(nèi)核如何實(shí)現(xiàn)futex的吧
數(shù)據(jù)結(jié)構(gòu)
Linux內(nèi)核實(shí)現(xiàn)主要在kernel/futex.c和include/linux/futex.h,翻了下3.10版本內(nèi)核源碼,futex大體是所以它的結(jié)構(gòu)跟Java中的HashMap有些類似。futex_hash_bucket為hash桶,futex_q為存儲(chǔ)的元素實(shí)體結(jié)構(gòu)。如下圖
/**
* struct futex_q - futex等待隊(duì)列實(shí)體,每個(gè)等待任務(wù)分配一個(gè)
* @list: 等待在futex上帶優(yōu)先級(jí)鏈表
* @task: 等待在futex上的任務(wù)
* @lock_ptr: 哈希桶自旋鎖指針
* @key: the key the futex is hashed on
* @pi_state: optional priority inheritance state
* @rt_waiter: rt_waiter storage for use with requeue_pi
* @requeue_pi_key: the requeue_pi target futex key
* @bitset: bitset for the optional bitmasked wakeup
*
* We use this hashed waitqueue, instead of a normal wait_queue_t, so
* we can wake only the relevant ones (hashed queues may be shared).
*
* A futex_q has a woken state, just like tasks have TASK_RUNNING.
* It is considered woken when plist_node_empty(&q->list) || q->lock_ptr == 0.
* The order of wakeup is always to make the first condition true, then
* the second.
*
* PI futexes are typically woken before they are removed from the hash list via
* the rt_mutex code. See unqueue_me_pi().
*/
struct futex_q {
struct plist_node list;// 等待在futex上按照優(yōu)先級(jí)排序的任務(wù)列表
struct task_struct *task; //等待在futex上的任務(wù)
spinlock_t *lock_ptr; //哈希桶自旋鎖指針
union futex_key key; //futex進(jìn)行hash的key
struct futex_pi_state *pi_state; //可選的 優(yōu)先級(jí)繼承狀態(tài) 控制優(yōu)先級(jí)繼承
struct rt_mutex_waiter *rt_waiter;//
union futex_key *requeue_pi_key;
u32 bitset; //掩碼匹配
};
/*
* Hash buckets are shared by all the futex_keys that hash to the same
* location. Each key may have multiple futex_q structures, one for each task
* waiting on a futex.
*/
struct futex_hash_bucket {
spinlock_t lock;
struct plist_head chain;
};
static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS];

外部調(diào)用入口
從mutex源碼分析,我們得出Go最終會(huì)通過系統(tǒng)調(diào)用內(nèi)核提供的futex。futex原型為:
#include <linux/futex.h>
#include <stdint.h>
#include <sys/time.h>
long futex(uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout, /* or: uint32_t val2 */
uint32_t *uaddr2, uint32_t val3);
futex方法有6個(gè)入?yún)ⅲ?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">*uaddr表示futex字的指針;futex表示針對(duì)futex的具體操作類型;val的含義依賴于flags參數(shù);*timeout表示超時(shí)操作的超時(shí)時(shí)間;*uaddr2跟操作有關(guān),必填時(shí)指向操作使用的第二個(gè)futex字;val3取決于具體操作。futex_op取值有很多,這里我們關(guān)心的主要是FUTEX_WAIT和FUTEX_WAKE。
從kernel/futex.c中可以發(fā)現(xiàn)系統(tǒng)調(diào)用最終會(huì)調(diào)用do_futex,而do_futex最終通過op位運(yùn)算生成cmd來(lái)確定具體的調(diào)用的futex的形式。
//kernel/futex.c
SYSCALL_DEFINE6(futex, u32 __user *, uaddr, int, op, u32, val,
struct timespec __user *, utime, u32 __user *, uaddr2,
u32, val3)
{
struct timespec ts;
ktime_t t, *tp = NULL;
u32 val2 = 0;
int cmd = op & FUTEX_CMD_MASK;
if (utime && (cmd == FUTEX_WAIT || cmd == FUTEX_LOCK_PI ||
cmd == FUTEX_WAIT_BITSET ||
cmd == FUTEX_WAIT_REQUEUE_PI)) {
if (copy_from_user(&ts, utime, sizeof(ts)) != 0)
return -EFAULT;
if (!timespec_valid(&ts))
return -EINVAL;
t = timespec_to_ktime(ts);
if (cmd == FUTEX_WAIT)
t = ktime_add_safe(ktime_get(), t);
tp = &t;
}
/*
* requeue parameter in 'utime' if cmd == FUTEX_*_REQUEUE_*.
* number of waiters to wake in 'utime' if cmd == FUTEX_WAKE_OP.
*/
if (cmd == FUTEX_REQUEUE || cmd == FUTEX_CMP_REQUEUE ||
cmd == FUTEX_CMP_REQUEUE_PI || cmd == FUTEX_WAKE_OP)
val2 = (u32) (unsigned long) utime;
return do_futex(uaddr, op, val, tp, uaddr2, val2, val3);
}
long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
u32 __user *uaddr2, u32 val2, u32 val3)
{
int cmd = op & FUTEX_CMD_MASK;
unsigned int flags = 0;
if (!(op & FUTEX_PRIVATE_FLAG))
flags |= FLAGS_SHARED;
if (op & FUTEX_CLOCK_REALTIME) {
flags |= FLAGS_CLOCKRT;
if (cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI)
return -ENOSYS;
}
switch (cmd) {
case FUTEX_LOCK_PI:
case FUTEX_UNLOCK_PI:
case FUTEX_TRYLOCK_PI:
case FUTEX_WAIT_REQUEUE_PI:
case FUTEX_CMP_REQUEUE_PI:
if (!futex_cmpxchg_enabled)
return -ENOSYS;
}
switch (cmd) {
case FUTEX_WAIT:
val3 = FUTEX_BITSET_MATCH_ANY;
case FUTEX_WAIT_BITSET:
// 若*uaddr==val則阻塞當(dāng)前進(jìn)程/線程 放入阻塞隊(duì)列
return futex_wait(uaddr, flags, val, timeout, val3);
case FUTEX_WAKE:
val3 = FUTEX_BITSET_MATCH_ANY;
case FUTEX_WAKE_BITSET:
// 最多喚醒val個(gè)在uaddr等待的線程/進(jìn)程
return futex_wake(uaddr, flags, val, val3);
case FUTEX_REQUEUE:
return futex_requeue(uaddr, flags, uaddr2, val, val2, NULL, 0);
case FUTEX_CMP_REQUEUE:
return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 0);
case FUTEX_WAKE_OP:
return futex_wake_op(uaddr, flags, uaddr2, val, val2, val3);
case FUTEX_LOCK_PI:
return futex_lock_pi(uaddr, flags, val, timeout, 0);
case FUTEX_UNLOCK_PI:
return futex_unlock_pi(uaddr, flags);
case FUTEX_TRYLOCK_PI:
return futex_lock_pi(uaddr, flags, 0, timeout, 1);
case FUTEX_WAIT_REQUEUE_PI:
val3 = FUTEX_BITSET_MATCH_ANY;
return futex_wait_requeue_pi(uaddr, flags, val, timeout, val3,
uaddr2);
case FUTEX_CMP_REQUEUE_PI:
return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 1);
}
return -ENOSYS;
}
故而,根據(jù)參數(shù)分析,go運(yùn)行時(shí)調(diào)用的futexsleep()和futexwakeup()方法
// 原子上,如果 *addr == val { sleep } ;虛假喚醒是被允許的;不會(huì)阻塞超過ns,ns<0表示永遠(yuǎn)休眠
futexsleep(addr *uint32, val uint32, ns int64)
//如果任何線程阻塞在addr上,則喚醒至少cnt個(gè)阻塞的任務(wù)
futexwakeup(addr *uint32, cnt uint32)
實(shí)則對(duì)應(yīng)了futex()中futex_op為FUTEX_WAIT和FUTEX_WAKE的調(diào)用,接下來(lái),我們主要來(lái)分析下這兩種操作的大概流程。
futex_wait
futex()方法中的futex_op為FUTEX_WAIT時(shí)會(huì)調(diào)用``futex_wait()方法。主要邏輯為判斷*uaddr指向的值和val值若相等,則將當(dāng)前線程/進(jìn)程阻塞在uaddr所指向的futex`的值上,放入等待隊(duì)列。
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
struct hrtimer_sleeper timeout, *to = NULL;
struct restart_block *restart;
struct futex_hash_bucket *hb;
struct futex_q q = futex_q_init;
int ret;
...
// 設(shè)置定時(shí)任務(wù),超時(shí)abs_time后線程仍未被喚醒,則由定時(shí)任務(wù)喚醒它
if (abs_time) {
to = &timeout;
hrtimer_init_on_stack(&to->timer, (flags & FLAGS_CLOCKRT) ?
CLOCK_REALTIME : CLOCK_MONOTONIC,
HRTIMER_MODE_ABS);
hrtimer_init_sleeper(to, current);
hrtimer_set_expires_range_ns(&to->timer, *abs_time,
current->timer_slack_ns);
}
retry:
// 準(zhǔn)備等待在uaddr上 如果成功則持有hash_bucket的鎖
// 比較uaddr和期望值val是否相等 并初始化q.key
ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
if (ret)//val被改變直接返回
goto out;
// 插入futex等待隊(duì)列,等待超時(shí)或通過信號(hào)來(lái)喚醒
futex_wait_queue_me(hb, &q, to);
/* If we were woken (and unqueued), we succeeded, whatever. */
ret = 0;
/* unqueue_me() drops q.key ref */
// 若unqueue_me成功,則說明是超時(shí)觸發(fā)(因?yàn)閒utex_wake喚醒時(shí),會(huì)將該進(jìn)程移除等待隊(duì)列,所以這里會(huì)失敗)
if (!unqueue_me(&q))
goto out;
ret = -ETIMEDOUT;
if (to && !to->task)
goto out;
/*
* We expect signal_pending(current), but we might be the
* victim of a spurious wakeup as well.
*/
if (!signal_pending(current))
goto retry;
ret = -ERESTARTSYS;
if (!abs_time)
goto out;
restart = ¤t_thread_info()->restart_block;
restart->fn = futex_wait_restart;
restart->futex.uaddr = uaddr;
restart->futex.val = val;
restart->futex.time = abs_time->tv64;
restart->futex.bitset = bitset;
restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;
ret = -ERESTART_RESTARTBLOCK;
out:
if (to) {
// 取消定時(shí)任務(wù)
hrtimer_cancel(&to->timer);
destroy_hrtimer_on_stack(&to->timer);
}
return ret;
}
futex_wait()首先設(shè)置定時(shí)任務(wù),若超時(shí)abs_time后線程仍未被喚醒,則由定時(shí)任務(wù)喚醒它;然后進(jìn)入goto retry循環(huán):
調(diào)用
futex_wait_setup(uaddr, val, flags, &q, &hb)根據(jù)uaddrhash找到futex_hash_bucket,并初始化futex_q.key。調(diào)用
futex_wait_queue_me(hb, &q, to)插入futex等待隊(duì)列,等待超時(shí)或被信號(hào)喚醒
/**
* futex_wait_setup() - 準(zhǔn)備等待在一個(gè)futex變量上
* @uaddr: futex用戶空間地址
* @val: 期望的值
* @flags: futex標(biāo)識(shí) (FLAGS_SHARED, etc.)
* @q: 關(guān)聯(lián)的futex_q
* @hb: hash_bucket的指針 返回給調(diào)用者
*
* Setup the futex_q and locate the hash_bucket. Get the futex value and
* compare it with the expected value. Handle atomic faults internally.
* Return with the hb lock held and a q.key reference on success, and unlocked
* with no q.key reference on failure.
*
* Return:
* 0 - uaddr contains val and hb has been locked;
* <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked
*/
static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
struct futex_q *q, struct futex_hash_bucket **hb)
{
u32 uval;
int ret;
/*
* Access the page AFTER the hash-bucket is locked.
* Order is important:
*
* Userspace waiter: val = var; if (cond(val)) futex_wait(&var, val);
* Userspace waker: if (cond(var)) { var = new; futex_wake(&var); }
*
* The basic logical guarantee of a futex is that it blocks ONLY
* if cond(var) is known to be true at the time of blocking, for
* any cond. If we locked the hash-bucket after testing *uaddr, that
* would open a race condition where we could block indefinitely with
* cond(var) false, which would violate the guarantee.
*
* On the other hand, we insert q and release the hash-bucket only
* after testing *uaddr. This guarantees that futex_wait() will NOT
* absorb a wakeup if *uaddr does not match the desired values
* while the syscall executes.
*/
retry:
//初始化futex_q 填充q->key
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key, VERIFY_READ);
if (unlikely(ret != 0))
return ret;
retry_private:
//1.對(duì)q->key進(jìn)行hash 然后通過&futex_queues[hash & ((1 << FUTEX_HASHBITS)-1)]找到futex_hash_bucket
//2. spin_lock(&hb->lock)獲得自旋鎖
*hb = queue_lock(q);
//原子的將uaddr的值拷貝到uval中
ret = get_futex_value_locked(&uval, uaddr);
if (ret) { //拷貝操作失敗 重試
...
goto retry;
}
// 如果uval的值(即uaddr)不等于期望值val 則表明其他線程在修改
// 直接返回?zé)o需等待
if (uval != val) {
queue_unlock(q, *hb);
ret = -EWOULDBLOCK;
}
out:
if (ret)
put_futex_key(&q->key);
return ret;
}
futex_wait_setup()方法主要是為阻塞在uaddr上做準(zhǔn)備,主要步驟:
初始化 futex_q,并初始化futex_q.key的引用,對(duì) futex_q.key進(jìn)行hash通過&futex_queues[hash & ((1 << FUTEX_HASHBITS)-1)]找到futex_hash_bucket調(diào)用spin_lock(&hb->lock)嘗試獲得自旋鎖 失敗則進(jìn)行重試回到步驟1 判斷 *uaddr的值跟val是否相等,不相等說明其他線程在修改則釋放持有的hb.lock自旋鎖,返回-EWOULDBLOCK(\#define EWOULDBLOCK 246 /* Operation would block (Linux returns EAGAIN) */即linux中的EAGAIN)
/**
* futex_wait_queue_me() - queue_me() and wait for wakeup, timeout, or signal
* @hb: the futex hash bucket, must be locked by the caller
* @q: the futex_q to queue up on
* @timeout: the prepared hrtimer_sleeper, or null for no timeout
*/
static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
struct hrtimer_sleeper *timeout)
{
// task狀態(tài)保證在另一個(gè)task喚醒它之前被設(shè)置,set_current_state利用set_mb()實(shí)現(xiàn)
// 設(shè)置task狀態(tài)為TASK_INTERRUPTIBLE CPU只會(huì)調(diào)度狀態(tài)為TASK_RUNNING的任務(wù)
set_current_state(TASK_INTERRUPTIBLE);
//將q插入到等待隊(duì)列中然后釋放自旋鎖
queue_me(q, hb);
//啟動(dòng)定時(shí)器
if (timeout) {
hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
if (!hrtimer_active(&timeout->timer))
timeout->task = NULL;
}
/*
* If we have been removed from the hash list, then another task
* has tried to wake us, and we can skip the call to schedule().
*/
if (likely(!plist_node_empty(&q->list))) {
/*
* If the timer has already expired, current will already be
* flagged for rescheduling. Only call schedule if there
* is no timeout, or if it has yet to expire.
*/
// 未設(shè)置過期時(shí)間 或過期時(shí)間還未到期才進(jìn)行調(diào)度
if (!timeout || timeout->task)
//系統(tǒng)重新進(jìn)行調(diào)度,此時(shí)CPU會(huì)去執(zhí)行其他任務(wù),當(dāng)前任務(wù)會(huì)被阻塞
schedule();
}
// 走到這里說明當(dāng)前任務(wù)被CPU選中執(zhí)行
__set_current_state(TASK_RUNNING);
}
futex_wait_queue_me主要做了幾件事:
將設(shè)置task狀態(tài)為 TASK_INTERRUPTIBLE(CPU只會(huì)調(diào)度狀態(tài)為TASK_RUNNING的任務(wù))調(diào)用 queueme()將futex_q插入到等待隊(duì)列啟動(dòng)定時(shí)任務(wù) 若未設(shè)置過期時(shí)間或過期時(shí)間未到期 重新調(diào)度進(jìn)程 獲的執(zhí)行資格 設(shè)置task狀態(tài)為 TASK_RUNNING
總結(jié)一下futex_wait的工作機(jī)制:futex_wait_setup()方法會(huì)根據(jù)futex_q.keyhash找到對(duì)應(yīng)futex_hash_bucket,并通過spin_lock(futex_hash_bucket.lock)獲取自旋鎖;futex_wait_queue_me()方法先是將task設(shè)置為TASK_INTERRUPTIBLE然后調(diào)用queue_me()將futex_q入隊(duì),然后才spin_unlock(&hb->lock);釋放持有的自旋鎖。也就是說檢查*uaddr值和進(jìn)程/線程掛起放在了一個(gè)臨界區(qū)中,保證了條件和等待之間的原子性。
futex_wake
futex_wake()喚醒阻塞在*uaddr上的任務(wù)。
/*
* Wake up waiters matching bitset queued on this futex (uaddr).
*/
static int
futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
struct futex_hash_bucket *hb;
struct futex_q *this, *next;
struct plist_head *head;
union futex_key key = FUTEX_KEY_INIT;
int ret;
if (!bitset)
return -EINVAL;
//根據(jù)uaddr的值填充&key的內(nèi)容
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, VERIFY_READ);
if (unlikely(ret != 0))
goto out;
//根據(jù)&key獲取對(duì)應(yīng)uaddr所在的futex_hash_bucket
hb = hash_futex(&key);
//對(duì)該hb加自旋鎖
spin_lock(&hb->lock);
head = &hb->chain;
//遍歷該hb的鏈表 注意鏈表中存儲(chǔ)的節(jié)點(diǎn)時(shí)plist_node類型,而這里的this卻是futex_q類型
//這種類型轉(zhuǎn)換是通過c中的container_of機(jī)制實(shí)現(xiàn)的
plist_for_each_entry_safe(this, next, head, list) {
if (match_futex (&this->key, &key)) {
if (this->pi_state || this->rt_waiter) {
ret = -EINVAL;
break;
}
/* Check if one of the bits is set in both bitsets */
if (!(this->bitset & bitset))
continue;
//喚醒對(duì)應(yīng)進(jìn)程
wake_futex(this);
if (++ret >= nr_wake)
break;
}
}
//釋放自旋鎖
spin_unlock(&hb->lock);
put_futex_key(&key);
out:
return ret;
}
關(guān)于wake_futex,調(diào)用wake_futex必須持有futex_hash_bucket的自旋鎖,之后不能訪問入?yún)⒌?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">futex_q
static void wake_futex(struct futex_q *q)
{
struct task_struct *p = q->task;
if (WARN(q->pi_state || q->rt_waiter, "refusing to wake PI futex\n"))
return;
// 在喚醒任務(wù)之前將q->lock_ptr設(shè)置為NULL
// 如果另一個(gè)CPU發(fā)生了非futex方式的喚醒,則任務(wù)可能退出。p解引用則是一個(gè)不存在的task結(jié)構(gòu)體
// 為避免這種case,需要持有p引用來(lái)進(jìn)行喚醒
get_task_struct(p);
//將q出隊(duì)列
__unqueue_futex(q);
/*
* 只要寫入q-> lock_ptr = NULL,等待的任務(wù)就可以釋放futex_q,而無(wú)需獲取任何鎖。
* 這里需要一個(gè)內(nèi)存屏障,以防止lock_ptr的后續(xù)存儲(chǔ)超越plist_del。
*/
smp_wmb();
q->lock_ptr = NULL;
// 將TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE狀態(tài)的task喚醒
wake_up_state(p, TASK_NORMAL);
// task釋放futex_q
put_task_struct(p);
}
futex_wake()流程如下:
根據(jù) *uaddr填充futex_q->key,調(diào)用hash_futex(&key);查找對(duì)應(yīng)的futex_hash_bucket調(diào)用 spin_lock(&hb->lock);嘗試獲取hb的自旋鎖遍歷 futex_hash_bucket掛載的鏈表,找到uaddr對(duì)應(yīng)的節(jié)點(diǎn)調(diào)用 wake_futex喚起等待的進(jìn)程,其實(shí)就是將task設(shè)置為TASK_RUNNING并放入調(diào)度隊(duì)列中等待CPU調(diào)度同時(shí)釋放futex_q釋放自旋鎖
此外futex同步機(jī)制即可用于線程之間同步,也可用于進(jìn)程之間同步。
用于線程比較簡(jiǎn)單,因?yàn)榫€程共享虛擬內(nèi)存空間,futex變量由唯一的虛擬地址表示,線程即可用虛擬地址訪問futex變量 用于進(jìn)程稍微復(fù)雜一些,因?yàn)檫M(jìn)程間是分配的獨(dú)立的虛擬內(nèi)存地址空間。需要通過 mmap讓進(jìn)程可以共享一段地址空間來(lái)使用futex變量。故而每個(gè)進(jìn)程訪問futex的虛擬地址不一樣,但是操作系統(tǒng)知道所有這些虛擬地址映射到同一個(gè)表示futex變量的物理地址。
Unlock
與Lock對(duì)應(yīng)操作,幫助喚醒阻塞的協(xié)程。
func unlock(l *mutex) {
unlockWithRank(l)
}
//go:nosplit
func unlockWithRank(l *mutex) {
unlock2(l)
}
func unlock2(l *mutex) {
v := atomic.Xchg(key32(&l.key), mutex_unlocked)
if v == mutex_unlocked {
throw("unlock of unlocked lock")
}
if v == mutex_sleeping {
futexwakeup(key32(&l.key), 1)
}
gp := getg()
gp.m.locks--
if gp.m.locks < 0 {
throw("runtime·unlock: lock count")
}
if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstack
gp.stackguard0 = stackPreempt
}
}
其邏輯相比Lock要更簡(jiǎn)單一些:
通過 atomic.Xchg更改mutex.key值 若為mutex_unlocked說明當(dāng)前沒有鎖占用直接panic;若為 mutex_sleeping則調(diào)用futexwakeup()喚醒阻塞的任務(wù)將當(dāng)前g綁定的m的locks屬性減一 解除禁止搶占
關(guān)于futexwakeup()最終也是調(diào)用了futex()方法進(jìn)而通過系統(tǒng)調(diào)用linux的futex實(shí)現(xiàn)。
//src/runtime/os_linux.go
// If any procs are sleeping on addr, wake up at most cnt.
//go:nosplit
func futexwakeup(addr *uint32, cnt uint32) {
ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0)
if ret >= 0 {
return
}
// I don't know that futex wakeup can return
// EAGAIN or EINTR, but if it does, it would be
// safe to loop and call futex again.
systemstack(func() {
print("futexwakeup addr=", addr, " returned ", ret, "\n")
})
*(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006
}
具體futexwakeup的語(yǔ)義內(nèi)核實(shí)現(xiàn)可以參考,futex的原理分析。
總結(jié)
理想狀態(tài)下,在無(wú)沖突的情況下,在用戶空間通過自旋鎖(yield)即可解決資源競(jìng)爭(zhēng)同步問題,這也是我們所期盼的畢竟用戶態(tài)的操作開銷相對(duì)較?。坏且坏_突加劇或持有鎖的操作過長(zhǎng),自旋鎖的方式會(huì)讓CPU空轉(zhuǎn)浪費(fèi)寶貴的計(jì)算資源;進(jìn)而需要陷入系統(tǒng)調(diào)用將當(dāng)前線程/進(jìn)程進(jìn)行休眠,等鎖資源釋放再喚醒它們。關(guān)于futex,也有這么個(gè)說法
剛剛不是用戶態(tài)的自旋鎖已經(jīng)解決不了問題了嗎,你可能會(huì)想我們可以這么玩:
func lock() {
for !tryLock(state) {
wait() //系統(tǒng)調(diào)用 進(jìn)入休眠
}
}
func tryLock(state) bool {
i:=0
for !compareAndSwap(state, 0, 1) {
i++
if i>3 {
return false
}
}
return true
}
func unlock() {
compareAndSet(state,1,0)
wakeup() //系統(tǒng)調(diào)用喚醒
}
沒錯(cuò)思路很好,但是tryLock和wait中間存在一個(gè)時(shí)間窗口問題。比如,我們?cè)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">lock()之后調(diào)用wait()之前調(diào)用了unlock(),這個(gè)時(shí)候當(dāng)前線程調(diào)用wait()后就沒人管它了。怎么辦呢?為了解決這個(gè)問題,隨即引入了futex,多數(shù)語(yǔ)言的玩法都大同小異。
如有異議 請(qǐng)輕拍 歡迎交流。
如果閱讀過程中發(fā)現(xiàn)本文存疑或錯(cuò)誤的地方,可以關(guān)注公眾號(hào)留言。點(diǎn)贊在看人燦爛??
