Flink 1.11 Unaligned Checkpoint 解析
作為 Flink 最基礎(chǔ)也是最關(guān)鍵的容錯機制,Checkpoint 快照機制很好地保證了 Flink 應(yīng)用從異常狀態(tài)恢復(fù)后的數(shù)據(jù)準(zhǔn)確性。同時 Checkpoint 相關(guān)的 metrics 也是診斷 Flink 應(yīng)用健康狀態(tài)最為重要的指標(biāo),成功且耗時較短的 Checkpoint 表明作業(yè)運行狀況良好,沒有異常或反壓。然而,由于 Checkpoint 與反壓的耦合,反壓反過來也會作用于 Checkpoint,導(dǎo)致 Checkpoint 的種種問題。針對于此,F(xiàn)link 在 1.11 引入 Unaligned Checkpint 來解耦 Checkpoint 機制與反壓機制,優(yōu)化高反壓情況下的 Checkpoint 表現(xiàn)。
當(dāng)前 Checkpoint 機制簡述
相信不少讀者對 Flink Checkpoint 基于 Chandy-Lamport 算法的分布式快照已經(jīng)比較熟悉,該節(jié)簡單回顧下算法的基礎(chǔ)邏輯,熟悉算法的讀者可放心跳過。
Chandy-Lamport 算法將分布式系統(tǒng)抽象成 DAG(暫時不考慮有閉環(huán)的圖),節(jié)點表示進(jìn)程,邊表示兩個進(jìn)程間通信的管道。分布式快照的目的是記錄下整個系統(tǒng)的狀態(tài),即可以分為節(jié)點的狀態(tài)(進(jìn)程的狀態(tài))和邊的狀態(tài)(信道的狀態(tài),即傳輸中的數(shù)據(jù))。因為系統(tǒng)狀態(tài)是由輸入的消息序列驅(qū)動變化的,我們可以將輸入的消息序列分為多個較短的子序列,圖的每個節(jié)點或邊先后處理完某個子序列后,都會進(jìn)入同一個穩(wěn)定的全局統(tǒng)狀態(tài)。利用這個特性,系統(tǒng)的進(jìn)程和信道在子序列的邊界點分別進(jìn)行本地快照,即使各部分的快照時間點不同,最終也可以組合成一個有意義的全局快照。

圖1. Checkpoint Barrier
從實現(xiàn)上看,F(xiàn)link 通過在 DAG 數(shù)據(jù)源定時向數(shù)據(jù)流注入名為 Barrier 的特殊元素,將連續(xù)的數(shù)據(jù)流切分為多個有限序列,對應(yīng)多個 Checkpoint 周期。每當(dāng)接收到 Barrier,算子進(jìn)行本地的 Checkpoint 快照,并在完成后異步上傳本地快照,同時將 Barrier 以廣播方式發(fā)送至下游。當(dāng)某個 Checkpoint 的所有 Barrier 到達(dá) DAG 末端且所有算子完成快照,則標(biāo)志著全局快照的成功。

圖2. Barrier Alignment
在有多個輸入 Channel 的情況下,為了數(shù)據(jù)準(zhǔn)確性,算子會等待所有流的 Barrier 都到達(dá)之后才會開始本地的快照,這種機制被稱為 Barrier 對齊。在對齊的過程中,算子只會繼續(xù)處理的來自未出現(xiàn) Barrier Channel 的數(shù)據(jù),而其余 Channel 的數(shù)據(jù)會被寫入輸入隊列,直至在隊列滿后被阻塞。當(dāng)所有 Barrier 到達(dá)后,算子進(jìn)行本地快照,輸出 Barrier 到下游并恢復(fù)正常處理。
比起其他分布式快照,該算法的優(yōu)勢在于輔以 Copy-On-Write 技術(shù)的情況下不需要 “Stop The World” 影響應(yīng)用吞吐量,同時基本不用持久化處理中的數(shù)據(jù),只用保存進(jìn)程的狀態(tài)信息,大大減小了快照的大小。
Checkpoint 與反壓的耦合
目前的 Checkpoint 算法在大多數(shù)情況下運行良好,然而當(dāng)作業(yè)出現(xiàn)反壓時,阻塞式的 Barrier 對齊反而會加劇作業(yè)的反壓,甚至導(dǎo)致作業(yè)的不穩(wěn)定。
首先, Chandy-Lamport 分布式快照的結(jié)束依賴于 Marker 的流動,而反壓則會限制 Marker 的流動,導(dǎo)致快照的完成時間變長甚至超時。無論是哪種情況,都會導(dǎo)致 Checkpoint 的時間點落后于實際數(shù)據(jù)流較多。這時作業(yè)的計算進(jìn)度是沒有被持久化的,處于一個比較脆弱的狀態(tài),如果作業(yè)出于異常被動重啟或者被用戶主動重啟,作業(yè)會回滾丟失一定的進(jìn)度。如果 Checkpoint 連續(xù)超時且沒有很好的監(jiān)控,回滾丟失的進(jìn)度可能高達(dá)一天以上,對于實時業(yè)務(wù)這通常是不可接受的。更糟糕的是,回滾后的作業(yè)落后的 Lag 更大,通常帶來更大的反壓,形成一個惡性循環(huán)。
其次,Barrier 對齊本身可能成為一個反壓的源頭,影響上游算子的效率,而這在某些情況下是不必要的。比如典型的情況是一個的作業(yè)讀取多個 Source,分別進(jìn)行不同的聚合計算,然后將計算完的結(jié)果分別寫入不同的 Sink。通常來說,這些不同的 Sink 會復(fù)用公共的算子以減少重復(fù)計算,但并不希望不同 Source 間相互影響。

