VictorialMetrics源碼分析之插入指標(biāo)數(shù)據(jù)
為了調(diào)試方便,這里我們將 VictorialMetrics 代碼使用 Goland 打開。每個組件的入口位于 app/,比如 vmstorage 組件的入口位于 app/vmstorage/main.go:

為了對 VM 整個流暢分析,我們可以直接在 IDE 中來啟動這些組件。
直接在 vmstorage 入口的 main 函數(shù)上點擊 Run 'go build main.go' 即可啟動該組件:

通過日志記錄可以看出 vmstorage 會在 8401 端口監(jiān)聽 vmselect 的連接請求,在 8400 端口監(jiān)聽 vminsert 的連接請求,其本身的服務(wù)會通過 8482 端口進(jìn)行暴露。啟動后會在根目錄下面創(chuàng)建一個名為 vmstorage-data 的數(shù)據(jù)目錄,該目錄就是用來保存 VM 的數(shù)據(jù)的,其中 data 目錄是監(jiān)控指標(biāo)數(shù)據(jù)目錄,indexdb 目錄是索引數(shù)據(jù)目錄,snapshots 是快照目錄,flock.lock 為文件鎖文件,用于 VM 進(jìn)程鎖住文件,不允許別的進(jìn)程進(jìn)行修改目錄或文件,如下所示:

