Flink 1.15 新功能架構(gòu)解析:高效穩(wěn)定的通用增量 Checkpoint
一、概述
Generic Log-Based Incremental Checkpointing 的設(shè)計(jì)初衷是我們將全量的狀態(tài)快照和增量的檢查點(diǎn)機(jī)制分隔開,通過持續(xù)上傳增量 Changelog 的方法,來確保每次 Checkpointing 可以穩(wěn)定快速的完成,從而減小 Checkpointing 之間的間隔,提升 Flink系統(tǒng)端到端的延遲。拓展開來說,主要有如下三點(diǎn)提升:
更短的端到端延遲:尤其是對(duì)于 Transactional Sink。Transactional Sink 在 Checkpoint 完成的時(shí)候才能完成兩階段提交,因此減小 Checkpointing 的間隔意味著可以更頻繁的提交,達(dá)到更短的端到端的延遲。 更穩(wěn)定的 Checkpoint 完成時(shí)間:目前 Checkpoint 完成時(shí)間很大程度上取決于在 Checkpointing 時(shí)需要持久化的(增量)狀態(tài)的大小。在新的設(shè)計(jì)中,我們通過持續(xù)上傳增量,以達(dá)到減少 Checkpoint Flush 時(shí)所需要持久化的數(shù)據(jù),來保證 Checkpoint 完成的穩(wěn)定性。 容錯(cuò)恢復(fù)需要回滾的數(shù)據(jù)量更少:Checkpointing 之間的間隔越短,每次容錯(cuò)恢復(fù)后需要重新處理的數(shù)據(jù)就越少。
Checkpoint Barrier 流動(dòng)和對(duì)齊的速度; 將狀態(tài)快照持久化到非易失性高可用存儲(chǔ)(例如 S3)上所需要的時(shí)間。


二、設(shè)計(jì)
算子在更新狀態(tài)的時(shí)候?qū)戨p份,一份更新寫入狀態(tài)表 State Table 中,一份增量寫入 State Changelog 中。 Checkpoint 變成由兩個(gè)部分組成,第一個(gè)部分是當(dāng)前已經(jīng)持久化的存在遠(yuǎn)端存儲(chǔ)上的 State Table,第二個(gè)部分是增量的 State Changelog。 State Table 的持久化和 Checkpointing 過程獨(dú)立開來,會(huì)定期由 background thread 持久化,我們稱為 Materialization(物化)的過程。 在做 Checkpoint 的時(shí)候,只要保證新增的 State Changelog 被持久化就可以了。



數(shù)據(jù)的增量更改(插入/更新/刪除)會(huì)被寫入到 Transaction Log 中。一旦這部分更改的日志被同步到持久存儲(chǔ)中,我們就可以認(rèn)為 Transaction 已經(jīng)完成了。這個(gè)過程類似于上述方法中的 Checkpointing 的過程。 同時(shí),為了方便數(shù)據(jù)查詢,數(shù)據(jù)的更改也會(huì)異步持久化在數(shù)據(jù)表(Table)中。 一旦 Transaction Log 中的相關(guān)部分也在數(shù)據(jù)表中被持久化了,Transaction Log 中相關(guān)部分就可以刪除了。這個(gè)過程類似于我們方法中的 State Table 持久化過程。
額外的網(wǎng)絡(luò) IO 和額外的 Changelog 持久存儲(chǔ)開銷; 緩存 Changelog 帶來的額外的內(nèi)存使用; 容錯(cuò)恢復(fù)需要額外的重放 Changelog 帶來的潛在的恢復(fù)時(shí)間的增加。
三、Changelog 存儲(chǔ)(DSTL)
短期持久化 State Changelog 是組成 Checkpoint 的一個(gè)部分,所以也需要能持久化存儲(chǔ)。同時(shí),State Changelog 只需要保存從最近一次持久化 State Table 到當(dāng)前做 Checkpoint 時(shí)的 Changelog,因此只需要保存很短時(shí)間(幾分鐘)的數(shù)據(jù)。
寫入頻率遠(yuǎn)遠(yuǎn)大于讀取頻率 只有在 Restore 或者 Rescale 的情況下才需要讀取 Changelog,大部分情況下只有 append 操作,并且一旦寫入,數(shù)據(jù)就不能再被修改。
很短的寫延遲 引入 State Changelog 是為了能將 Checkpoint 做得更快(1s 以內(nèi))。因此,單次寫請(qǐng)求需要至少能在期望的 Checkpoint 時(shí)間內(nèi)完成。
保證一致性 如果我們有多個(gè) State Changelog 的副本,就會(huì)產(chǎn)生多副本之間的一致性問題。一旦某個(gè)副本的 State Changelog 被持久化并被 JM 確認(rèn),恢復(fù)時(shí)需要以此副本為基準(zhǔn)保證語義一致性。
3.1 DSTL 方案的選擇
沒有額外的外部依賴:目前 Flink Checkpoint 持久化在 DFS 中,所以以 DFS 來實(shí)現(xiàn) DSTL 沒有引入額外的外部組件。 沒有額外的狀態(tài)管理:目前的設(shè)計(jì)方案中 DSTL 的狀態(tài)管理是和 Flink Checkpointing 機(jī)制整合在一起的,所以也不需要額外的狀態(tài)管理。 DFS 原生提供持久化和一致性保證:如果實(shí)現(xiàn)多副本分布式日志,這些都是額外需要考慮的成本。
更高的延遲:DFS 相比于寫入本地盤的分布式日志系統(tǒng)來講一般來說有更高的延遲。 網(wǎng)絡(luò) I/O 限制:大部分 DFS 供應(yīng)商出于成本的考慮都會(huì)對(duì)單用戶 DFS 寫入限流限速,極端情況有可能會(huì)造成網(wǎng)絡(luò)過載。
3.2 DSTL 架構(gòu)

