<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          從零實現(xiàn)一個時序數(shù)據(jù)庫

          共 42479字,需瀏覽 85分鐘

           ·

          2021-08-06 07:10

          時序數(shù)據(jù)庫(TSDB: Time Series Database)大多數(shù)時候都是為了滿足監(jiān)控場景的需求,這里先介紹兩個概念:

          • 數(shù)據(jù)點(Point): 時序數(shù)據(jù)的數(shù)據(jù)點是一個包含 (Timestamp:int64, Value:float64) 的二元組。
          • 時間線(Series): 不同標簽(Label)的組合稱為不同的時間線,如
          series1: {"__name__": "netspeed", "host": "localhost", "iface": "eth0"}
          series2: {"__name__": "netspeed", "host": "localhost", "iface": "eth1"}

          Prometheus, InfluxDB, M3, TimescaleDB 都是時下流行的 TSDB。時序數(shù)據(jù)的壓縮算法很大程度上決定了 TSDB 的性能,以上幾個項目的實現(xiàn)都參考了 Fackbook 2015 年發(fā)表的論文《Gorilla: A fast, scalable, in-memory time series database》(http://www.vldb.org/pvldb/vol8/p1816-teller.pdf) 中提到的差值算法,該算法平均可以將 16 字節(jié)的數(shù)據(jù)點壓縮成 1.37 字節(jié)。

          Who's mando?

          Din Djarin, also known as "the Mandalorian" or simply "Mando," was a human male Mandalorian who worked as a famous bounty hunter during the New Republic Era.

          What's mandodb?

          mandodb(https://github.com/chenjiandongx/mandodb) 是我在學(xué)習(xí)過程中實現(xiàn)的一個最小化的 TSDB,從概念上來講它還算不上是一個完整的 TSDB,因為它:

          • 沒有實現(xiàn)自己的查詢引擎(實現(xiàn)難度大)
          • 缺少磁盤歸檔文件 Compact 操作(有空的話會實現(xiàn))
          • 沒有 WAL 作為災(zāi)備保證高可用(心情好的話會實現(xiàn))

          mandodb 主要受到了兩個項目的啟發(fā)。本項目僅限于學(xué)習(xí)用途,未經(jīng)生產(chǎn)環(huán)境測試驗證!

          • nakabonne/tstorage
          • prometheus/prometheus

          prometheus 的核心開發(fā)者 Fabian Reinartz 寫了一篇文章 《Writing a Time Series Database from Scratch》(https://fabxc.org/tsdb/) 來介紹 prometheus TSDB 的演變過程,非常值得一讀,強烈推薦。

          ?? 數(shù)據(jù)模型 & API 文檔

          數(shù)據(jù)模型定義

          // Point 表示一個數(shù)據(jù)點 (ts, value) 二元組
          type Point struct {
           Ts    int64 // in seconds
           Value float64
          }

          // Label 代表一個標簽組合
          type Label struct {
           Name  string
           Value string
          }

          // Row 一行時序數(shù)據(jù) 包括數(shù)據(jù)點和標簽組合
          type Row struct {
           Metric string
           Labels LabelSet
           Point  Point
          }

          // LabelSet 表示 Label 組合
          type LabelSet []Label

          // LabelMatcher Label 匹配器 支持正則
          type LabelMatcher struct {
           Name   string
           Value  string
           IsRegx bool
          }

          // LabelMatcherSet 表示 LabelMatcher 組合
          type LabelMatcherSet []LabelMatcher

          API

          // InsertRows 寫數(shù)據(jù)
          InsertRows(rows []*Row) error 

          // QueryRange 查詢時序數(shù)據(jù)點
          QueryRange(metric string, lms LabelMatcherSet, start, end int64) ([]MetricRet, error)

          // QuerySeries 查詢時序序列組合
          QuerySeries(lms LabelMatcherSet, start, end int64) ([]map[string]string, error)

          // QueryLabelValues 查詢標簽值
          QueryLabelValues(label string, start, end int64) []string

          ?? 配置選項

          配置項在初始化 TSDB 的時候設(shè)置。

          // WithMetaSerializerType 設(shè)置 Metadata 數(shù)據(jù)的序列化類型
          // 目前只提供了 BinaryMetaSerializer
          WithMetaSerializerType(t MetaSerializerType) Option 

          // WithMetaBytesCompressorType 設(shè)置字節(jié)數(shù)據(jù)的壓縮算法
          // 目前提供了
          // * 不壓縮: NoopBytesCompressor(默認)
          // * ZSTD: ZstdBytesCompressor
          // * Snappy: SnappyBytesCompressor
          WithMetaBytesCompressorType(t BytesCompressorType) Option

          // WithOnlyMemoryMode 設(shè)置是否默認只存儲在內(nèi)存中
          // 默認為 false
          WithOnlyMemoryMode(memoryMode bool) Option

          // WithEnabledOutdated 設(shè)置是否支持亂序?qū)懭?nbsp;此特性會增加資源開銷 但會提升數(shù)據(jù)完整性
          // 默認為 true
          WithEnabledOutdated(outdated bool) Option

          // WithMaxRowsPerSegment 設(shè)置單 Segment 最大允許存儲的點數(shù)
          // 默認為 19960412(夾雜私貨 ??)
          WithMaxRowsPerSegment(n int64) Option

          // WithDataPath 設(shè)置 Segment 持久化存儲文件夾
          // 默認為 "."
          WithDataPath(d string) Option

          // WithRetention 設(shè)置 Segment 持久化數(shù)據(jù)保存時長
          // 默認為 7d
          WithRetention(t time.Duration) Option

          // WithWriteTimeout 設(shè)置寫入超時閾值
          // 默認為 30s
          WithWriteTimeout(t time.Duration) Option

          // WithLoggerConfig 設(shè)置日志配置項
          // logger: github.com/chenjiandongx/logger
          WithLoggerConfig(opt *logger.Options) Option

          ?? 用法示例

          package main

          import (
           "fmt"
           "time"

           "github.com/chenjiandongx/mandodb"
          )

          func main() {
           store := mandodb.OpenTSDB(
            mandodb.WithOnlyMemoryMode(true),
            mandodb.WithWriteTimeout(10*time.Second),
           )
           defer store.Close()

           // 插入數(shù)據(jù)
           _ = store.InsertRows([]*mandodb.Row{
            {
             Metric: "cpu.busy",
             Labels: []mandodb.Label{
              {Name: "node", Value: "vm1"},
              {Name: "dc", Value: "gz-idc"},
             },
             Point: mandodb.Point{Ts: 1600000001, Value: 0.1},
            },
            {
             Metric: "cpu.busy",
             Labels: []mandodb.Label{
              {Name: "node", Value: "vm2"},
              {Name: "dc", Value: "sz-idc"},
             },
             Point: mandodb.Point{Ts: 1600000001, Value: 0.1},
            },
           })

           time.Sleep(time.Millisecond)

           // 時序數(shù)據(jù)查詢
           data, _ := store.QueryRange("cpu.busy"nil16000000001600000002)
           fmt.Printf("data: %+v\n", data)
           // output:
           // data: [{Labels:{__name__="cpu.busy", dc="gz-idc", node="vm1"} Points:[{Ts:1600000001 Value:0.1}]}]

           // 查詢 Series
           // __name__ 是 metric 名稱在 TSDB 中的 Label Key
           ser, _ := store.QuerySeries(
                  mandodb.LabelMatcherSet{{Name: "__name__", Value: "cpu.busy"}}, 16000000001600000002)
           for _, d := range ser {
            fmt.Printf("data: %+v\n", d)
           }
           // output:
           // data: map[__name__:cpu.busy dc:gz-idc node:vm1]
           // data: map[__name__:cpu.busy dc:sz-idc node:vm2]

           // 查詢標簽值
           lvs := store.QueryLabelValues("node"16000000001600000002)
           fmt.Printf("data: %+v\n", lvs)
           // output:
           // data: [vm1 vm2]
          }

          下面是我對這段時間學(xué)習(xí)內(nèi)容的整理,嘗試完整介紹如何從零開始實現(xiàn)一個小型的 TSDB。

          我本身并沒有數(shù)據(jù)庫開發(fā)的背景,某些描述可能并不那么準確,所以歡迎 實名 diss 指正。

          ?? Gorilla 差值算法

          Gorilla 論文 4.1 小節(jié)介紹了壓縮算法,先整體看一下壓縮方案,T/V 是緊挨存儲的,'0'/'10'/'11' 表示控制位。

          Figure: Gorilla 壓縮算法

          Timestamp DOD 壓縮:

          在時序的場景中,每個時序點都有一個對應(yīng)的 Timestamp,一條時序序列中相鄰數(shù)據(jù)點的間隔是有規(guī)律可循的。一般來講,監(jiān)控數(shù)據(jù)的采集都是會以固定的時間間隔進行的,所以就可以用差值來記錄時間間隔,更進一步,我們可以用差值的差值來記錄以此來減少存儲空間。

          t1: 1627401800; t2: 1627401810; t3: 1627401820; t4: 1627401830
          --------------------------------------------------------------
          // 差值:delta
          t1: 1627401800; (t2-t1)d1: 10; (t3-t2)d2: 10; (t4-t3)d3: 10
          --------------------------------------------------------------
          // 差值的差值:delta of delta
          t1: 1627401800; dod1: 0; dod2: 0; dod3: 0

          實際環(huán)境中當然不可能每個間隔都這么均勻,由于網(wǎng)絡(luò)延遲等其他原因,差值會有波動。

          Value XOR 壓縮:

          Figure: IEEE 浮點數(shù)以及 XOR 計算結(jié)果

          當兩個數(shù)據(jù)點數(shù)值值比較接近的話,通過異或操作計算出來的結(jié)果是比較相似的,利用這點就可以通過記錄前置零和后置零個數(shù)以及數(shù)值部分來達到壓縮空間的目的。

          下面通過算法實現(xiàn)來介紹,代碼來自項目 dgryski/go-tsz。代碼完全按照論文中給出的步驟來實現(xiàn)。

          // New 初始化 block 這里會將第一個原始時間戳寫入到 block 中
          func New(t0 uint32) *Series {
           s := Series{
            T0:      t0,
            leading: ^uint8(0),
           }

           s.bw.writeBits(uint64(t0), 32)
           return &s
          }

          // Push 負責(zé)寫入時序數(shù)據(jù)
          func (s *Series) Push(t uint32, v float64) {
           // ....
           // 如果是第一個數(shù)據(jù)點的話寫入原始數(shù)據(jù)后直接返回
           if s.t == 0 {
            s.t = t
            s.val = v
            s.tDelta = t - s.T0 // 實際上這里為 0

            // The block header stores the starting time stamp, t-1(前一個時間戳),
            // which is aligned to a two hour window; the first time
            // stamp, t0, in the block is stored as a delta from t?1 in 14 bits.
                  
            // 用 14 個 bit 寫入時間戳差值
            s.bw.writeBits(uint64(s.tDelta), 14)
            // 原始數(shù)據(jù)點完整寫入
            s.bw.writeBits(math.Float64bits(v), 64)
            return
           }

           tDelta := t - s.t
           dod := int32(tDelta - s.tDelta) // 計算差值的差值 Detla of Delta

           // 下面開始就處理非第一個數(shù)據(jù)點的情況了
           switch {
            // If D is zero, then store a single ‘0’ bit
            // 如果是零的話 那直接用 '0' 一個字節(jié)就可以直接表示
           case dod == 0:
            s.bw.writeBit(zero)

            //  If D is between [-63, 64], store ‘10’ followed by the value (7 bits)
           case -63 <= dod && dod <= 64:
            s.bw.writeBits(0x022// 控制位 '10'
            s.bw.writeBits(uint64(dod), 7// 7bits 可以表示 [-63, 64] 的范圍

            // If D is between [-255, 256], store ‘110’ followed by the value (9 bits)
           case -255 <= dod && dod <= 256:
            s.bw.writeBits(0x063// 控制位 '110'
            s.bw.writeBits(uint64(dod), 9)

            // if D is between [-2047, 2048], store ‘1110’ followed by the value (12 bits)
           case -2047 <= dod && dod <= 2048:
            s.bw.writeBits(0x0e4// 控制位 '1110'
            s.bw.writeBits(uint64(dod), 12)

            // Otherwise store ‘1111’ followed by D using 32 bits
           default:
            s.bw.writeBits(0x0f4// 其余情況控制位均用 '1111'
            s.bw.writeBits(uint64(dod), 32)
           }

           // 到這里 (T, V) 中的時間戳已經(jīng)寫入完畢了 接下來是寫 V 部分

           // 先計算兩個值的異或結(jié)果
           vDelta := math.Float64bits(v) ^ math.Float64bits(s.val)

           // If XOR with the previous is zero (same value), store single ‘0’ bit
           // 如果前后兩個值相等的話 直接用 '0' 1 個 bit 就可以表示
           // 所以如果上報的時序數(shù)據(jù)是 1 或者 0 這種的話 占用的內(nèi)存會非常少

           // zero = '0'; one = '1'
           if vDelta == 0 {
            s.bw.writeBit(zero)
           } else {    // 非 0 情況那就要把控制位置為 1
            s.bw.writeBit(one)

            // 計算前置 0 和后置 0
            leading := uint8(bits.LeadingZeros64(vDelta))
            trailing := uint8(bits.TrailingZeros64(vDelta))

            // clamp number of leading zeros to avoid overflow when encoding
            if leading >= 32 {
             leading = 31
            }

            // (Control bit ‘0’) If the block of meaningful bits
            // falls within the block of previous meaningful bits,
            // i.e., there are at least as many leading zeros and
            // as many trailing zeros as with the previous value,
            // use that information for the block position and
            // just store the meaningful XORed value.

            // 如果前置 0 不小于上一個值計算的異或結(jié)果的前置 0 且后置 0 也不小于上一個值計算的異或結(jié)果的后置 0
            if s.leading != ^uint8(0) && leading >= s.leading && trailing >= s.trailing { // => 控制位 '10'
             s.bw.writeBit(zero)
             // 記錄異或值非零部分
             s.bw.writeBits(vDelta>>s.trailing, 64-int(s.leading)-int(s.trailing))
            } else { // => 控制位 '11'

             // (Control bit ‘1’) Store the length of the number
             // of leading zeros in the next 5 bits, then store the
             // length of the meaningful XORed value in the next
             // 6 bits. Finally store the meaningful bits of the XORed value.
             s.leading, s.trailing = leading, trailing

             // 其他情況控制位置為 1 并用接下來的 5bits 記錄前置 0 個數(shù)
             s.bw.writeBit(one)
             s.bw.writeBits(uint64(leading), 5)

             // 然后用接下來的 6bits 記錄異或差值中的非零部分
             sigbits := 64 - leading - trailing
             s.bw.writeBits(uint64(sigbits), 6)
             s.bw.writeBits(vDelta>>trailing, int(sigbits))
            }
           }

           // 狀態(tài)更新 至此(T, V)均已被壓縮寫入到內(nèi)存中
           s.tDelta = tDelta
           s.t = t
           s.val = v
          }

          // 每個 block 的結(jié)尾會使用特殊標記用于標識
          func finish(w *bstream) {
           // write an end-of-stream record
           w.writeBits(0x0f4)
           w.writeBits(0xffffffff32)
           w.writeBit(zero)
          }

          論文給出了不同 case 的 buckets 占比分布。

          Figure: Timestamp buckets distribution

          Figure: Value buckets distribution

          Timestamp buckets 中,前后兩個時間戳差值相同的比例高達 96.39%,而在 Value buckets 中只用一個控制位的占比也達到了 59.06%,可見其壓縮比之高。

          論文還給出了一個重要結(jié)論,數(shù)據(jù)壓縮比隨著時間的增長而增長,并在 120 個點的時候開始收斂到一個最佳值。

          Figure: 壓縮率曲線

          Gorilla 差值算法也應(yīng)用于我的另外一個項目 chenjiandongx/tszlist,一種時序數(shù)據(jù)線程安全鏈表。

          ?? 數(shù)據(jù)寫入

          時序數(shù)據(jù)具有「垂直寫,水平查」的特性,即同一時刻有多條時間線的數(shù)據(jù)不斷被追加。但查詢的時候往往是查某條時間線持續(xù)一段時間內(nèi)的數(shù)據(jù)點。

          series
            ^   
            │   . . . . . . . . . . . . . . . . .   . . . . .   {__name__="request_total", method="GET"}
            │     . . . . . . . . . . . . . . . . . . . . . .   {__name__="request_total", method="POST"}
            │         . . . . . . .
            │       . . .     . . . . . . . . . . . . . . . .                  ... 
            │     . . . . . . . . . . . . . . . . .   . . . .   
            │     . . . . . . . . . .   . . . . . . . . . . .   {__name__="errors_total", method="POST"}
            │           . . .   . . . . . . . . .   . . . . .   {__name__="errors_total", method="GET"}
            │         . . . . . . . . .       . . . . .
            │       . . .     . . . . . . . . . . . . . . . .                  ... 
            │     . . . . . . . . . . . . . . . .   . . . . 
            v
              <-------------------- time --------------------->

          時序數(shù)據(jù)跟時間是強相關(guān)的(不然還叫時序數(shù)據(jù)???),即大多數(shù)查詢其實只會查詢最近時刻的數(shù)據(jù),這里的「最近」是個相對概念。所以沒必要維護一條時間線的完整生命周期,特別是在 Kubernetes 這種云原生場景,Pod 隨時有可能會被擴縮容,也就意味著一條時間線的生命周期可能會很短。如果我們一直記錄著所有的時間線的索引信息,那么隨著時間的推移,數(shù)據(jù)庫里的時間線的數(shù)量會呈現(xiàn)一個線性增長的趨勢 ??,會極大地影響查詢效率。

          這里引入一個概念「序列分流」,這個概念描述的是一組時間序列變得不活躍,即不再接收數(shù)據(jù)點,取而代之的是有一組新的活躍的序列出現(xiàn)的場景。

          series
            ^
            │   . . . . . .
            │   . . . . . .
            │   . . . . . .
            │               . . . . . . .
            │               . . . . . . .
            │               . . . . . . .
            │                             . . . . . .
            │                             . . . . . .
            │                                         . . . . .
            │                                         . . . . .
            │                                         . . . . .
            v
              <-------------------- time --------------------->

          我們將多條時間線的數(shù)據(jù)按一定的時間跨度切割成多個小塊,每個小塊本質(zhì)就是一個獨立小型的數(shù)據(jù)庫,這種做法另外一個優(yōu)勢是清除過期操作的時候非常方便,只要將整個塊給刪了就行 ??(梭哈是一種智慧)。內(nèi)存中保留最近兩個小時的熱數(shù)據(jù)(Memory Segment),其余數(shù)據(jù)持久化到磁盤(Disk Segment)。

          Figure: 序列分塊

          DiskSegment 使用的是 AVL Tree 實現(xiàn)的列表,可在插入時排序。為什么不用更加高大上的紅黑樹?因為不好實現(xiàn)...

          當 Memory Segment 達到歸檔條件的時候,會創(chuàng)建一個新的內(nèi)存塊并異步將剛歸檔的塊寫入到磁盤,同時會使用 mmap 將磁盤文件句柄映射到內(nèi)存中。代碼實現(xiàn)如下。

          func (tsdb *TSDB) getHeadPartition() (Segment, error) {
           tsdb.mut.Lock()
           defer tsdb.mut.Unlock()

           if tsdb.segs.head.Frozen() {
            head := tsdb.segs.head

            go func() {
             tsdb.wg.Add(1)
             defer tsdb.wg.Done()

             tsdb.segs.Add(head)

             t0 := time.Now()
             dn := dirname(head.MinTs(), head.MaxTs())

             if err := writeToDisk(head.(*memorySegment)); err != nil {
              logger.Errorf("failed to flush data to disk, %v", err)
              return
             }

             fname := path.Join(dn, "data")
             mf, err := mmap.OpenMmapFile(fname)
             if err != nil {
              logger.Errorf("failed to make a mmap file %s, %v", fname, err)
              return
             }

             tsdb.segs.Remove(head)
             tsdb.segs.Add(newDiskSegment(mf, dn, head.MinTs(), head.MaxTs()))
             logger.Infof("write file %s take: %v", fname, time.Since(t0))
            }()

            tsdb.segs.head = newMemorySegment()
           }

           return tsdb.segs.head, nil
          }

          Figure: Memory Segment 兩部分數(shù)據(jù)

          寫入的時候支持數(shù)據(jù)時間回撥,也就是支持有限的亂序數(shù)據(jù)寫入,實現(xiàn)方案是在內(nèi)存中對還沒歸檔的每條時間線維護一個鏈表(同樣使用 AVL Tree 實現(xiàn)),當數(shù)據(jù)點的時間戳不是遞增的時候存儲到鏈表中,查詢的時候會將兩部分數(shù)據(jù)合并查詢,持久化的時候也會將兩者合并寫入。

          ?? Mmap 內(nèi)存映射

          mmap 是一種將磁盤文件映射到進程的虛擬地址空間來實現(xiàn)對文件讀取和修改操作的技術(shù)。

          從 Linux 角度來看,操作系統(tǒng)的內(nèi)存空間被分為「內(nèi)核空間」和「用戶空間」兩大部分,其中內(nèi)核空間和用戶空間的空間大小、操作權(quán)限以及核心功能都不相同。這里的內(nèi)核空間是指操作系統(tǒng)本身使用的內(nèi)存空間,而用戶空間則是提供給各個進程使用的內(nèi)存空間。由于用戶進程不具有訪問內(nèi)核資源的權(quán)限,例如訪問硬件資源,因此當一個用戶進程需要使用內(nèi)核資源的時候,就需要通過 系統(tǒng)調(diào)用 來完成。

          虛擬內(nèi)存細節(jié)可以閱讀 《虛擬內(nèi)存精粹》 這篇文章。

          Figure: 常規(guī)文件操作和 mmap 操作的區(qū)別

          image

          常規(guī)文件操作

          讀文件: 用戶進程首先執(zhí)行 read(2) 系統(tǒng)調(diào)用,會進行系統(tǒng)上下文環(huán)境切換,從用戶態(tài)切換到內(nèi)核態(tài),之后由 DMA 將文件數(shù)據(jù)從磁盤讀取到內(nèi)核緩沖區(qū),再將內(nèi)核空間緩沖區(qū)的數(shù)據(jù)復(fù)制到用戶空間的緩沖區(qū)中,最后 read(2) 系統(tǒng)調(diào)用返回,進程從內(nèi)核態(tài)切換到用戶態(tài),整個過程結(jié)束。

          寫文件: 用戶進程發(fā)起 write(2) 系統(tǒng)調(diào)用,從用戶態(tài)切換到內(nèi)核態(tài),將數(shù)據(jù)從用戶空間緩沖區(qū)復(fù)制到內(nèi)核空間緩沖區(qū),接著 write(2) 系統(tǒng)調(diào)用返回,同時進程從內(nèi)核態(tài)切換到用戶態(tài),數(shù)據(jù)從內(nèi)核緩沖區(qū)寫入到磁盤,整個過程結(jié)束。

          mmap 操作

          mmap 內(nèi)存映射的實現(xiàn)過程,總的來說可以分為三個階段:

          1. 進程啟動映射過程,并在虛擬地址空間中為映射創(chuàng)建虛擬映射區(qū)域。
          2. 執(zhí)行內(nèi)核空間的系統(tǒng)調(diào)用函數(shù) mmap,建立文件物理地址和進程虛擬地址的一一映射關(guān)系。
          3. 進程發(fā)起對這片映射空間的訪問,引發(fā)缺頁異常,實現(xiàn)文件內(nèi)容到物理內(nèi)存的拷貝。

          ?? 小結(jié)

          常規(guī)文件操作為了提高讀寫效率和保護磁盤,使用了頁緩存機制。這樣造成讀文件時需要先將文件頁從磁盤拷貝到頁緩存中,由于頁緩存處在內(nèi)核空間,不能被用戶進程直接尋址,所以還需要將頁緩存中數(shù)據(jù)頁再次拷貝到內(nèi)存對應(yīng)的用戶空間中。這樣,通過了兩次數(shù)據(jù)拷貝過程,才能完成進程對文件內(nèi)容的獲取任務(wù)。寫操作也是一樣,待寫入的 buffer 在內(nèi)核空間不能直接訪問,必須要先拷貝至內(nèi)核空間對應(yīng)的主存,再寫回磁盤中(延遲寫回),也是需要兩次數(shù)據(jù)拷貝。

          而使用 mmap 操作文件,創(chuàng)建新的虛擬內(nèi)存區(qū)域和建立文件磁盤地址和虛擬內(nèi)存區(qū)域映射這兩步,沒有任何文件拷貝操作。而之后訪問數(shù)據(jù)時發(fā)現(xiàn)內(nèi)存中并無數(shù)據(jù)而發(fā)起的缺頁異常過程,可以通過已經(jīng)建立好的映射關(guān)系,只使用一次數(shù)據(jù)拷貝,就從磁盤中將數(shù)據(jù)傳入內(nèi)存的用戶空間中,供進程使用。

          ?? 總而言之,常規(guī)文件操作需要從磁盤到頁緩存再到用戶主存的兩次數(shù)據(jù)拷貝。而 mmap 操控文件只需要從磁盤到用戶主存的一次數(shù)據(jù)拷貝過程。mmap 的關(guān)鍵點是實現(xiàn)了「用戶空間」和「內(nèi)核空間」的數(shù)據(jù)直接交互而省去了不同空間數(shù)據(jù)復(fù)制的開銷

          ?? 索引設(shè)計

          TSDB 的查詢,是通過 Label 組合來鎖定到具體的時間線進而確定分塊偏移檢索出數(shù)據(jù)。

          • Sid(MetricHash/-/LabelHash) 是一個 Series 的唯一標識。
          • Label(Name/-/Value) => vm="node1"; vm="node2"; iface="eth0"。

          在傳統(tǒng)的關(guān)系型數(shù)據(jù)庫,索引設(shè)計可能是這樣的。

          Sid(主鍵)Label1Label2Label3Label4...LabelN
          sid1×××
          ...×
          sid2
          ×××...×
          sid3××
          ×...×
          sid4×
          ××...×

          時序數(shù)據(jù)是 NoSchema 的,沒辦法提前建表和定義數(shù)據(jù)模型 ??,因為我們要支持用戶上報任意 Label 組合的數(shù)據(jù),這樣的話就沒辦法進行動態(tài)的擴展了。或許你會靈光一現(xiàn) ?,既然這樣,那把 Labels 放一個字段拼接起來不就可以無限擴展啦,比如下面這個樣子。

          Sid(主鍵)Labels
          sid1label1, label2, label3, ...
          sid2label2, label3, label5, ...
          sid3label4, label6, label9, ...
          sid4label2, label3, label7, ...

          喲嚯,乍一看沒毛病,靚仔竊喜。

          不對,有問題 ??,要定位到其中的某條時間線,那我是不是得全表掃描一趟。而且這種設(shè)計還有另外一個弊病,就是會導(dǎo)致內(nèi)存激增,Label 的 Name 和 Value 都可能是特別長的字符串。

          那怎么辦呢(?? 靚仔沉默...),剎那間我的腦中閃過一個帥氣的身影,沒錯,就是你,花澤類「只要倒立眼淚就不會流出來」。

          我悟了!要學(xué)會逆向思維 ??,把 Label 當做主鍵,Sid 當做其字段不就好了。這其實有點類似于 ElasticSearch 中的倒排索引,主鍵為 Keyword,字段為 DocumentID。索引設(shè)計如下。

          Label(主鍵)Sids
          label1: {vm="node1"}sid1, sid2, sid3, ...
          label2: {vm="node2"}sid2, sid3, sid5, ...
          label3: {iface="eth0"}sid3, sid5, sid9, ...
          label4: {iface="eth1"}sid2, sid3, sid7, ...

          Label 作為主鍵時會建立索引(Hashkey),查找的效率可視為 O(1),再根據(jù)鎖定的 Label 來最終確定想要的 Sid。舉個例子,我們想要查找 {vm="node1", iface="eth0"} 的時間線的話就可以快速定位到 Sids(忽略其他 ... sid)。

          sid1; sid2; sid3
          sid2; sid3; sid5

          兩者求一個交集,就可以得到最終要查詢的 Sid 為 sid2sid3。?? Nice!

          假設(shè)我們的查詢只支持相等匹配的話,格局明顯就小了 ??。查詢條件是 {vm=~"node*", iface="eth0"} 腫么辦?對 label1、label2、label3 和 label4 一起求一個并集嗎?顯然不是,因為這樣算的話那結(jié)果就是 sid3

          厘清關(guān)系就不難看出,只要對相同的 Label Name 做并集然后再對不同的 Label Name 求交集就可以了。這樣算的正確結(jié)果就是 sid3sid5。實現(xiàn)的時候用到了 Roaring Bitmap,一種優(yōu)化的位圖算法。

          Memory Segment 索引匹配

          func (mim *memoryIndexMap) MatchSids(lvs *labelValueSet, lms LabelMatcherSet) []string {
           // ...
           sids := newMemorySidSet()
           var got bool
           for i := len(lms) - 1; i >= 0; i-- {
            tmp := newMemorySidSet()
            vs := lvs.Match(lms[i])
            // 對相同的 Label Name 求并集
            for _, v := range vs {
             midx := mim.idx[joinSeparator(lms[i].Name, v)]
             if midx == nil || midx.Size() <= 0 {
              continue
             }

             tmp.Union(midx.Copy())
            }

            if tmp == nil || tmp.Size() <= 0 {
             return nil
            }

            if !got {
             sids = tmp
             got = true
             continue
            }

            // 對不同的 Label Name 求交集
            sids.Intersection(tmp.Copy())
           }

           return sids.List()
          }

          Disk Segment 索引匹配

          func (dim *diskIndexMap) MatchSids(lvs *labelValueSet, lms LabelMatcherSet) []uint32 {
           // ...

           lst := make([]*roaring.Bitmap, 0)
           for i := len(lms) - 1; i >= 0; i-- {
            tmp := make([]*roaring.Bitmap, 0)
            vs := lvs.Match(lms[i])

            // 對相同的 Label Name 求并集
            for _, v := range vs {
             didx := dim.label2sids[joinSeparator(lms[i].Name, v)]
             if didx == nil || didx.set.IsEmpty() {
              continue
             }

             tmp = append(tmp, didx.set)
            }

            union := roaring.ParOr(4, tmp...)
            if union.IsEmpty() {
             return nil
            }

            lst = append(lst, union)
           }

           // 對不同的 Label Name 求交集
           return roaring.ParAnd(4, lst...).ToArray()
          }

          然而,確定相同的 LabelName 也是一個問題,因為 Label 本身就代表著 Name:Value,難不成我還要遍歷所有 label 才能確定嘛,這不就又成了全表掃描???

          沒有什么問題是一個索引解決不了的,如果有,那就再增加一個索引。--- 魯迅。

          只要我們保存 Label 的 Name 對應(yīng)的 Value 列表的映射關(guān)系即可高效解決這個問題。

          LabelNameLabelValue
          vmnode1, node2, ...
          ifaceeth0, eth1, ...

          還是上面的 {vm=~"node1|node2", iface="eth0"} 查詢,第一步通過正則匹配確定匹配到 node1, node2,第二步匹配到 eth0,再將 LabelName 和 LabelValue 一拼裝,Label 就出來了,?? 完事!

          橋豆麻袋!還有一個精彩的正則匹配優(yōu)化算法沒介紹。

          fastRegexMatcher 是一種優(yōu)化的正則匹配器,算法來自 Prometheus。

          // 思路就是盡量先執(zhí)行前綴匹配和后綴匹配 能不用正則就不用正則
          // 如 label 表達式為 {vm="node*"}
          // 而我們此時內(nèi)存中有 vm=node1, vm=node2, vm=foo, vm=bar,那這個時候只需要前綴匹配就能直接把 vm=foo,vm=bar 給過濾了
          // 畢竟前綴匹配和后綴匹配的執(zhí)行效率還是比正則高不少的
          type fastRegexMatcher struct {
           re       *regexp.Regexp
           prefix   string
           suffix   string
           contains string
          }

          func newFastRegexMatcher(v string) (*fastRegexMatcher, error) {
           re, err := regexp.Compile("^(?:" + v + ")$")
           if err != nil {
            return nil, err
           }

           parsed, err := syntax.Parse(v, syntax.Perl)
           if err != nil {
            return nil, err
           }

           m := &fastRegexMatcher{
            re: re,
           }

           if parsed.Op == syntax.OpConcat {
            m.prefix, m.suffix, m.contains = optimizeConcatRegex(parsed)
           }

           return m, nil
          }

          // optimizeConcatRegex returns literal prefix/suffix text that can be safely
          // checked against the label value before running the regexp matcher.
          func optimizeConcatRegex(r *syntax.Regexp) (prefix, suffix, contains string) {
           sub := r.Sub

           // We can safely remove begin and end text matchers respectively
           // at the beginning and end of the regexp.
           if len(sub) > 0 && sub[0].Op == syntax.OpBeginText {
            sub = sub[1:]
           }
           if len(sub) > 0 && sub[len(sub)-1].Op == syntax.OpEndText {
            sub = sub[:len(sub)-1]
           }

           if len(sub) == 0 {
            return
           }

           // Given Prometheus regex matchers are always anchored to the begin/end
           // of the text, if the first/last operations are literals, we can safely
           // treat them as prefix/suffix.
           if sub[0].Op == syntax.OpLiteral && (sub[0].Flags&syntax.FoldCase) == 0 {
            prefix = string(sub[0].Rune)
           }
           if last := len(sub) - 1; sub[last].Op == syntax.OpLiteral && (sub[last].Flags&syntax.FoldCase) == 0 {
            suffix = string(sub[last].Rune)
           }

           // If contains any literal which is not a prefix/suffix, we keep the
           // 1st one. We do not keep the whole list of literals to simplify the
           // fast path.
           for i := 1; i < len(sub)-1; i++ {
            if sub[i].Op == syntax.OpLiteral && (sub[i].Flags&syntax.FoldCase) == 0 {
             contains = string(sub[i].Rune)
             break
            }
           }

           return
          }

          func (m *fastRegexMatcher) MatchString(s string) bool {
           if m.prefix != "" && !strings.HasPrefix(s, m.prefix) {
            return false
           }

           if m.suffix != "" && !strings.HasSuffix(s, m.suffix) {
            return false
           }

           if m.contains != "" && !strings.Contains(s, m.contains) {
            return false
           }
           return m.re.MatchString(s)
          }

          ?? 存儲布局

          既然是數(shù)據(jù)庫,那么自然少不了數(shù)據(jù)持久化的特性。了解完索引的設(shè)計,再看看落到磁盤的存儲布局就很清晰了。先跑個示例程序?qū)懭胍恍?shù)據(jù)熱熱身。

          package main

          import (
           "fmt"
           "math/rand"
           "strconv"
           "time"

           "github.com/chenjiandongx/mandodb"
           "github.com/satori/go.uuid"
          )

          // 模擬一些監(jiān)控指標
          var metrics = []string{
           "cpu.busy""cpu.load1""cpu.load5""cpu.load15""cpu.iowait",
           "disk.write.ops""disk.read.ops""disk.used",
           "net.in.bytes""net.out.bytes""net.in.packages""net.out.packages",
           "mem.used""mem.idle""mem.used.bytes""mem.total.bytes",
          }

          // 增加 Label 數(shù)量
          var uid1, uid2, uid3 []string

          func init() {
           for i := 0; i < len(metrics); i++ {
            uid1 = append(uid1, uuid.NewV4().String())
            uid2 = append(uid2, uuid.NewV4().String())
            uid3 = append(uid3, uuid.NewV4().String())
           }
          }

          func genPoints(ts int64, node, dc int) []*mandodb.Row {
           points := make([]*mandodb.Row, 0)
           for idx, metric := range metrics {
            points = append(points, &mandodb.Row{
             Metric: metric,
             Labels: []mandodb.Label{
              {Name: "node", Value: "vm" + strconv.Itoa(node)},
              {Name: "dc", Value: strconv.Itoa(dc)},
              {Name: "foo", Value: uid1[idx]},
              {Name: "bar", Value: uid2[idx]},
              {Name: "zoo", Value: uid3[idx]},
             },
             Point: mandodb.Point{Ts: ts, Value: float64(rand.Int31n(60))},
            })
           }

           return points
          }

          func main() {
           store := mandodb.OpenTSDB()
           defer store.Close()

           now := time.Now().Unix() - 36000 // 10h ago

           for i := 0; i < 720; i++ {
            for n := 0; n < 5; n++ {
             for j := 0; j < 1024; j++ {
              _ = store.InsertRows(genPoints(now, n, j))
             }
            }

            now += 60 //1min
           }

           fmt.Println("finished")

           select {}
          }

          每個分塊保存在名字為 seg-${mints}-${maxts} 文件夾里,每個文件夾含有 datameta.json 兩個文件。

          • data: 存儲了一個 Segment 的所有數(shù)據(jù),包括數(shù)據(jù)點和索引信息。
          • meta.json: 描述了分塊的時間線數(shù)量,數(shù)據(jù)點數(shù)量以及該塊的數(shù)據(jù)時間跨度。
          ? ?? tree -h seg-*
          seg-1627709713-1627716973
          ├── [ 28M]  data
          └── [ 110]  meta.json
          seg-1627716973-1627724233
          ├── [ 28M]  data
          └── [ 110]  meta.json
          seg-1627724233-1627731493
          ├── [ 28M]  data
          └── [ 110]  meta.json
          seg-1627731493-1627738753
          ├── [ 28M]  data
          └── [ 110]  meta.json
          seg-1627738753-1627746013
          ├── [ 28M]  data
          └── [ 110]  meta.json

          0 directories, 10 files

          ? ?? cat seg-1627709713-1627716973/meta.json -p
          {
              "seriesCount": 81920,
              "dataPointsCount": 9912336,
              "maxTs": 1627716973,
              "minTs": 1627709713
          }

          存儲 8 萬條時間線共接近 1 千萬的數(shù)據(jù)點的數(shù)據(jù)塊占用磁盤 28M。實際上在寫入的時候,一條數(shù)據(jù)是這個樣子的。

          {__name__="cpu.busy", node="vm0", dc="0", foo="bdac463d-8805-4cbe-bc9a-9bf495f87bab", bar="3689df1d-cbf3-4962-abea-6491861e62d2", zoo="9551010d-9726-4b3b-baf3-77e50655b950"1627710454 41

          這樣一條數(shù)據(jù)按照 JSON 格式進行網(wǎng)絡(luò)通信的話,大概是 200Byte,初略計算一下。

          200 * 9912336 = 1982467200Byte = 1890M

          可以選擇 ZSTD 或者 Snappy 算法進行二次壓縮(默認不開啟)。還是上面的示例代碼,不過在 TSDB 啟動的時候指定了壓縮算法。

          ZstdBytesCompressor

          func main() {
           store := mandodb.OpenTSDB(mandodb.WithMetaBytesCompressorType(mandodb.ZstdBytesCompressor))
           defer store.Close()
           // ...
          }

          // 壓縮效果 28M -> 25M
          ? ?? ll seg-1627711905-1627719165
          Permissions Size User          Date Modified Name
          .rwxr-xr-x   25M chenjiandongx  1 Aug 00:13  data
          .rwxr-xr-x   110 chenjiandongx  1 Aug 00:13  meta.json

          SnappyBytesCompressor

          func main() {
           store := mandodb.OpenTSDB(mandodb.WithMetaBytesCompressorType(mandodb.SnappyBytesCompressor))
           defer store.Close()
           // ...
          }

          // 壓縮效果 28M -> 26M
          ? ?? ll seg-1627763918-1627771178
          Permissions Size User          Date Modified Name
          .rwxr-xr-x   26M chenjiandongx  1 Aug 14:39  data
          .rwxr-xr-x   110 chenjiandongx  1 Aug 14:39  meta.json

          多多少少還是有點效果的 ??...

          壓縮是有成本的,壓縮體積的同時會增大 CPU 開銷(mbp 可以煎雞蛋了),減緩寫入速率。

          敲黑板,接下來就要來好好講講 data 文件到底寫了什么東西。 data 存儲布局如下。

          Figure: Segment Stroage

          TOC 描述了 Data Block 和 Meta Block(Series Block + Labels Block)的體積,用于后面對 data 進行解析讀取。Data Block 存儲了每條時間線具體的數(shù)據(jù)點,時間線之間數(shù)據(jù)緊挨存儲。DataContent 就是使用 Gorilla 差值算法壓縮的 block。

          Figure: Data Block

          Labels Block 記錄了具體的 Label 值以及對應(yīng) Label 與哪些 Series 相關(guān)聯(lián)。

          Figure: Labels Block

          Series Block 記錄了每條時間線的元數(shù)據(jù),字段解釋如下。

          • SidLength: Sid 的長度。
          • Sid: 時間線的唯一標識。
          • StartOffset: 時間線數(shù)據(jù)塊在 Data Block 中的起始偏移。
          • EndOffset: 時間線數(shù)據(jù)塊在 Data Block 中的終止偏移。
          • LabelCount: 時間線包含的 Label 數(shù)量。
          • Labels: 標簽在 Labels Block 中的序號(僅記錄序號,不記錄具體值)。

          Figure: Series Block

          了解完設(shè)計,再看看 Meta Block 編碼和解編碼的代碼實現(xiàn),binaryMetaSerializer 實現(xiàn)了 MetaSerializer 接口。

          type MetaSerializer interface {
           Marshal(Metadata) ([]byte, error)
           Unmarshal([]byte, *Metadata) error
          }

          編碼 Metadata

          const (
           endOfBlock uint16 = 0xffff
           uint16Size        = 2
           uint32Size        = 4
           uint64Size        = 8

           magic = "https://github.com/chenjiandongx/mandodb"
          )

          func (s *binaryMetaSerializer) Marshal(meta Metadata) ([]byte, error) {
           encf := newEncbuf()

           // labels block
           labelOrdered := make(map[string]int)
           for idx, row := range meta.Labels {
            labelOrdered[row.Name] = idx
            encf.MarshalUint16(uint16(len(row.Name)))
            encf.MarshalString(row.Name)
            encf.MarshalUint32(uint32(len(row.Sids)))
            encf.MarshalUint32(row.Sids...)
           }
           encf.MarshalUint16(endOfBlock)

           // series block
           for idx, series := range meta.Series {
            encf.MarshalUint16(uint16(len(series.Sid)))
            encf.MarshalString(series.Sid)
            encf.MarshalUint64(series.StartOffset, series.EndOffset)

            rl := meta.sidRelatedLabels[idx]
            encf.MarshalUint32(uint32(rl.Len()))

            lids := make([]uint320, rl.Len())
            for _, lb := range rl {
             lids = append(lids, uint32(labelOrdered[lb.MarshalName()]))
            }

            sort.Slice(lids, func(i, j int) bool {
             return lids[i] < lids[j]
            })
            encf.MarshalUint32(lids...)
           }
           encf.MarshalUint16(endOfBlock)

           encf.MarshalUint64(uint64(meta.MinTs))
           encf.MarshalUint64(uint64(meta.MaxTs))
           encf.MarshalString(magic)   // <-- magic here

           return ByteCompress(encf.Bytes()), nil
          }

          解碼 Metadata

          func (s *binaryMetaSerializer) Unmarshal(data []byte, meta *Metadata) error {
           data, err := ByteDecompress(data)
           if err != nil {
            return ErrInvalidSize
           }

           if len(data) < len(magic) {
            return ErrInvalidSize
           }

           decf := newDecbuf()
           // 檢驗數(shù)據(jù)完整性
           if decf.UnmarshalString(data[len(data)-len(magic):]) != magic {
            return ErrInvalidSize
           }

           // labels block
           offset := 0
           labels := make([]seriesWithLabel, 0)
           for {
            var labelName string
            labelLen := decf.UnmarshalUint16(data[offset : offset+uint16Size])
            offset += uint16Size

            if labelLen == endOfBlock {
             break
            }

            labelName = decf.UnmarshalString(data[offset : offset+int(labelLen)])
            offset += int(labelLen)
            sidCnt := decf.UnmarshalUint32(data[offset : offset+uint32Size])
            offset += uint32Size

            sidLst := make([]uint32, sidCnt)
            for i := 0; i < int(sidCnt); i++ {
             sidLst[i] = decf.UnmarshalUint32(data[offset : offset+uint32Size])
             offset += uint32Size
            }
            labels = append(labels, seriesWithLabel{Name: labelName, Sids: sidLst})
           }
           meta.Labels = labels

           // series block
           rows := make([]metaSeries, 0)
           for {
            series := metaSeries{}
            sidLen := decf.UnmarshalUint16(data[offset : offset+uint16Size])
            offset += uint16Size

            if sidLen == endOfBlock {
             break
            }

            series.Sid = decf.UnmarshalString(data[offset : offset+int(sidLen)])
            offset += int(sidLen)
            series.StartOffset = decf.UnmarshalUint64(data[offset : offset+uint64Size])
            offset += uint64Size
            series.EndOffset = decf.UnmarshalUint64(data[offset : offset+uint64Size])
            offset += uint64Size
            labelCnt := decf.UnmarshalUint32(data[offset : offset+uint32Size])
            offset += uint32Size

            labelLst := make([]uint32, labelCnt)
            for i := 0; i < int(labelCnt); i++ {
             labelLst[i] = decf.UnmarshalUint32(data[offset : offset+uint32Size])
             offset += uint32Size
            }
            series.Labels = labelLst
            rows = append(rows, series)
           }
           meta.Series = rows

           meta.MinTs = int64(decf.UnmarshalUint64(data[offset : offset+uint64Size]))
           offset += uint64Size
           meta.MaxTs = int64(decf.UnmarshalUint64(data[offset : offset+uint64Size]))
           offset += uint64Size

           return decf.Err()
          }

          至此,對 mandodb 的索引和存儲整體設(shè)計是不是就了然于胸。?? 文檔較長,建議 Star 收藏,畢竟來都來了...

          項目地址:https://github.com/chenjiandongx/mandodb

          瀏覽 83
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  好逼人人 | av岛国免费 | 国内视频精品在线 | 日韩成人AV电影在线 | 欧美日韩A片免费观看 |