數(shù)據(jù)目錄 data 下面包含兩個最主要的目錄big 目錄和small 目錄,這兩個目錄的結(jié)構(gòu)是一樣的。
small 目錄:內(nèi)存中的數(shù)據(jù)先持久化到目錄,壓縮比例高,會定期檢測判斷是否滿足 merge 條件,合并多個小文件。 big 目錄:small 過大后會合并到 big 目錄,壓縮比例極高。
索引目錄 indexdb 下面包含兩個目錄 16F29B51EDD96911、16F29B51EDD96912,這兩個目錄分別表示當(dāng)前正在使用的索引目錄,和前面一次使用的索引目錄,為什么需要保留前面一次使用的呢?
這是因為 VM 中會配置自動輪換的周期,比如可以配置1天、1周、1月等等,那么這個周期到了后索引數(shù)據(jù)就要輪換,就相當(dāng)于會創(chuàng)建一個新的目錄作為最新的索引數(shù)據(jù)目錄,但是如果你直接將前面一個到期的索引刪除,那么現(xiàn)在就沒有任何索引了,此時如果有大量的插入或者查詢操作的話比如就需要去生成大量的索引,而生成索引的是非常消耗資源的,索引會造成系統(tǒng)性能急劇下降,保留前面一個索引可以來判斷新的數(shù)據(jù)是否能命中前面的緩存,如果命中了則直接將之前的索引拷貝到最新的索引中來,這樣就大大提高了索引的效率,索引我們需要保留兩個索引,之前的索引則會刪除掉。
索引的名稱是根據(jù)系統(tǒng)的納秒時間戳原子+1后生成的16進(jìn)制數(shù)據(jù):
//?lib/storage/storage.go
func?nextIndexDBTableName()?string?{??
???n?:=?atomic.AddUint64(&indexDBTableIdx,?1)??
???return?fmt.Sprintf("%016X",?n)??
}??
??
var?indexDBTableIdx?=?uint64(time.Now().UnixNano())
啟動 vmstorage 的時候就會去打開索引,默認(rèn)路徑為 :
//?lib/storage/storage.go
//?打開索引數(shù)據(jù)表?path=vmstorage-data/indexdb??
func?(s?*Storage)?openIndexDBTables(path?string)?(curr,?prev?*indexDB,?err?error)?{??
???//索引目錄不存在則創(chuàng)建??
???if?err?:=?fs.MkdirAllIfNotExist(path);?err?!=?nil?{??
??????return?nil,?nil,?fmt.Errorf("cannot?create?directory?%q:?%w",?path,?err)??
???}??
??
???d,?err?:=?os.Open(path)??
???if?err?!=?nil?{??
??????return?nil,?nil,?fmt.Errorf("cannot?open?directory:?%w",?err)??
???}??
???defer?fs.MustClose(d)??
??
???//?搜索最近的兩個表,最后一個表示活躍狀態(tài)的,前面一個包含備份數(shù)據(jù)??
???fis,?err?:=?d.Readdir(-1)??
???if?err?!=?nil?{??
??????return?nil,?nil,?fmt.Errorf("cannot?read?directory:?%w",?err)??
???}??
???var?tableNames?[]string??
???for?_,?fi?:=?range?fis?{??
??????if?!fs.IsDirOrSymlink(fi)?{??
?????????//?不是目錄則跳過??
?????????continue??
??????}??
??????tableName?:=?fi.Name()??
??????if?!indexDBTableNameRegexp.MatchString(tableName)?{??
?????????//?名稱不符合規(guī)范也有跳過??
?????????continue??
??????}??
??????//?剩下的就是所有的表名稱了??
??????tableNames?=?append(tableNames,?tableName)??
???}??
???//?對表名進(jìn)行排序??
???sort.Slice(tableNames,?func(i,?j?int)?bool?{??
??????return?tableNames[i]????})??
???//?如果表名個數(shù)小于2,則創(chuàng)建??
???if?len(tableNames)?2?{??
??????//?如果沒有表名,則先創(chuàng)建前面一個表名??
??????if?len(tableNames)?==?0?{??
?????????//?生成前面一個表名??
?????????prevName?:=?nextIndexDBTableName()??
?????????tableNames?=?append(tableNames,?prevName)??
??????}??
??????//生成后面的一個表名(在前面表名的基礎(chǔ)上做原子+1操作的16進(jìn)制數(shù)據(jù))??
??????currName?:=?nextIndexDBTableName()??
??????tableNames?=?append(tableNames,?currName)??
???}??
??
???//?Invariant:?len(tableNames)?>=?2??
??
???//?如果操過2個表,則只保留最后兩個表,其他不需要了,沒意義,因為過期了??
???for?_,?tn?:=?range?tableNames[:len(tableNames)-2]?{??
??????pathToRemove?:=?path?+?"/"?+?tn??
??????logger.Infof("removing?obsolete?indexdb?dir?%q...",?pathToRemove)??
??????fs.MustRemoveAll(pathToRemove)??
??????logger.Infof("removed?obsolete?indexdb?dir?%q",?pathToRemove)??
???}??
??
???//?持久化變更??
???fs.MustSyncPath(path)??
??
???//?打開最后兩個表??
???currPath?:=?path?+?"/"?+?tableNames[len(tableNames)-1]??
???logger.Infof("1.prepare?open?index?db?currPath?%s",?currPath)??
???curr,?err?=?openIndexDB(currPath,?s,?0)??
???if?err?!=?nil?{??
??????return?nil,?nil,?fmt.Errorf("cannot?open?curr?indexdb?table?at?%q:?%w",?currPath,?err)??
???}??
??
???prevPath?:=?path?+?"/"?+?tableNames[len(tableNames)-2]??
???logger.Infof("2.prepare?open?index?db?prevPath?%s",?prevPath)??
???prev,?err?=?openIndexDB(prevPath,?s,?0)??
???if?err?!=?nil?{??
??????curr.MustClose()??
??????return?nil,?nil,?fmt.Errorf("cannot?open?prev?indexdb?table?at?%q:?%w",?prevPath,?err)??
???}??
??
???return?curr,?prev,?nil??
}
當(dāng)索引目錄不存在的時候會創(chuàng)建該目錄,然后去該目錄中查找最近的兩個索引,如果沒有兩個索引,則去生成對應(yīng)的索引目錄,索引的名稱就是上面的納秒時間戳原子+1后的16進(jìn)制數(shù)據(jù),然后通過 openIndexDB 函數(shù)分別打開這兩個索引。
openIndexDB 函數(shù)用于打開指定路徑的索引,其實就是生成一個 indexDB 對象,indexDB 結(jié)構(gòu)體定義如下所示:
//?lib/storage/index_db.go
//?indexDB?代表一個?index?db.??
type?indexDB?struct?{??
???//?原子計數(shù)器必須位于結(jié)構(gòu)體的頂部,以便在32位架構(gòu)上正確對齊8個字節(jié)
???//?See?https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212?.??
???refCount?uint64??
??
???//?新創(chuàng)建的時間序列的計數(shù)器,可用于確定時間序列的 churn rate。?
???newTimeseriesCreated?uint64??
???//?在輪換后從以前的 indexDB 重新填充的時間序列的計數(shù)器。?
???timeseriesRepopulated?uint64??
???//?MetricID?->?TSID?條目?miss?的數(shù)量?
???//?該值比率如果較高則證明?indexDB?損壞了
???missingTSIDsForMetricID?uint64??
???//?date?range?搜索的調(diào)用數(shù)
???dateRangeSearchCalls?uint64??
???//?date?range?搜索的命中數(shù)
???dateRangeSearchHits?uint64??
???//?全局搜索調(diào)用次數(shù)
???globalSearchCalls?uint64??
???//?MetricID?->?MetricName?條目?miss?的數(shù)量
???//?高比率可能意味著由于不干凈的關(guān)機(jī)導(dǎo)致索引數(shù)據(jù)庫損壞。?
???//?之后必須自動恢復(fù)db
???missingMetricNamesForMetricID?uint64??
???//?標(biāo)記為刪除
???mustDrop?uint64??
??
???//?標(biāo)識索引的?生成?ID(可以看成是第幾代索引),并用于同步來自不同?indexDB?的數(shù)據(jù)
???generation?uint64??
???// indexDB 輪換的unix時間戳(以秒為單位)。?
???rotationTimestamp?uint64??
???//?索引名稱
???name?string??
???//?Table?表結(jié)構(gòu)
???tb???*mergeset.Table??
???//?相當(dāng)于之前的一個?indexDB
???extDB?????*indexDB??
???extDBLock?sync.Mutex??
??
???//?用于快速查找?TagFilters?->?TSIDs?的緩存
???tagFiltersCache?*workingsetcache.Cache??
??
???//?父級存儲引用
???s?*Storage??
????
???//?(date,?tagFilter)?->?loopsCount?的緩存
???//?用于減少匹配一組過濾器時的工作量。
???loopsPerDateTagFilterCache?*workingsetcache.Cache??
???//?索引搜索的對象池
???indexSearchPool?sync.Pool??
}
openIndexDB 函數(shù)實現(xiàn)代碼如下所示,整體比較簡單,就是去構(gòu)造一個 indexDB 對象,索引路徑的最后一段(也就是文件夾的名稱)轉(zhuǎn)換成10進(jìn)制的數(shù)據(jù)就會用來表示 indexDB 的 generation:
//?lib/storage/index_db.go
//?openIndexDB?從指定路徑打開索引?db?文件??
//??
//?path?路徑的最后一段應(yīng)該是一個唯一的16進(jìn)制數(shù)據(jù),會被用作?indexDB.generation??
//??
//?當(dāng)在?indexdb?輪換期間創(chuàng)建新的?indexdb?時,ipenIndexDB?被調(diào)用時??
// rotationTimestamp 必須設(shè)置為當(dāng)前的 unix 時間戳。??
func?openIndexDB(path?string,?s?*Storage,?rotationTimestamp?uint64)?(*indexDB,?error)?{??
???if?s?==?nil?{??
??????logger.Panicf("BUG:?Storage?must?be?nin-nil")??
???}??
??
???//?獲取路徑的最后一段,也就是索引表(文件夾)的名稱??
???name?:=?filepath.Base(path)??
???//?將16進(jìn)制數(shù)據(jù)轉(zhuǎn)換成10進(jìn)制的數(shù)據(jù),用來表示?indexDB.generation???
???gen,?err?:=?strconv.ParseUint(name,?16,?64)??
???logger.Infof("Open?Index?DB?path?%s,?and?gen?%d",?name,?gen)??
???if?err?!=?nil?{??
??????return?nil,?fmt.Errorf("failed?to?parse?indexdb?path?%q:?%w",?path,?err)??
???}??
??
???tb,?err?:=?mergeset.OpenTable(path,?invalidateTagFiltersCache,?mergeTagToMetricIDsRows)??
???if?err?!=?nil?{??
??????return?nil,?fmt.Errorf("cannot?open?indexDB?%q:?%w",?path,?err)??
???}??
??
???//?不要將 tagFiltersCache 保存在文件中,因為它非常不穩(wěn)定。?
???mem?:=?memory.Allowed()??
??
???db?:=?&indexDB{??
??????refCount:??????????1,??
??????generation:????????gen,??
??????rotationTimestamp:?rotationTimestamp,??
??????tb:????????????????tb,??
??????name:??????????????name,??
??
??????tagFiltersCache:????????????workingsetcache.New(mem?/?32),??
??????s:??????????????????????????s,??
??????loopsPerDateTagFilterCache:?workingsetcache.New(mem?/?128),??
???}??
???return?db,?nil??
}
構(gòu)造 indexDB 對象中最核心部分就是獲取 Table 表對象了,通過 mergeset.OpenTable 函數(shù)來實現(xiàn)。要搞清楚這個 Table 表是什么,首先我們需要去看下其結(jié)構(gòu)定義:
//?lib/mergeset/table.go
//?Table?代表?mergeset?表.??
type?Table?struct?{??
???//?原子更新的計數(shù)器必須在結(jié)構(gòu)體最前面,這樣在32位架構(gòu)上可以正確地對齊到8字節(jié)。??
???//?See?https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212??
???activeMerges???uint64??
???mergesCount????uint64??
???itemsMerged????uint64??
???assistedMerges?uint64??
??
???mergeIdx?uint64??
??
???path?string??
???//?將數(shù)據(jù)刷新到存儲的回調(diào)
???flushCallback?????????func()????
???flushCallbackWorkerWG?sync.WaitGroup??
???needFlushCallbackCall?uint32??
???//?block?準(zhǔn)備好的回調(diào)
???prepareBlock?PrepareBlockCallback??
??
???partsLock?sync.Mutex??
???//?包含的?part?列表
???parts?????[]*partWrapper??
??
???// rawItems 包含最近添加的尚未轉(zhuǎn)換為 parts 的數(shù)據(jù)。
???//?出于性能原因,未在搜索中使用?rawItems??
???rawItems?rawItemsShards??
??
???snapshotLock?sync.RWMutex??
??
???flockF?*os.File??
??
???stopCh?chan?struct{}??
??
???//?使用 syncwg 而不是sync,因為可以從并發(fā) goroutine 調(diào)用 Add/Wait。
???partMergersWG?syncwg.WaitGroup??
???
???rawItemsFlusherWG?sync.WaitGroup??
???convertersWG?sync.WaitGroup??
??
???//?使用 syncwg 而不是sync,因為可以從并發(fā) goroutine 調(diào)用 Add/Wait。
???rawItemsPendingFlushesWG?syncwg.WaitGroup??
}
OpenTable 函數(shù)實現(xiàn)如下所示,首先會判斷表目錄是否存在,不存在就創(chuàng)建這個目錄,然后創(chuàng)建 flock.lock 文件防止并發(fā)打開,然后就是核心的 openParts 函數(shù)打開表的 part 列表:
//?lib/mergeset/table.go
//?OpenTable?在指定路徑上打開一個?table
//??
//?每次將新數(shù)據(jù)批次刷新到底層存儲并對搜索可見時,都會調(diào)用可選的 flushCallback 回調(diào)。
//??
//?在將準(zhǔn)備好的 block 塊刷新到持久存儲之前,在合并期間調(diào)用可選的 prepareBlock 回調(diào)。
//??
//?如果該表還不存在,則創(chuàng)建該表。
func?OpenTable(path?string,?flushCallback?func(),?prepareBlock?PrepareBlockCallback)?(*Table,?error)?{??
???path?=?filepath.Clean(path)??
???logger.Infof("opening?table?%q...",?path)??
???startTime?:=?time.Now()??
??
???//?如果表還不存在,那么為它創(chuàng)建一個目錄
???if?err?:=?fs.MkdirAllIfNotExist(path);?err?!=?nil?{??
??????return?nil,?fmt.Errorf("cannot?create?directory?%q:?%w",?path,?err)??
???}??
??
???//?創(chuàng)建?flock.lock?文件,防止并發(fā)打開
???flockF,?err?:=?fs.CreateFlockFile(path)??
???if?err?!=?nil?{??
??????return?nil,?err??
???}??
??
???//?打開表?parts
???pws,?err?:=?openParts(path)??
???if?err?!=?nil?{??
??????return?nil,?fmt.Errorf("cannot?open?table?parts?at?%q:?%w",?path,?err)??
???}??
??
???tb?:=?&Table{??
??????path:??????????path,??
??????flushCallback:?flushCallback,??
??????prepareBlock:??prepareBlock,??
??????parts:?????????pws,??
??????mergeIdx:??????uint64(time.Now().UnixNano()),??
??????flockF:????????flockF,??
??????stopCh:????????make(chan?struct{}),??
???}??
???//?初始化?rawItems
???tb.rawItems.init()??
???//?開始執(zhí)行?partMerges?的工作
???tb.startPartMergers()??
???//?開始執(zhí)行?rawItems?刷新的工作
???tb.startRawItemsFlusher()??
???//?更新表相關(guān)的指標(biāo)數(shù)據(jù)
???var?m?TableMetrics??
???tb.UpdateMetrics(&m)??
???logger.Infof("table?%q?has?been?opened?in?%.3f?seconds;?partsCount:?%d;?blocksCount:?%d,?itemsCount:?%d;?sizeBytes:?%d",??
??????path,?time.Since(startTime).Seconds(),?m.PartsCount,?m.BlocksCount,?m.ItemsCount,?m.SizeBytes)??
??
???tb.convertersWG.Add(1)??
???go?func()?{??
??????tb.convertToV1280()??
??????tb.convertersWG.Done()??
???}()??
???//?如果有刷新回調(diào)則執(zhí)行回調(diào)
???if?flushCallback?!=?nil?{??
??????tb.flushCallbackWorkerWG.Add(1)??
??????go?func()?{??
?????????//?每10秒調(diào)用一次?flushCallback,以提高緩存的效率
?????????//?緩存由?flushCallback?重置
?????????tc?:=?time.NewTicker(10?*?time.Second)??
?????????for?{??
????????????select?{??
????????????case?<-tb.stopCh:??//?停止
???????????????tb.flushCallback()??
???????????????tb.flushCallbackWorkerWG.Done()??
???????????????return??
????????????case?<-tc.C:??
???????????????//?如果需要刷新,則調(diào)用刷新回調(diào)
???????????????if?atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall,?1,?0)?{??
??????????????????tb.flushCallback()??
???????????????}??
????????????}??
?????????}??
??????}()??
???}??
??
???return?tb,?nil??
}
openParts 返回的就是一個包裝的 part 列表 partWrapper,里面除了 part 的引用之外,還包括在內(nèi)存中的 inmemoryPart 的引用。
//?lib/mergeset/table.go
type?partWrapper?struct?{??
???p?*part??
??
???mp?*inmemoryPart??
??
???refCount?uint64??
??
???isInMerge?bool??
}
func?openParts(path?string)?([]*partWrapper,?error)?{??
???//?從備份還原后,可能會丟失路徑,所以需要的時候就創(chuàng)建它
???if?err?:=?fs.MkdirAllIfNotExist(path);?err?!=?nil?{??
??????return?nil,?err??
???}??
???d,?err?:=?os.Open(path)??
???if?err?!=?nil?{??
??????return?nil,?fmt.Errorf("cannot?open?difrectory:?%w",?err)??
???}??
???defer?fs.MustClose(d)??
??
???//?執(zhí)行剩余的事務(wù)和清理?/txn 和?/tmp 目錄。??
???//?尚未創(chuàng)建快照,使用?fakeSnapshotLock
???var?fakeSnapshotLock?sync.RWMutex??
???if?err?:=?runTransactions(&fakeSnapshotLock,?path);?err?!=?nil?{??
??????return?nil,?fmt.Errorf("cannot?run?transactions:?%w",?err)??
???}??
???//?清理事務(wù)目錄?txn,然后重新創(chuàng)建
???txnDir?:=?path?+?"/txn"??
???fs.MustRemoveAll(txnDir)??
???if?err?:=?fs.MkdirAllFailIfExist(txnDir);?err?!=?nil?{??
??????return?nil,?fmt.Errorf("cannot?create?%q:?%w",?txnDir,?err)??
???}??
???//?清理臨時數(shù)據(jù)目錄?tmp,然后重新創(chuàng)建
???tmpDir?:=?path?+?"/tmp"??
???fs.MustRemoveAll(tmpDir)??
???if?err?:=?fs.MkdirAllFailIfExist(tmpDir);?err?!=?nil?{??
??????return?nil,?fmt.Errorf("cannot?create?%q:?%w",?tmpDir,?err)??
???}??
??
???fs.MustSyncPath(path)??
??
???//?獲取所有的?parts?
???fis,?err?:=?d.Readdir(-1)??
???if?err?!=?nil?{??
??????return?nil,?fmt.Errorf("cannot?read?directory:?%w",?err)??
???}??
???var?pws?[]*partWrapper??
???for?_,?fi?:=?range?fis?{??
??????if?!fs.IsDirOrSymlink(fi)?{??
?????????//?跳過非目錄的
?????????continue??
??????}??
??????fn?:=?fi.Name()??
??????if?isSpecialDir(fn)?{??
?????????//?跳過一些特殊的目錄??
?????????continue??
??????}??
??????partPath?:=?path?+?"/"?+?fn??
??????if?fs.IsEmptyDir(partPath)?{?//?如果為空目錄
?????????//?刪除空目錄,該目錄可以在NFS上不干凈關(guān)閉后保留下來。?
?????????//?See?https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142?????????
?????????fs.MustRemoveAll(partPath)??
?????????continue??
??????}??
??????//?打開?Part
??????p,?err?:=?openFilePart(partPath)??
??????if?err?!=?nil?{??
?????????mustCloseParts(pws)??
?????????return?nil,?fmt.Errorf("cannot?open?part?%q:?%w",?partPath,?err)??
??????}??
??????//?將?Part?放進(jìn)包裝的?partWrapper?中去
??????pw?:=?&partWrapper{??
?????????p:????????p,??
?????????refCount:?1,??
??????}??
??????pws?=?append(pws,?pw)??
???}??
??
???return?pws,?nil??
}
openParts 的過程其實就是去構(gòu)造表的過程,比如重置事務(wù)目錄 txn、臨時數(shù)據(jù)目錄 tmp,當(dāng)?shù)谝淮螁拥臅r候可以看出 parts 是為空的,索引 openParts 會返回一個空的切片。那么什么時候才會有 part 數(shù)據(jù)產(chǎn)生呢?自然要等到有數(shù)據(jù)寫入的時候,所以接下來我們要去啟動 vminsert 這個組件。
首先同樣需要在 IDE 中來啟動 vminsert,但是在啟動之前需要配置下啟動參數(shù),因為 vminsert 需要將數(shù)據(jù)傳輸?shù)?vmstorage 中去的,在 app/vminsert/main.go 文件上右鍵選擇 Modify Run Configuration...:

