<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink 1.15 新功能架構(gòu)解析:高效穩(wěn)定的通用增量 Checkpoint

          共 7455字,需瀏覽 15分鐘

           ·

          2022-06-11 22:53


          作者|梅源(Yuan Mei)&? Roman Khachatryan

          流處理系統(tǒng)最重要的特性是端到端的延遲,端到端延遲是指開始處理輸入數(shù)據(jù)到輸出該數(shù)據(jù)產(chǎn)生的結(jié)果所需的時(shí)間。Flink,作為流式計(jì)算的標(biāo)桿,其端到端延遲包括容錯(cuò)的快慢主要取決于檢查點(diǎn)機(jī)制(Checkpointing),所以如何將 Checkpoint 做得高效穩(wěn)定是 Flink 流計(jì)算的首要任務(wù)。我們?cè)?“Flink 新一代流計(jì)算和容錯(cuò)——階段總結(jié)和展望”[1]??一文中介紹了 Flink 從社區(qū) 1.12 版本開始所做的提升 Checkpointing 機(jī)制的努力,本文將著重介紹其中剛剛在 Flink 1.15 版本發(fā)布的 Generic Log-Based Incremental Checkpointing 這個(gè)功能。


          一、概述


          Generic Log-Based Incremental Checkpointing 的設(shè)計(jì)初衷是我們將全量的狀態(tài)快照和增量的檢查點(diǎn)機(jī)制分隔開,通過持續(xù)上傳增量 Changelog 的方法,來確保每次 Checkpointing 可以穩(wěn)定快速的完成,從而減小 Checkpointing 之間的間隔,提升 Flink系統(tǒng)端到端的延遲。拓展開來說,主要有如下三點(diǎn)提升:


          1. 更短的端到端延遲:尤其是對(duì)于 Transactional Sink。Transactional Sink 在 Checkpoint 完成的時(shí)候才能完成兩階段提交,因此減小 Checkpointing 的間隔意味著可以更頻繁的提交,達(dá)到更短的端到端的延遲。

          2. 更穩(wěn)定的 Checkpoint 完成時(shí)間目前 Checkpoint 完成時(shí)間很大程度上取決于在 Checkpointing 時(shí)需要持久化的(增量)狀態(tài)的大小。在新的設(shè)計(jì)中,我們通過持續(xù)上傳增量,以達(dá)到減少 Checkpoint Flush 時(shí)所需要持久化的數(shù)據(jù),來保證 Checkpoint 完成的穩(wěn)定性。

          3. 容錯(cuò)恢復(fù)需要回滾的數(shù)據(jù)量更少:Checkpointing 之間的間隔越短,每次容錯(cuò)恢復(fù)后需要重新處理的數(shù)據(jù)就越少。


          那是怎么做到的呢?我們知道影響 Flink Checkpointing 時(shí)間的主要因素有以下幾點(diǎn):

          1. Checkpoint Barrier 流動(dòng)和對(duì)齊的速度;

          2. 將狀態(tài)快照持久化到非易失性高可用存儲(chǔ)(例如 S3)上所需要的時(shí)間。


          對(duì) Flink Checkpoint 機(jī)制不太了解的讀者可以參考:

          Flink 1.12 版本引入的 Unaligned Checkpoint[2]?和 1.14 版本中引入的 Buffer Debloating[3]?主要解決了上述第 1 個(gè)問題,尤其是在反壓的情況下。更早之前引入的 Incremental Checkpoint[4]?是為了減少每次 Checkpointing 所需要持久化存儲(chǔ)狀態(tài)的大小,以減小第 2 個(gè)影響因素,但在實(shí)際中也不完全能做到:現(xiàn)有 Incremental Checkpoint 是基于 RocksDB 來完成的,RocksDB 出于空間放大和讀性能的考慮會(huì)定期做 Compaction。Compaction 會(huì)產(chǎn)生新的、相對(duì)較大的文件,會(huì)增加上傳所需要的時(shí)間。每一個(gè)執(zhí)行 Flink 作業(yè)的物理節(jié)點(diǎn)(Task)至少有一個(gè) RocksDB 實(shí)例,所以 Checkpoint 被延遲的概率會(huì)隨著物理節(jié)點(diǎn)增多而變大。這導(dǎo)致在 Flink 的大型作業(yè)中,幾乎每次完成 Checkpointing 時(shí)都有可能會(huì)因?yàn)槟硞€(gè)節(jié)點(diǎn)而延遲,如下圖所示。



          圖1: 每次 Checkpoint 都可能因?yàn)槟硞€(gè)節(jié)點(diǎn)上傳文件緩慢而延遲


          另外值得一提的是在現(xiàn)有的 Checkpointing 機(jī)制下,Task 只有在收到至少一個(gè) Checkpoint Barrier 之后,才會(huì)做狀態(tài)快照并且開始持久化狀態(tài)快照到高可用存儲(chǔ),從而增加了 Checkpoint 完成時(shí)間,如下圖所示。



          圖2: 在現(xiàn)有機(jī)制下,快照在 Checkpoint Barrier 到達(dá)之后才會(huì)開始上傳


          在新的設(shè)計(jì)中,我們通過持續(xù)上傳增量 Changelog 的方法,可以避免這個(gè)限制,加速 Checkpoint 完成時(shí)間。下面我們來看看詳細(xì)的設(shè)計(jì)。

          二、設(shè)計(jì)


          Generic Log-Based Incremental Checkpointing 的核心思想是引入 State Changelog(狀態(tài)變化日志),這樣可以更細(xì)粒度地持久化狀態(tài):


          1. 算子在更新狀態(tài)的時(shí)候?qū)戨p份,一份更新寫入狀態(tài)表 State Table 中,一份增量寫入 State Changelog 中。

          2. Checkpoint 變成由兩個(gè)部分組成,第一個(gè)部分是當(dāng)前已經(jīng)持久化的存在遠(yuǎn)端存儲(chǔ)上的 State Table,第二個(gè)部分是增量的 State Changelog。

          3. State Table 的持久化和 Checkpointing 過程獨(dú)立開來,會(huì)定期由 background thread 持久化,我們稱為 Materialization(物化)的過程。

          4. 在做 Checkpoint 的時(shí)候,只要保證新增的 State Changelog 被持久化就可以了。

          新的設(shè)計(jì)中需要在做 Checkpoint 的時(shí)候上傳的數(shù)據(jù)量變得很少,不僅可以把 Checkpoint 做得更穩(wěn)定,還可以做得更高頻。整個(gè)工作流程如下圖所示:



          圖3: Generic Log-Based Incremental Checkpointing 工作流程


          Generic Log-Based Incremental Checkpointing 類似傳統(tǒng)數(shù)據(jù)庫系統(tǒng)的 WAL 機(jī)制:

          1. 數(shù)據(jù)的增量更改(插入/更新/刪除)會(huì)被寫入到 Transaction Log 中。一旦這部分更改的日志被同步到持久存儲(chǔ)中,我們就可以認(rèn)為 Transaction 已經(jīng)完成了。這個(gè)過程類似于上述方法中的 Checkpointing 的過程。

          2. 同時(shí),為了方便數(shù)據(jù)查詢,數(shù)據(jù)的更改也會(huì)異步持久化在數(shù)據(jù)表(Table)中。 一旦 Transaction Log 中的相關(guān)部分也在數(shù)據(jù)表中被持久化了,Transaction Log 中相關(guān)部分就可以刪除了。這個(gè)過程類似于我們方法中的 State Table 持久化過程。


          這種和 WAL 類似的機(jī)制可以有效提升 Checkpoint 完成的速度,但也帶來一些額外的開銷:


          1. 額外的網(wǎng)絡(luò) IO 和額外的 Changelog 持久存儲(chǔ)開銷;

          2. 緩存 Changelog 帶來的額外的內(nèi)存使用;

          3. 容錯(cuò)恢復(fù)需要額外的重放 Changelog 帶來的潛在的恢復(fù)時(shí)間的增加。

          我們?cè)诤竺娴?Benchmark 對(duì)比中,也會(huì)對(duì)這三方面的影響進(jìn)行分析。特別對(duì)于第 3 點(diǎn),額外的重放 Changelog 所帶來的容錯(cuò)恢復(fù)時(shí)間增加會(huì)在一定程度上因?yàn)榭梢宰龈l繁的 Checkpoint 所彌補(bǔ),因?yàn)楦l繁的 Checkpoint 意味著容錯(cuò)恢復(fù)后需要回放的處理數(shù)據(jù)更少。

          三、Changelog 存儲(chǔ)(DSTL)


          Generic Log-Based Incremental Checkpointing 的很重要的一個(gè)組件是 State Changelog 存儲(chǔ)這個(gè)部分,我們稱之為 Durable Short-term Log(DSTL,短存 Log)。DSTL 需要滿足以下幾個(gè)特性:


          • 短期持久化

            State Changelog 是組成 Checkpoint 的一個(gè)部分,所以也需要能持久化存儲(chǔ)。同時(shí),State Changelog 只需要保存從最近一次持久化 State Table 到當(dāng)前做 Checkpoint 時(shí)的 Changelog,因此只需要保存很短時(shí)間(幾分鐘)的數(shù)據(jù)。


          • 寫入頻率遠(yuǎn)遠(yuǎn)大于讀取頻率

            只有在 Restore 或者 Rescale 的情況下才需要讀取 Changelog,大部分情況下只有 append 操作,并且一旦寫入,數(shù)據(jù)就不能再被修改。


          • 很短的寫延遲

            引入 State Changelog 是為了能將 Checkpoint 做得更快(1s 以內(nèi))。因此,單次寫請(qǐng)求需要至少能在期望的 Checkpoint 時(shí)間內(nèi)完成。


          • 保證一致性

            如果我們有多個(gè) State Changelog 的副本,就會(huì)產(chǎn)生多副本之間的一致性問題。一旦某個(gè)副本的 State Changelog 被持久化并被 JM 確認(rèn),恢復(fù)時(shí)需要以此副本為基準(zhǔn)保證語義一致性。

          從上面的特性也可以看出為什么我們將 Changelog 存儲(chǔ)命名為 DSTL 短存 Log。

          3.1 DSTL 方案的選擇


          DSTL 可以有多種方式實(shí)現(xiàn),例如分布式日志(Kafka)、分布式文件系統(tǒng)(DFS),甚至是數(shù)據(jù)庫。在 Flink 1.15 發(fā)布的 Generic Log-Based Incremental Checkpointing MVP 版本中,我們選擇 DFS 來實(shí)現(xiàn) DSTL,基于如下考慮:

          1. 沒有額外的外部依賴:目前 Flink Checkpoint 持久化在 DFS 中,所以以 DFS 來實(shí)現(xiàn) DSTL 沒有引入額外的外部組件。

          2. 沒有額外的狀態(tài)管理:目前的設(shè)計(jì)方案中 DSTL 的狀態(tài)管理是和 Flink Checkpointing 機(jī)制整合在一起的,所以也不需要額外的狀態(tài)管理。

          3. DFS 原生提供持久化和一致性保證:如果實(shí)現(xiàn)多副本分布式日志,這些都是額外需要考慮的成本。


          另一方面,使用 DFS 有以下缺點(diǎn):


          1. 更高的延遲:DFS 相比于寫入本地盤的分布式日志系統(tǒng)來講一般來說有更高的延遲。
          2. 網(wǎng)絡(luò) I/O 限制:大部分 DFS 供應(yīng)商出于成本的考慮都會(huì)對(duì)單用戶 DFS 寫入限流限速,極端情況有可能會(huì)造成網(wǎng)絡(luò)過載。


          經(jīng)過一些初步實(shí)驗(yàn),我們認(rèn)為目前大部分 DFS 實(shí)現(xiàn)(例如 S3,HDFS 等)的性能可以滿足 80% 的用例,后面的 Benchmark 會(huì)提供更多數(shù)據(jù)。

          3.2 DSTL 架構(gòu)


          下圖以 RocksDB 為例展示了基于 DFS 的 DSTL 架構(gòu)圖。狀態(tài)更新通過 Changelog State Backend 雙寫,一份寫到 RocksDB,另一份寫到 DSTL。RocksDB 會(huì)定期進(jìn)行 Materialization,也就是將當(dāng)前的 SST 文件 上傳到 DFS;而 DSTL 會(huì)將 state change 持續(xù)寫入 DFS,并在 Checkpointing 的時(shí)候完成 flush,這樣 Checkpoint 完成時(shí)間只取決于所需 flush 的數(shù)據(jù)量。需要注意的是 Materialization 完全獨(dú)立于 Checkpointing 的過程,并且 Materialization 也可以比 Checkpointing 的頻率慢很多,系統(tǒng)默認(rèn)值是 10 分鐘。



          圖4: 以 RocksDB 為例基于 DFS 的 DSTL 架構(gòu)圖


          這里還有幾個(gè)問題值得補(bǔ)充討論一下:

          • 狀態(tài)清理問題

            前面有提到在新的架構(gòu)中,一個(gè) Checkpoint 由兩部分組成:1)State Table 和 2)State Change Log。這兩部分都需要按需清理。1)這個(gè)部分的清理復(fù)用 Flink 已有的 Checkpoint 機(jī)制;2)這個(gè)部分的清理相對(duì)較復(fù)雜,特別是 State Change Log 在當(dāng)前的設(shè)計(jì)中為了避免小文件的問題,是以 TM 為粒度的。在當(dāng)前的設(shè)計(jì)中,我們分兩個(gè)部分來清理 State Change Log:一是 Change Log 本身的數(shù)據(jù)需要在 State Table 物化后刪除其相對(duì)應(yīng)的部分;二是 Change Log 中成為 Checkpoint 的部分的清理融合進(jìn)已有的 Flink Checkpoint 清理機(jī)制[4]?。


          • DFS 相關(guān)問題

            • 長尾延遲問題
              為了解決 DFS 高長尾延遲問題,DFS 寫入請(qǐng)求會(huì)在允許超時(shí)時(shí)間(默認(rèn)為 1 秒)內(nèi)無法完成時(shí)重試。

            • 小文件問題
              DFS 的一個(gè)問題是每個(gè) Checkpoint 會(huì)創(chuàng)建很多小文件,并且因?yàn)?Changleog State Backend 可以提供更高頻的 Checkpoint,小文件問題會(huì)成為瓶頸。為了緩解這種情況,我們將同一個(gè) Task Manager 上同一作業(yè)的所有 State Change 寫到同一個(gè)文件中。因此,同一個(gè) Task Manager 會(huì)共享同一個(gè) State Change Log。


          四、Benchmark 測(cè)試結(jié)果分析


          Generic Log-Based Incremental Checkpointing 對(duì)于 Checkpoint 速度和穩(wěn)定性的提升取決于以下幾個(gè)因素:

          1. State Change Log 增量的部分與全量狀態(tài)大小之比,增量越小越好。

          2. 不間斷上傳狀態(tài)增量的能力。這個(gè)和狀態(tài)訪問模式相關(guān),極端情況下,如果算子只在 Checkpointing 前更新 Flink State Table 的話,Changelog 起不到太大作用。

          3. 能夠?qū)碜远鄠€(gè) Task 的 changelog 分組批量上傳的能力。Changelog 分組批量寫 DFS 可以減少需要?jiǎng)?chuàng)建的文件數(shù)量并降低 DFS 負(fù)載,從而提高穩(wěn)定性。

          4. 底層 State Backend 在刷磁盤前對(duì)同一個(gè) key 的 更新的去重能力。因?yàn)?state change log 保存的是狀態(tài)更新,而不是最終值,底層 State Backend 這種能力會(huì)增大 Changelog 增量與 State Table 全量狀態(tài)大小之比。

          5. 寫持久存儲(chǔ) DFS 的速度,寫的速度越快 Changelog 所帶來的提升越不明顯。


          4.1 Benchmark 配置

          在 Benchmark 實(shí)驗(yàn)中,我們使用如下配置:
          • 算子并行度:50
          • 運(yùn)行時(shí)間:21h
          • State Backend:RocksDB (Incremental Checkpoint Enabled)
          • 持久存儲(chǔ):S3 (Presto plugin)
          • 機(jī)器型號(hào):AWS m5.xlarge(4 slots per TM)
          • Checkpoint 間隔: 10ms
          • State Table Materialization 間隔:3m
          • Input Rate:50K Events /s


          4.2 ValueState Workload


          我們第一部分的實(shí)驗(yàn),主要針對(duì)每次更新的 Key 值都不一樣的負(fù)載;這種負(fù)載因?yàn)樯鲜龅?2 點(diǎn)和第 4 點(diǎn)的原因,Changelog 的提升是比較明顯的:Checkpoint 完成時(shí)間縮短了 10 倍(99.9 pct),Checkpoint 大小增加 30%,恢復(fù)時(shí)間增加 66% - 225%,如下表所示。



          表1: 基于 ValueState Workload 的 Changelog 各項(xiàng)指標(biāo)對(duì)比




          下面我們來更詳細(xì)的看一下 Checkpoint Size 這個(gè)部分:


          表2: 基于 ValueState Workload 的 Changelog(開啟/關(guān)閉)的 Checkpoint 相關(guān)指標(biāo)對(duì)比


          • Checkpointed Data Size 是指在收到 Checkpoint Barrier,Checkpointing 過程開始后上傳數(shù)據(jù)的大小。對(duì)于 Changelog 來說,大部分?jǐn)?shù)據(jù)在 Checkpointing 過程開始前就已經(jīng)上傳了,所以這就是為什么開啟 Changelog 時(shí)這個(gè)指標(biāo)要比關(guān)閉時(shí)小得多的原因。

          • Full Checkpoint Data Size 是構(gòu)成 Checkpoint 的所有文件的總大小,也包括與之前 Checkpoint 共享的文件。與通常的 Checkpoint 相比,Changelog 的格式?jīng)]有被壓縮過也不夠緊湊,因此占用更多空間。


          4.3 Window Workload



          這里使用的是 Sliding Window。如下表所示,Changelog 對(duì) checkpoint 完成時(shí)間加速 3 倍左右;但存儲(chǔ)放大要高得多(消耗的空間接近 45 倍):


          表3: 基于 Window Workload 的 Changelog(開啟/關(guān)閉)的 Checkpoint 相關(guān)指標(biāo)對(duì)比



          Full Checkpoint Data 存儲(chǔ)空間放大主要原因來自于:


          1. 對(duì)于 Sliding Window 算子,每條數(shù)據(jù)會(huì)加到多個(gè)滑動(dòng)窗口中,因此為造成多次更新。Changelog 的寫放大問題會(huì)更大。

          2. 前面有提到,如果底層 State Backend(比如 RocksDB)在刷磁盤前對(duì)同一個(gè) key 的 更新去重能力越強(qiáng),則快照的大小相對(duì)于 Changelog 會(huì)越小。在 Sliding Window 算子的極端情況下,滑動(dòng)窗口會(huì)因?yàn)槭П磺謇?。如果更新和清理發(fā)生在同一個(gè) Checkpoint 之內(nèi),則很可能該窗口中的數(shù)據(jù)不包含在快照中。這也意味著清除窗口的速度越快,快照的大小就可能越小。

          五、結(jié)論


          Flink 1.15 版本實(shí)現(xiàn)了 Generic Log-Based Incremental Checkpointing 的 MVP 版本。這個(gè)版本基于 DFS 可以提供秒級(jí)左右的 Checkpoint 時(shí)間,并極大的提升了 Checkpoint 穩(wěn)定性,但一定程度上也增加了空間的成本,本質(zhì)上是用空間換時(shí)間。1.16 版本將進(jìn)一步完善使其生產(chǎn)可用,比如我們可以通過 Local Recovery 和文件緩存來加速恢復(fù)時(shí)間。另一個(gè)方面,Changelog State Backend 接口是通用的,我們可以用同樣的接口對(duì)接更快的存儲(chǔ)來實(shí)現(xiàn)更短的延遲,例如 Apache Bookkeeper。除此之外,我們正在研究 Changelog 的其他應(yīng)用,例如將 Changelog 應(yīng)用于 Sink 來實(shí)現(xiàn)通用的端到端的 exactly-once 等。

          附錄

          如果您想試用 Generic Log-Based Incremental Checkpointing 的話,可以在 flink-conf.yaml 中進(jìn)行如下簡(jiǎn)單的設(shè)置:

          state.backend.changelog.enabled: true
          state.backend.changelog.storage: filesystem
          dstl.dfs.base-path: <location similar to state.checkpoints.dir>
          瀏覽 29
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产电影三级在线观看 | 日本学生妹内射视频在线观看 | 久久精品在线观看 | 国产小学生妹在线观看 | 欧美成人手机在线观看 |