聊聊Flink:Flink的容錯機(jī)制
共 5519字,需瀏覽 12分鐘
·
2024-05-21 07:30
一、前言
我們上一篇講了 Flink的狀態(tài)管理 ,這一篇我們來說一說Flink的容錯機(jī)制。
在分布式計(jì)算系統(tǒng)中,為了保證數(shù)據(jù)的一致性需要對數(shù)據(jù)進(jìn)行一致性快照。
Flink和Spark在做流式計(jì)算時,為了保證數(shù)據(jù)一致性都借鑒了Chandy-Lamport算法原理,Chandy-Lamport算法目標(biāo)是讓多個分布式節(jié)點(diǎn)本地數(shù)據(jù)以及通信中的數(shù)據(jù)完成local snapshot本地狀態(tài)保存最終能一起完成global snapshot保存全局狀態(tài)。只有了解分布式系統(tǒng)為了保證數(shù)據(jù)一致性的算法背景,才能更好理解Flink如何用Checkpoint來保證數(shù)據(jù)Exactly Once準(zhǔn)確一次語義和何為barrier對齊。
1.1 Chandy-Lamport算法
Chandy-Lamport的“快照”算法描述了決定分布式系統(tǒng)全局狀態(tài)的“快照”算法。該算法的目的是記錄進(jìn)程集Pi(i=1,2,…,N)的進(jìn)程狀態(tài)和通道狀態(tài)集(快照)。這里的進(jìn)程集類似Flink JobManager和TaskManager構(gòu)成分布式架構(gòu)的進(jìn)程集。這樣,即使所記錄的狀態(tài)組合可能從沒有在同一時間發(fā)生,但所記錄的全局狀態(tài)還是一致的。Flink TaskManager多任務(wù)可異步完成各自的快照,等所有的快照保存完成通知JobManager來最終保證全局狀態(tài)一致。此算法本身在進(jìn)程本地記錄狀態(tài),它沒有給出在一個場地收集全局狀態(tài)的方法。收集狀態(tài)的一個簡單方法是讓所有進(jìn)程把它們記錄的狀態(tài)發(fā)送到一個指定的收集進(jìn)程,如Flink JobManager中CheckPoint Coordinator檢查點(diǎn)協(xié)調(diào)器類似指定的所有進(jìn)程的狀態(tài)收集進(jìn)程。
1.2 容錯機(jī)制Checkpoint檢查點(diǎn)理解
首先狀態(tài)State與檢查點(diǎn)Checkpoint之間關(guān)系:Checkpoint將某個時刻應(yīng)用狀態(tài)State進(jìn)行快照Snapshot保存。
State:維護(hù)/存儲的是某一個Operator的運(yùn)行的狀態(tài)/歷史值,是維護(hù)在內(nèi)存中。
Checkpoint:某一時刻,F(xiàn)link中所有的Operator的當(dāng)前State的全局快照,一般存在磁盤上。
二、Barrier
Checkpoint是Flink實(shí)現(xiàn)容錯機(jī)制最核心的功能,它能夠周期性地基于流中各個算子的狀態(tài)來生成快照,從而將這些狀態(tài)數(shù)據(jù)定期持久化存儲下來。當(dāng)Flink程序一旦意外崩潰時,重新運(yùn)行程序時可以從這些快照進(jìn)行恢復(fù),從而修正因?yàn)楣收蠋淼某绦驍?shù)據(jù)異常。
Flink分布式快照的一個核心元素是流Barrier(屏障或柵欄)。這些Barrier被注入數(shù)據(jù)流中,并將記錄作為數(shù)據(jù)流的一部分進(jìn)行流處理。Barrier永遠(yuǎn)不會超過記錄,它們嚴(yán)格地在一條線上流動。Barrier將數(shù)據(jù)流中的記錄隔離成一系列的記錄集合,即一個Barrier將數(shù)據(jù)流中的記錄分離為進(jìn)入當(dāng)前快照的記錄集和進(jìn)入下一個快照的記錄集。
每個Barrier都攜帶快照的ID,并且Barrier之前的記錄都進(jìn)入了該快照。Barrier不會中斷數(shù)據(jù)流,非常輕量。來自不同快照的多個Barrier可以同時在流中,這意味著多個快照可能同時并發(fā)發(fā)生。單流Barrier在流中的位置如下圖所示。
Barrier在數(shù)據(jù)流源處被注入并行數(shù)據(jù)流中。快照n的Barrier被注入的位置(用Sn表示)是快照所包含的數(shù)據(jù)在數(shù)據(jù)源中的最大位置。例如,在Apache Kafka中,這個位置將是分區(qū)中最后一條記錄的偏移量。這個位置Sn會被報告給Checkpoint協(xié)調(diào)器(Flink的JobManager)。
然后這些Barrier就會順流而下。當(dāng)中間算子從所有輸入流接收到快照n的Barrier時,它會向所有輸出流發(fā)出快照n的Barrier。一旦Sink算子(流DAG的末端)從所有輸入流接收到Barrier n,它就向Checkpoint協(xié)調(diào)器確認(rèn)快照n完成。在所有的Sink都確認(rèn)了一個快照之后,快照就被認(rèn)為完成了。
一旦快照n完成,作業(yè)將不再向數(shù)據(jù)源請求Sn之前的記錄,因?yàn)榇藭r這些記錄(及其后續(xù)記錄)已經(jīng)通過整個數(shù)據(jù)流拓?fù)洌匆呀?jīng)被處理完畢。
如下圖所示,接收多個輸入流的算子需要基于快照Barrier對齊輸入流。
一旦算子從一個輸入流接收到快照Barrier n,它就不能再處理來自該流的任何記錄(新到來的來自該流的其他記錄不會被處理,而是輸入緩沖區(qū)中),直到它從其他輸入流接收到Barrier n。否則,它將弄混屬于快照n的記錄和屬于快照n+1的記錄。一旦最后一個流接收到Barrier n,算子就會發(fā)出所有掛起的(緩沖區(qū)中的)向后傳送的記錄,然后發(fā)出快照Barrier n本身。之后,開始恢復(fù)處理來自所有輸入流的記錄,在處理來自流的記錄之前,優(yōu)先處理來自輸入緩沖區(qū)的記錄。最后,算子將狀態(tài)異步寫入狀態(tài)后端。
三、Checkpoint 執(zhí)行流程
是不是還是有點(diǎn)抽象?我們結(jié)合上面的算法以及Barrier的相關(guān)概念畫一下Checkpoint總體的執(zhí)行流程。
再來看下詳細(xì)版的Checkpoint 執(zhí)行流程:
如上圖所示,Checkpoint在執(zhí)行過程中,可以簡化為可以簡化為以下四大步:
四、重啟與故障恢復(fù)策略
當(dāng)Task出現(xiàn)故障時,可以對故障的Task以及其他受影響的Task進(jìn)行重啟,以使作業(yè)恢復(fù)到正常執(zhí)行狀態(tài)。Flink通過重啟策略和故障恢復(fù)策略來控制Task重啟,重啟策略控制是否可以重啟以及重啟的時間間隔,故障恢復(fù)策略控制哪些Task需要重啟。
4.1 重啟策略
每個重啟策略都有自己的一組配置選項(xiàng)來控制其行為。這些選項(xiàng)可以在Flink的配置文件flink-conf.yaml中設(shè)置。restart-strategy用于定義在作業(yè)失敗時使用的重啟策略,可接受的值有:
none、off、disable:不重啟策略。
fixeddelay、fixed-delay:固定延遲重啟策略。
failurerate、failure-rate:故障率重啟策略。
例如,配置固定延遲重啟策略(默認(rèn)使用該策略),代碼如下:
restart-strategy: fixed-delay
也可以通過在應(yīng)用程序中調(diào)用StreamExecutionEnvironment對象的setRestartStrategy方法進(jìn)行設(shè)置。當(dāng)然對于ExecutionEnvironment也同樣適用。
如果配置了Checkpoint而沒有配置重啟策略,那么將默認(rèn)使用固定延遲重啟策略,程序會無限重啟,此時最大嘗試重啟次數(shù)由Integer.MAX_VALUE參數(shù)決定;如果沒有配置Checkpoint,則使用“不重啟”策略;如果提交作業(yè)時設(shè)置了重啟策略,該策略將覆蓋掉集群的默認(rèn)策略。
4.2 故障恢復(fù)策略
Flink中支持兩種不同的故障恢復(fù)策略,可以在配置文件flink-conf.yaml中對屬性jobmanager.execution.failover-strategy進(jìn)行設(shè)置。該屬性有兩個值:full和region(默認(rèn))。
full:Task(任務(wù))發(fā)生故障時重啟作業(yè)中的所有Task進(jìn)行故障恢復(fù)。
region:將作業(yè)中的Task分為數(shù)個不相交的Region(區(qū)域)。當(dāng)有Task發(fā)生故障時,將計(jì)算進(jìn)行故障恢復(fù)需要重啟的最小Region集合。與重啟所有Task相比,對于某些作業(yè)可能要重啟的Task數(shù)量更少。
需要重啟的最小Region集合的計(jì)算邏輯如下:
發(fā)生錯誤的Task所在的Region需要重啟,就會重啟該Region的所有Task。
如果要重啟的Region需要消費(fèi)的數(shù)據(jù)有部分無法訪問(丟失或損壞),那么產(chǎn)出該部分?jǐn)?shù)據(jù)的Region也需要重啟。
為了保障數(shù)據(jù)的一致性,需要重啟的Region的下游Region也需要重啟。因?yàn)閷τ谝恍┓谴_定性的計(jì)算或者分發(fā)會導(dǎo)致同一個結(jié)果分區(qū)每次產(chǎn)生時包含的數(shù)據(jù)都不相同。
Region中的Task的數(shù)據(jù)交換是以Pipelined形式的,而非Batch形式,即Batch形式的Task數(shù)據(jù)交換不存在Region恢復(fù)策略。DataStream和流式Table/SQL作業(yè)的所有數(shù)據(jù)交換都是Pipelined形式的,而批處理式Table/SQL作業(yè)的所有數(shù)據(jù)交換默認(rèn)都是Batch形式的。
五、Savepoint
Savepoint(保存點(diǎn))是用戶手動觸發(fā)的Checkpoint,由用戶手動創(chuàng)建、擁有和刪除,它獲取狀態(tài)的快照并將其寫入狀態(tài)后端。Savepoint主要用于手動的狀態(tài)數(shù)據(jù)備份和恢復(fù),常用于在升級和維護(hù)集群的過程中保存狀態(tài)數(shù)據(jù),避免系統(tǒng)無法恢復(fù)到原有的計(jì)算狀態(tài)。例如,需要升級程序之前先執(zhí)行一次Savepoint,升級后可以繼續(xù)從升級前的那個點(diǎn)進(jìn)行計(jì)算,保證數(shù)據(jù)不中斷。
Savepoint底層其實(shí)使用的也是Checkpoint機(jī)制。Savepoint與Checkpoint的對比如下表所示。
當(dāng)需要對Flink作業(yè)進(jìn)行停止、重啟或者更新時,可以進(jìn)行一次Savepoint,保存流作業(yè)的執(zhí)行狀態(tài)。
Flink的Savepoint與Checkpoint的不同之處類似于傳統(tǒng)數(shù)據(jù)庫中的備份與恢復(fù)日志。Checkpoint的主要目的是為意外失敗的作業(yè)提供恢復(fù)機(jī)制。Checkpoint的生命周期由Flink管理,即Flink創(chuàng)建、管理和刪除,無須用戶交互。Savepoint由用戶創(chuàng)建、擁有和刪除,是一種有計(jì)劃的手動備份和恢復(fù)。Savepoint的生成、恢復(fù)成本可能更高一些,Savepoint更多地關(guān)注可移植性和對作業(yè)更改的支持。
5.1 觸發(fā)Savepoint
可以使用命令行客戶端來觸發(fā)Savepoint、觸發(fā)Savepoint并取消作業(yè)、從Savepoint恢復(fù)以及刪除Savepoint。從Flink 1.2.0開始,還可以使用WebUI從Savepoint恢復(fù)。
例如,觸發(fā)ID為jobId的作業(yè)的Savepoint,并返回創(chuàng)建的Savepoint路徑(需要此路徑來還原和刪除Savepoint),代碼如下:
$ bin/flink savepoint jobId [targetDirectory]
targetDirectory表示需要創(chuàng)建的用于存儲Savepoint數(shù)據(jù)的目標(biāo)路徑。如果不指定,則使用配置文件中state.savepoints.dir屬性指定的默認(rèn)路徑:
state.savepoints.dir: hdfs://centos01:9000/savepoints
目標(biāo)路徑必須是JobManager和TaskManager可訪問的位置,例如分布式文件系統(tǒng)上的位置。如果既未設(shè)置默認(rèn)路徑又未在執(zhí)行命令時手動指定目標(biāo)路徑,則觸發(fā)Savepoint將失敗。
如果是基于YARN搭建的集群,則可以使用以下命令來觸發(fā)Savepoint:
$ bin/flink savepoint jobId [targetDirectory] -yid yarnAppId
參數(shù)解析如下:
jobId:要觸發(fā)的作業(yè)ID。·
targetDirectory:可選的用于存儲Savepoint數(shù)據(jù)的目標(biāo)路徑,如果不指定,則使用配置的默認(rèn)路徑。
yarnAppId:YARN的應(yīng)用程序ID。
5.2 觸發(fā)Savepoint并取消作業(yè)
如果希望觸發(fā)ID為jobId的作業(yè)的Savepoint并取消作業(yè),代碼如下:
$ bin/flink cancel -s [targetDirectory] jobId
5.3 通過Savepoint恢復(fù)作業(yè)
如果希望提交作業(yè),并從Savepoint中恢復(fù)作業(yè)的狀態(tài)數(shù)據(jù),則可以使用以下代碼:
$ bin/flink run -s savepointPath
savepointPath表示Savepoint數(shù)據(jù)存儲的路徑。Flink將從該路徑中讀取已經(jīng)備份的狀態(tài)數(shù)據(jù)。
5.4 刪除Savepoint
如果希望刪除存儲在savepointPath中的Savepoint數(shù)據(jù),則可以使用以下代碼:
$ bin/flink savepoint -d savepointPath
當(dāng)然,還可以通過常規(guī)文件系統(tǒng)手動刪除Savepoint數(shù)據(jù),而不會影響其他Savepoint或Checkpoint。
歡迎大家關(guān)注我的公眾號【老周聊架構(gòu)】,AI、大數(shù)據(jù)、云原生、物聯(lián)網(wǎng)等相關(guān)領(lǐng)域的技術(shù)知識分享。