狀態(tài)清理問題 前面有提到在新的架構(gòu)中,一個(gè) Checkpoint 由兩部分組成:1)State Table 和 2)State Change Log。這兩部分都需要按需清理。1)這個(gè)部分的清理復(fù)用 Flink 已有的 Checkpoint 機(jī)制;2)這個(gè)部分的清理相對(duì)較復(fù)雜,特別是 State Change Log 在當(dāng)前的設(shè)計(jì)中為了避免小文件的問題,是以 TM 為粒度的。在當(dāng)前的設(shè)計(jì)中,我們分兩個(gè)部分來清理 State Change Log:一是 Change Log 本身的數(shù)據(jù)需要在 State Table 物化后刪除其相對(duì)應(yīng)的部分;二是 Change Log 中成為 Checkpoint 的部分的清理融合進(jìn)已有的 Flink Checkpoint 清理機(jī)制[4]?。
DFS 相關(guān)問題 長尾延遲問題 為了解決 DFS 高長尾延遲問題,DFS 寫入請(qǐng)求會(huì)在允許超時(shí)時(shí)間(默認(rèn)為 1 秒)內(nèi)無法完成時(shí)重試。 小文件問題 DFS 的一個(gè)問題是每個(gè) Checkpoint 會(huì)創(chuàng)建很多小文件,并且因?yàn)?Changleog State Backend 可以提供更高頻的 Checkpoint,小文件問題會(huì)成為瓶頸。為了緩解這種情況,我們將同一個(gè) Task Manager 上同一作業(yè)的所有 State Change 寫到同一個(gè)文件中。因此,同一個(gè) Task Manager 會(huì)共享同一個(gè) State Change Log。
四、Benchmark 測(cè)試結(jié)果分析
State Change Log 增量的部分與全量狀態(tài)大小之比,增量越小越好。 不間斷上傳狀態(tài)增量的能力。這個(gè)和狀態(tài)訪問模式相關(guān),極端情況下,如果算子只在 Checkpointing 前更新 Flink State Table 的話,Changelog 起不到太大作用。 能夠?qū)碜远鄠€(gè) Task 的 changelog 分組批量上傳的能力。Changelog 分組批量寫 DFS 可以減少需要?jiǎng)?chuàng)建的文件數(shù)量并降低 DFS 負(fù)載,從而提高穩(wěn)定性。 底層 State Backend 在刷磁盤前對(duì)同一個(gè) key 的 更新的去重能力。因?yàn)?state change log 保存的是狀態(tài)更新,而不是最終值,底層 State Backend 這種能力會(huì)增大 Changelog 增量與 State Table 全量狀態(tài)大小之比。 寫持久存儲(chǔ) DFS 的速度,寫的速度越快 Changelog 所帶來的提升越不明顯。
4.1 Benchmark 配置
算子并行度:50 運(yùn)行時(shí)間:21h State Backend:RocksDB (Incremental Checkpoint Enabled) 持久存儲(chǔ):S3 (Presto plugin) 機(jī)器型號(hào):AWS m5.xlarge(4 slots per TM) Checkpoint 間隔: 10ms State Table Materialization 間隔:3m Input Rate:50K Events /s
4.2 ValueState Workload


表2: 基于 ValueState Workload 的 Changelog(開啟/關(guān)閉)的 Checkpoint 相關(guān)指標(biāo)對(duì)比
Checkpointed Data Size 是指在收到 Checkpoint Barrier,Checkpointing 過程開始后上傳數(shù)據(jù)的大小。對(duì)于 Changelog 來說,大部分?jǐn)?shù)據(jù)在 Checkpointing 過程開始前就已經(jīng)上傳了,所以這就是為什么開啟 Changelog 時(shí)這個(gè)指標(biāo)要比關(guān)閉時(shí)小得多的原因。 Full Checkpoint Data Size 是構(gòu)成 Checkpoint 的所有文件的總大小,也包括與之前 Checkpoint 共享的文件。與通常的 Checkpoint 相比,Changelog 的格式?jīng)]有被壓縮過也不夠緊湊,因此占用更多空間。
4.3 Window Workload

表3: 基于 Window Workload 的 Changelog(開啟/關(guān)閉)的 Checkpoint 相關(guān)指標(biāo)對(duì)比
對(duì)于 Sliding Window 算子,每條數(shù)據(jù)會(huì)加到多個(gè)滑動(dòng)窗口中,因此為造成多次更新。Changelog 的寫放大問題會(huì)更大。 前面有提到,如果底層 State Backend(比如 RocksDB)在刷磁盤前對(duì)同一個(gè) key 的 更新去重能力越強(qiáng),則快照的大小相對(duì)于 Changelog 會(huì)越小。在 Sliding Window 算子的極端情況下,滑動(dòng)窗口會(huì)因?yàn)槭П磺謇?。如果更新和清理發(fā)生在同一個(gè) Checkpoint 之內(nèi),則很可能該窗口中的數(shù)據(jù)不包含在快照中。這也意味著清除窗口的速度越快,快照的大小就可能越小。
五、結(jié)論
state.backend.changelog.enabled: truestate.backend.changelog.storage: filesystemdstl.dfs.base-path: <location similar to state.checkpoints.dir>
評(píng)論
圖片
表情