在配置對話框中的 Program arguments 行添加需要配置的參數(shù),比如我們這里添加 -storageNode=127.0.0.1:8401,意思就是 vminert 接收到數(shù)據(jù)后會發(fā)送到后面的 storageNode 節(jié)點去:

配置好后和前面一樣再次去啟動 app/vminsert/main.go 即可,如下所示。可以看到 vminsert 成功和 127.0.0.1:8400 建立了連接,也就是上面的 vmstorage 節(jié)點:

同樣當(dāng)連接建立后在 vmstorage 節(jié)點這邊也有相應(yīng)的日志體現(xiàn),如下所示:

vmstorage 在 8400 端口上接收 vminsert 的請求,8401 端口上接收 vmselect 的請求,通過 transport.NewServer 去初始化 Server,然后分別在一個 goroutine 中去啟動監(jiān)聽 vminsert、vmselect 的請求:
//?app/vmstorage/main.go
srv,?err?:=?transport.NewServer(*vminsertAddr,?*vmselectAddr,?strg)??
if?err?!=?nil?{??
???logger.Fatalf("cannot?create?a?server?with?vminsertAddr=%s,?vmselectAddr=%s:?%s",?*vminsertAddr,?*vmselectAddr,?err)??
}??
??
go?srv.RunVMInsert()??
go?srv.RunVMSelect()
我們可以先看看這里的 Server 是如何定義的:
//?app/vmstorage/transport/server.go
//?Server?用于處理來自?vminsert?和?vmselect?的連接
type?Server?struct?{??
???//?將 stopFlag 移動到結(jié)構(gòu)體頂部,以便在32位架構(gòu)上修復(fù)對它的原子訪問(內(nèi)存對齊)。
???//?See?https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212???
???stopFlag?uint64??
???//?存儲引用
???storage?*storage.Storage??
???//?vminsert和vmselect的網(wǎng)絡(luò)監(jiān)聽器
???vminsertLN?net.Listener??
???vmselectLN?net.Listener??
??
???vminsertWG?sync.WaitGroup??
???vmselectWG?sync.WaitGroup??
???
???//?用于跟蹤vminsert與vmselect的活躍連接
???vminsertConnsMap?ingestserver.ConnsMap??
???vmselectConnsMap?ingestserver.ConnsMap??
}??
//?NewServer?實例化?Server.??
func?NewServer(vminsertAddr,?vmselectAddr?string,?storage?*storage.Storage)?(*Server,?error)?{??
???//?初始化網(wǎng)絡(luò)監(jiān)聽器
???vminsertLN,?err?:=?netutil.NewTCPListener("vminsert",?vminsertAddr,?nil)??
???if?err?!=?nil?{??
??????return?nil,?fmt.Errorf("unable?to?listen?vminsertAddr?%s:?%w",?vminsertAddr,?err)??
???}??
???vmselectLN,?err?:=?netutil.NewTCPListener("vmselect",?vmselectAddr,?nil)??
???if?err?!=?nil?{??
??????return?nil,?fmt.Errorf("unable?to?listen?vmselectAddr?%s:?%w",?vmselectAddr,?err)??
???}??
???if?err?:=?encoding.CheckPrecisionBits(uint8(*precisionBits));?err?!=?nil?{??
??????return?nil,?fmt.Errorf("invalid?-precisionBits:?%w",?err)??
???}??
???s?:=?&Server{??
??????storage:?storage,??
??
??????vminsertLN:?vminsertLN,??
??????vmselectLN:?vmselectLN,??
???}??
???//?初始化活躍連接Map
???s.vminsertConnsMap.Init()??
???s.vmselectConnsMap.Init()??
???return?s,?nil??
}
//?lib/ingestserver/conns_map.go
//?ConnsMap?用于跟蹤活躍的連接
type?ConnsMap?struct?{??
???mu???????sync.Mutex??
???m????????map[net.Conn]struct{}??
???isClosed?bool??
}
Server 里面主要了包含 vminsert 和 vmselect 的監(jiān)聽器,還有專門用來跟蹤活躍連接的 ConnsMap,其實就是一個 Map,Server 初始化后會通過一個 goroutine 執(zhí)行 RunVMInsert:
//?app/vmstorage/transport/server.go
//?RunVMInsert?運(yùn)行接受?vminsert?連接的服務(wù)器
func?(s?*Server)?RunVMInsert()?{??
???logger.Infof("accepting?vminsert?conns?at?%s",?s.vminsertLN.Addr())??
???for?{??
??????//?等待并返回到監(jiān)聽器的下一個連接
??????c,?err?:=?s.vminsertLN.Accept()??
??????if?err?!=?nil?{??
?????????if?pe,?ok?:=?err.(net.Error);?ok?&&?pe.Temporary()?{??
????????????continue??
?????????}??
?????????if?s.isStopping()?{??
????????????return??
?????????}??
?????????logger.Panicf("FATAL:?cannot?process?vminsert?conns?at?%s:?%s",?s.vminsertLN.Addr(),?err)??
??????}??
??????logger.Infof("accepted?vminsert?conn?from?%s",?c.RemoteAddr())??
??????//?將該連接c添加到ConnsMap中
??????if?!s.vminsertConnsMap.Add(c)?{??
??????????//?關(guān)閉連接
?????????_?=?c.Close()??
?????????return??
??????}??
??????//?vminsert連接數(shù)+1
??????vminsertConns.Inc()??
??????s.vminsertWG.Add(1)??
??????go?func()?{??
?????????defer?func()?{??
????????????//?處理完過后清理連接
????????????s.vminsertConnsMap.Delete(c)??
????????????vminsertConns.Dec()??
????????????s.vminsertWG.Done()??
?????????}()??
??????????//?不需要響應(yīng)壓縮
?????????//?vmstorage?只會發(fā)送小的?packets?給?vminsert
?????????compressionLevel?:=?0??
?????????//?VMInsertServer?為?vminsert?執(zhí)行服務(wù)器端握手的協(xié)議
?????????//?得到的是一個帶?buffer?的?net.Conn(BufferedConn)
?????????bc,?err?:=?handshake.VMInsertServer(c,?compressionLevel)??
?????????if?err?!=?nil?{??
????????????if?s.isStopping()?{??
???????????????//?c?在服務(wù)器內(nèi)停止,必須關(guān)閉
???????????????return??
????????????}??
????????????logger.Errorf("cannot?perform?vminsert?handshake?with?client?%q:?%s",?c.RemoteAddr(),?err)??
????????????_?=?c.Close()??
????????????return??
?????????}??
?????????defer?func()?{??
????????????if?!s.isStopping()?{??
???????????????logger.Infof("closing?vminsert?conn?from?%s",?c.RemoteAddr())??
????????????}??
????????????_?=?bc.Close()??
?????????}()??
?????????//?真正處理?vminsert?連接的邏輯
?????????logger.Infof("processing?vminsert?conn?from?%s",?c.RemoteAddr())??
?????????if?err?:=?s.processVMInsertConn(bc);?err?!=?nil?{??
????????????if?s.isStopping()?{??
???????????????return??
????????????}??
????????????vminsertConnErrors.Inc()??
????????????logger.Errorf("cannot?process?vminsert?conn?from?%s:?%s",?c.RemoteAddr(),?err)??
?????????}??
??????}()??
???}??
}
RunVMInsert 用來不斷接收監(jiān)聽器的連接,獲取到連接 c 過后記得添加到 ConnsMap 中去,表示當(dāng)前連接是活躍連接,然后要開另外一個 goroutine 去處理連接,在連接處理完成后要在 goroutine 退出之前要記得清理連接,從 ConnsMap 移出掉,真正處理連接的過程是先通過 handshake.VMInsertServer 創(chuàng)建一個帶有 buffer 的 net.Conn 連接,真正處理連接的邏輯是通過 processVMInsertConn 來完成的。
//?app/vmstorage/transport/server.go
func?(s?*Server)?processVMInsertConn(bc?*handshake.BufferedConn)?error?{??
???return?clusternative.ParseStream(bc,?func(rows?[]storage.MetricRow)?error?{??
??????vminsertMetricsRead.Add(len(rows))??
??????return?s.storage.AddRows(rows,?uint8(*precisionBits))??
???},?s.storage.IsReadOnly)??
}
可以看到上面的函數(shù)是通過 clusternative.ParseStream 來進(jìn)行處理的,該函數(shù)解析從 vminsert 發(fā)送到 bc 的數(shù)據(jù),并對解析的行數(shù)據(jù)執(zhí)行回調(diào)。我們可以先來看下這個函數(shù)的具體實現(xiàn):
//?lib/protoparser/clusternative/streamparser.go
// ParseStream 解析從 vminsert 發(fā)送到 bc 的數(shù)據(jù),并對解析的行數(shù)據(jù)執(zhí)行回調(diào)。
//?如果存儲無法接受新數(shù)據(jù),則可選函數(shù) isReadOnly 必須返回 true。在這種情況下,從 bc 讀取的數(shù)據(jù)不被接受,只讀狀態(tài)被發(fā)回 bc。
//??
//?對于來自 req 的流數(shù)據(jù),可以多次并發(fā)調(diào)用回調(diào)。
//??
//?回調(diào)在返回后不應(yīng)阻塞。
func?ParseStream(bc?*handshake.BufferedConn,?callback?func(rows?[]storage.MetricRow)?error,?isReadOnly?func()?bool)?error?{??
???var?wg?sync.WaitGroup??
???var?(??
??????callbackErrLock?sync.Mutex??
??????callbackErr?????error??
???)??
???for?{???????
??????//?不要使用 unmarshalWork pool,因為每個 unmarshalWork 結(jié)構(gòu)通常占用大量內(nèi)存(超過 consts.MaxInsertPacketSize 字節(jié))。該 pool 將導(dǎo)致內(nèi)存使用量增加。??
??????uw?:=?&unmarshalWork{}??
??????//?設(shè)置回調(diào)?callback
??????uw.callback?=?func(rows?[]storage.MetricRow)?{??
?????????//?執(zhí)行回調(diào)
?????????if?err?:=?callback(rows);?err?!=?nil?{??
????????????processErrors.Inc()??
????????????callbackErrLock.Lock()??
????????????if?callbackErr?==?nil?{??
???????????????callbackErr?=?fmt.Errorf("error?when?processing?native?block:?%w",?err)??
????????????}??
????????????callbackErrLock.Unlock()??
?????????}??
??????}??
??????uw.wg?=?&wg??
??????var?err?error??
??????// readBlock 從 vminsert 的 bc 連接中讀取下一個數(shù)據(jù)塊。
??????uw.reqBuf,?err?=?readBlock(uw.reqBuf[:0],?bc,?isReadOnly)??
??????if?err?!=?nil?{??
?????????wg.Wait()??
?????????if?err?==?io.EOF?{??
????????????//?Remote?end?gracefully?closed?the?connection.??
????????????return?nil??
?????????}??
?????????return?err??
??????}??
??????blocksRead.Inc()??
??????wg.Add(1)??
??????//?獲取數(shù)據(jù)后將數(shù)據(jù)傳遞到?unmarshalWorkCh?通道中,unmarshal?workers?會在其他?goroutine?中進(jìn)行處理
??????common.ScheduleUnmarshalWork(uw)??
???}??
}
在上面的 ParseStream 函數(shù)中會通過 readBlock 函數(shù)不斷從 bc 連接中讀取數(shù)據(jù)塊,readBlock 中獲取到數(shù)據(jù)后會發(fā)送 ack 給到客戶端的 vminsert,表示傳遞的網(wǎng)絡(luò)數(shù)據(jù)已經(jīng)正確獲取到。當(dāng)獲取到數(shù)據(jù)后會傳遞到 unmarshalWorkCh 通道中,unmarshal workers 會在其他 goroutine 中去進(jìn)行處理。
//?lib/protoparser/common/unmarshal_work.go
//?StartUnmarshalWorkers?啟動?unmarshal?workers.??
func?StartUnmarshalWorkers()?{??
???if?unmarshalWorkCh?!=?nil?{??
??????logger.Panicf("BUG:?it?looks?like?startUnmarshalWorkers()?has?been?alread?called?without?stopUnmarshalWorkers()")??
???}??
???gomaxprocs?:=?cgroup.AvailableCPUs()???????????????????//獲取?CUP?核數(shù)??
???unmarshalWorkCh?=?make(chan?UnmarshalWork,?gomaxprocs)?//?初始化channel通道,長度與核數(shù)相等??
???unmarshalWorkersWG.Add(gomaxprocs)??
???for?i?:=?0;?i???????go?func()?{?//?啟動N個?goroutine,數(shù)量與?CPU?核數(shù)一樣??
?????????defer?unmarshalWorkersWG.Done()?//?waitgroup?完成??
?????????for?uw?:=?range?unmarshalWorkCh?{??
????????????uw.Unmarshal()?//?執(zhí)行具體的業(yè)務(wù)邏輯??
?????????}??
??????}()??
???}??
}
而上面的 StartUnmarshalWorkers() 函數(shù)在 vmstorage 的 main 函數(shù)中就調(diào)用了,所以我們只需要做的就是往 unmarshalWorkCh 通道傳數(shù)據(jù)過去即可。
//?app/vmstorage/main.go
func?main()?{
??......
??common.StartUnmarshalWorkers()??
??srv,?err?:=?transport.NewServer(*vminsertAddr,?*vmselectAddr,?strg)
??......
}
真正執(zhí)行具體的業(yè)務(wù)邏輯是 Unmarshal() 函數(shù):
//?lib/protoparser/clusternative/streamparser.go
//?真正處理?vminsert?傳過來的數(shù)據(jù)的業(yè)務(wù)邏輯
func?(uw?*unmarshalWork)?Unmarshal()?{??
???reqBuf?:=?uw.reqBuf??//?vminsert?傳過來的數(shù)據(jù)
???for?len(reqBuf)?>?0?{????
??????//?限制傳遞給回調(diào)的行數(shù),以減少處理大行數(shù)據(jù)包時的內(nèi)存使用。????
??????//?將?reqBuf?轉(zhuǎn)換成插入存儲中的指標(biāo)數(shù)據(jù)列表?[]MetricRow
??????mrs,?tail,?err?:=?storage.UnmarshalMetricRows(uw.mrs[:0],?reqBuf,?maxRowsPerCallback)??
??????uw.mrs?=?mrs??
??????if?err?!=?nil?{??
?????????parseErrors.Inc()??
?????????logger.Errorf("cannot?unmarshal?MetricRow?from?clusternative?block?with?size?%d?(remaining?%d?bytes):?%s",?len(reqBuf),?len(tail),?err)??
?????????break??
??????}??
??????rowsRead.Add(len(mrs))??
??????//?調(diào)用回調(diào)
??????uw.callback(mrs)??
??????reqBuf?=?tail??
???}??
???wg?:=?uw.wg??
???wg.Done()??
}??
??
const?maxRowsPerCallback?=?10000
上面的函數(shù)中先將從 vminsert 傳過來的數(shù)據(jù)通過 storage.UnmarshalMetricRows 函數(shù)轉(zhuǎn)換成可以直接存入到 vmstorage 存儲中的 MetricRow 列表,轉(zhuǎn)換完成后調(diào)用 callback 去進(jìn)行處理,這樣就可以回到前面的 processVMInsertConn 函數(shù)中了,clusternative.ParseStream 的第二個參數(shù)就是回調(diào)函數(shù)。
//?app/vmstorage/transport/server.go
func?(s?*Server)?processVMInsertConn(bc?*handshake.BufferedConn)?error?{??
???return?clusternative.ParseStream(bc,?func(rows?[]storage.MetricRow)?error?{??
??????vminsertMetricsRead.Add(len(rows))??
??????return?s.storage.AddRows(rows,?uint8(*precisionBits))??
???},?s.storage.IsReadOnly)??
}
最后就是通過 s.storage.AddRows 函數(shù)去處理添加轉(zhuǎn)換過后的 MetricRow 列表,這也是真正的將數(shù)據(jù)存入到本地存儲的入口函數(shù)了。
現(xiàn)在我們知道了服務(wù)的 vmstorage 如何去接收客戶端 vminsert 傳過來的數(shù)據(jù)了,那么 vminsert 中是如何來發(fā)送網(wǎng)絡(luò)請求的呢?未完待續(xù).....
