Go 項目實戰(zhàn):實現(xiàn)一個 Redis(4) 之AOF 持久化與AOF重寫
本文是使用 golang 實現(xiàn) redis 系列的第四篇文章,將介紹如何使用 golang 實現(xiàn) Append Only File 持久化及 AOF 文件重寫。
本文完整源代碼在作者Github:https://github.com/HDT3213/godis/blob/master/src/db/aof.go
AOF 文件
AOF 持久化是典型的異步任務,主協(xié)程(goroutine) 可以使用 channel 將數(shù)據(jù)發(fā)送到異步協(xié)程由異步協(xié)程執(zhí)行持久化操作。
在 DB 中定義相關(guān)字段:
type?DB?struct?{
????//?主線程使用此channel將要持久化的命令發(fā)送到異步協(xié)程
????aofChan?????chan?*reply.MultiBulkReply?
????//?append?file?文件描述符
????aofFile?????*os.File??
????//?append?file?路徑
??aofFilename?string?
????//?aof?重寫需要的緩沖區(qū),將在AOF重寫一節(jié)詳細介紹
????aofRewriteChan?chan?*reply.MultiBulkReply?
????//?在必要的時候使用此字段暫停持久化操作
??pausingAof?????sync.RWMutex?
}
在進行持久化時需要注意兩個細節(jié):
get 之類的讀命令并不需要進行持久化 expire 命令要用等效的 expireat 命令替換。舉例說明,10:00 執(zhí)行 expire a 3600表示鍵 a 在 11:00 過期,在 10:30 載入AOF文件時執(zhí)行expire a 3600就成了 11:30 過期與原數(shù)據(jù)不符。
我們在命令處理方法中返回 AOF 需要的額外信息:
type?extra?struct?{
????//?表示該命令是否需要持久化
????toPersist??bool?
????//?如上文所述?expire?之類的命令不能直接持久化
????//?若?specialAof?==?nil?則將命令原樣持久化,否則持久化?specialAof?中的指令
??specialAof?[]*reply.MultiBulkReply?
}
type?CmdFunc?func(db?*DB,?args?[][]byte)?(redis.Reply,?*extra)
以 SET 命令為例:
func?Set(db?*DB,?args?[][]byte)?(redis.Reply,?*extra)?{
????//....
????var?result?int
????switch?policy?{
????case?upsertPolicy:
????????result?=?db.Put(key,?entity)
????case?insertPolicy:
????????result?=?db.PutIfAbsent(key,?entity)
????case?updatePolicy:
????????result?=?db.PutIfExists(key,?entity)
????}
????extra?:=?&extra{toPersist:?result?>?0}?//?若實際寫入了數(shù)據(jù)則toPresist=true,?若因為XX或NX選項沒有實際寫入數(shù)據(jù)則toPresist=false
????if?result?>?0?{
????????if?ttl?!=?unlimitedTTL?{?//?使用了?EX?或?NX?選項
????????????expireTime?:=?time.Now().Add(time.Duration(ttl)?*?time.Millisecond)
????????????db.Expire(key,?expireTime)
????????????//?持久化時使用?set?key?value?和?pexpireat?命令代替?set?key?value?EX?ttl?命令
????????????extra.specialAof?=?[]*reply.MultiBulkReply{?
????????????????reply.MakeMultiBulkReply([][]byte{
????????????????????[]byte("SET"),
????????????????????args[0],
????????????????????args[1],
????????????????}),
????????????????makeExpireCmd(key,?expireTime),
????????????}
????????}?else?{
????????????db.Persist(key)?//?override?ttl
????????}
????}
????return?&reply.OkReply{},?extra
}
var?pExpireAtCmd?=?[]byte("PEXPIREAT")
func?makeExpireCmd(key?string,?expireAt?time.Time)?*reply.MultiBulkReply?{
??args?:=?make([][]byte,?3)
??args[0]?=?pExpireAtCmd
??args[1]?=?[]byte(key)
??args[2]?=?[]byte(strconv.FormatInt(expireAt.UnixNano()/1e6,?10))
??return?reply.MakeMultiBulkReply(args)
}
在處理命令的調(diào)度方法中將 aof 命令發(fā)送到 channel:
func?(db?*DB)?Exec(c?redis.Client,?args?[][]byte)?(result?redis.Reply)?{
??//?....
??//?normal?commands
??var?extra?*extra
??cmdFunc,?ok?:=?router[cmd]?//?找到命令對應的處理函數(shù)
??if?!ok?{
????return?reply.MakeErrReply("ERR?unknown?command?'"?+?cmd?+?"'")
????}
????//?使用處理函數(shù)執(zhí)行命令
??if?len(args)?>?1?{
????result,?extra?=?cmdFunc(db,?args[1:])
??}?else?{
????result,?extra?=?cmdFunc(db,?[][]byte{})
??}
??//?AOF?持久化
??if?config.Properties.AppendOnly?{
????if?extra?!=?nil?&&?extra.toPersist?{
????????????//?寫入?specialAof
??????if?extra.specialAof?!=?nil?&&?len(extra.specialAof)?>?0?{
????????for?_,?r?:=?range?extra.specialAof?{
??????????db.addAof(r)
????????}
??????}?else?{
????????????????//?寫入原始命令
????????r?:=?reply.MakeMultiBulkReply(args)
????????db.addAof(r)
??????}
????}
??}
??return
}
在異步協(xié)程中寫入命令:
func?(db?*DB)?handleAof()?{
??for?cmd?:=?range?db.aofChan?{
????????//?異步協(xié)程在持久化之前會嘗試獲取鎖,若其他協(xié)程持有鎖則會暫停持久化操作
????????//?鎖也保證了每次寫入完整的一條指令不會格式錯誤
????db.pausingAof.RLock()?
????if?db.aofRewriteChan?!=?nil?{
??????db.aofRewriteChan?<-?cmd
????}
????_,?err?:=?db.aofFile.Write(cmd.ToBytes())
????if?err?!=?nil?{
??????logger.Warn(err)
????}
????db.pausingAof.RUnlock()
??}
}
讀取過程與協(xié)議解析器[1]一節(jié)基本相同,不在正文中贅述:loadAof https://github.com/HDT3213/godis/blob/master/src/db/aof.go。
AOF 重寫
若我們對鍵a賦值100次會在AOF文件中產(chǎn)生100條指令但只有最后一條指令是有效的,為了減少持久化文件的大小需要進行AOF重寫以刪除無用的指令。
重寫必須在固定不變的數(shù)據(jù)集上進行,不能直接使用內(nèi)存中的數(shù)據(jù)。Redis 重寫的實現(xiàn)方式是進行 fork 并在子進程中遍歷數(shù)據(jù)庫內(nèi)的數(shù)據(jù)重新生成AOF文件。由于 golang 不支持 fork 操作,我們只能采用讀取AOF文件生成副本的方式來代替fork。
在進行AOF重寫操作時需要滿足兩個要求:
若 AOF 重寫失敗或被中斷,AOF 文件需保持重寫之前的狀態(tài)不能丟失數(shù)據(jù) 進行 AOF 重寫期間執(zhí)行的命令必須保存到新的AOF文件中, 不能丟失
因此我們設(shè)計了一套比較復雜的流程:
暫停AOF寫入 -> 更改狀態(tài)為重寫中 -> 準備重寫 -> 恢復AOF寫入 在重寫過程中,持久化協(xié)程在將命令寫入文件的同時也將其寫入內(nèi)存中的重寫緩存區(qū) 重寫協(xié)程讀取 AOF 文件中的前一部分(重寫開始前的數(shù)據(jù),不包括讀寫過程中寫入的數(shù)據(jù))并重寫到臨時文件(tmp.aof)中 暫停AOF寫入 -> 將重寫緩沖區(qū)中的命令寫入tmp.aof -> 使用臨時文件tmp.aof覆蓋AOF文件(使用文件系統(tǒng)的mv命令保證安全)-> 清空重寫緩沖區(qū) -> 恢復AOF寫入
在不阻塞在線服務的同時進行其它操作是一項必需的能力,AOF重寫的思路在解決這類問題時具有重要的參考價值。比如Mysql Online DDL: gh-ost[2]采用了類似的策略保證數(shù)據(jù)一致。
首先準備開始重寫操作:
func?(db?*DB)?startRewrite()?(*os.File,?int64,?error)?{
??//?暫停AOF寫入,?數(shù)據(jù)會在?db.aofChan?中暫時堆積
??db.pausingAof.Lock()?
??defer?db.pausingAof.Unlock()
??//?創(chuàng)建重寫緩沖區(qū)??
??db.aofRewriteChan?=?make(chan?*reply.MultiBulkReply,?aofQueueSize)
??//?讀取當前?aof?文件大小,?不讀取重寫過程中新寫入的內(nèi)容
??fileInfo,?_?:=?os.Stat(db.aofFilename)
??filesize?:=?fileInfo.Size()
??//?創(chuàng)建臨時文件
??file,?err?:=?ioutil.TempFile("",?"aof")
??if?err?!=?nil?{
????logger.Warn("tmp?file?create?failed")
????return?nil,?0,?err
??}
??return?file,?filesize,?nil
}
在重寫過程中,持久化協(xié)程進行雙寫:
func?(db?*DB)?handleAof()?{
??for?cmd?:=?range?db.aofChan?{
????db.pausingAof.RLock()?
????if?db.aofRewriteChan?!=?nil?{
??????//?數(shù)據(jù)寫入重寫緩沖區(qū)
??????db.aofRewriteChan?<-?cmd
????}
????_,?err?:=?db.aofFile.Write(cmd.ToBytes())
????if?err?!=?nil?{
??????logger.Warn(err)
????}
????db.pausingAof.RUnlock()
??}
}
執(zhí)行重寫:
func?(db?*DB)?aofRewrite()?{
??file,?fileSize,?err?:=?db.startRewrite()
??if?err?!=?nil?{
????logger.Warn(err)
????return
??}
??//?load?aof?file
??tmpDB?:=?&DB{
????Data:?????dict.MakeSimple(),
????TTLMap:???dict.MakeSimple(),
????Locker:???lock.Make(lockerSize),
????interval:?5?*?time.Second,
????aofFilename:?db.aofFilename,
??}
??//?只讀取開始重寫前?aof?文件的內(nèi)容
??tmpDB.loadAof(int(fileSize))
??//?rewrite?aof?file
??tmpDB.Data.ForEach(func(key?string,?raw?interface{})?bool?{
????var?cmd?*reply.MultiBulkReply
????entity,?_?:=?raw.(*DataEntity)
????switch?val?:=?entity.Data.(type)?{
????case?[]byte:
??????cmd?=?persistString(key,?val)
????case?*List.LinkedList:
??????cmd?=?persistList(key,?val)
????case?*set.Set:
??????cmd?=?persistSet(key,?val)
????case?dict.Dict:
??????cmd?=?persistHash(key,?val)
????case?*SortedSet.SortedSet:
??????cmd?=?persistZSet(key,?val)
????}
????if?cmd?!=?nil?{
??????_,?_?=?file.Write(cmd.ToBytes())
????}
????return?true
??})
??tmpDB.TTLMap.ForEach(func(key?string,?raw?interface{})?bool?{
????expireTime,?_?:=?raw.(time.Time)
????cmd?:=?makeExpireCmd(key,?expireTime)
????if?cmd?!=?nil?{
??????_,?_?=?file.Write(cmd.ToBytes())
????}
????return?true
??})
??db.finishRewrite(file)
}
重寫完畢后寫入緩沖區(qū)中的數(shù)據(jù)并替換正式文件:
func?(db?*DB)?finishRewrite(tmpFile?*os.File)?{
????//?暫停AOF寫入
??db.pausingAof.Lock()?
??defer?db.pausingAof.Unlock()
????//?將重寫緩沖區(qū)內(nèi)的數(shù)據(jù)寫入臨時文件
??//?因為handleAof已被暫停,在遍歷期間aofRewriteChan中不會有新數(shù)據(jù)
????loop:
??for?{
????select?{
????case?cmd?:=?<-db.aofRewriteChan:
??????_,?err?:=?tmpFile.Write(cmd.ToBytes())
??????if?err?!=?nil?{
????????logger.Warn(err)
??????}
????default:
??????//?只有?channel?為空時才會進入此分支
??????break?loop
????}
????}
????//?釋放重寫緩沖區(qū)
??close(db.aofRewriteChan)
??db.aofRewriteChan?=?nil
??//?使用臨時文件代替aof文件
??_?=?db.aofFile.Close()
??_?=?os.Rename(tmpFile.Name(),?db.aofFilename)
??//?重新打開文件描述符以保證正常寫入
??aofFile,?err?:=?os.OpenFile(db.aofFilename,?os.O_APPEND|os.O_CREATE|os.O_RDWR,?0600)
??if?err?!=?nil?{
????panic(err)
??}
??db.aofFile?=?aofFile
}
作者:finley
出處:https://www.cnblogs.com/Finley/p/12663636.html
版權(quán):本作品采用「署名-非商業(yè)性使用-相同方式共享 4.0 國際[3]」許可協(xié)議進行許可。
參考資料
協(xié)議解析器: https://www.cnblogs.com/Finley/p/11923168.html
[2]Mysql Online DDL: gh-ost: https://github.com/github/gh-ost
[3]署名-非商業(yè)性使用-相同方式共享 4.0 國際: https://creativecommons.org/licenses/by-nc-sa/4.0/
推薦閱讀