圖3. Barrier Alignment 阻塞上游 Task
假設(shè)一個作業(yè)要分別統(tǒng)計 A 和 B 兩個業(yè)務(wù)線的以天為粒度指標(biāo),同時還需要統(tǒng)計所有業(yè)務(wù)線以周為單位的指標(biāo),拓?fù)淙缟蠄D所示。如果 B 業(yè)務(wù)線某天的業(yè)務(wù)量突漲,使得 Checkpoint Barrier 有延遲,那么會導(dǎo)致公用的 Window Aggregate 進(jìn)行 Barrier 對齊,進(jìn)而阻塞業(yè)務(wù) A 的 FlatMap,最終令業(yè)務(wù) A 的計算也出現(xiàn)延遲。
當(dāng)然這種情況可以通過拆分作業(yè)等方式優(yōu)化,但難免引入更多開發(fā)維護(hù)成本,而且更重要的是這本來就符合 Flink 用戶常規(guī)的開發(fā)思路,應(yīng)該在框架內(nèi)盡量減小出現(xiàn)用戶意料之外的行為的可能性。
Unaligned Checkpoint
為了解決這個問題,F(xiàn)link 在 1.11 版本引入了 Unaligned Checkpoint 的特性。要理解 Unaligned Checkpoint 的原理,首先需要了解 Chandy-Lamport 論文中對于 Marker 處理規(guī)則的描述:

圖4. Chandy-Lamport Marker 處理
其中關(guān)鍵是 if q has not recorded its state,也就是接收到 Marker 時算子是否已經(jīng)進(jìn)行過本地快照。一直以來 Flink 的 Aligned Checkpoint 通過 Barrier 對齊,將本地快照延遲至所有 Barrier 到達(dá),因而這個條件是永真的,從而巧妙地避免了對算子輸入隊列的狀態(tài)進(jìn)行快照,但代價是比較不可控的 Checkpoint 時長和吞吐量的降低。實際上這和 Chandy-Lamport 算法是有一定出入的。
舉個例子,假設(shè)我們對兩個數(shù)據(jù)流進(jìn)行 equal-join,輸出匹配上的元素。按照 Flink Aligned Checkpoint 的方式,系統(tǒng)的狀態(tài)變化如下(圖中不同顏色的元素代表屬于不同的 Checkpoint 周期):

圖5. Aligned Checkpoint 狀態(tài)變化
圖 a: 輸入 Channel 1 存在 3 個元素,其中
2在 Barrier 前面;Channel 2 存在 4 個元素,其中2、9、7在 Barrier 前面。圖 b: 算子分別讀取 Channel 一個元素,輸出
2。隨后接收到 Channel 1 的 Barrier,停止處理 Channel 1 后續(xù)的數(shù)據(jù),只處理 Channel 2 的數(shù)據(jù)。圖 c: 算子再消費 2 個自 Channel 2 的元素,接收到 Barrier,開始本地快照并輸出 Barrier。
對于相同的情況,Chandy-Lamport 算法的狀態(tài)變化如下:

圖6. Chandy-Lamport 狀態(tài)變化
圖 a: 同上。
圖 b: 算子分別處理兩個 Channel 一個元素,輸出結(jié)果
2。此后接收到 Channel 1 的 Barrier,算子開始本地快照記錄自己的狀態(tài),并輸出 Barrier。圖 c: 算子繼續(xù)正常處理兩個 Channel 的輸入,輸出
9。特別的地方是 Channel 2 后續(xù)元素會被保存下來,直到 Channel 2 的 Barrier 出現(xiàn)(即 Channel 2 的9和7)。保存的數(shù)據(jù)會作為 Channel 的狀態(tài)成為快照的一部分。
兩者的差異主要可以總結(jié)為兩點:
快照的觸發(fā)是在接收到第一個 Barrier 時還是在接收到最后一個 Barrier 時。
是否需要阻塞已經(jīng)接收到 Barrier 的 Channel 的計算。
從這兩點來看,新的 Unaligned Checkpoint 將快照的觸發(fā)改為第一個 Barrier 且取消阻塞 Channel 的計算,算法上與 Chandy-Lamport 基本一致,同時在實現(xiàn)細(xì)節(jié)方面結(jié)合 Flink 的定位做了幾個改進(jìn)。
首先,不同于 Chandy-Lamport 模型的只需要考慮算子輸入 Channel 的狀態(tài),F(xiàn)link 的算子有輸入和輸出兩種 Channel,在快照時兩者的狀態(tài)都需要被考慮。
其次,無論在 Chandy-Lamport 還是 Flink Aligned Checkpoint 算法中,Barrier 都必須遵循其在數(shù)據(jù)流中的位置,算子需要等待 Barrier 被實際處理才開始快照。而 Unaligned Checkpoint 改變了這個設(shè)定,允許算子優(yōu)先攝入并優(yōu)先輸出 Barrier。如此一來,第一個到達(dá) Barrier 會在算子的緩存數(shù)據(jù)隊列(包括輸入 Channel 和輸出 Channel)中往前跳躍一段距離,而被”插隊”的數(shù)據(jù)和其他輸入 Channel 在其 Barrier 之前的數(shù)據(jù)會被寫入快照中(圖中黃色部分)。

