<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          聊聊Flink:Flink的容錯機(jī)制

          共 5519字,需瀏覽 12分鐘

           ·

          2024-05-21 07:30

          一、前言

          我們上一篇講了 Flink的狀態(tài)管理 ,這一篇我們來說一說Flink的容錯機(jī)制。

          在分布式計(jì)算系統(tǒng)中,為了保證數(shù)據(jù)的一致性需要對數(shù)據(jù)進(jìn)行一致性快照。

          Flink和Spark在做流式計(jì)算時,為了保證數(shù)據(jù)一致性都借鑒了Chandy-Lamport算法原理,Chandy-Lamport算法目標(biāo)是讓多個分布式節(jié)點(diǎn)本地數(shù)據(jù)以及通信中的數(shù)據(jù)完成local snapshot本地狀態(tài)保存最終能一起完成global snapshot保存全局狀態(tài)。只有了解分布式系統(tǒng)為了保證數(shù)據(jù)一致性的算法背景,才能更好理解Flink如何用Checkpoint來保證數(shù)據(jù)Exactly Once準(zhǔn)確一次語義和何為barrier對齊。

          1.1 Chandy-Lamport算法

          Chandy-Lamport的“快照”算法描述了決定分布式系統(tǒng)全局狀態(tài)的“快照”算法。該算法的目的是記錄進(jìn)程集Pi(i=1,2,…,N)的進(jìn)程狀態(tài)和通道狀態(tài)集(快照)。這里的進(jìn)程集類似Flink JobManager和TaskManager構(gòu)成分布式架構(gòu)的進(jìn)程集。這樣,即使所記錄的狀態(tài)組合可能從沒有在同一時間發(fā)生,但所記錄的全局狀態(tài)還是一致的。Flink TaskManager多任務(wù)可異步完成各自的快照,等所有的快照保存完成通知JobManager來最終保證全局狀態(tài)一致。此算法本身在進(jìn)程本地記錄狀態(tài),它沒有給出在一個場地收集全局狀態(tài)的方法。收集狀態(tài)的一個簡單方法是讓所有進(jìn)程把它們記錄的狀態(tài)發(fā)送到一個指定的收集進(jìn)程,如Flink JobManager中CheckPoint Coordinator檢查點(diǎn)協(xié)調(diào)器類似指定的所有進(jìn)程的狀態(tài)收集進(jìn)程。

          1.2 容錯機(jī)制Checkpoint檢查點(diǎn)理解

          首先狀態(tài)State與檢查點(diǎn)Checkpoint之間關(guān)系:Checkpoint將某個時刻應(yīng)用狀態(tài)State進(jìn)行快照Snapshot保存。

          • State:維護(hù)/存儲的是某一個Operator的運(yùn)行的狀態(tài)/歷史值,是維護(hù)在內(nèi)存中。

          • Checkpoint:某一時刻,F(xiàn)link中所有的Operator的當(dāng)前State的全局快照,一般存在磁盤上。

          二、Barrier

          Checkpoint是Flink實(shí)現(xiàn)容錯機(jī)制最核心的功能,它能夠周期性地基于流中各個算子的狀態(tài)來生成快照,從而將這些狀態(tài)數(shù)據(jù)定期持久化存儲下來。當(dāng)Flink程序一旦意外崩潰時,重新運(yùn)行程序時可以從這些快照進(jìn)行恢復(fù),從而修正因?yàn)楣收蠋淼某绦驍?shù)據(jù)異常。

          Flink分布式快照的一個核心元素是流Barrier(屏障或柵欄)。這些Barrier被注入數(shù)據(jù)流中,并將記錄作為數(shù)據(jù)流的一部分進(jìn)行流處理。Barrier永遠(yuǎn)不會超過記錄,它們嚴(yán)格地在一條線上流動。Barrier將數(shù)據(jù)流中的記錄隔離成一系列的記錄集合,即一個Barrier將數(shù)據(jù)流中的記錄分離為進(jìn)入當(dāng)前快照的記錄集和進(jìn)入下一個快照的記錄集。

          每個Barrier都攜帶快照的ID,并且Barrier之前的記錄都進(jìn)入了該快照。Barrier不會中斷數(shù)據(jù)流,非常輕量。來自不同快照的多個Barrier可以同時在流中,這意味著多個快照可能同時并發(fā)發(fā)生。單流Barrier在流中的位置如下圖所示。


          Barrier在數(shù)據(jù)流源處被注入并行數(shù)據(jù)流中。快照n的Barrier被注入的位置(用Sn表示)是快照所包含的數(shù)據(jù)在數(shù)據(jù)源中的最大位置。例如,在Apache Kafka中,這個位置將是分區(qū)中最后一條記錄的偏移量。這個位置Sn會被報告給Checkpoint協(xié)調(diào)器(Flink的JobManager)。

          然后這些Barrier就會順流而下。當(dāng)中間算子從所有輸入流接收到快照n的Barrier時,它會向所有輸出流發(fā)出快照n的Barrier。一旦Sink算子(流DAG的末端)從所有輸入流接收到Barrier n,它就向Checkpoint協(xié)調(diào)器確認(rèn)快照n完成。在所有的Sink都確認(rèn)了一個快照之后,快照就被認(rèn)為完成了。

          一旦快照n完成,作業(yè)將不再向數(shù)據(jù)源請求Sn之前的記錄,因?yàn)榇藭r這些記錄(及其后續(xù)記錄)已經(jīng)通過整個數(shù)據(jù)流拓?fù)洌匆呀?jīng)被處理完畢。

          如下圖所示,接收多個輸入流的算子需要基于快照Barrier對齊輸入流。

          一旦算子從一個輸入流接收到快照Barrier n,它就不能再處理來自該流的任何記錄(新到來的來自該流的其他記錄不會被處理,而是輸入緩沖區(qū)中),直到它從其他輸入流接收到Barrier n。否則,它將弄混屬于快照n的記錄和屬于快照n+1的記錄。一旦最后一個流接收到Barrier n,算子就會發(fā)出所有掛起的(緩沖區(qū)中的)向后傳送的記錄,然后發(fā)出快照Barrier n本身。之后,開始恢復(fù)處理來自所有輸入流的記錄,在處理來自流的記錄之前,優(yōu)先處理來自輸入緩沖區(qū)的記錄。最后,算子將狀態(tài)異步寫入狀態(tài)后端。

          三、Checkpoint 執(zhí)行流程

          是不是還是有點(diǎn)抽象?我們結(jié)合上面的算法以及Barrier的相關(guān)概念畫一下Checkpoint總體的執(zhí)行流程。

          再來看下詳細(xì)版的Checkpoint 執(zhí)行流程:


          如上圖所示,Checkpoint在執(zhí)行過程中,可以簡化為可以簡化為以下四大步:

          四、重啟與故障恢復(fù)策略

          當(dāng)Task出現(xiàn)故障時,可以對故障的Task以及其他受影響的Task進(jìn)行重啟,以使作業(yè)恢復(fù)到正常執(zhí)行狀態(tài)。Flink通過重啟策略和故障恢復(fù)策略來控制Task重啟,重啟策略控制是否可以重啟以及重啟的時間間隔,故障恢復(fù)策略控制哪些Task需要重啟。

          4.1 重啟策略
          每個重啟策略都有自己的一組配置選項(xiàng)來控制其行為。這些選項(xiàng)可以在Flink的配置文件flink-conf.yaml中設(shè)置。restart-strategy用于定義在作業(yè)失敗時使用的重啟策略,可接受的值有:

          • none、off、disable:不重啟策略。

          • fixeddelay、fixed-delay:固定延遲重啟策略。

          • failurerate、failure-rate:故障率重啟策略。

          例如,配置固定延遲重啟策略(默認(rèn)使用該策略),代碼如下:

          restart-strategy: fixed-delay

          也可以通過在應(yīng)用程序中調(diào)用StreamExecutionEnvironment對象的setRestartStrategy方法進(jìn)行設(shè)置。當(dāng)然對于ExecutionEnvironment也同樣適用。

          如果配置了Checkpoint而沒有配置重啟策略,那么將默認(rèn)使用固定延遲重啟策略,程序會無限重啟,此時最大嘗試重啟次數(shù)由Integer.MAX_VALUE參數(shù)決定;如果沒有配置Checkpoint,則使用“不重啟”策略;如果提交作業(yè)時設(shè)置了重啟策略,該策略將覆蓋掉集群的默認(rèn)策略。

          4.2 故障恢復(fù)策略

          Flink中支持兩種不同的故障恢復(fù)策略,可以在配置文件flink-conf.yaml中對屬性jobmanager.execution.failover-strategy進(jìn)行設(shè)置。該屬性有兩個值:full和region(默認(rèn))。

          • full:Task(任務(wù))發(fā)生故障時重啟作業(yè)中的所有Task進(jìn)行故障恢復(fù)。

          • region:將作業(yè)中的Task分為數(shù)個不相交的Region(區(qū)域)。當(dāng)有Task發(fā)生故障時,將計(jì)算進(jìn)行故障恢復(fù)需要重啟的最小Region集合。與重啟所有Task相比,對于某些作業(yè)可能要重啟的Task數(shù)量更少。

          需要重啟的最小Region集合的計(jì)算邏輯如下:

          • 發(fā)生錯誤的Task所在的Region需要重啟,就會重啟該Region的所有Task。

          • 如果要重啟的Region需要消費(fèi)的數(shù)據(jù)有部分無法訪問(丟失或損壞),那么產(chǎn)出該部分?jǐn)?shù)據(jù)的Region也需要重啟。

          • 為了保障數(shù)據(jù)的一致性,需要重啟的Region的下游Region也需要重啟。因?yàn)閷τ谝恍┓谴_定性的計(jì)算或者分發(fā)會導(dǎo)致同一個結(jié)果分區(qū)每次產(chǎn)生時包含的數(shù)據(jù)都不相同。

          Region中的Task的數(shù)據(jù)交換是以Pipelined形式的,而非Batch形式,即Batch形式的Task數(shù)據(jù)交換不存在Region恢復(fù)策略。DataStream和流式Table/SQL作業(yè)的所有數(shù)據(jù)交換都是Pipelined形式的,而批處理式Table/SQL作業(yè)的所有數(shù)據(jù)交換默認(rèn)都是Batch形式的。

          五、Savepoint

          Savepoint(保存點(diǎn))是用戶手動觸發(fā)的Checkpoint,由用戶手動創(chuàng)建、擁有和刪除,它獲取狀態(tài)的快照并將其寫入狀態(tài)后端。Savepoint主要用于手動的狀態(tài)數(shù)據(jù)備份和恢復(fù),常用于在升級和維護(hù)集群的過程中保存狀態(tài)數(shù)據(jù),避免系統(tǒng)無法恢復(fù)到原有的計(jì)算狀態(tài)。例如,需要升級程序之前先執(zhí)行一次Savepoint,升級后可以繼續(xù)從升級前的那個點(diǎn)進(jìn)行計(jì)算,保證數(shù)據(jù)不中斷。

          Savepoint底層其實(shí)使用的也是Checkpoint機(jī)制。Savepoint與Checkpoint的對比如下表所示。


          當(dāng)需要對Flink作業(yè)進(jìn)行停止、重啟或者更新時,可以進(jìn)行一次Savepoint,保存流作業(yè)的執(zhí)行狀態(tài)。

          Flink的Savepoint與Checkpoint的不同之處類似于傳統(tǒng)數(shù)據(jù)庫中的備份與恢復(fù)日志。Checkpoint的主要目的是為意外失敗的作業(yè)提供恢復(fù)機(jī)制。Checkpoint的生命周期由Flink管理,即Flink創(chuàng)建、管理和刪除,無須用戶交互。Savepoint由用戶創(chuàng)建、擁有和刪除,是一種有計(jì)劃的手動備份和恢復(fù)。Savepoint的生成、恢復(fù)成本可能更高一些,Savepoint更多地關(guān)注可移植性和對作業(yè)更改的支持。

          5.1 觸發(fā)Savepoint

          可以使用命令行客戶端來觸發(fā)Savepoint、觸發(fā)Savepoint并取消作業(yè)、從Savepoint恢復(fù)以及刪除Savepoint。從Flink 1.2.0開始,還可以使用WebUI從Savepoint恢復(fù)。

          例如,觸發(fā)ID為jobId的作業(yè)的Savepoint,并返回創(chuàng)建的Savepoint路徑(需要此路徑來還原和刪除Savepoint),代碼如下:

          $ bin/flink savepoint jobId [targetDirectory]

          targetDirectory表示需要創(chuàng)建的用于存儲Savepoint數(shù)據(jù)的目標(biāo)路徑。如果不指定,則使用配置文件中state.savepoints.dir屬性指定的默認(rèn)路徑:

          state.savepoints.dir: hdfs://centos01:9000/savepoints

          目標(biāo)路徑必須是JobManager和TaskManager可訪問的位置,例如分布式文件系統(tǒng)上的位置。如果既未設(shè)置默認(rèn)路徑又未在執(zhí)行命令時手動指定目標(biāo)路徑,則觸發(fā)Savepoint將失敗。

          如果是基于YARN搭建的集群,則可以使用以下命令來觸發(fā)Savepoint:

          $ bin/flink savepoint jobId [targetDirectory] -yid yarnAppId

          參數(shù)解析如下:

          • jobId:要觸發(fā)的作業(yè)ID。·

          • targetDirectory:可選的用于存儲Savepoint數(shù)據(jù)的目標(biāo)路徑,如果不指定,則使用配置的默認(rèn)路徑。

          • yarnAppId:YARN的應(yīng)用程序ID。

          5.2 觸發(fā)Savepoint并取消作業(yè)

          如果希望觸發(fā)ID為jobId的作業(yè)的Savepoint并取消作業(yè),代碼如下:

          $ bin/flink cancel -s [targetDirectory] jobId

          5.3 通過Savepoint恢復(fù)作業(yè)

          如果希望提交作業(yè),并從Savepoint中恢復(fù)作業(yè)的狀態(tài)數(shù)據(jù),則可以使用以下代碼:

          $ bin/flink run -s savepointPath

          savepointPath表示Savepoint數(shù)據(jù)存儲的路徑。Flink將從該路徑中讀取已經(jīng)備份的狀態(tài)數(shù)據(jù)。

          5.4 刪除Savepoint

          如果希望刪除存儲在savepointPath中的Savepoint數(shù)據(jù),則可以使用以下代碼:

          $ bin/flink savepoint -d savepointPath

          當(dāng)然,還可以通過常規(guī)文件系統(tǒng)手動刪除Savepoint數(shù)據(jù),而不會影響其他Savepoint或Checkpoint。



          關(guān)構(gòu)AI數(shù)據(jù)、云原生、物聯(lián)網(wǎng)等相關(guān)領(lǐng)域的技術(shù)知識分享。


          瀏覽 59
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产探花在线视频 | 国产九九精品视频 | 成人视频在线观看18 | 日韩资源在线观看 | 成人自拍在线视频 |