<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go 項目實戰(zhàn):實現(xiàn)一個 Redis(4) 之AOF 持久化與AOF重寫

          共 4490字,需瀏覽 9分鐘

           ·

          2020-12-11 10:36

          點擊上方藍色“Go語言中文網(wǎng)”關(guān)注,每天一起學 Go

          本文是使用 golang 實現(xiàn) redis 系列的第四篇文章,將介紹如何使用 golang 實現(xiàn) Append Only File 持久化及 AOF 文件重寫。

          本文完整源代碼在作者Github:https://github.com/HDT3213/godis/blob/master/src/db/aof.go

          AOF 文件

          AOF 持久化是典型的異步任務,主協(xié)程(goroutine) 可以使用 channel 將數(shù)據(jù)發(fā)送到異步協(xié)程由異步協(xié)程執(zhí)行持久化操作。

          在 DB 中定義相關(guān)字段:

          type?DB?struct?{
          ????//?主線程使用此channel將要持久化的命令發(fā)送到異步協(xié)程
          ????aofChan?????chan?*reply.MultiBulkReply?
          ????//?append?file?文件描述符
          ????aofFile?????*os.File??
          ????//?append?file?路徑
          ??aofFilename?string?

          ????//?aof?重寫需要的緩沖區(qū),將在AOF重寫一節(jié)詳細介紹
          ????aofRewriteChan?chan?*reply.MultiBulkReply?
          ????//?在必要的時候使用此字段暫停持久化操作
          ??pausingAof?????sync.RWMutex?
          }

          在進行持久化時需要注意兩個細節(jié):

          1. get 之類的讀命令并不需要進行持久化
          2. expire 命令要用等效的 expireat 命令替換。舉例說明,10:00 執(zhí)行 expire a 3600 表示鍵 a 在 11:00 過期,在 10:30 載入AOF文件時執(zhí)行 expire a 3600 就成了 11:30 過期與原數(shù)據(jù)不符。

          我們在命令處理方法中返回 AOF 需要的額外信息:

          type?extra?struct?{
          ????//?表示該命令是否需要持久化
          ????toPersist??bool?
          ????//?如上文所述?expire?之類的命令不能直接持久化
          ????//?若?specialAof?==?nil?則將命令原樣持久化,否則持久化?specialAof?中的指令
          ??specialAof?[]*reply.MultiBulkReply?
          }

          type?CmdFunc?func(db?*DB,?args?[][]byte)?(redis.Reply,?*extra)

          以 SET 命令為例:

          func?Set(db?*DB,?args?[][]byte)?(redis.Reply,?*extra)?{
          ????//....
          ????var?result?int
          ????switch?policy?{
          ????case?upsertPolicy:
          ????????result?=?db.Put(key,?entity)
          ????case?insertPolicy:
          ????????result?=?db.PutIfAbsent(key,?entity)
          ????case?updatePolicy:
          ????????result?=?db.PutIfExists(key,?entity)
          ????}
          ????extra?:=?&extra{toPersist:?result?>?0}?//?若實際寫入了數(shù)據(jù)則toPresist=true,?若因為XX或NX選項沒有實際寫入數(shù)據(jù)則toPresist=false
          ????if?result?>?0?{
          ????????if?ttl?!=?unlimitedTTL?{?//?使用了?EX?或?NX?選項
          ????????????expireTime?:=?time.Now().Add(time.Duration(ttl)?*?time.Millisecond)
          ????????????db.Expire(key,?expireTime)
          ????????????//?持久化時使用?set?key?value?和?pexpireat?命令代替?set?key?value?EX?ttl?命令
          ????????????extra.specialAof?=?[]*reply.MultiBulkReply{?
          ????????????????reply.MakeMultiBulkReply([][]byte{
          ????????????????????[]byte("SET"),
          ????????????????????args[0],
          ????????????????????args[1],
          ????????????????}),
          ????????????????makeExpireCmd(key,?expireTime),
          ????????????}
          ????????}?else?{
          ????????????db.Persist(key)?//?override?ttl
          ????????}
          ????}
          ????return?&reply.OkReply{},?extra
          }

          var?pExpireAtCmd?=?[]byte("PEXPIREAT")

          func?makeExpireCmd(key?string,?expireAt?time.Time)?*reply.MultiBulkReply?{
          ??args?:=?make([][]byte,?3)
          ??args[0]?=?pExpireAtCmd
          ??args[1]?=?[]byte(key)
          ??args[2]?=?[]byte(strconv.FormatInt(expireAt.UnixNano()/1e6,?10))
          ??return?reply.MakeMultiBulkReply(args)
          }

          在處理命令的調(diào)度方法中將 aof 命令發(fā)送到 channel:

          func?(db?*DB)?Exec(c?redis.Client,?args?[][]byte)?(result?redis.Reply)?{
          ??//?....
          ??//?normal?commands
          ??var?extra?*extra
          ??cmdFunc,?ok?:=?router[cmd]?//?找到命令對應的處理函數(shù)
          ??if?!ok?{
          ????return?reply.MakeErrReply("ERR?unknown?command?'"?+?cmd?+?"'")
          ????}
          ????//?使用處理函數(shù)執(zhí)行命令
          ??if?len(args)?>?1?{
          ????result,?extra?=?cmdFunc(db,?args[1:])
          ??}?else?{
          ????result,?extra?=?cmdFunc(db,?[][]byte{})
          ??}

          ??//?AOF?持久化
          ??if?config.Properties.AppendOnly?{
          ????if?extra?!=?nil?&&?extra.toPersist?{
          ????????????//?寫入?specialAof
          ??????if?extra.specialAof?!=?nil?&&?len(extra.specialAof)?>?0?{
          ????????for?_,?r?:=?range?extra.specialAof?{
          ??????????db.addAof(r)
          ????????}
          ??????}?else?{
          ????????????????//?寫入原始命令
          ????????r?:=?reply.MakeMultiBulkReply(args)
          ????????db.addAof(r)
          ??????}
          ????}
          ??}
          ??return
          }

          在異步協(xié)程中寫入命令:

          func?(db?*DB)?handleAof()?{
          ??for?cmd?:=?range?db.aofChan?{
          ????????//?異步協(xié)程在持久化之前會嘗試獲取鎖,若其他協(xié)程持有鎖則會暫停持久化操作
          ????????//?鎖也保證了每次寫入完整的一條指令不會格式錯誤
          ????db.pausingAof.RLock()?
          ????if?db.aofRewriteChan?!=?nil?{
          ??????db.aofRewriteChan?<-?cmd
          ????}
          ????_,?err?:=?db.aofFile.Write(cmd.ToBytes())
          ????if?err?!=?nil?{
          ??????logger.Warn(err)
          ????}
          ????db.pausingAof.RUnlock()
          ??}
          }

          讀取過程與協(xié)議解析器[1]一節(jié)基本相同,不在正文中贅述:loadAof https://github.com/HDT3213/godis/blob/master/src/db/aof.go。

          AOF 重寫

          若我們對鍵a賦值100次會在AOF文件中產(chǎn)生100條指令但只有最后一條指令是有效的,為了減少持久化文件的大小需要進行AOF重寫以刪除無用的指令。

          重寫必須在固定不變的數(shù)據(jù)集上進行,不能直接使用內(nèi)存中的數(shù)據(jù)。Redis 重寫的實現(xiàn)方式是進行 fork 并在子進程中遍歷數(shù)據(jù)庫內(nèi)的數(shù)據(jù)重新生成AOF文件。由于 golang 不支持 fork 操作,我們只能采用讀取AOF文件生成副本的方式來代替fork。

          在進行AOF重寫操作時需要滿足兩個要求:

          1. 若 AOF 重寫失敗或被中斷,AOF 文件需保持重寫之前的狀態(tài)不能丟失數(shù)據(jù)
          2. 進行 AOF 重寫期間執(zhí)行的命令必須保存到新的AOF文件中, 不能丟失

          因此我們設(shè)計了一套比較復雜的流程:

          1. 暫停AOF寫入 -> 更改狀態(tài)為重寫中 -> 準備重寫 -> 恢復AOF寫入
          2. 在重寫過程中,持久化協(xié)程在將命令寫入文件的同時也將其寫入內(nèi)存中的重寫緩存區(qū)
          3. 重寫協(xié)程讀取 AOF 文件中的前一部分(重寫開始前的數(shù)據(jù),不包括讀寫過程中寫入的數(shù)據(jù))并重寫到臨時文件(tmp.aof)中
          4. 暫停AOF寫入 -> 將重寫緩沖區(qū)中的命令寫入tmp.aof -> 使用臨時文件tmp.aof覆蓋AOF文件(使用文件系統(tǒng)的mv命令保證安全)-> 清空重寫緩沖區(qū) -> 恢復AOF寫入

          在不阻塞在線服務的同時進行其它操作是一項必需的能力,AOF重寫的思路在解決這類問題時具有重要的參考價值。比如Mysql Online DDL: gh-ost[2]采用了類似的策略保證數(shù)據(jù)一致。

          首先準備開始重寫操作:

          func?(db?*DB)?startRewrite()?(*os.File,?int64,?error)?{
          ??//?暫停AOF寫入,?數(shù)據(jù)會在?db.aofChan?中暫時堆積
          ??db.pausingAof.Lock()?
          ??defer?db.pausingAof.Unlock()

          ??//?創(chuàng)建重寫緩沖區(qū)??
          ??db.aofRewriteChan?=?make(chan?*reply.MultiBulkReply,?aofQueueSize)

          ??//?讀取當前?aof?文件大小,?不讀取重寫過程中新寫入的內(nèi)容
          ??fileInfo,?_?:=?os.Stat(db.aofFilename)
          ??filesize?:=?fileInfo.Size()

          ??//?創(chuàng)建臨時文件
          ??file,?err?:=?ioutil.TempFile("",?"aof")
          ??if?err?!=?nil?{
          ????logger.Warn("tmp?file?create?failed")
          ????return?nil,?0,?err
          ??}
          ??return?file,?filesize,?nil
          }

          在重寫過程中,持久化協(xié)程進行雙寫:

          func?(db?*DB)?handleAof()?{
          ??for?cmd?:=?range?db.aofChan?{
          ????db.pausingAof.RLock()?
          ????if?db.aofRewriteChan?!=?nil?{
          ??????//?數(shù)據(jù)寫入重寫緩沖區(qū)
          ??????db.aofRewriteChan?<-?cmd
          ????}
          ????_,?err?:=?db.aofFile.Write(cmd.ToBytes())
          ????if?err?!=?nil?{
          ??????logger.Warn(err)
          ????}
          ????db.pausingAof.RUnlock()
          ??}
          }

          執(zhí)行重寫:

          func?(db?*DB)?aofRewrite()?{
          ??file,?fileSize,?err?:=?db.startRewrite()
          ??if?err?!=?nil?{
          ????logger.Warn(err)
          ????return
          ??}

          ??//?load?aof?file
          ??tmpDB?:=?&DB{
          ????Data:?????dict.MakeSimple(),
          ????TTLMap:???dict.MakeSimple(),
          ????Locker:???lock.Make(lockerSize),
          ????interval:?5?*?time.Second,

          ????aofFilename:?db.aofFilename,
          ??}
          ??//?只讀取開始重寫前?aof?文件的內(nèi)容
          ??tmpDB.loadAof(int(fileSize))

          ??//?rewrite?aof?file
          ??tmpDB.Data.ForEach(func(key?string,?raw?interface{})?bool?{
          ????var?cmd?*reply.MultiBulkReply
          ????entity,?_?:=?raw.(*DataEntity)
          ????switch?val?:=?entity.Data.(type)?{
          ????case?[]byte:
          ??????cmd?=?persistString(key,?val)
          ????case?*List.LinkedList:
          ??????cmd?=?persistList(key,?val)
          ????case?*set.Set:
          ??????cmd?=?persistSet(key,?val)
          ????case?dict.Dict:
          ??????cmd?=?persistHash(key,?val)
          ????case?*SortedSet.SortedSet:
          ??????cmd?=?persistZSet(key,?val)

          ????}
          ????if?cmd?!=?nil?{
          ??????_,?_?=?file.Write(cmd.ToBytes())
          ????}
          ????return?true
          ??})
          ??tmpDB.TTLMap.ForEach(func(key?string,?raw?interface{})?bool?{
          ????expireTime,?_?:=?raw.(time.Time)
          ????cmd?:=?makeExpireCmd(key,?expireTime)
          ????if?cmd?!=?nil?{
          ??????_,?_?=?file.Write(cmd.ToBytes())
          ????}
          ????return?true
          ??})

          ??db.finishRewrite(file)
          }

          重寫完畢后寫入緩沖區(qū)中的數(shù)據(jù)并替換正式文件:

          func?(db?*DB)?finishRewrite(tmpFile?*os.File)?{
          ????//?暫停AOF寫入
          ??db.pausingAof.Lock()?
          ??defer?db.pausingAof.Unlock()


          ????//?將重寫緩沖區(qū)內(nèi)的數(shù)據(jù)寫入臨時文件
          ??//?因為handleAof已被暫停,在遍歷期間aofRewriteChan中不會有新數(shù)據(jù)
          ????loop:
          ??for?{
          ????select?{
          ????case?cmd?:=?<-db.aofRewriteChan:
          ??????_,?err?:=?tmpFile.Write(cmd.ToBytes())
          ??????if?err?!=?nil?{
          ????????logger.Warn(err)
          ??????}
          ????default:
          ??????//?只有?channel?為空時才會進入此分支
          ??????break?loop
          ????}
          ????}
          ????//?釋放重寫緩沖區(qū)
          ??close(db.aofRewriteChan)
          ??db.aofRewriteChan?=?nil

          ??//?使用臨時文件代替aof文件
          ??_?=?db.aofFile.Close()
          ??_?=?os.Rename(tmpFile.Name(),?db.aofFilename)

          ??//?重新打開文件描述符以保證正常寫入
          ??aofFile,?err?:=?os.OpenFile(db.aofFilename,?os.O_APPEND|os.O_CREATE|os.O_RDWR,?0600)
          ??if?err?!=?nil?{
          ????panic(err)
          ??}
          ??db.aofFile?=?aofFile
          }

          作者:finley

          出處:https://www.cnblogs.com/Finley/p/12663636.html

          版權(quán):本作品采用「署名-非商業(yè)性使用-相同方式共享 4.0 國際[3]」許可協(xié)議進行許可。

          參考資料

          [1]

          協(xié)議解析器: https://www.cnblogs.com/Finley/p/11923168.html

          [2]

          Mysql Online DDL: gh-ost: https://github.com/github/gh-ost

          [3]

          署名-非商業(yè)性使用-相同方式共享 4.0 國際: https://creativecommons.org/licenses/by-nc-sa/4.0/



          推薦閱讀


          福利

          我為大家整理了一份從入門到進階的Go學習資料禮包,包含學習建議:入門看什么,進階看什么。關(guān)注公眾號 「polarisxu」,回復 ebook 獲?。贿€可以回復「進群」,和數(shù)萬 Gopher 交流學習。


          瀏覽 87
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产视频福利论坛 | 国产熟妇毛多 久久久久 | 国产三区视频 | 亚洲视屏在线观看 | 久久亚洲伊人 |