圖8. Barrier 越過數(shù)據(jù)
這樣的主要好處是,如果本身算子的處理就是瓶頸,Chandy-Lamport 的 Barrier 仍會被阻塞,但 Unaligned Checkpoint 則可以在 Barrier 進(jìn)入輸入 Channel 就馬上開始快照。這可以從很大程度上加快 Barrier 流經(jīng)整個 DAG 的速度,從而降低 Checkpoint 整體時長。
回到之前的例子,用 Unaligned Checkpoint 來實現(xiàn),狀態(tài)變化如下:

圖8. Unaligned-Checkpoint 狀態(tài)變化
圖 a: 輸入 Channel 1 存在 3 個元素,其中
2在 Barrier 前面;Channel 2 存在 4 個元素,其中2、9、7在 Barrier 前面。輸出 Channel 已存在結(jié)果數(shù)據(jù)1。圖 b: 算子優(yōu)先處理輸入 Channel 1 的 Barrier,開始本地快照記錄自己的狀態(tài),并將 Barrier 插到輸出 Channel 末端。
圖 c: 算子繼續(xù)正常處理兩個 Channel 的輸入,輸出
2、9。同時算子會將 Barrier 越過的數(shù)據(jù)(即輸入 Channel 1 的2和輸出 Channel 的1)寫入 Checkpoint,并將輸入 Channel 2 后續(xù)早于 Barrier 的數(shù)據(jù)(即2、9、7)持續(xù)寫入 Checkpoint。
比起 Aligned Checkpoint 中不同 Checkpoint 周期的數(shù)據(jù)以算子快照為界限分隔得很清晰,Unaligned Checkpoint 進(jìn)行快照和輸出 Barrier 時,部分本屬于當(dāng)前 Checkpoint 的輸入數(shù)據(jù)還未計算(因此未反映到當(dāng)前算子狀態(tài)中),而部分屬于當(dāng)前 Checkpoint 的輸出數(shù)據(jù)卻落到 Barrier 之后(因此未反映到下游算子的狀態(tài)中)。這也正是 Unaligned 的含義: 不同 Checkpoint 周期的數(shù)據(jù)沒有對齊,包括不同輸入 Channel 之間的不對齊,以及輸入和輸出間的不對齊。而這部分不對齊的數(shù)據(jù)會被快照記錄下來,以在恢復(fù)狀態(tài)時重放。換句話說,從 Checkpoint 恢復(fù)時,不對齊的數(shù)據(jù)并不能由 Source 端重放的數(shù)據(jù)計算得出,同時也沒有反映到算子狀態(tài)中,但因為它們會被 Checkpoint 恢復(fù)到對應(yīng) Channel 中,所以依然能提供只計算一次的準(zhǔn)確結(jié)果。
當(dāng)然,Unaligned Checkpoint 并不是百分百優(yōu)于 Aligned Checkpoint,它會帶來的已知問題就有:
由于要持久化緩存數(shù)據(jù),State Size 會有比較大的增長,磁盤負(fù)載會加重。
隨著 State Size 增長,作業(yè)恢復(fù)時間可能增長,運維管理難度增加。
目前看來,Unaligned Checkpoint 更適合容易產(chǎn)生高反壓同時又比較重要的復(fù)雜作業(yè)。對于像數(shù)據(jù) ETL 同步等簡單作業(yè),更輕量級的 Aligned Checkpoint 顯然是更好的選擇。
總結(jié)
Flink 1.11 的 Unaligned Checkpoint 主要解決在高反壓情況下作業(yè)難以完成 Checkpoint 的問題,同時它以磁盤資源為代價,避免了 Checkpoint 可能帶來的阻塞,有利于提升 Flink 的資源利用率。隨著流計算的普及,未來的 Flink 應(yīng)用大概會越來越復(fù)雜,在未來經(jīng)過實戰(zhàn)打磨完善后 Unaligned Checkpoint 很有可能會取代 Aligned Checkpoint 成為 Flink 的默認(rèn) Checkpoint 策略。
