JuiceFS 源碼閱讀-中
JuiceFS鎖的實(shí)現(xiàn)
JuiceFS的鎖實(shí)現(xiàn),目前同時(shí)實(shí)現(xiàn)了BSD locks(對應(yīng)Flock)和POSIX locks(對應(yīng)Setlk)。細(xì)節(jié)上最大區(qū)別就是BSD locks只能以FD為最小控制單位(簡單理解為單文件加鎖,鎖定的是文件描述符fd對應(yīng)的文件),而POSIX locks可以在一個(gè)文件中以文件的offset+length的方式進(jìn)行加鎖(按文件內(nèi)容進(jìn)行范圍加鎖)。
BSD locks
//pkg/vfs/vfs_unix.go
func Flock(ctx Context, ino Ino, fh uint64, owner uint64, typ uint32, block bool) (err syscall.Errno) {
var name string
var reqid uint32
defer func() { logit(ctx, "flock (%d,%d,%016X,%s,%t): %s", reqid, ino, owner, name, block, strerr(err)) }()
switch typ {
case syscall.F_RDLCK:
name = "LOCKSH"
case syscall.F_WRLCK:
name = "LOCKEX"
case syscall.F_UNLCK:
name = "UNLOCK"
default:
err = syscall.EINVAL
return
}
if IsSpecialNode(ino) {
err = syscall.EPERM
return
}
h := findHandle(ino, fh)
if h == nil {
err = syscall.EBADF
return
}
h.addOp(ctx)
defer h.removeOp(ctx)
err = m.Flock(ctx, ino, owner, typ, block) //核心在元數(shù)據(jù)部分的控制,具體參考下面部分代碼注釋
if err == 0 {
h.Lock()
if typ == syscall.F_UNLCK {
h.locks &= 2
} else {
h.locks |= 1
h.flockOwner = owner
}
h.Unlock()
}
return
}
//pkg/meta/sql_unix.go
func (m *dbMeta) Flock(ctx Context, inode Ino, owner uint64, ltype uint32, block bool) syscall.Errno {
if ltype == syscall.F_UNLCK {//如果為解鎖操作,則只需要?jiǎng)h除對應(yīng)的db記錄即可
return errno(m.txn(func(s *xorm.Session) error {
_, err := s.Delete(&flock{Inode: inode, Owner: owner, Sid: m.sid})
return err
}))
}
var err syscall.Errno
//循環(huán)處理加鎖請求,分為阻塞(block=true)和非阻塞兩種類型操作
for {
err = errno(m.txn(func(s *xorm.Session) error {
//獲取inode信息,避免鎖指向的對象不存在,成為空鎖。
if exists, err := s.Get(&node{Inode: inode}); err != nil || !exists {
if err == nil && !exists {
err = syscall.ENOENT
}
return err
}
//查詢inode關(guān)聯(lián)的全部鎖信息
rows, err := s.Rows(&flock{Inode: inode})
if err != nil {
return err
}
type key struct {
sid uint64
o uint64
}
var locks = make(map[key]flock)
var l flock
for rows.Next() {
if rows.Scan(&l) == nil {
//執(zhí)行迭代,將查詢結(jié)果臨時(shí)保存到locks數(shù)據(jù)結(jié)構(gòu)中
locks[key{l.Sid, l.Owner}] = l
}
}
rows.Close()
//判斷需要加鎖的類型是否為讀鎖,如果已經(jīng)有寫鎖則加鎖失敗
if ltype == syscall.F_RDLCK {
for _, l := range locks {
if l.Ltype == 'W' {
return syscall.EAGAIN
}
}
//沒有寫鎖沖突,則通過insert記錄加上讀鎖
return mustInsert(s, flock{Inode: inode, Owner: owner, Ltype: 'R', Sid: m.sid})
}
//加寫入鎖邏輯:先判斷是否已經(jīng)有寫入鎖(判斷l(xiāng)ocks中是否有重復(fù)鍵值),如果有則更新鎖的記錄,否則直接insert插入對應(yīng)的記錄
me := key{m.sid, owner}
_, ok := locks[me]
delete(locks, me)
if len(locks) > 0 {
return syscall.EAGAIN
}
if ok {
_, err = s.Cols("Ltype").Update(&flock{Ltype: 'W'}, &flock{Inode: inode, Owner: owner, Sid: m.sid})
} else {
err = mustInsert(s, flock{Inode: inode, Owner: owner, Ltype: 'W', Sid: m.sid})
}
return err
}))
//非阻塞or報(bào)錯(cuò)直接返回結(jié)果
if !block || err != syscall.EAGAIN {
break
}
//阻塞情況下加寫鎖,則等待固定時(shí)長再進(jìn)行下一輪加鎖操作
if ltype == syscall.F_WRLCK {
time.Sleep(time.Millisecond * 1)
} else {
time.Sleep(time.Millisecond * 10)
}
if ctx.Canceled() {
return syscall.EINTR
}
}
return err
}
POSIX locks
按pid進(jìn)行范圍加鎖,實(shí)現(xiàn)起來相對比較復(fù)雜,核心算法在updateLocks中實(shí)現(xiàn)。
func (m *dbMeta) Setlk(ctx Context, inode Ino, owner_ uint64, block bool, ltype uint32, start, end uint64, pid uint32) syscall.Errno {
var err syscall.Errno
lock := plockRecord{ltype, pid, start, end}//以pid為粒度,所以適合單機(jī)多進(jìn)/線程模型,跨節(jié)點(diǎn)不太合適
owner := int64(owner_)
for {
err = errno(m.txn(func(s *xorm.Session) error {
if exists, err := s.Get(&node{Inode: inode}); err != nil || !exists {
if err == nil && !exists {
err = syscall.ENOENT
}
return err
}
//unlock操作
if ltype == F_UNLCK {
//sid代表session ID,每個(gè)客戶端的數(shù)據(jù)庫連接都有一個(gè)獨(dú)立的ID實(shí)例
var l = plock{Inode: inode, Owner: owner, Sid: m.sid}
ok, err := m.engine.Get(&l) //按inode、owner、sid三個(gè)字段組合,查詢鎖列表
if err != nil {
return errno(err)
}
if !ok {
return nil
}
ls := loadLocks([]byte(l.Records)) //解析鎖列表信息
if len(ls) == 0 {
return nil
}
ls = updateLocks(ls, lock) //在已有所列表里面新增鎖記錄,有點(diǎn)復(fù)雜,之后詳細(xì)介紹
if len(ls) == 0 {
_, err = s.Delete(&plock{Inode: inode, Owner: owner, Sid: m.sid}) //刪除鎖記錄
} else {
_, err = s.Cols("records").Update(plock{Records: dumpLocks(ls)}, l) //更新已有所記錄
}
return err
}
//以inode為關(guān)鍵字,查找已有的鎖列表
rows, err := s.Rows(&plock{Inode: inode})
if err != nil {
return errno(err)
}
type key struct {
sid uint64
owner int64
}
var locks = make(map[key][]byte)
var l plock
//按查詢結(jié)果構(gòu)建鎖map
for rows.Next() {
if rows.Scan(&l) == nil {
locks[key{l.Sid, l.Owner}] = dup(l.Records)
}
}
rows.Close()
//遍歷map,判斷是否有沖突鎖
lkey := key{m.sid, owner}
for k, d := range locks {
if k == lkey {
continue
}
ls := loadLocks([]byte(d))
for _, l := range ls {
// find conflicted locks
if (ltype == F_WRLCK || l.ltype == F_WRLCK) && end > l.start && start < l.end {
return syscall.EAGAIN
}
}
}
ls := updateLocks(loadLocks([]byte(locks[lkey])), lock) //更新鎖列表信息
var n int64
//保存鎖列表記錄到DB
if len(locks[lkey]) > 0 {
n, err = s.Cols("records").Update(plock{Records: dumpLocks(ls)},
&plock{Inode: inode, Sid: m.sid, Owner: owner})
} else {
n, err = s.InsertOne(&plock{Inode: inode, Sid: m.sid, Owner: owner, Records: dumpLocks(ls)})
}
if err == nil && n == 0 {
err = fmt.Errorf("insert/update failed")
}
return err
}))
//如果加鎖失敗且不進(jìn)行阻塞,則直接返回結(jié)果
if !block || err != syscall.EAGAIN {
break
}
//加鎖失敗,阻塞,進(jìn)入下一輪操作
if ltype == F_WRLCK {
time.Sleep(time.Millisecond * 1)
} else {
time.Sleep(time.Millisecond * 10)
}
if ctx.Canceled() {
return syscall.EINTR
}
}
return err
}
updateLocks 的代碼邏輯如下,通過加上debug輸出,更加容易觀察其中細(xì)節(jié)
const (
F_UNLCK = syscall.F_UNLCK
F_RDLCK = syscall.F_RDLCK
F_WRLCK = syscall.F_WRLCK
)
type plockRecord struct {
ltype uint32
pid uint32
start uint64
end uint64
}
func insertLocks(ls []plockRecord, i int, nl plockRecord) []plockRecord {
//fmt.Println(i,"insertLocks before ls=",ls)
nls := make([]plockRecord, len(ls)+1)
copy(nls[:i], ls[:i])
nls[i] = nl
copy(nls[i+1:], ls[i:])
ls = nls
//fmt.Println(i,"insertLocks after ls=",ls)
return ls
}
func updateLocks(ls []plockRecord, nl plockRecord) []plockRecord {
// ls is ordered by l.start without overlap
var i int
for i < len(ls) && nl.end > nl.start {
l := ls[i]
if l.end < nl.start {
fmt.Println("新增鎖設(shè)定的區(qū)域超過當(dāng)前鎖范圍,查找下一個(gè)")
} else if l.start < nl.start {
//fmt.Println("l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
fmt.Println("1. 當(dāng)前鎖包含部分新鎖區(qū)域,拆分成兩個(gè)鎖,調(diào)整當(dāng)前鎖范圍從[",ls[i].start,"->",ls[i].end,"]調(diào)整為[",ls[i].start,"->",nl.start,"],并在當(dāng)前位置之后插入新鎖 [",nl.start,"->",l.end,"]")
//fmt.Println("1-> l.start=",l.start," < nl.start=",nl.start,"l.end=",l.end," < nl.end=",nl.end )
ls = insertLocks(ls, i+1, plockRecord{nl.ltype, nl.pid, nl.start, l.end})
ls[i].end = nl.start
i++
nl.start = l.end
} else if l.end < nl.end {
//fmt.Println("2-> l.end < nl.end","nl.start=",nl.start,"ls[i].start=",ls[i].start)
fmt.Println("2. 當(dāng)前鎖區(qū)間屬于新鎖區(qū)間,縮小當(dāng)前鎖范圍,從[",ls[i].start,"->",ls[i].end,"]調(diào)整為[",nl.start,"->",ls[i].end,"]")
ls[i].ltype = nl.ltype
ls[i].start = nl.start
nl.start = l.end
} else if l.start < nl.end {
//fmt.Println("3. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end=",nl.end)
ls = insertLocks(ls, i, nl)
fmt.Println("3. 新鎖與當(dāng)前鎖有部分內(nèi)容重疊,需要在當(dāng)前位置插入新鎖=[",nl.start,nl.end,"],并調(diào)整下一個(gè)鎖的起始位置從[",ls[i+1].start,ls[i+1].end,"] -> [",nl.end,ls[i+1].end,"]")
ls[i+1].start = nl.end
nl.start = nl.end
} else {
fmt.Println("4. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
fmt.Println("4. 新鎖右側(cè)和當(dāng)前鎖沒有重疊(l.start>nl.end),僅需要在當(dāng)前位置插入新鎖=[",nl.start,nl.end,"]")
ls = insertLocks(ls, i, nl)
nl.start = nl.end
}
i++
}
if nl.start < nl.end {
ls = append(ls, nl)
fmt.Println("5. 仍然有部分尾部內(nèi)容沒有,補(bǔ)充末尾部分的鎖內(nèi)容,補(bǔ)充后=",ls)
}
i = 0
//再次遍歷鎖列表,進(jìn)行無效內(nèi)容刪除or區(qū)間合并操作。
for i < len(ls) {
if ls[i].ltype == F_UNLCK || ls[i].start == ls[i].end {
// remove empty one
//fmt.Println("刪除鎖列表從i=",i,"位置的內(nèi)容,刪除前l(fā)s=",ls)
//fmt.Println("刪除鎖列表從i=",i,"位置的內(nèi)容",ls[i:i+1])
copy(ls[i:], ls[i+1:]) //從i位置開始左移1個(gè)單位
ls = ls[:len(ls)-1] //刪除末尾
fmt.Println("6-1. 刪除鎖列表從i=",i,"位置的內(nèi)容,刪除后ls=",ls)
} else {
if i+1 < len(ls) && ls[i].ltype == ls[i+1].ltype && ls[i].end == ls[i+1].start {
fmt.Println("6-2. 鎖類型相同,且首尾相接,進(jìn)行區(qū)間合并操作",ls)
// combine continuous range
ls[i].end = ls[i+1].end
ls[i+1].start = ls[i+1].end
//fmt.Println("鎖類型相同,且首尾相接,進(jìn)行區(qū)間合并操作2",ls)
}
i++
}
}
return ls
}
小結(jié)
整個(gè)POSIX locks的算法主要是通過遍歷已有的鎖列表ls(數(shù)組結(jié)構(gòu)),并按照一定規(guī)則進(jìn)行新增鎖記錄的插入(簡單理解為滑動(dòng)窗口查找),其中nl代表窗口滑動(dòng)范圍鎖,l代表當(dāng)前已經(jīng)有的鎖。

根據(jù)代碼注釋,大概分為4種類型的鎖處理。其中l(wèi)odlock代表已有的鎖記錄(對應(yīng)l),Newlock是新增鎖的記錄(對應(yīng)nl)
類型1:

類型2:

類型3:

類型4:

發(fā)現(xiàn)問題
發(fā)現(xiàn)一個(gè)pid同步的bug,當(dāng)新鎖的內(nèi)容覆蓋舊鎖時(shí),并未更新對應(yīng)的pid記錄,導(dǎo)致加鎖雖然成功,但是鎖的pid還是指向舊的pid內(nèi)容。復(fù)現(xiàn)代碼如下
package main
import (
"fmt"
"syscall"
)
const (
F_UNLCK = syscall.F_UNLCK
F_RDLCK = syscall.F_RDLCK
F_WRLCK = syscall.F_WRLCK
)
type plockRecord struct {
ltype uint32
pid uint32
start uint64
end uint64
}
func insertLocks(ls []plockRecord, i int, nl plockRecord) []plockRecord {
//fmt.Println(i,"insertLocks before ls=",ls)
nls := make([]plockRecord, len(ls)+1)
copy(nls[:i], ls[:i])
nls[i] = nl
copy(nls[i+1:], ls[i:])
ls = nls
//fmt.Println(i,"insertLocks after ls=",ls)
return ls
}
func updateLocks(ls []plockRecord, nl plockRecord) []plockRecord {
// ls is ordered by l.start without overlap
var i int
for i < len(ls) && nl.end > nl.start {
l := ls[i]
if l.end < nl.start {
fmt.Println("新增鎖設(shè)定的區(qū)域超過當(dāng)前鎖范圍,查找下一個(gè)")
} else if l.start < nl.start {
//fmt.Println("l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
fmt.Println("1. 當(dāng)前鎖包含部分新鎖區(qū)域,拆分成兩個(gè)鎖,調(diào)整當(dāng)前鎖范圍從[",ls[i].start,"->",ls[i].end,"]調(diào)整為[",ls[i].start,"->",nl.start,"],并在當(dāng)前位置之后插入新鎖 [",nl.start,"->",l.end,"]")
//fmt.Println("1-> l.start=",l.start," < nl.start=",nl.start,"l.end=",l.end," < nl.end=",nl.end )
ls = insertLocks(ls, i+1, plockRecord{nl.ltype, nl.pid, nl.start, l.end})
ls[i].end = nl.start
i++
nl.start = l.end
} else if l.end < nl.end {
//fmt.Println("2-> l.end < nl.end","nl.start=",nl.start,"ls[i].start=",ls[i].start)
fmt.Println("2. 當(dāng)前鎖區(qū)間屬于新鎖區(qū)間,縮小當(dāng)前鎖范圍,從[",ls[i].start,"->",ls[i].end,"]調(diào)整為[",nl.start,"->",ls[i].end,"]")
ls[i].ltype = nl.ltype
ls[i].start = nl.start
//if ls[i].pid != nl.pid { //patch
// ls[i].pid = nl.pid
//}
nl.start = l.end
} else if l.start < nl.end {
//fmt.Println("3. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end=",nl.end)
ls = insertLocks(ls, i, nl)
fmt.Println("3. 新鎖與當(dāng)前鎖有部分內(nèi)容重疊,需要在當(dāng)前位置插入新鎖=[",nl.start,nl.end,"],并調(diào)整下一個(gè)鎖的起始位置從[",ls[i+1].start,ls[i+1].end,"] -> [",nl.end,ls[i+1].end,"]")
ls[i+1].start = nl.end
nl.start = nl.end
} else {
fmt.Println("4. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
fmt.Println("4. 新鎖右側(cè)和當(dāng)前鎖沒有重疊(l.start>nl.end),僅需要在當(dāng)前位置插入新鎖=[",nl.start,nl.end,"]")
ls = insertLocks(ls, i, nl)
nl.start = nl.end
}
i++
}
if nl.start < nl.end {
ls = append(ls, nl)
fmt.Println("5. 仍然有部分尾部內(nèi)容沒有,補(bǔ)充末尾部分的鎖內(nèi)容,補(bǔ)充后=",ls)
}
i = 0
//再次遍歷鎖列表,進(jìn)行無效內(nèi)容刪除or區(qū)間合并操作。
for i < len(ls) {
if ls[i].ltype == F_UNLCK || ls[i].start == ls[i].end {
// remove empty one
//fmt.Println("刪除鎖列表從i=",i,"位置的內(nèi)容,刪除前l(fā)s=",ls)
//fmt.Println("刪除鎖列表從i=",i,"位置的內(nèi)容",ls[i:i+1])
copy(ls[i:], ls[i+1:]) //從i位置開始左移1個(gè)單位
ls = ls[:len(ls)-1] //刪除末尾
fmt.Println("6-1. 刪除鎖列表從i=",i,"位置的內(nèi)容,刪除后ls=",ls)
} else {
if i+1 < len(ls) && ls[i].ltype == ls[i+1].ltype && ls[i].end == ls[i+1].start {
fmt.Println("6-2. 鎖類型相同,且首尾相接,進(jìn)行區(qū)間合并操作",ls)
// combine continuous range
ls[i].end = ls[i+1].end
ls[i+1].start = ls[i+1].end
//fmt.Println("鎖類型相同,且首尾相接,進(jìn)行區(qū)間合并操作2",ls)
}
i++
}
}
return ls
}
func Setlk( ltype uint32, start, end uint64, pid uint32) {
lock := plockRecord{ltype, pid, start, end}
//ls := []plockRecord{plockRecord{F_WRLCK, pid, 0, 4},{F_WRLCK, pid, 7, 10},{F_WRLCK, pid, 13, 16}}
ls := []plockRecord{plockRecord{F_WRLCK, 100, 0, 4},{F_WRLCK, 102, 7, 10}}
//ls := []plockRecord{plockRecord{F_WRLCK, pid, 1, 4}}
fmt.Println("before updateLocks=",ls)
ls = updateLocks(ls, lock)
fmt.Println("after updateLocks=",ls)
}
func main(){
Setlk(F_WRLCK,6,13,103) //理論上加鎖以后的記錄應(yīng)該對應(yīng)pid=103,上面的patch已經(jīng)修復(fù)這個(gè)問題
}
輸出內(nèi)容如下:
before updateLocks= [{3 100 0 4} {3 102 7 10}]
新增鎖設(shè)定的區(qū)域超過當(dāng)前鎖范圍,查找下一個(gè)
2. 當(dāng)前鎖區(qū)間屬于新鎖區(qū)間,縮小當(dāng)前鎖范圍,從[ 7 -> 10 ]調(diào)整為[ 6 -> 10 ]
5. 仍然有部分尾部內(nèi)容沒有,補(bǔ)充末尾部分的鎖內(nèi)容,補(bǔ)充后= [{3 100 0 4} {3 102 6 10} {3 103 10 13}]
6-2. 鎖類型相同,且首尾相接,進(jìn)行區(qū)間合并操作 [{3 100 0 4} {3 102 6 10} {3 103 10 13}]
6-1. 刪除鎖列表從i= 2 位置的內(nèi)容,刪除后ls= [{3 100 0 4} {3 102 6 13}]
after updateLocks= [{3 100 0 4} {3 102 6 13}] //理論上這里的pid=102是對應(yīng)的是舊鎖內(nèi)容,應(yīng)該被新增加的鎖記錄pid=103覆蓋