<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Loki 源碼分析之日志寫入

          共 30780字,需瀏覽 62分鐘

           ·

          2021-05-25 21:38

          前面我們介紹了 Loki 的一些基本使用配置,但是對(duì) Loki 還是了解不夠深入,官方文檔寫得較為凌亂,而且沒有跟上新版本,為了能夠?qū)?Loki 有一個(gè)更深入的認(rèn)識(shí),做到有的放矢,這里面我們嘗試對(duì) Loki 的源碼進(jìn)行一些簡單的分析,由于有很多模塊和實(shí)現(xiàn)細(xì)節(jié),這里我們主要是對(duì)核心功能進(jìn)行分析,希望對(duì)大家有所幫助。本文首先對(duì)日志的寫入過程進(jìn)行簡單分析。

          Distributor Push API

          Promtail 通過 Loki 的 Push API 接口推送日志數(shù)據(jù),該接口在初始化 Distributor 的時(shí)候進(jìn)行初始化,在控制器基礎(chǔ)上包裝了兩個(gè)中間件,其中的 HTTPAuthMiddleware 就是獲取租戶 ID,如果開啟了認(rèn)證配置,則從 X-Scope-OrgID 這個(gè)請(qǐng)求 Header 頭里面獲取,如果沒有配置則用默認(rèn)的 fake 代替。

          // pkg/loki/modules.go
          func (t *Loki) initDistributor() (services.Service, error) {
           ......
           if t.cfg.Target != All {
            logproto.RegisterPusherServer(t.Server.GRPC, t.distributor)
           }

           pushHandler := middleware.Merge(
            serverutil.RecoveryHTTPMiddleware,
            t.HTTPAuthMiddleware,
           ).Wrap(http.HandlerFunc(t.distributor.PushHandler))

           t.Server.HTTP.Handle("/api/prom/push", pushHandler)
           t.Server.HTTP.Handle("/loki/api/v1/push", pushHandler)
           return t.distributor, nil
          }

          Push API 處理器實(shí)現(xiàn)如下所示,首先通過 ParseRequest 函數(shù)將 Http 請(qǐng)求轉(zhuǎn)換成 logproto.PushRequest,然后直接調(diào)用 Distributor 下面的 Push 函數(shù)來推送日志數(shù)據(jù):

          // pkg/distributor/http.go

          // PushHandler 從 HTTP body 中讀取一個(gè) snappy 壓縮的 proto
          func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
           logger := util_log.WithContext(r.Context(), util_log.Logger)
           userID, _ := user.ExtractOrgID(r.Context())
           req, err := ParseRequest(logger, userID, r)
           ......
           _, err = d.Push(r.Context(), req)
           ......
          }

          func ParseRequest(logger gokit.Logger, userID string, r *http.Request) (*logproto.PushRequest, error) {
           var body lokiutil.SizeReader
           contentEncoding := r.Header.Get(contentEnc)
           switch contentEncoding {
           case "":
            body = lokiutil.NewSizeReader(r.Body)
           case "snappy":
            body = lokiutil.NewSizeReader(r.Body)
           case "gzip":
            gzipReader, err := gzip.NewReader(r.Body)
            if err != nil {
             return nil, err
            }
            defer gzipReader.Close()
            body = lokiutil.NewSizeReader(gzipReader)
           default:
            return nil, fmt.Errorf("Content-Encoding %q not supported", contentEncoding)
           }

           contentType := r.Header.Get(contentType)
           var req logproto.PushRequest
           ......
           switch contentType {
           case applicationJSON:
            var err error
            if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 {
             err = unmarshal.DecodePushRequest(body, &req)
            } else {
             err = unmarshal_legacy.DecodePushRequest(body, &req)
            }
            if err != nil {
             return nil, err
            }
           default:
            // When no content-type header is set or when it is set to
            // `application/x-protobuf`: expect snappy compression.
            if err := util.ParseProtoReader(r.Context(), body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil {
             return nil, err
            }
           }
           return &req, nil
          }

          首先我們先了解下 PushRequest 的結(jié)構(gòu),PushRequest 就是一個(gè) Stream 集合:

          // pkg/logproto/logproto.pb.go
          type PushRequest struct {
           Streams []Stream `protobuf:"bytes,1,rep,name=streams,proto3,customtype=Stream" json:"streams"`
          }

          // pkg/logproto/types.go
          // Stream 流包含一個(gè)唯一的標(biāo)簽集,作為一個(gè)字符串,然后還包含一組日志條目
          type Stream struct {
           Labels  string  `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"`
           Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"`
          }

          // Entry 是一個(gè)帶有時(shí)間戳的日志條目
          type Entry struct {
           Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
           Line      string    `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
          }

          然后查看 Distributor 下的 Push 函數(shù)實(shí)現(xiàn):

          // pkg/distributor/distributor.go
          // Push 日志流集合
          func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
           // 獲取租戶ID
           userID, err := user.ExtractOrgID(ctx)
           ......

           // 首先把請(qǐng)求平鋪成一個(gè)樣本的列表
           streams := make([]streamTracker, 0len(req.Streams))
           keys := make([]uint320len(req.Streams))
           var validationErr error
           validatedSamplesSize := 0
           validatedSamplesCount := 0

           validationContext := d.validator.getValidationContextFor(userID)

           for _, stream := range req.Streams {
            // 解析日志流標(biāo)簽
            stream.Labels, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
            ......
            n := 0
            for _, entry := range stream.Entries {
             // 校驗(yàn)一個(gè)日志Entry實(shí)體
             if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil {
              validationErr = err
              continue
             }
             stream.Entries[n] = entry
             n++
             // 校驗(yàn)成功的樣本大小和個(gè)數(shù)
             validatedSamplesSize += len(entry.Line)
             validatedSamplesCount++
            }
            // 去掉校驗(yàn)失敗的實(shí)體
            stream.Entries = stream.Entries[:n]

            if len(stream.Entries) == 0 {
             continue
            }
            // 為當(dāng)前日志流生成用于hash換的token值
            keys = append(keys, util.TokenFor(userID, stream.Labels))
            streams = append(streams, streamTracker{
             stream: stream,
            })
           }

           if len(streams) == 0 {
            return &logproto.PushResponse{}, validationErr
           }

           now := time.Now()
           // 每個(gè)租戶有一個(gè)限速器,判斷可以正常傳輸?shù)娜罩敬笮∈欠駪?yīng)該被限制
           if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamplesSize) {
            // 返回429表明客戶端被限速了
            ......
            return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize)
           }

           const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
           var descs [maxExpectedReplicationSet]ring.InstanceDesc

           samplesByIngester := map[string][]*streamTracker{}
           ingesterDescs := map[string]ring.InstanceDesc{}
           for i, key := range keys {
            // ReplicationSet 描述了一個(gè)指定的鍵與哪些 Ingesters 進(jìn)行對(duì)話,以及可以容忍多少個(gè)錯(cuò)誤
            // 根據(jù) label hash 到 hash 環(huán)上獲取對(duì)應(yīng)的 ingester 節(jié)點(diǎn),一個(gè)節(jié)點(diǎn)可能有多個(gè)對(duì)等的 ingester 副本來做 HA
            replicationSet, err := d.ingestersRing.Get(key, ring.Write, descs[:0], nilnil)
            ......
            // 最小成功的實(shí)例樹
            streams[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors
            // 可容忍的最大故障實(shí)例數(shù)
            streams[i].maxFailures = replicationSet.MaxErrors
            // 將 Stream 按對(duì)應(yīng)的 ingester 進(jìn)行分組
            for _, ingester := range replicationSet.Ingesters {
             // 配置每個(gè) ingester 副本對(duì)應(yīng)的日志流數(shù)據(jù)
             samplesByIngester[ingester.Addr] = append(samplesByIngester[ingester.Addr], &streams[i])
             ingesterDescs[ingester.Addr] = ingester
            }
           }

           tracker := pushTracker{
            done: make(chan struct{}),
            err:  make(chan error),
           }
           tracker.samplesPending.Store(int32(len(streams)))
           // 循環(huán)Ingesters
           for ingester, samples := range samplesByIngester {
            // 讓ingester并行處理通過hash環(huán)對(duì)應(yīng)的日志流列表
            go func(ingester ring.InstanceDesc, samples []*streamTracker) {
             ......
             // 將日志流樣本數(shù)據(jù)下發(fā)給對(duì)應(yīng)的 ingester 節(jié)點(diǎn)
             d.sendSamples(localCtx, ingester, samples, &tracker)
            }(ingesterDescs[ingester], samples)
           }
           ......
          }

          Push 函數(shù)的核心就是根據(jù)日志流的標(biāo)簽來計(jì)算一個(gè) Token 值,根據(jù)這個(gè) Token 值去哈希環(huán)上獲取對(duì)應(yīng)的處理日志的 Ingester 實(shí)例,然后并行通過 Ingester 處理日志流數(shù)據(jù),通過 sendSamples 函數(shù)為單個(gè) ingester 去發(fā)送日志樣本數(shù)據(jù):

          // pkg/distributor/distributor.go

          func (d *Distributor) sendSamples(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) {
           err := d.sendSamplesErr(ctx, ingester, streamTrackers)
           ......
          }

          func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.InstanceDesc, streams []*streamTracker) error {
           // 根據(jù) ingester 地址獲取 client
           c, err := d.pool.GetClientFor(ingester.Addr)
           ......
           // 重新構(gòu)造 PushRequest
           req := &logproto.PushRequest{
            Streams: make([]logproto.Stream, len(streams)),
           }
           for i, s := range streams {
            req.Streams[i] = s.stream
           }
           // 通過 Ingester 客戶端請(qǐng)求數(shù)據(jù)
           _, err = c.(logproto.PusherClient).Push(ctx, req)
           ......
          }

          Ingester 寫入日志

          Ingester 客戶端中的 Push 函數(shù)實(shí)際上就是一個(gè) gRPC 服務(wù)的客戶端:

          // pkg/ingester/ingester.go

          // Push 實(shí)現(xiàn) logproto.Pusher.
          func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
           // 獲取租戶ID
           instanceID, err := user.ExtractOrgID(ctx)
           ......
           // 根據(jù)租戶ID獲取 instance 對(duì)象
           instance := i.getOrCreateInstance(instanceID)
           // 直接調(diào)用 instance 對(duì)象 Push 數(shù)據(jù)
           err = instance.Push(ctx, req)
           return &logproto.PushResponse{}, err
          }

          instance 下的 Push 函數(shù):

          // pkg/ingester/instance.go

          func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
           record := recordPool.GetRecord()
           record.UserID = i.instanceID
           defer recordPool.PutRecord(record)

           i.streamsMtx.Lock()
           defer i.streamsMtx.Unlock()

           var appendErr error
           for _, s := range req.Streams {
            // 獲取一個(gè) stream 對(duì)象
            stream, err := i.getOrCreateStream(s, false, record)
            if err != nil {
             appendErr = err
             continue
            }
            // 真正用于數(shù)據(jù)處理的是 stream 對(duì)象中的 Push 函數(shù)
            if _, err := stream.Push(ctx, s.Entries, record); err != nil {
             appendErr = err
             continue
            }
           }
           ......
           return appendErr
          }

          func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, record *WALRecord) (*stream, error) {
           if lock {
            i.streamsMtx.Lock()
            defer i.streamsMtx.Unlock()
           }
           // 如果 streams 中包含當(dāng)前標(biāo)簽列表對(duì)應(yīng)的 stream 對(duì)象,則直接返回
           stream, ok := i.streams[pushReqStream.Labels]
           if ok {
            return stream, nil
           }
           // record 只在重放 WAL 時(shí)為 nil
           // 我們不希望在重放 WAL 后丟掉數(shù)據(jù)
           // 為 instance 降低 stream 流限制
           var err error
           if record != nil {
            // 限流器判斷
            // AssertMaxStreamsPerUser 確保與當(dāng)前輸入的流數(shù)量沒有達(dá)到限制
            err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
           }
           ......
           // 解析日志流標(biāo)簽集
           labels, err := logql.ParseLabels(pushReqStream.Labels)
           ......
           // 獲取對(duì)應(yīng)標(biāo)簽集的指紋
           fp := i.getHashForLabels(labels)
           // 重新實(shí)例化一個(gè) stream 對(duì)象,這里還會(huì)維護(hù)日志流的倒排索引
           sortedLabels := i.index.Add(client.FromLabelsToLabelAdapters(labels), fp)
           stream = newStream(i.cfg, fp, sortedLabels, i.metrics)
           // 將stream設(shè)置到streams中去
           i.streams[pushReqStream.Labels] = stream
           i.streamsByFP[fp] = stream

           // 當(dāng)重放 wal 的時(shí)候 record 是 nil (我們不希望在重放時(shí)重寫 wal entries).
           if record != nil {
            record.Series = append(record.Series, tsdb_record.RefSeries{
             Ref:    uint64(fp),
             Labels: sortedLabels,
            })
           } else {
            // 如果 record 為 nil,這就是一個(gè) WAL 恢復(fù)
            i.metrics.recoveredStreamsTotal.Inc()
           }
           ......
           i.addTailersToNewStream(stream)
           return stream, nil
          }

          這個(gè)里面涉及到 WAL 這一塊的設(shè)計(jì),比較復(fù)雜,我們可以先看 stream 下面的 Push 函數(shù)實(shí)現(xiàn),主要就是將收到的 []Entry 先 Append 到內(nèi)存中的 Chunk 流([]chunkDesc) 中:

          // pkg/ingester/stream.go
          func (s *stream) Push(ctx context.Context, entries []logproto.Entry, record *WALRecord) (int, error) {
           s.chunkMtx.Lock()
           defer s.chunkMtx.Unlock()
           var bytesAdded int
           prevNumChunks := len(s.chunks)
           var lastChunkTimestamp time.Time
           // 如果之前的 chunks 列表為空,則創(chuàng)建一個(gè)新的 chunk
           if prevNumChunks == 0 {
            s.chunks = append(s.chunks, chunkDesc{
             chunk: s.NewChunk(),
            })
            chunksCreatedTotal.Inc()
           } else {
            // 獲取最新一個(gè)chunk的日志時(shí)間戳
            _, lastChunkTimestamp = s.chunks[len(s.chunks)-1].chunk.Bounds()
           }

           var storedEntries []logproto.Entry
           failedEntriesWithError := []entryWithError{}

           for i := range entries {
            // 如果這個(gè)日志條目與我們最后 append 的一行的時(shí)間戳和內(nèi)容相匹配,則忽略它
            if entries[i].Timestamp.Equal(s.lastLine.ts) && entries[i].Line == s.lastLine.content {
             continue
            }

            // 最新的一個(gè) chunk
            chunk := &s.chunks[len(s.chunks)-1]
            // 如果當(dāng)前chunk已經(jīng)關(guān)閉 或者 已經(jīng)達(dá)到設(shè)置的最大 Chunk 大小
            if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) {
             // 如果 chunk 沒有更多的空間,則調(diào)用 Close 來以確保 head block 中的數(shù)據(jù)都被切割和壓縮。
             err := chunk.chunk.Close()
             ......
             chunk.closed = true
             ......
             // Append 一個(gè)新的 Chunk
             s.chunks = append(s.chunks, chunkDesc{
              chunk: s.NewChunk(),
             })
             chunk = &s.chunks[len(s.chunks)-1]
             lastChunkTimestamp = time.Time{}
            }
            // 往 chunk 里面 Append 日志數(shù)據(jù)
            if err := chunk.chunk.Append(&entries[i]); err != nil {
             failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err})
            } else {
             // 存儲(chǔ)添加到 chunk 中的日志數(shù)據(jù)
             storedEntries = append(storedEntries, entries[i])
             // 配置最后日志行的數(shù)據(jù)
             lastChunkTimestamp = entries[i].Timestamp
             s.lastLine.ts = lastChunkTimestamp
             s.lastLine.content = entries[i].Line
             // 累計(jì)大小
             bytesAdded += len(entries[i].Line)
            }
            chunk.lastUpdated = time.Now()
           }

           if len(storedEntries) != 0 {
            // 當(dāng)重放 wal 的時(shí)候 record 將為 nil(我們不希望在重放的時(shí)候重寫wal日志條目)
            if record != nil {
             record.AddEntries(uint64(s.fp), storedEntries...)
            }
            // 后續(xù)是用與tail日志的處理
            ......
           }
           ......
           // 如果新增了chunks
           if len(s.chunks) != prevNumChunks {
            memoryChunks.Add(float64(len(s.chunks) - prevNumChunks))
           }
           return bytesAdded, nil
          }

          Chunk 其實(shí)就是多條日志構(gòu)成的壓縮包,將日志壓成 Chunk 的可以直接存入對(duì)象存儲(chǔ), 一個(gè) Chunk 到達(dá)指定大小之前會(huì)不斷 Append 新的日志到里面,而在達(dá)到大小之后, Chunk 就會(huì)關(guān)閉等待持久化(強(qiáng)制持久化也會(huì)關(guān)閉 Chunk, 比如關(guān)閉 ingester 實(shí)例時(shí)就會(huì)關(guān)閉所有的 Chunk 并持久化)。Chunk 的大小控制很重要:

          • 假如 Chunk 容量過小: 首先是導(dǎo)致壓縮效率不高,同時(shí)也會(huì)增加整體的 Chunk 數(shù)量, 導(dǎo)致倒排索引過大,最后, 對(duì)象存儲(chǔ)的操作次數(shù)也會(huì)變多, 帶來額外的性能開銷
          • 假如 Chunk 過大: 一個(gè) Chunk 的 open 時(shí)間會(huì)更長, 占用額外的內(nèi)存空間, 同時(shí), 也增加了丟數(shù)據(jù)的風(fēng)險(xiǎn),Chunk 過大也會(huì)導(dǎo)致查詢讀放大

          (圖片來源: https://aleiwu.com/post/grafana-loki/)

          在將日志流追加到 Chunk 中過后,在 Ingester 初始化時(shí)會(huì)啟動(dòng)兩個(gè)循環(huán)去處理 Chunk 數(shù)據(jù),分別從 chunks 數(shù)據(jù)取出存入優(yōu)先級(jí)隊(duì)列,另外一個(gè)循環(huán)定期檢查從內(nèi)存中刪除已經(jīng)持久化過后的數(shù)據(jù)。

          首先是 Ingester 中定義了一個(gè) flushQueues 屬性,是一個(gè)優(yōu)先級(jí)隊(duì)列數(shù)組,該隊(duì)列中存放的是 flushOp

          // pkg/ingester/ingester.go
          type Ingester struct {
           services.Service
           ......
           // 每個(gè) flush 線程一個(gè)隊(duì)列,指紋用來選擇隊(duì)列
           flushQueues     []*util.PriorityQueue  // 優(yōu)先級(jí)隊(duì)列數(shù)組
           flushQueuesDone sync.WaitGroup
           ......
          }

          // pkg/ingester/flush.go
          // 優(yōu)先級(jí)隊(duì)列中存放的數(shù)據(jù)
          type flushOp struct {
           from      model.Time
           userID    string
           fp        model.Fingerprint
           immediate bool
          }

          在初始化 Ingester 的時(shí)候會(huì)根據(jù)傳遞的 ConcurrentFlushes 參數(shù)來實(shí)例化 flushQueues 的大小:

          // pkg/ingester/ingester.go
          func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error) {
           ......
           i := &Ingester{
            ......
            flushQueues:           make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
            ......
           }
           ......
           i.Service = services.NewBasicService(i.starting, i.running, i.stopping)
           return i, nil
          }

          然后通過 services.NewBasicService 實(shí)例化 Service 的時(shí)候指定了服務(wù)的 Starting、Running、Stopping 3 個(gè)狀態(tài),在其中的 staring 狀態(tài)函數(shù)中會(huì)啟動(dòng)協(xié)程去消費(fèi)優(yōu)先級(jí)隊(duì)列中的數(shù)據(jù)

          // pkg/ingester/ingester.go
          func (i *Ingester) starting(ctx context.Context) error {
           // todo,如果開啟了 WAL 的處理
           ......
           // 初始化 flushQueues
           i.InitFlushQueues()
           ......
           // 啟動(dòng)循環(huán)檢查chunk數(shù)據(jù)
           i.loopDone.Add(1)
           go i.loop()
           return nil
          }

          初始化 flushQueues 實(shí)現(xiàn)如下所示,其中 flushQueuesDone 是一個(gè) WaitGroup,根據(jù)配置的并發(fā)數(shù)量并發(fā)執(zhí)行 flushLoop 操作:

          // pkg/ingester/flush.go
          func (i *Ingester) InitFlushQueues() {
           i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
           for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
            // 為每個(gè)協(xié)程構(gòu)造一個(gè)優(yōu)先級(jí)隊(duì)列
            i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength)
            go i.flushLoop(j)
           }
          }

          每一個(gè)優(yōu)先級(jí)隊(duì)列循環(huán)消費(fèi)數(shù)據(jù):

          // pkg/ingester/flush.go
          func (i *Ingester) flushLoop(j int) {
           ......
           for {
            // 從隊(duì)列中根據(jù)優(yōu)先級(jí)取出數(shù)據(jù)
            o := i.flushQueues[j].Dequeue()
            if o == nil {
             return
            }
            op := o.(*flushOp)
            // 執(zhí)行真正的刷新用戶序列數(shù)據(jù)
            err := i.flushUserSeries(op.userID, op.fp, op.immediate)
            ......
            // 如果退出時(shí)刷新失敗了,把失敗的操作放回到隊(duì)列中去。
            if op.immediate && err != nil {
             op.from = op.from.Add(flushBackoff)
             i.flushQueues[j].Enqueue(op)
            }
           }
          }

          刷新用戶的序列操作,也就是要保存到存儲(chǔ)中去:

          // pkg/ingester/flush.go
          // 根據(jù)用戶ID刷新用戶日志序列
          func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
           instance, ok := i.getInstanceByID(userID)
           ......
           // 根據(jù)instance和fp指紋數(shù)據(jù)獲取需要刷新的chunks
           chunks, labels, chunkMtx := i.collectChunksToFlush(instance, fp, immediate)
           ......
           // 執(zhí)行真正的刷新 chunks 操作
           err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx)
           ......
          }

          // 收集需要刷新的 chunks
          func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, labels.Labels, *sync.RWMutex) {
           instance.streamsMtx.Lock()
           // 根據(jù)指紋數(shù)據(jù)獲取 stream
           stream, ok := instance.streamsByFP[fp]
           instance.streamsMtx.Unlock()
           if !ok {
            return nilnilnil
           }

           var result []*chunkDesc
           stream.chunkMtx.Lock()
           defer stream.chunkMtx.Unlock()
           // 循環(huán)所有chunks
           for j := range stream.chunks {
            // 判斷是否應(yīng)該刷新當(dāng)前chunk
            shouldFlush, reason := i.shouldFlushChunk(&stream.chunks[j])
            if immediate || shouldFlush {
             // 確保不再對(duì)該塊進(jìn)行寫操作(如果沒有關(guān)閉,則設(shè)置為關(guān)閉狀態(tài))
             if !stream.chunks[j].closed {
              stream.chunks[j].closed = true
             }
             // 如果該 chunk 還沒有被成功刷新,則刷新這個(gè)塊
             if stream.chunks[j].flushed.IsZero() {
              result = append(result, &stream.chunks[j])
              ......
             }
            }
           }
           return result, stream.labels, &stream.chunkMtx
          }

          下面是判斷一個(gè)具體的 chunk 是否應(yīng)該被刷新的邏輯:

          // pkg/ingester/flush.go
          func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) (boolstring) {
           // chunk關(guān)閉了也應(yīng)該刷新了
           if chunk.closed {
            if chunk.synced {
             return true, flushReasonSynced
            }
            return true, flushReasonFull
           }
           // chunk最后更新的時(shí)間超過了配置的 chunk 空閑時(shí)間 MaxChunkIdle
           if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle {
            return true, flushReasonIdle
           }

           // chunk的邊界時(shí)間操過了配置的 chunk  最大時(shí)間 MaxChunkAge
           if from, to := chunk.chunk.Bounds(); to.Sub(from) > i.cfg.MaxChunkAge {
            return true, flushReasonMaxAge
           }
           return false""
          }

          真正將 chunks 數(shù)據(jù)刷新保存到存儲(chǔ)中是 flushChunks 函數(shù)實(shí)現(xiàn)的:

          // pkg/ingester/flush.go
          func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc, chunkMtx sync.Locker) error {
           ......
           wireChunks := make([]chunk.Chunk, len(cs))
           // 下面的匿名函數(shù)用于生成保存到存儲(chǔ)中的chunk數(shù)據(jù)
           err = func() error {
            chunkMtx.Lock()
            defer chunkMtx.Unlock()

            for j, c := range cs {
             if err := c.chunk.Close(); err != nil {
              return err
             }
             firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds())
             ch := chunk.NewChunk(
              userID, fp, metric,
              chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize),
              firstTime,
              lastTime,
             )

             chunkSize := c.chunk.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header
             start := time.Now()
             if err := ch.EncodeTo(bytes.NewBuffer(make([]byte0, chunkSize))); err != nil {
              return err
             }
             wireChunks[j] = ch
            }
            return nil
           }()


           // 通過 store 接口保存 chunk 數(shù)據(jù)
           if err := i.store.Put(ctx, wireChunks); err != nil {
            return err
           }

           ......

           chunkMtx.Lock()
           defer chunkMtx.Unlock()
           for i, wc := range wireChunks {
            // flush 成功,寫入刷新時(shí)間
            cs[i].flushed = time.Now()
            // 下是一些監(jiān)控?cái)?shù)據(jù)更新
            ......
           }

           return nil
          }

          chunk 數(shù)據(jù)被寫入到存儲(chǔ)后,還有有一個(gè)協(xié)程會(huì)去定時(shí)清理本地的這些 chunk 數(shù)據(jù),在上面的 Ingester 的 staring 函數(shù)中最后有一個(gè) go i.loop(),在這個(gè) loop() 函數(shù)中會(huì)每隔 FlushCheckPeriod(默認(rèn) 30s,可以通過 --ingester.flush-check-period 進(jìn)行配置)時(shí)間就會(huì)去去調(diào)用 sweepUsers 函數(shù)進(jìn)行垃圾回收:

          // pkg/ingester/ingester.go
          func (i *Ingester) loop() {
           defer i.loopDone.Done()

           flushTicker := time.NewTicker(i.cfg.FlushCheckPeriod)
           defer flushTicker.Stop()

           for {
            select {
            case <-flushTicker.C:
             i.sweepUsers(falsetrue)
            case <-i.loopQuit:
             return
            }
           }
          }

          sweepUsers 函數(shù)用于執(zhí)行將日志流數(shù)據(jù)加入到優(yōu)先級(jí)隊(duì)列中,并對(duì)沒有序列的用戶進(jìn)行垃圾回收:

          // pkg/ingester/flush.go
          // sweepUsers 定期執(zhí)行 flush 操作,并對(duì)沒有序列的用戶進(jìn)行垃圾回收
          func (i *Ingester) sweepUsers(immediate, mayRemoveStreams bool) {
           instances := i.getInstances()
           for _, instance := range instances {
            i.sweepInstance(instance, immediate, mayRemoveStreams)
           }
          }

          func (i *Ingester) sweepInstance(instance *instance, immediate, mayRemoveStreams bool) {
           instance.streamsMtx.Lock()
           defer instance.streamsMtx.Unlock()
           for _, stream := range instance.streams {
            i.sweepStream(instance, stream, immediate)
            i.removeFlushedChunks(instance, stream, mayRemoveStreams)
           }
          }

          // must hold streamsMtx
          func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate bool) {
           stream.chunkMtx.RLock()
           defer stream.chunkMtx.RUnlock()
           if len(stream.chunks) == 0 {
            return
           }
           // 最新的chunk
           lastChunk := stream.chunks[len(stream.chunks)-1]
           // 判斷是否應(yīng)該被flush
           shouldFlush, _ := i.shouldFlushChunk(&lastChunk)
           // 如果只有一個(gè)chunk并且不是強(qiáng)制持久化切最新的chunk還不應(yīng)該被flush,則直接返回
           if len(stream.chunks) == 1 && !immediate && !shouldFlush {
            return
           }
           // 根據(jù)指紋獲取用與處理的優(yōu)先級(jí)隊(duì)列索引
           flushQueueIndex := int(uint64(stream.fp) % uint64(i.cfg.ConcurrentFlushes))
           firstTime, _ := stream.chunks[0].chunk.Bounds()
           // 加入到優(yōu)先級(jí)隊(duì)列中去
           i.flushQueues[flushQueueIndex].Enqueue(&flushOp{
            model.TimeFromUnixNano(firstTime.UnixNano()), instance.instanceID,
            stream.fp, immediate,
           })
          }

          // 移除已經(jīng)flush過后的chunks數(shù)據(jù)
          func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRemoveStream bool) {
           now := time.Now()

           stream.chunkMtx.Lock()
           defer stream.chunkMtx.Unlock()
           prevNumChunks := len(stream.chunks)
           var subtracted int
           for len(stream.chunks) > 0 {
            // 如果chunk還沒有被刷新到存儲(chǔ) 或者 chunk被刷新到存儲(chǔ)到現(xiàn)在的時(shí)間還沒操過 RetainPeriod(默認(rèn)15分鐘,可以通過--ingester.chunks-retain-period 進(jìn)行配置)則忽略
            if stream.chunks[0].flushed.IsZero() || now.Sub(stream.chunks[0].flushed) < i.cfg.RetainPeriod {
             break
            }
            subtracted += stream.chunks[0].chunk.UncompressedSize()
            // 刪除引用,以便該塊可以被垃圾回收起來
            stream.chunks[0].chunk = nil
            // 移除chunk
            stream.chunks = stream.chunks[1:]
           }
           ......
           // 如果stream中的所有chunk都被清空了,則清空該 stream 的相關(guān)數(shù)據(jù)
           if mayRemoveStream && len(stream.chunks) == 0 {
            delete(instance.streamsByFP, stream.fp)
            delete(instance.streams, stream.labelsString)
            instance.index.Delete(stream.labels, stream.fp)
            ......
           }
          }

          關(guān)于存儲(chǔ)或者查詢等模塊的實(shí)現(xiàn)在后文再繼續(xù)探索,包括 WAL 的實(shí)現(xiàn)也較為復(fù)雜。


          K8S 進(jìn)階訓(xùn)練營


           點(diǎn)擊屏末  | 即刻學(xué)習(xí)
          瀏覽 63
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  人人操人人射人人色 | 日本动漫操逼一区二区 | 亚洲第一中文字幕在线播放 | 日韩在线aaa | 伊人大香蕉电影网 |