VictorialMetrics存儲原理之索引存儲格式
前文我們介紹了當(dāng)插入數(shù)據(jù)的時候會先去添加索引數(shù)據(jù),索引構(gòu)建完成后又是如何去持久化數(shù)據(jù)的呢?保存的數(shù)據(jù)又是怎樣的格式呢?本節(jié)我們將對此進(jìn)行詳細(xì)講解。
添加索引數(shù)據(jù)
索引構(gòu)建完成后會調(diào)用 AddItems 函數(shù)將索引添加到 Table 中去:
//?lib/mergeset/table.go
//?AddItems?添加指定的?items?到?table?中去
func?(tb?*Table)?AddItems(items?[][]byte)?error?{??
???if?err?:=?tb.rawItems.addItems(tb,?items);?err?!=?nil?{??
??????return?fmt.Errorf("cannot?insert?data?into?%q:?%w",?tb.path,?err)??
???}??
???return?nil??
}
Table 的結(jié)構(gòu)如下所示:
//?lib/mergeset/table.go
//?Table?代表?mergeset?table.??
type?Table?struct?{??
???activeMerges???uint64??
???mergesCount????uint64??
???itemsMerged????uint64??
???assistedMerges?uint64??
???//?merge?索引
???mergeIdx?uint64??
???//?路徑
???path?string??
???//?flush回調(diào)
???flushCallback?????????func()??
???flushCallbackWorkerWG?sync.WaitGroup??
???needFlushCallbackCall?uint32??
???//?在將指定項的整個塊刷新到持久存儲之前,在合并期間調(diào)用的回調(diào)
???prepareBlock?PrepareBlockCallback??
???//?parts?列表
???partsLock?sync.Mutex??
???parts?????[]*partWrapper??
??
???//?rawItems?包含最近添加的尚未轉(zhuǎn)換為?parts?的數(shù)據(jù)
???//?出于性能原因,未在搜索中使用?rawItems
???rawItems?rawItemsShards??
??
???snapshotLock?sync.RWMutex??
??
???flockF?*os.File??
??
???stopCh?chan?struct{}??
??
???partMergersWG?syncwg.WaitGroup??
???rawItemsFlusherWG?sync.WaitGroup??
???convertersWG?sync.WaitGroup??
???rawItemsPendingFlushesWG?syncwg.WaitGroup??
}
一個索引 Table 就對應(yīng)著一個 indexDB,也就是數(shù)據(jù)目錄 indexdb 下面的文件夾:
其中核心的是 parts 和 rawItems 兩個屬性。
parts主要是存儲 merge 后的 blocks,一個part與文件系統(tǒng)上的一個目錄對應(yīng),比如上圖中的24_1_16F4A862471C1DC9目錄就是一個part。rawItems是用于預(yù)處理Items的,是一個rawItemsShards對象。
rawItemsShards 結(jié)構(gòu)體定義如下所示:
//?lib/mergeset/table.go
type?rawItemsShards?struct?{??
???shardIdx?uint32??
??
???//?在多?cpu?系統(tǒng)上添加?rows?數(shù)據(jù)時,shards?分片可以減少鎖競爭?
???shards?[]rawItemsShard??
}
//?每個?table?的?rawItems?分片數(shù)?
var?rawItemsShardsPerTable?=?cgroup.AvailableCPUs()??
//?每個分片最大的Block數(shù)
const?maxBlocksPerShard?=?512
//?當(dāng)在打開Table的時候就會調(diào)用該函數(shù)進(jìn)行初始化
func?(riss?*rawItemsShards)?init()?{??
???riss.shards?=?make([]rawItemsShard,?rawItemsShardsPerTable)??
}
//?添加?items?元素
func?(riss?*rawItemsShards)?addItems(tb?*Table,?items?[][]byte)?error?{??
???n?:=?atomic.AddUint32(&riss.shardIdx,?1)??
???shards?:=?riss.shards??
???idx?:=?n?%?uint32(len(shards))??
???shard?:=?&shards[idx]??
???return?shard.addItems(tb,?items)??
}
rawItemsShards 其實就是加了一個分片功能用于保存索引數(shù)據(jù),addItems 函數(shù)就是將要添加的數(shù)據(jù)添加到對應(yīng)的分片上去,最終執(zhí)行的邏輯是 shard.addItems。
//?lib/mergeset/table.go
type?rawItemsShard?struct?{??
???mu????????????sync.Mutex??
???ibs???????????[]*inmemoryBlock??
???lastFlushTime?uint64??
}??
//?添加items元素
func?(ris?*rawItemsShard)?addItems(tb?*Table,?items?[][]byte)?error?{??
???var?err?error??
???var?blocksToFlush?[]*inmemoryBlock??
??
???ris.mu.Lock()??
???ibs?:=?ris.ibs??
???if?len(ibs)?==?0?{??
??????ib?:=?getInmemoryBlock()??
??????ibs?=?append(ibs,?ib)??
??????ris.ibs?=?ibs??
???}??
???//?取最后一個內(nèi)存塊
???ib?:=?ibs[len(ibs)-1]??
???for?_,?item?:=?range?items?{?
??????//?添加索引item到內(nèi)存塊?
??????if?!ib.Add(item)?{??//?超過了內(nèi)存塊大小
?????????//?重新獲取一個內(nèi)存塊,此時肯定為空
?????????ib?=?getInmemoryBlock()??
?????????//?重新添加
?????????if?!ib.Add(item)?{??
????????????putInmemoryBlock(ib)??
????????????err?=?fmt.Errorf("cannot?insert?an?item?%q?into?an?empty?inmemoryBlock;?it?looks?like?the?item?is?too?large??len(item)=%d",?item,?len(item))??
????????????break??
?????????}??
?????????ibs?=?append(ibs,?ib)??
?????????ris.ibs?=?ibs??
??????}??
???}??
???//?超過了每個分片的最大內(nèi)存塊的數(shù)量
???if?len(ibs)?>=?maxBlocksPerShard?{??
??????//?將內(nèi)存塊放到待刷新的內(nèi)存塊列表中去
??????blocksToFlush?=?append(blocksToFlush,?ibs...)??
??????//?釋放前面的內(nèi)存塊資源
??????for?i?:=?range?ibs?{??
?????????ibs[i]?=?nil??
??????}??
??????ris.ibs?=?ibs[:0]??
??????ris.lastFlushTime?=?fasttime.UnixTimestamp()??
???}??
???ris.mu.Unlock()??
???//?執(zhí)行merge合并操作
???tb.mergeRawItemsBlocks(blocksToFlush,?false)??
???return?err??
}
//?lib/mergeset/encoding.go
//?內(nèi)存中的一個Block塊結(jié)構(gòu)
type?inmemoryBlock?struct?{??
???commonPrefix?[]byte??
???data?????????[]byte??//?用來存儲數(shù)據(jù)
???items????????[]Item??//?用來存儲每個item數(shù)據(jù)的起始偏移量
}
//?Item?表示用于存儲在?mergeset?中的單個?item?數(shù)據(jù)
type?Item?struct?{??
???//?數(shù)據(jù)的開始偏移量
???Start?uint32??
???//?數(shù)據(jù)的結(jié)束偏移量
???End?uint32??
}
//?maxInmemoryBlockSize?是?memoryblock.data?的最大值。
//??
//?它必須適合?CPU?緩存大小,即當(dāng)前?CPU?的緩存大小為64kb。
const?maxInmemoryBlockSize?=?64?*?1024
//?Add?將?x?添加到內(nèi)存卡?ib?的末尾
//??
//?如果由于塊大小限制,x?未添加到?ib,則返回?false
func?(ib?*inmemoryBlock)?Add(x?[]byte)?bool?{??
???data?:=?ib.data??
???//?操過塊大小限制了
???if?len(x)+len(data)?>?maxInmemoryBlockSize?{??
??????return?false??
???}??
???if?cap(data)?==?0?{??
??????//?預(yù)分配?data?和?items?以減少內(nèi)存分配
??????data?=?make([]byte,?0,?maxInmemoryBlockSize)??
??????ib.items?=?make([]Item,?0,?512)??
???}??
???dataLen?:=?len(data)??
???data?=?append(data,?x...)??//?將?x?添加到?data
???ib.items?=?append(ib.items,?Item{??//?更新?items
??????Start:?uint32(dataLen),??
??????End:???uint32(len(data)),??
???})??
???ib.data?=?data??
???return?true??
}
rawItemsShard 表示保存索引數(shù)據(jù)的一個分片,里面其實就是一個 inmemoryBlock 的內(nèi)存塊切片,每個分片最多有 512 個內(nèi)存塊,每個內(nèi)存塊占用 64KB 的容量,當(dāng)每個分片中的內(nèi)存塊數(shù)量超過最大數(shù)量(512)會去將內(nèi)存塊數(shù)據(jù)刷新為 Part。
如果分片中的內(nèi)存塊數(shù)量沒超過上限,則會通過一個任務(wù)去定時(1s)將 rawItem 數(shù)據(jù)刷新(轉(zhuǎn)換)為 Part,以便它們對搜索可見。
//?lib/mergeset/table.go
//?將最近的?rawItem?刷新(轉(zhuǎn)換)為?Part,以便它們對搜索可見。
const?rawItemsFlushInterval?=?time.Second
//?啟動?rawItems?Flusher?任務(wù)
func?(tb?*Table)?startRawItemsFlusher()?{??
???tb.rawItemsFlusherWG.Add(1)??
???go?func()?{??
??????tb.rawItemsFlusher()??
??????tb.rawItemsFlusherWG.Done()??
???}()??
}??
??
func?(tb?*Table)?rawItemsFlusher()?{??
???ticker?:=?time.NewTicker(rawItemsFlushInterval)??
???defer?ticker.Stop()??
???for?{??
??????select?{??
??????case?<-tb.stopCh:??
?????????return??
??????case?<-ticker.C:??
?????????tb.flushRawItems(false)??
??????}??
???}??
}
合并內(nèi)存數(shù)據(jù)
將內(nèi)存塊數(shù)據(jù)轉(zhuǎn)換為 Part 都是通過 mergeRawItemsBlocks 函數(shù)去實現(xiàn)的。
//?lib/mergeset/table.go
//?一次合并的默認(rèn)?parts?數(shù)
//??
//?這個數(shù)字是根據(jù)經(jīng)驗得出的,它提供了盡可能低的開銷
//?有關(guān)詳細(xì)信息,請參閱?appendPartsToMerge?test
const?defaultPartsToMerge?=?15
//?merge?內(nèi)存塊數(shù)據(jù)
func?(tb?*Table)?mergeRawItemsBlocks(ibs?[]*inmemoryBlock,?isFinal?bool)?{??
???if?len(ibs)?==?0?{??
??????return??
???}??
???tb.partMergersWG.Add(1)??
???defer?tb.partMergersWG.Done()??
??
???pws?:=?make([]*partWrapper,?0,?(len(ibs)+defaultPartsToMerge-1)/defaultPartsToMerge)??
???var?pwsLock?sync.Mutex??
???var?wg?sync.WaitGroup??
???for?len(ibs)?>?0?{??
??????//?一次最大合并的內(nèi)存塊數(shù)量
??????n?:=?defaultPartsToMerge??
??????if?n?>?len(ibs)?{??
?????????n?=?len(ibs)??
??????}??
??????wg.Add(1)??
??????go?func(ibsPart?[]*inmemoryBlock)?{??
?????????defer?wg.Done()??
?????????//?merge?inmemoryBlock
?????????pw?:=?tb.mergeInmemoryBlocks(ibsPart)??
?????????if?pw?==?nil?{??
????????????return??
?????????}??
?????????pw.isInMerge?=?true??
?????????pwsLock.Lock()??
?????????pws?=?append(pws,?pw)??
?????????pwsLock.Unlock()??
??????}(ibs[:n])??
??????ibs?=?ibs[n:]??
???}??
???wg.Wait()??
???if?len(pws)?>?0?{??
??????if?err?:=?tb.mergeParts(pws,?nil,?true);?err?!=?nil?{??
?????????logger.Panicf("FATAL:?cannot?merge?raw?parts:?%s",?err)??
??????}??
??????if?tb.flushCallback?!=?nil?{??
?????????if?isFinal?{??
????????????tb.flushCallback()??
?????????}?else?{??
????????????atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall,?0,?1)??
?????????}??
??????}??
???}??
??
???for?{??
??????tb.partsLock.Lock()??
??????ok?:=?len(tb.parts)?<=?maxParts??
??????tb.partsLock.Unlock()??
??????if?ok?{??
?????????return??
??????}??
??
??????//?The?added?part?exceeds?maxParts?count.?Assist?with?merging?other?parts.??
??????//??????
??????//?Prioritize?assisted?merges?over?searches.??????
??????storagepacelimiter.Search.Inc()??
??????err?:=?tb.mergeExistingParts(false)??
??????storagepacelimiter.Search.Dec()??
??????if?err?==?nil?{??
?????????atomic.AddUint64(&tb.assistedMerges,?1)??
?????????continue??
??????}??
??????if?errors.Is(err,?errNothingToMerge)?||?errors.Is(err,?errForciblyStopped)?{??
?????????return??
??????}??
??????logger.Panicf("FATAL:?cannot?merge?small?parts:?%s",?err)??
???}??
}
mergeRawItemsBlocks 函數(shù)將指定的內(nèi)存塊進(jìn)行 merge 合并操作,一次合并最大的內(nèi)存塊數(shù)量為 15,然后在獨立的 goroutine 中去進(jìn)行合并操作,使用 mergeInmemoryBlocks 函數(shù)。
//?lib/mergeset/table.go
//?merge?InmemoryBlocks
func?(tb?*Table)?mergeInmemoryBlocks(ibs?[]*inmemoryBlock)?*partWrapper?{??
???//?將?InmemoryBlock?列表轉(zhuǎn)換成?inmemoryPart?列表?
???//?inmemoryPart?表示內(nèi)存中的Part
???mps?:=?make([]*inmemoryPart,?0,?len(ibs))??
???for?_,?ib?:=?range?ibs?{??
??????if?len(ib.items)?==?0?{??
?????????continue??
??????}??
??????mp?:=?getInmemoryPart()??
??????mp.Init(ib)?//?將inmemoryBlock轉(zhuǎn)換為inmemoryPart
??????putInmemoryBlock(ib)??
??????mps?=?append(mps,?mp)??
???}??
???if?len(mps)?==?0?{??
??????return?nil??
???}??
???if?len(mps)?==?1?{??
??????//?沒有要合并的內(nèi)容。只需返回單個?inmemory?part。
??????mp?:=?mps[0]??
??????p?:=?mp.NewPart()??
??????return?&partWrapper{??
?????????p:????????p,??
?????????mp:???????mp,??
?????????refCount:?1,??
??????}??
???}??
???defer?func()?{??
??????for?_,?mp?:=?range?mps?{??
?????????putInmemoryPart(mp)??
??????}??
???}()??
??
???atomic.AddUint64(&tb.mergesCount,?1)??
???atomic.AddUint64(&tb.activeMerges,?1)??
???defer?atomic.AddUint64(&tb.activeMerges,?^uint64(0))??
??
???//?為每個?`inmemoryPart`?構(gòu)造?`blockStreamReader`,?用于迭代讀取?items
???bsrs?:=?make([]*blockStreamReader,?0,?len(mps))??
???for?_,?mp?:=?range?mps?{??
??????bsr?:=?getBlockStreamReader()??
??????bsr.InitFromInmemoryPart(mp)??
??????bsrs?=?append(bsrs,?bsr)??
???}??
??
???//?準(zhǔn)備一個?blockStreamWriter?用于合并寫入的?part
???bsw?:=?getBlockStreamWriter()??
???//?不要通過?getInmemoryPart()?獲取?mpDst,因為與池中的其他條目相比,它的大小可能太大。?
???//?這可能會導(dǎo)致內(nèi)存使用量增加,因為存在大量的碎片。?
???//?創(chuàng)建一個新的?inmemoryPart,接收合并的數(shù)據(jù)
???mpDst?:=?&inmemoryPart{}??
???bsw.InitFromInmemoryPart(mpDst)??
??
???//?開始?merge?數(shù)據(jù)
???//?該?merge?不應(yīng)該被?stopCh?中斷,因為它可能是?stopCh?關(guān)閉后的最終結(jié)果
???err?:=?mergeBlockStreams(&mpDst.ph,?bsw,?bsrs,?tb.prepareBlock,?nil,?&tb.itemsMerged)??
???if?err?!=?nil?{??
??????logger.Panicf("FATAL:?cannot?merge?inmemoryBlocks:?%s",?err)??
???}??
???putBlockStreamWriter(bsw)??
???for?_,?bsr?:=?range?bsrs?{??
??????putBlockStreamReader(bsr)??
???}??
??
???p?:=?mpDst.NewPart()??
???return?&partWrapper{??
??????p:????????p,??
??????mp:???????mpDst,??
??????refCount:?1,??
???}??
}
上面的函數(shù)會將指定的內(nèi)存塊轉(zhuǎn)換成 partWrapper,該結(jié)構(gòu)就是一個包含 part 和 inmemoryPart 的包裝器。
//?lib/mergeset/table.go
type?partWrapper?struct?{??
???p?*part??
??
???mp?*inmemoryPart??
??
???refCount?uint64??
??
???isInMerge?bool??
}
part 的結(jié)構(gòu)如下所示:
//?lib/mergeset/part.go
type?part?struct?{??
???ph?partHeader??
??
???path?string??
??
???size?uint64??
??
???mrs?[]metaindexRow??
??
???indexFile?fs.MustReadAtCloser??
???itemsFile?fs.MustReadAtCloser??
???lensFile??fs.MustReadAtCloser??
}
一個 part 就是 Table 下面的一個數(shù)據(jù)目錄。

