<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          JuiceFS 源碼閱讀-中

          共 39431字,需瀏覽 79分鐘

           ·

          2021-08-07 09:09

          JuiceFS鎖的實(shí)現(xiàn)

          JuiceFS的鎖實(shí)現(xiàn),目前同時(shí)實(shí)現(xiàn)了BSD locks(對應(yīng)Flock)和POSIX locks(對應(yīng)Setlk)。細(xì)節(jié)上最大區(qū)別就是BSD locks只能以FD為最小控制單位(簡單理解為單文件加鎖,鎖定的是文件描述符fd對應(yīng)的文件),而POSIX locks可以在一個(gè)文件中以文件的offset+length的方式進(jìn)行加鎖(按文件內(nèi)容進(jìn)行范圍加鎖)。

          BSD locks

          //pkg/vfs/vfs_unix.go

          func Flock(ctx Context, ino Ino, fh uint64, owner uint64, typ uint32, block bool) (err syscall.Errno) {
              var name string
              var reqid uint32
              defer func() { logit(ctx, "flock (%d,%d,%016X,%s,%t): %s", reqid, ino, owner, name, block, strerr(err)) }()
              switch typ {
              case syscall.F_RDLCK:
                  name = "LOCKSH"
              case syscall.F_WRLCK:
                  name = "LOCKEX"
              case syscall.F_UNLCK:
                  name = "UNLOCK"
              default:
                  err = syscall.EINVAL
                  return
              }

              if IsSpecialNode(ino) {
                  err = syscall.EPERM
                  return
              }
              h := findHandle(ino, fh)
              if h == nil {
                  err = syscall.EBADF
                  return
              }
              h.addOp(ctx)
              defer h.removeOp(ctx)
              err = m.Flock(ctx, ino, owner, typ, block) //核心在元數(shù)據(jù)部分的控制,具體參考下面部分代碼注釋
              if err == 0 {
                  h.Lock()
                  if typ == syscall.F_UNLCK {
                      h.locks &= 2
                  } else {
                      h.locks |= 1
                      h.flockOwner = owner
                  }
                  h.Unlock()
              }
              return
          }
          //pkg/meta/sql_unix.go
          func (m *dbMeta) Flock(ctx Context, inode Ino, owner uint64, ltype uint32, block bool) syscall.Errno {
              if ltype == syscall.F_UNLCK {//如果為解鎖操作,則只需要?jiǎng)h除對應(yīng)的db記錄即可
                  return errno(m.txn(func(s *xorm.Session) error {
                      _, err := s.Delete(&flock{Inode: inode, Owner: owner, Sid: m.sid})
                      return err
                  }))
              }
              var err syscall.Errno
              //循環(huán)處理加鎖請求,分為阻塞(block=true)和非阻塞兩種類型操作
              for {
                  err = errno(m.txn(func(s *xorm.Session) error {
                  //獲取inode信息,避免鎖指向的對象不存在,成為空鎖。
                      if exists, err := s.Get(&node{Inode: inode}); err != nil || !exists {
                          if err == nil && !exists {
                              err = syscall.ENOENT
                          }
                          return err
                      }
                      //查詢inode關(guān)聯(lián)的全部鎖信息
                      rows, err := s.Rows(&flock{Inode: inode})
                      if err != nil {
                          return err
                      }
                      type key struct {
                          sid uint64
                          o   uint64
                      }
                      var locks = make(map[key]flock)
                      var l flock
                      for rows.Next() {
                          if rows.Scan(&l) == nil {
                          //執(zhí)行迭代,將查詢結(jié)果臨時(shí)保存到locks數(shù)據(jù)結(jié)構(gòu)中
                              locks[key{l.Sid, l.Owner}] = l
                          }
                      }
                      rows.Close()
          //判斷需要加鎖的類型是否為讀鎖,如果已經(jīng)有寫鎖則加鎖失敗
                      if ltype == syscall.F_RDLCK {
                          for _, l := range locks {
                              if l.Ltype == 'W' {
                                  return syscall.EAGAIN
                              }
                          }
                          //沒有寫鎖沖突,則通過insert記錄加上讀鎖
                          return mustInsert(s, flock{Inode: inode, Owner: owner, Ltype: 'R', Sid: m.sid})
                      }

                      //加寫入鎖邏輯:先判斷是否已經(jīng)有寫入鎖(判斷l(xiāng)ocks中是否有重復(fù)鍵值),如果有則更新鎖的記錄,否則直接insert插入對應(yīng)的記錄
                      me := key{m.sid, owner}
                      _, ok := locks[me]
                      delete(locks, me)
                      if len(locks) > 0 {
                          return syscall.EAGAIN
                      }
                      if ok {
                          _, err = s.Cols("Ltype").Update(&flock{Ltype: 'W'}, &flock{Inode: inode, Owner: owner, Sid: m.sid})
                      } else {
                          err = mustInsert(s, flock{Inode: inode, Owner: owner, Ltype: 'W', Sid: m.sid})
                      }
                      return err
                  }))
              //非阻塞or報(bào)錯(cuò)直接返回結(jié)果
                  if !block || err != syscall.EAGAIN {
                      break
                  }
                  //阻塞情況下加寫鎖,則等待固定時(shí)長再進(jìn)行下一輪加鎖操作
                  if ltype == syscall.F_WRLCK {
                      time.Sleep(time.Millisecond * 1)
                  } else {
                      time.Sleep(time.Millisecond * 10)
                  }
                  if ctx.Canceled() {
                      return syscall.EINTR
                  }
              }
              return err
          }

          POSIX locks

          按pid進(jìn)行范圍加鎖,實(shí)現(xiàn)起來相對比較復(fù)雜,核心算法在updateLocks中實(shí)現(xiàn)。

          func (m *dbMeta) Setlk(ctx Context, inode Ino, owner_ uint64, block bool, ltype uint32, start, end uint64, pid uint32) syscall.Errno {
              var err syscall.Errno
              lock := plockRecord{ltype, pid, start, end}//以pid為粒度,所以適合單機(jī)多進(jìn)/線程模型,跨節(jié)點(diǎn)不太合適
              owner := int64(owner_)
              for {
                  err = errno(m.txn(func(s *xorm.Session) error {
                      if exists, err := s.Get(&node{Inode: inode}); err != nil || !exists {
                          if err == nil && !exists {
                              err = syscall.ENOENT
                          }
                          return err
                      }
                      //unlock操作
                      if ltype == F_UNLCK {
                      //sid代表session ID,每個(gè)客戶端的數(shù)據(jù)庫連接都有一個(gè)獨(dú)立的ID實(shí)例
                          var l = plock{Inode: inode, Owner: owner, Sid: m.sid}
                          ok, err := m.engine.Get(&l) //按inode、owner、sid三個(gè)字段組合,查詢鎖列表
                          if err != nil {
                              return errno(err)
                          }
                          if !ok {
                              return nil
                          }
                          ls := loadLocks([]byte(l.Records)) //解析鎖列表信息
                          if len(ls) == 0 {
                              return nil
                          }
                          ls = updateLocks(ls, lock) //在已有所列表里面新增鎖記錄,有點(diǎn)復(fù)雜,之后詳細(xì)介紹
                          if len(ls) == 0 {
                              _, err = s.Delete(&plock{Inode: inode, Owner: owner, Sid: m.sid}) //刪除鎖記錄
                          } else {
                              _, err = s.Cols("records").Update(plock{Records: dumpLocks(ls)}, l) //更新已有所記錄
                          }
                          return err
                      }
                      //以inode為關(guān)鍵字,查找已有的鎖列表
                      rows, err := s.Rows(&plock{Inode: inode})
                      if err != nil {
                          return errno(err)
                      }
                      type key struct {
                          sid   uint64
                          owner int64
                      }
                      var locks = make(map[key][]byte)
                      var l plock
                      //按查詢結(jié)果構(gòu)建鎖map
                      for rows.Next() {
                          if rows.Scan(&l) == nil {
                              locks[key{l.Sid, l.Owner}] = dup(l.Records)
                          }
                      }
                      rows.Close()
                      //遍歷map,判斷是否有沖突鎖
                      lkey := key{m.sid, owner}
                      for k, d := range locks {
                          if k == lkey {
                              continue
                          }
                          ls := loadLocks([]byte(d))
                          for _, l := range ls {
                              // find conflicted locks
                              if (ltype == F_WRLCK || l.ltype == F_WRLCK) && end > l.start && start < l.end {
                                  return syscall.EAGAIN
                              }
                          }
                      }
                      ls := updateLocks(loadLocks([]byte(locks[lkey])), lock) //更新鎖列表信息
                      var n int64
                      //保存鎖列表記錄到DB
                      if len(locks[lkey]) > 0 {
                          n, err = s.Cols("records").Update(plock{Records: dumpLocks(ls)},
                              &plock{Inode: inode, Sid: m.sid, Owner: owner})
                      } else {
                          n, err = s.InsertOne(&plock{Inode: inode, Sid: m.sid, Owner: owner, Records: dumpLocks(ls)})
                      }
                      if err == nil && n == 0 {
                          err = fmt.Errorf("insert/update failed")
                      }
                      return err
                  }))
              //如果加鎖失敗且不進(jìn)行阻塞,則直接返回結(jié)果
                  if !block || err != syscall.EAGAIN {
                      break
                  }
                  //加鎖失敗,阻塞,進(jìn)入下一輪操作
                  if ltype == F_WRLCK {
                      time.Sleep(time.Millisecond * 1)
                  } else {
                      time.Sleep(time.Millisecond * 10)
                  }
                  if ctx.Canceled() {
                      return syscall.EINTR
                  }
              }
              return err
          }

          updateLocks 的代碼邏輯如下,通過加上debug輸出,更加容易觀察其中細(xì)節(jié)

          const (
              F_UNLCK = syscall.F_UNLCK
              F_RDLCK = syscall.F_RDLCK
              F_WRLCK = syscall.F_WRLCK
          )

          type plockRecord struct {
              ltype uint32
              pid   uint32
              start uint64
              end   uint64
          }


          func insertLocks(ls []plockRecord, i int, nl plockRecord) []plockRecord {
              //fmt.Println(i,"insertLocks before ls=",ls)
              nls := make([]plockRecord, len(ls)+1)
              copy(nls[:i], ls[:i])
              nls[i] = nl
              copy(nls[i+1:], ls[i:])
              ls = nls
              //fmt.Println(i,"insertLocks after ls=",ls)
              return ls
          }

          func updateLocks(ls []plockRecord, nl plockRecord) []plockRecord {
              // ls is ordered by l.start without overlap
              var i int
              for i < len(ls) && nl.end > nl.start {
                  l := ls[i]
                  if l.end < nl.start {
                      fmt.Println("新增鎖設(shè)定的區(qū)域超過當(dāng)前鎖范圍,查找下一個(gè)")
                  } else if l.start < nl.start {
                      //fmt.Println("l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
                      fmt.Println("1. 當(dāng)前鎖包含部分新鎖區(qū)域,拆分成兩個(gè)鎖,調(diào)整當(dāng)前鎖范圍從[",ls[i].start,"->",ls[i].end,"]調(diào)整為[",ls[i].start,"->",nl.start,"],并在當(dāng)前位置之后插入新鎖 [",nl.start,"->",l.end,"]")
                      //fmt.Println("1-> l.start=",l.start," < nl.start=",nl.start,"l.end=",l.end," < nl.end=",nl.end )
                      ls = insertLocks(ls, i+1, plockRecord{nl.ltype, nl.pid, nl.start, l.end})
                      ls[i].end = nl.start
                      i++
                      nl.start = l.end
                  } else if l.end < nl.end {
                      //fmt.Println("2-> l.end < nl.end","nl.start=",nl.start,"ls[i].start=",ls[i].start)
                      fmt.Println("2. 當(dāng)前鎖區(qū)間屬于新鎖區(qū)間,縮小當(dāng)前鎖范圍,從[",ls[i].start,"->",ls[i].end,"]調(diào)整為[",nl.start,"->",ls[i].end,"]")
                      ls[i].ltype = nl.ltype
                      ls[i].start = nl.start
                      nl.start = l.end
                  } else if l.start < nl.end {
                      //fmt.Println("3. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end=",nl.end)
                      ls = insertLocks(ls, i, nl)
                      fmt.Println("3. 新鎖與當(dāng)前鎖有部分內(nèi)容重疊,需要在當(dāng)前位置插入新鎖=[",nl.start,nl.end,"],并調(diào)整下一個(gè)鎖的起始位置從[",ls[i+1].start,ls[i+1].end,"] -> [",nl.end,ls[i+1].end,"]")
                      ls[i+1].start = nl.end
                      nl.start = nl.end
                  } else {
                      fmt.Println("4. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
                      fmt.Println("4. 新鎖右側(cè)和當(dāng)前鎖沒有重疊(l.start>nl.end),僅需要在當(dāng)前位置插入新鎖=[",nl.start,nl.end,"]")
                      ls = insertLocks(ls, i, nl)
                      nl.start = nl.end
                  }
                  i++
              }

              if nl.start < nl.end {
                  ls = append(ls, nl)
                  fmt.Println("5. 仍然有部分尾部內(nèi)容沒有,補(bǔ)充末尾部分的鎖內(nèi)容,補(bǔ)充后=",ls)
              }

              i = 0
              //再次遍歷鎖列表,進(jìn)行無效內(nèi)容刪除or區(qū)間合并操作。
              for i < len(ls) {
                  if ls[i].ltype == F_UNLCK || ls[i].start == ls[i].end {
                      // remove empty one
                      //fmt.Println("刪除鎖列表從i=",i,"位置的內(nèi)容,刪除前l(fā)s=",ls)
                      //fmt.Println("刪除鎖列表從i=",i,"位置的內(nèi)容",ls[i:i+1])
                      copy(ls[i:], ls[i+1:]) //從i位置開始左移1個(gè)單位
                      ls = ls[:len(ls)-1] //刪除末尾
                      fmt.Println("6-1. 刪除鎖列表從i=",i,"位置的內(nèi)容,刪除后ls=",ls)
                  } else {
                      if i+1 < len(ls) && ls[i].ltype == ls[i+1].ltype && ls[i].end == ls[i+1].start {
                          fmt.Println("6-2. 鎖類型相同,且首尾相接,進(jìn)行區(qū)間合并操作",ls)
                          // combine continuous range
                          ls[i].end = ls[i+1].end
                          ls[i+1].start = ls[i+1].end
                          //fmt.Println("鎖類型相同,且首尾相接,進(jìn)行區(qū)間合并操作2",ls)
                      }
                      i++
                  }
              }
              return ls
          }

          小結(jié)

          整個(gè)POSIX locks的算法主要是通過遍歷已有的鎖列表ls(數(shù)組結(jié)構(gòu)),并按照一定規(guī)則進(jìn)行新增鎖記錄的插入(簡單理解為滑動(dòng)窗口查找),其中nl代表窗口滑動(dòng)范圍鎖,l代表當(dāng)前已經(jīng)有的鎖。

          -w900

          根據(jù)代碼注釋,大概分為4種類型的鎖處理。其中l(wèi)odlock代表已有的鎖記錄(對應(yīng)l),Newlock是新增鎖的記錄(對應(yīng)nl)

          類型1:

          -w881

          類型2:

          -w899

          類型3:

          -w899

          類型4:

          -w899

          發(fā)現(xiàn)問題

          發(fā)現(xiàn)一個(gè)pid同步的bug,當(dāng)新鎖的內(nèi)容覆蓋舊鎖時(shí),并未更新對應(yīng)的pid記錄,導(dǎo)致加鎖雖然成功,但是鎖的pid還是指向舊的pid內(nèi)容。復(fù)現(xiàn)代碼如下

          package main

          import (
              "fmt"
              "syscall"
          )

          const (
              F_UNLCK = syscall.F_UNLCK
              F_RDLCK = syscall.F_RDLCK
              F_WRLCK = syscall.F_WRLCK
          )

          type plockRecord struct {
              ltype uint32
              pid   uint32
              start uint64
              end   uint64
          }


          func insertLocks(ls []plockRecord, i int, nl plockRecord) []plockRecord {
              //fmt.Println(i,"insertLocks before ls=",ls)
              nls := make([]plockRecord, len(ls)+1)
              copy(nls[:i], ls[:i])
              nls[i] = nl
              copy(nls[i+1:], ls[i:])
              ls = nls
              //fmt.Println(i,"insertLocks after ls=",ls)
              return ls
          }

          func updateLocks(ls []plockRecord, nl plockRecord) []plockRecord {
              // ls is ordered by l.start without overlap
              var i int
              for i < len(ls) && nl.end > nl.start {
                  l := ls[i]
                  if l.end < nl.start {
                      fmt.Println("新增鎖設(shè)定的區(qū)域超過當(dāng)前鎖范圍,查找下一個(gè)")
                  } else if l.start < nl.start {
                      //fmt.Println("l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
                      fmt.Println("1. 當(dāng)前鎖包含部分新鎖區(qū)域,拆分成兩個(gè)鎖,調(diào)整當(dāng)前鎖范圍從[",ls[i].start,"->",ls[i].end,"]調(diào)整為[",ls[i].start,"->",nl.start,"],并在當(dāng)前位置之后插入新鎖 [",nl.start,"->",l.end,"]")
                      //fmt.Println("1-> l.start=",l.start," < nl.start=",nl.start,"l.end=",l.end," < nl.end=",nl.end )
                      ls = insertLocks(ls, i+1, plockRecord{nl.ltype, nl.pid, nl.start, l.end})
                      ls[i].end = nl.start
                      i++
                      nl.start = l.end
                  } else if l.end < nl.end {
                      //fmt.Println("2-> l.end < nl.end","nl.start=",nl.start,"ls[i].start=",ls[i].start)
                      fmt.Println("2. 當(dāng)前鎖區(qū)間屬于新鎖區(qū)間,縮小當(dāng)前鎖范圍,從[",ls[i].start,"->",ls[i].end,"]調(diào)整為[",nl.start,"->",ls[i].end,"]")
                      ls[i].ltype = nl.ltype
                      ls[i].start = nl.start
                      //if ls[i].pid != nl.pid { //patch
                      //  ls[i].pid = nl.pid
                      //}
                      nl.start = l.end
                  } else if l.start < nl.end {
                      //fmt.Println("3. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end=",nl.end)
                      ls = insertLocks(ls, i, nl)
                      fmt.Println("3. 新鎖與當(dāng)前鎖有部分內(nèi)容重疊,需要在當(dāng)前位置插入新鎖=[",nl.start,nl.end,"],并調(diào)整下一個(gè)鎖的起始位置從[",ls[i+1].start,ls[i+1].end,"] -> [",nl.end,ls[i+1].end,"]")
                      ls[i+1].start = nl.end
                      nl.start = nl.end
                  } else {
                      fmt.Println("4. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
                      fmt.Println("4. 新鎖右側(cè)和當(dāng)前鎖沒有重疊(l.start>nl.end),僅需要在當(dāng)前位置插入新鎖=[",nl.start,nl.end,"]")
                      ls = insertLocks(ls, i, nl)
                      nl.start = nl.end
                  }
                  i++
              }

              if nl.start < nl.end {
                  ls = append(ls, nl)
                  fmt.Println("5. 仍然有部分尾部內(nèi)容沒有,補(bǔ)充末尾部分的鎖內(nèi)容,補(bǔ)充后=",ls)
              }

              i = 0
              //再次遍歷鎖列表,進(jìn)行無效內(nèi)容刪除or區(qū)間合并操作。
              for i < len(ls) {
                  if ls[i].ltype == F_UNLCK || ls[i].start == ls[i].end {
                      // remove empty one
                      //fmt.Println("刪除鎖列表從i=",i,"位置的內(nèi)容,刪除前l(fā)s=",ls)
                      //fmt.Println("刪除鎖列表從i=",i,"位置的內(nèi)容",ls[i:i+1])
                      copy(ls[i:], ls[i+1:]) //從i位置開始左移1個(gè)單位
                      ls = ls[:len(ls)-1] //刪除末尾
                      fmt.Println("6-1. 刪除鎖列表從i=",i,"位置的內(nèi)容,刪除后ls=",ls)
                  } else {
                      if i+1 < len(ls) && ls[i].ltype == ls[i+1].ltype && ls[i].end == ls[i+1].start {
                          fmt.Println("6-2. 鎖類型相同,且首尾相接,進(jìn)行區(qū)間合并操作",ls)
                          // combine continuous range
                          ls[i].end = ls[i+1].end
                          ls[i+1].start = ls[i+1].end
                          //fmt.Println("鎖類型相同,且首尾相接,進(jìn)行區(qū)間合并操作2",ls)
                      }
                      i++
                  }
              }
              return ls
          }


          func Setlk( ltype uint32, start, end uint64, pid uint32) {

              lock := plockRecord{ltype, pid, start, end}
              //ls := []plockRecord{plockRecord{F_WRLCK, pid, 04},{F_WRLCK, pid, 710},{F_WRLCK, pid, 1316}}
              ls := []plockRecord{plockRecord{F_WRLCK, 10004},{F_WRLCK, 102710}}
              //ls := []plockRecord{plockRecord{F_WRLCK, pid, 14}}
              fmt.Println("before updateLocks=",ls)
              ls = updateLocks(ls, lock)
              fmt.Println("after updateLocks=",ls)
          }

          func main(){
              Setlk(F_WRLCK,6,13,103) //理論上加鎖以后的記錄應(yīng)該對應(yīng)pid=103,上面的patch已經(jīng)修復(fù)這個(gè)問題
          }

          輸出內(nèi)容如下:

          before updateLocks= [{3 100 0 4} {3 102 7 10}]
          新增鎖設(shè)定的區(qū)域超過當(dāng)前鎖范圍,查找下一個(gè)
          2. 當(dāng)前鎖區(qū)間屬于新鎖區(qū)間,縮小當(dāng)前鎖范圍,從[ 7 -> 10 ]調(diào)整為[ 6 -> 10 ]
          5. 仍然有部分尾部內(nèi)容沒有,補(bǔ)充末尾部分的鎖內(nèi)容,補(bǔ)充后= [{3 100 0 4} {3 102 6 10} {3 103 10 13}]
          6-2. 鎖類型相同,且首尾相接,進(jìn)行區(qū)間合并操作 [{3 100 0 4} {3 102 6 10} {3 103 10 13}]
          6-1. 刪除鎖列表從i= 2 位置的內(nèi)容,刪除后ls= [{3 100 0 4} {3 102 6 13}]
          after updateLocks= [{3 100 0 4} {3 102 6 13}] //理論上這里的pid=102是對應(yīng)的是舊鎖內(nèi)容,應(yīng)該被新增加的鎖記錄pid=103覆蓋


          瀏覽 75
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产精品永久免费观看 | 高潮视频在线免费观看 | 一区二区三区日本视频 | 欧美色色爱爱男人天堂 | 亚洲欧洲免费观看 |