part 中包含一個 partHeader,該屬性中包含當(dāng)前 part 的一些 Meta 信息,一共有多少個 items、有多少 blocks、第一個和最后一個 item,對應(yīng)著 part 目錄下面的 metadata.json 文件。
//?lib/mergeset/part_header.go
type?partHeader?struct?{??
???//?part?包含的?items?數(shù)
???itemsCount?uint64??
??
???//?part?包含的?blocks?數(shù)
???blocksCount?uint64??
??
???//?part?中的第一個?item
???firstItem?[]byte??
??
???//?part?中的最后一個?item
???lastItem?[]byte??
}
part 中另外的屬性 path 表示當(dāng)前 part 的路徑,size 表示大小,另外三個屬性 indexFile、itemsFile、lensFile 對應(yīng)中 part 目錄下面的三個文件:index.bin、items.bin、lens.bin。此外 part 結(jié)構(gòu)中還有最后一個 mrs 屬性,是一個 []metaindexRow。
//?lib/mergeset/metaindex_row.go
//?metaindexRow?描述了一個?blockHeaders?即索引塊。?
type?metaindexRow?struct?{??
???//?第一個?block?中的第一個?item?元素
???//?它用于快速查找所需的索引塊
???firstItem?[]byte??
??
???//?塊包含的?blockHeaders?的數(shù)量
???blockHeadersCount?uint32??
??
???//?索引文件中塊的偏移量
???indexBlockOffset?uint64??
??
???//?索引文件中塊的大小
???indexBlockSize?uint32??
}
除了 part 之外還有一個內(nèi)存中的 inmemoryPart 結(jié)構(gòu),其基本結(jié)構(gòu)和 part 類似,不同的是幾個相關(guān)的屬性不是文件對象,而是 ByteBuffer,因為是內(nèi)存中的結(jié)構(gòu)。
//?lib/mergeset/inmemory_part.go
//?在內(nèi)存中的?Part?結(jié)構(gòu)
type?inmemoryPart?struct?{??
???//?partHeader?記錄?itemsCount,?blocksCount,?firstItem,?lastItem?信息,?最后會序列化到?metadata.json
???ph?partHeader??
???//?當(dāng)前?block?的?header?信息,有?commonPrefix,?firstItem,?marshalType,?itemsCount,?itemsBlockOffset,?lenBlockOffset,?itemsBlockSize,?lenBlockSize
???bh?blockHeader??
???//?當(dāng)前?block?的?metaindex?信息,存儲了當(dāng)前?blockHeader?的?firstItem,?blockHeaderCount,?indexBlockOffset,?indexBlockSize
???mr?metaindexRow??
???
???//?用于序列化后寫入內(nèi)存/磁盤文件使用
???metaindexData?bytesutil.ByteBuffer??//?->?metaindex.bin
???indexData?????bytesutil.ByteBuffer??//?->?index.bin
???itemsData?????bytesutil.ByteBuffer??//?->?items.bin
???lensData??????bytesutil.ByteBuffer??//?->?lens.bin
}
其他幾個屬性上面介紹過,blockHeader 結(jié)構(gòu)如下所示,用于記錄 block 頭信息:
//?lib/mergeset/block_header.go
type?blockHeader?struct?{??
???//?塊中所有?items?的公用前綴??
???commonPrefix?[]byte??
??
???//?第一個?item
???firstItem?[]byte??
??
???//?用于塊壓縮的?Marshal?類型
???marshalType?marshalType??
??
???//?塊中的?items?數(shù),不包括第一個?item
???itemsCount?uint32??
??
???//?items?block?的偏移量
???itemsBlockOffset?uint64??
??
???//?lens?block?的偏移量
???lensBlockOffset?uint64??
??
???//?items?block?的大小
???itemsBlockSize?uint32??
??
???//?lens?block?的大小
???lensBlockSize?uint32??
}
整個 part 的結(jié)構(gòu)看上去確實比較復(fù)雜,為什么需要設(shè)計這些屬性?核心肯定就是為了快速索引,我們先往下分析,待會再回過頭來看。
inmemoryPart 是 part 讀入內(nèi)存中的結(jié)構(gòu), 在 inmemoryBlock merge 之前,每個 inmemoryBlock 都會先通過 mp.Init 轉(zhuǎn)換成一個 inmemoryPart 的結(jié)構(gòu),inmemoryPart 中 metaindexData、indexData、itemsData、lensData 數(shù)據(jù)結(jié)構(gòu)與磁盤對應(yīng)的文件內(nèi)容一致。
序列化數(shù)據(jù)
現(xiàn)在我們再回到上面的 mergeInmemoryBlocks 函數(shù),流程如下所示:
1.將所有的 inmemoryBlock轉(zhuǎn)換為inmemoryPart結(jié)構(gòu)2.為每個 inmemoryPart構(gòu)造blockStreamReader,用于迭代讀取 items3.創(chuàng)建一個新的 inmemoryPart,并構(gòu)造一個blockSteamWriter用于合并寫入的數(shù)據(jù)4.然后調(diào)用 mergeBlockStreams函數(shù)執(zhí)行真正的 merge 操作
首先通過 Init 函數(shù)將 inmemoryBlock 轉(zhuǎn)換為 inmemoryPart 結(jié)構(gòu)。
//?lib/mergeset/inmemory_part.go
//?Init?初始化?mp?從?ib.?
func?(mp?*inmemoryPart)?Init(ib?*inmemoryBlock)?{??
???mp.Reset()??
???
???sb?:=?&storageBlock{}??
???sb.itemsData?=?mp.itemsData.B[:0]??
???sb.lensData?=?mp.lensData.B[:0]??
??
???//?使用盡可能小的壓縮等級來壓縮?inmemoryPart,因為它很快就會被合并到文件?part?去。
???compressLevel?:=?-5??
???//?序列化亂序的數(shù)據(jù)
???mp.bh.firstItem,?mp.bh.commonPrefix,?mp.bh.itemsCount,?mp.bh.marshalType?=?ib.MarshalUnsortedData(sb,?mp.bh.firstItem[:0],?mp.bh.commonPrefix[:0],?compressLevel)??
???//?獲取?partHeader?值
???mp.ph.itemsCount?=?uint64(len(ib.items))??
???mp.ph.blocksCount?=?1??
???mp.ph.firstItem?=?append(mp.ph.firstItem[:0],?ib.items[0].String(ib.data)...)??
???mp.ph.lastItem?=?append(mp.ph.lastItem[:0],?ib.items[len(ib.items)-1].String(ib.data)...)??
???//?獲取itemsData,更新blockHeader的items偏移和數(shù)量
???mp.itemsData.B?=?sb.itemsData??
???mp.bh.itemsBlockOffset?=?0??
???mp.bh.itemsBlockSize?=?uint32(len(mp.itemsData.B))??
???//?獲取lensData,更新blockHeader的lens偏移和數(shù)量
???mp.lensData.B?=?sb.lensData??
???mp.bh.lensBlockOffset?=?0??
???mp.bh.lensBlockSize?=?uint32(len(mp.lensData.B))??
???//?獲取?indexData,blockHeader序列化的值
???bb?:=?inmemoryPartBytePool.Get()??
???bb.B?=?mp.bh.Marshal(bb.B[:0])??
???mp.indexData.B?=?encoding.CompressZSTDLevel(mp.indexData.B[:0],?bb.B,?0)??
???//?獲取?metaindexData,metaindexRow序列化的值
???mp.mr.firstItem?=?append(mp.mr.firstItem[:0],?mp.bh.firstItem...)??
???mp.mr.blockHeadersCount?=?1??
???mp.mr.indexBlockOffset?=?0??
???mp.mr.indexBlockSize?=?uint32(len(mp.indexData.B))??
???bb.B?=?mp.mr.Marshal(bb.B[:0])??
???mp.metaindexData.B?=?encoding.CompressZSTDLevel(mp.metaindexData.B[:0],?bb.B,?0)??
???inmemoryPartBytePool.Put(bb)??
}
上面的函數(shù)將 inmemoryBlock 轉(zhuǎn)換成 inmemoryPart,首先會通過一個 MarshalUnsortedData 函數(shù)來序列化未排序的數(shù)據(jù)。
//?MarshalUnsortedData?序列化未排序的?items?從?ib?到?sb.
//??
//?It?also:??
//?-?將第一個?item?追加到?firstItemDst?并返回結(jié)果??
//?-?將所有?item?的公共前綴附加到?commonPrefixDst?并返回結(jié)果??
//?-?返回包含第一個?item?的編碼項的數(shù)量??
//?-?返回用于編碼的?marshal?類型??
func?(ib?*inmemoryBlock)?MarshalUnsortedData(sb?*storageBlock,?firstItemDst,?commonPrefixDst?[]byte,?compressLevel?int)?([]byte,?[]byte,?uint32,?marshalType)?{??
???if?!ib.isSorted()?{??
??????sort.Sort(ib)?//?排序??
???}??
???//?更新內(nèi)存塊的公共前綴??
???ib.updateCommonPrefix()??
???//?序列化數(shù)據(jù)??
???return?ib.marshalData(sb,?firstItemDst,?commonPrefixDst,?compressLevel)??
}
上面的序列化函數(shù)中首先會對未排序的數(shù)據(jù)進(jìn)行排序,然后更新內(nèi)存塊的公共前綴:
//?lib/mergeset/encoding.go
//?更新公共前綴??
func?(ib?*inmemoryBlock)?updateCommonPrefix()?{??
???ib.commonPrefix?=?ib.commonPrefix[:0]??//?公共前綴
???if?len(ib.items)?==?0?{??
??????return??
???}??
???items?:=?ib.items??????????//?數(shù)據(jù)前后位置??
???data?:=?ib.data????????????//?數(shù)據(jù)??
???cp?:=?items[0].Bytes(data)?//?第一段數(shù)據(jù)??
???if?len(cp)?==?0?{??
??????return??
???}??
???for?_,?it?:=?range?items[1:]?{?//?后面的數(shù)據(jù)??
??????//?計算公共前綴的長度??
??????cpLen?:=?commonPrefixLen(cp,?it.Bytes(data))??
??????if?cpLen?==?0?{??
?????????return??
??????}??
??????//?截取公共前綴數(shù)據(jù)??
??????cp?=?cp[:cpLen]??
???}??
???//?設(shè)置內(nèi)存塊的公共前綴??
???ib.commonPrefix?=?append(ib.commonPrefix[:0],?cp...)??
}
公共前綴就是把每段數(shù)據(jù)包含的共同前綴提取出來,這樣存儲的時候后面就可以不需要存儲共同的部分了,減少存儲空間。
公共前綴提取出來后,接下來調(diào)用 marshalData 函數(shù)去序列化數(shù)據(jù)。
//?lib/mergeset/encoding.go
//?前提條件:??
//?-?ib.items?必須排序??
//?-?updateCommonPrefix?必須被調(diào)用??
//?序列化數(shù)據(jù)??
func?(ib?*inmemoryBlock)?marshalData(sb?*storageBlock,?firstItemDst,?commonPrefixDst?[]byte,?compressLevel?int)?([]byte,?[]byte,?uint32,?marshalType)?{??
???......??
???//?拷貝?inmemoryBlock?數(shù)據(jù)塊的?firstItem(排序后的第一條數(shù)據(jù))??
???data?:=?ib.data??????????????????????//?內(nèi)存塊數(shù)據(jù)??
???firstItem?:=?ib.items[0].Bytes(data)?//?第一條數(shù)據(jù)??
???firstItemDst?=?append(firstItemDst,?firstItem...)??
???//?最大公共前綴??
???commonPrefixDst?=?append(commonPrefixDst,?ib.commonPrefix...)?
???//?內(nèi)存塊數(shù)據(jù)小于2段或(數(shù)據(jù)大小-公共前綴長度*數(shù)據(jù)段大小?64)?則定義為小塊??
???if?len(data)-len(ib.commonPrefix)*len(ib.items)?64?||?len(ib.items)?2?{??
??????//?對small?block使用普通序列化,因為它更便宜??
??????ib.marshalDataPlain(sb)??
??????return?firstItemDst,?commonPrefixDst,?uint32(len(ib.items)),?marshalTypePlain??
???}??
??
???bbItems?:=?bbPool.Get()??
???bItems?:=?bbItems.B[:0]?//?保存目的?items?數(shù)據(jù)的內(nèi)存?buffer??
??
???bbLens?:=?bbPool.Get()??
???bLens?:=?bbLens.B[:0]?//?保存目的?lens?數(shù)據(jù)的內(nèi)存buffer??
??
???//?序列化?items?數(shù)據(jù)??
???//?第一項數(shù)據(jù)不需要存儲,所以獲取的?Uint64s?大小要減1??
???xs?:=?encoding.GetUint64s(len(ib.items)?-?1)?
???defer?encoding.PutUint64s(xs)??
??
???cpLen?:=?len(ib.commonPrefix)?//?公共前綴的長度??
???prevItem?:=?firstItem[cpLen:]?//?第一項數(shù)據(jù)(排除公共前綴)??
???prevPrefixLen?:=?uint64(0)??
???//?從第二個元素開始遍歷(第一個?firstItem?單獨存儲)??
???for?i,?it?:=?range?ib.items[1:]?{?
??????//?偏移到公共前綴之后的位置
??????it.Start?+=?uint32(cpLen)???
??????//?Bytes(data)?得到的數(shù)據(jù)不包含公共前綴的部分???????????????????????????
??????item?:=?it.Bytes(data)??
??????//?計算第?N?項和?N-1?項的公共前綴長度???????????????????????????????
??????prefixLen?:=?uint64(commonPrefixLen(prevItem,?item))???
??????//?僅僅只把差異的部分拷貝到目的buffer?
??????bItems?=?append(bItems,?item[prefixLen:]...)?????
??????//?第一次,與0異或,還是等于原值。異或后,兩個整數(shù)值前面相同的部分都為0了,數(shù)值變得更短,能夠便于壓縮。?????
??????xLen?:=?prefixLen?^?prevPrefixLen???????
??????//?上次的除去公共前綴的item?????????????????
??????prevItem?=?item??????????????????
??????//?上次計算得到的公共前綴長度??????????????????????
??????prevPrefixLen?=?prefixLen????????????????????????????
??
??????xs.A[i]?=?xLen?//?異或后的公共前綴值??
???}
???//?對N-1個長度進(jìn)行序列化(將uint64數(shù)組序列化成byte數(shù)組)??
???bLens?=?encoding.MarshalVarUint64s(bLens,?xs.A)????????????????????????????????????
???//?將items數(shù)據(jù)(只有差異的部分)ZSTD壓縮后,寫入storageBlock?
???sb.itemsData?=?encoding.CompressZSTDLevel(sb.itemsData[:0],?bItems,?compressLevel)??
??
???bbItems.B?=?bItems??
???bbPool.Put(bbItems)??
??
???//?序列化?lens?數(shù)據(jù)??
???//?第一項數(shù)據(jù)大小(排除公共前綴)
???prevItemLen?:=?uint64(len(firstItem)?-?cpLen)???
???for?i,?it?:=?range?ib.items[1:]?{?????????????//?從第二個元素開始遍歷?
??????//?item長度?=?End-Start-公共前綴大小???
??????itemLen?:=?uint64(int(it.End-it.Start)?-?cpLen)?
??????//?與前面一個元素長度異或?
??????xLen?:=?itemLen?^?prevItemLen????
??????//?上次去除公共前綴的長度??????????????????
??????prevItemLen?=?itemLen???????????????????????????
??
??????xs.A[i]?=?xLen?//?異或后的元素長度??
???}??
???//?前面記錄的是兩兩相對的長度,這里記錄的是數(shù)據(jù)的真實長度??
???//?長度信息包含兩種,相對長度和總長度??
???bLens?=?encoding.MarshalVarUint64s(bLens,?xs.A)???
???//?將lens數(shù)據(jù)進(jìn)行ZSTD壓縮后,寫入storageBlock????????????????????????????????
???sb.lensData?=?encoding.CompressZSTDLevel(sb.lensData[:0],?bLens,?compressLevel)?
??
???bbLens.B?=?bLens??
???bbPool.Put(bbLens)??
??
???//?如果壓縮不到90%則選擇不壓縮??
???if?float64(len(sb.itemsData))?>?0.9*float64(len(data)-len(ib.commonPrefix)*len(ib.items))?{??
??????//?壓縮率不高的時候,選擇不壓縮??
??????ib.marshalDataPlain(sb)??
??????return?firstItemDst,?commonPrefixDst,?uint32(len(ib.items)),?marshalTypePlain??
???}??
??
???//?很好的壓縮率??
???return?firstItemDst,?commonPrefixDst,?uint32(len(ib.items)),?marshalTypeZSTD??
}
上面的序列化函數(shù)看上去比較復(fù)雜,實際上核心的一點就是想辦法盡可能減少存儲空間。首先將數(shù)據(jù)塊的第一個數(shù)據(jù)拷貝出來放入 firstItemDst,然后后面就從第二個元素開始去循環(huán)處理,首先計算第 N 項和 N-1 項的公共前綴長度,然后將差異的數(shù)據(jù)部分保存起來,為了能夠反序列化回數(shù)據(jù),還需要將兩兩之間公共前綴的長度保存下來,為了能夠便于壓縮,使用異或的方式來計算兩兩之間的公共前綴長度值。
循環(huán)計算后,將保存的兩兩之間的公共前綴長度進(jìn)行序列化,下面的函數(shù)將一個 uint64 類型的切片轉(zhuǎn)換成字節(jié)切片,如果數(shù)據(jù)小于 128 直接轉(zhuǎn)換即可,如果大于 127 則用一個 7bit 來表示數(shù)值的內(nèi)容,最高位后面的一個字節(jié)用來表示長度,這樣就可以用變長長度來序列化數(shù)值,而不是每個數(shù)值都占用固定的長度。
//?lib/encoding/int.go
//?將uint64切片轉(zhuǎn)成字節(jié)切片
func?MarshalVarUint64s(dst?[]byte,?us?[]uint64)?[]byte?{??
???for?_,?u?:=?range?us?{??
??????if?u?0x80?{?//?小于128,直接加入到?dst,能直接存到?byte?中去??
?????????//?Fast?path??
?????????dst?=?append(dst,?byte(u))??
?????????continue??
??????}??
??????for?u?>?0x7f?{?//?大于127,則超過的部分保留為?0x80,低位右移7位繼續(xù)計算??
?????????dst?=?append(dst,?0x80|byte(u))??
?????????u?>>=?7??
??????}??
??????dst?=?append(dst,?byte(u))??
???}??
???return?dst??
}
長度數(shù)據(jù)序列化后,將 items 數(shù)據(jù)(只有差異的部分)進(jìn)行 ZSTD 壓縮后,寫入 storageBlock。
只記錄兩兩之間的公共前綴長度還不夠,還需要記錄數(shù)據(jù)的真實長度,最后同樣再將 lens 數(shù)據(jù)進(jìn)行 ZSTD 壓縮后,寫入 storageBlock。
如果最后的結(jié)果壓縮不到 90% 則選擇不壓縮,不壓縮則使用 marshalDataPlain 函數(shù)進(jìn)行序列化:
//?lib/mergeset/encoding.go
//?普通序列化數(shù)據(jù)??
func?(ib?*inmemoryBlock)?marshalDataPlain(sb?*storageBlock)?{??
???data?:=?ib.data??
??
???//?序列化?items?數(shù)據(jù)??
???//?不需要序列化第一項數(shù)據(jù),因為它會在?marshalData?中返回給調(diào)用者。??
???cpLen?:=?len(ib.commonPrefix)?//?公共前綴長度??
???b?:=?sb.itemsData[:0]??
???for?_,?it?:=?range?ib.items[1:]?{?//?第一項之后的數(shù)據(jù)??
??????it.Start?+=?uint32(cpLen)?????????//?跳過公共前綴??
??????b?=?append(b,?it.String(data)...)?//?添加移出公共前綴的數(shù)據(jù)??
???}??
???sb.itemsData?=?b?//?itemsData數(shù)據(jù)??
??
???//?序列化?lens?數(shù)據(jù)??
???b?=?sb.lensData[:0]??
???for?_,?it?:=?range?ib.items[1:]?{?//?第一項之后的數(shù)據(jù)??
??????//?原始的End-Start-公共前綴長度??
??????b?=?encoding.MarshalUint64(b,?uint64(int(it.End-it.Start)-cpLen))?
???}??
???sb.lensData?=?b??
}
經(jīng)過上面的序列化過后就可以得到第一個數(shù)據(jù)、公共前綴、items 個數(shù)以及序列化類型,然后將這些數(shù)據(jù)存入 blockHeader 中去,后面就是一些比較簡單的常規(guī)操作。
轉(zhuǎn)換成 inmemoryPart 后,再包裝成 blockStreamReader,創(chuàng)建一個新的 inmemoryPart,并構(gòu)造一個 blockSteamWriter 用于合并寫入的數(shù)據(jù),然后調(diào)用 mergeBlockStreams 函數(shù)執(zhí)行真正的 merge 操作。
//?lib/mergeset/merge.go
//?mergeBlockStreams?合并?bsrs?并將結(jié)果寫入?bsw
//??
//?也填充了?ph??
//??
//?prepareBlock?是可選的?
//??
//?當(dāng)?stopCh?關(guān)閉時,該函數(shù)立即返回
//??
//?它還以原子方式將合并的?items?添加到?itemsMerged
func?mergeBlockStreams(ph?*partHeader,?bsw?*blockStreamWriter,?bsrs?[]*blockStreamReader,?prepareBlock?PrepareBlockCallback,?stopCh?<-chan?struct{},??
???itemsMerged?*uint64)?error?{??
???//?將多個?blockStreamReader?構(gòu)造成一個?blockStreamMerger?結(jié)構(gòu)
???bsm?:=?bsmPool.Get().(*blockStreamMerger)??
???if?err?:=?bsm.Init(bsrs,?prepareBlock);?err?!=?nil?{??
??????return?fmt.Errorf("cannot?initialize?blockStreamMerger:?%w",?err)??
???}??
???err?:=?bsm.Merge(bsw,?ph,?stopCh,?itemsMerged)??
???bsm.reset()??
???bsmPool.Put(bsm)??
???bsw.MustClose()??
???if?err?==?nil?{??
??????return?nil??
???}??
???return?fmt.Errorf("cannot?merge?%d?block?streams:?%s:?%w",?len(bsrs),?bsrs,?err)??
}
首先把多個 blockStreamReader 構(gòu)造成一個 blockStreamMerger 結(jié)構(gòu), merger 里面主要是一個 bsrHeap 堆用于維護(hù) bsrs,用于 merge 數(shù)據(jù)時的排序。首先通過 merger 的 Init 函數(shù)構(gòu)造堆排序的結(jié)構(gòu),然后核心是調(diào)用 merger 的 Merge 函數(shù)進(jìn)行處理。
//?lib/mergeset/merge.go
func?(bsm?*blockStreamMerger)?Merge(bsw?*blockStreamWriter,?ph?*partHeader,?stopCh?<-chan?struct{},?itemsMerged?*uint64)?error?{??
again:??
???if?len(bsm.bsrHeap)?==?0?{??
??????//?將最后的?inmemoryBlock(可能不完整)寫入?bsw
??????bsm.flushIB(bsw,?ph,?itemsMerged)??
??????return?nil??
???}??
??
???select?{??
???case?<-stopCh:??
??????return?errForciblyStopped??
???default:??
???}??
???//?取出?blockStreamReader
???bsr?:=?heap.Pop(&bsm.bsrHeap).(*blockStreamReader)??
??
???var?nextItem?[]byte??//?下一個?blockStreamReader
???hasNextItem?:=?false??
???if?len(bsm.bsrHeap)?>?0?{??
??????nextItem?=?bsm.bsrHeap[0].bh.firstItem??
??????hasNextItem?=?true??
???}??
???items?:=?bsr.Block.items??
???data?:=?bsr.Block.data??
???//?循環(huán)所有的?items
???for?bsr.blockItemIdx?len(bsr.Block.items)?{??
??????item?:=?items[bsr.blockItemIdx].Bytes(data)??
??????if?hasNextItem?&&?string(item)?>?string(nextItem)?{??
?????????break??
??????}??
??????//?添加元素
??????if?!bsm.ib.Add(item)?{??
?????????//?bsm.ib?已滿,將其刷新到?bsw?并繼續(xù)
?????????bsm.flushIB(bsw,?ph,?itemsMerged)??
?????????continue??
??????}??
??????bsr.blockItemIdx++??
???}??
???if?bsr.blockItemIdx?==?len(bsr.Block.items)?{??
??????//?bsr.Block?已完全讀取,處理下一個?block
??????if?bsr.Next()?{??
?????????heap.Push(&bsm.bsrHeap,?bsr)??
?????????goto?again??
??????}??
??????if?err?:=?bsr.Error();?err?!=?nil?{??
?????????return?fmt.Errorf("cannot?read?storageBlock:?%w",?err)??
??????}??
??????goto?again??
???}??
???//?bsr.Block?中的下一個?item?超過了?nextItem
???//?調(diào)整?bsr.bh.firstItem?并將?bsr?返回到堆
???bsr.bh.firstItem?=?append(bsr.bh.firstItem[:0],?bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...)??
???heap.Push(&bsm.bsrHeap,?bsr)??
???goto?again??
}
這里主要解決的問題是多個有序的字節(jié)數(shù)組(inmemoryPart),按照字節(jié)序排序,合成一個 inmemoryPart 的過程,在 merge 的過程中,每 64KB 會單獨創(chuàng)建一個 blockHeader,用于快速索引該 block 里面的 Items。
持久化數(shù)據(jù)
最后重復(fù)上面的過程,將 n 個 inmemoryBlock 合并成 (n-1)/defaultPartsToMerge+1 個 inmemoryPart,最后再調(diào)用 mergeParts 函數(shù)完成索引持久化操作,持久化后生成的索引 part,主要包含 metaindex.bin、index.bin、lens.bin、items.bin、metadata.json 等 5 個文件。
這幾個文件的關(guān)系如下圖所示, metaindex.bin 文件通過 metaindexRow 索引 index.bin 文件,index.bin 文件通過 indexBlock 中的 blockHeader 同時索引 items.bin 文件和 items.bin 文件。
metaindex.bin:文件包含一系列的 metaindexRow 數(shù)據(jù),每個 metaindexRow 中包含第一條數(shù)據(jù) firstItem、索引塊包含的塊頭部數(shù) blockHeadersCount、索引塊偏移 indexBlockOffset 以及索引塊大小 indexBlockSize。
metaindexRow在文件中按照firstItem的大小的字典序排序存儲,以支持二分查找metaindex.bin文件使用 ZSTD 進(jìn)行壓縮metaindex.bin文件中的內(nèi)容在 part 打開時,會全部讀出加載至內(nèi)存中,以加速查詢過濾metaindexRow包含的firstItem為其索引的indexBlock中所有blockHeader中字典序最小的firstItem查找時根據(jù) firstItem進(jìn)行二分檢索
index.bin:文件中包含一系列的 indexBlock, 每個 indexBlock 又包含一系列 blockHeader,每個 blockHeader 包含 item 的公共前綴 commonPrefix、第一項數(shù)據(jù) firstItem、itemsData 的序列化類型 marshalType、itemsData 包含的 item 數(shù)、item 塊的偏移 itemsBlockOffset 等內(nèi)容,就是前面使用將 inmemoryBlock 轉(zhuǎn)換為 inmemoryPart 結(jié)構(gòu)的 Init 函數(shù)得到的。
每個 indexBlock使用ZSTD壓縮算法進(jìn)行壓縮在 indexBlock中查找時,根據(jù)firstItem進(jìn)行二分檢索blockHeader
items.bin 文件中,包含一系列的 itemsData, 每個 itemsData 又包含一系列的 Item。
itemsData會視情況而定來是否使用 ZTSD 壓縮,當(dāng) item 個數(shù)小于 2 時,或者itemsData的長度小于 64 字節(jié)時,不壓縮;當(dāng)itemsData使用 ZSTD 壓縮后的壓縮率大于90%的時候也不壓縮每個 item 在存儲時,去掉了 blockHeader中的公共前綴commonPrefix以提高壓縮率
lens.bin 文件中,包含一系列的 lensData, 每個 lensData 又包含一系列 8 字節(jié)的長度 len, 長度 len 標(biāo)識 items.bin 文件中對應(yīng) item 的長度。在讀取或者需要解析 itemsData 中的 item 時,先要讀取對應(yīng)的 lensData 中對應(yīng)的長度 len。 當(dāng) itemsData 進(jìn)行壓縮時,lensData 會先使用異或算法進(jìn)行壓縮,然后再使用 ZSTD 算法進(jìn)一步壓縮。
到這里我們就了解了索引數(shù)據(jù)是實現(xiàn)和存儲原理了,那么真正的指標(biāo)數(shù)據(jù)又是如何去存儲的呢?未完待續(xù)......
