<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          圖解 Flink Checkpoint 原理及在 1.11 版本的優(yōu)化

          共 5348字,需瀏覽 11分鐘

           ·

          2020-12-17 14:07

          一、什么是 checkpoint

          上次發(fā)文,提到了 Flink 可以非常高效的進(jìn)行有狀態(tài)流的計(jì)算,通過使用 Flink 內(nèi)置的 Keyed State 和 Operator State,保存每個(gè)算子的狀態(tài)。

          默認(rèn)情況下,狀態(tài)是存儲(chǔ)在 JVM 的堆內(nèi)存中,如果系統(tǒng)中某個(gè)環(huán)節(jié)發(fā)生了錯(cuò)誤,宕機(jī),這個(gè)時(shí)候所有的狀態(tài)都會(huì)丟失,并且無法恢復(fù),會(huì)導(dǎo)致整個(gè)系統(tǒng)的數(shù)據(jù)計(jì)算發(fā)生錯(cuò)誤。

          此時(shí)就需要 Checkpoint 來保障系統(tǒng)的容錯(cuò)。Checkpoint 過程,就是把算子的狀態(tài)周期性持久化的過程。

          在系統(tǒng)出錯(cuò)后恢復(fù)時(shí),就可以從 checkpoint 中恢復(fù)每個(gè)算子的狀態(tài),從上次消費(fèi)的地方重新開始消費(fèi)和計(jì)算。從而可以做到在高效進(jìn)行計(jì)算的同時(shí)還可以保證數(shù)據(jù)不丟失,只計(jì)算一次。

          二、Checkpoint 必要的兩個(gè)條件

          答案是否,需要滿足以下兩個(gè)條件才能做 Checkpoint:

          1. 需要支持重放一定時(shí)間范圍內(nèi)數(shù)據(jù)的數(shù)據(jù)源,比如:kafka 。因?yàn)槿蒎e(cuò)機(jī)制就是在任務(wù)失敗后自動(dòng)從最近一次成功的 checkpoint 處恢復(fù)任務(wù),此時(shí)需要把任務(wù)失敗前消費(fèi)的數(shù)據(jù)再消費(fèi)一遍。假設(shè)數(shù)據(jù)源不支持重放,那么數(shù)據(jù)還未寫到存儲(chǔ)中就丟了,任務(wù)恢復(fù)后,就再也無法重新消費(fèi)這部分丟了的數(shù)據(jù)了。

          2. 需要一個(gè)存儲(chǔ)來保存持久化的狀態(tài),如:Hdfs,本地文件??梢栽谌蝿?wù)失敗后,從存儲(chǔ)中恢復(fù) checkpoint 數(shù)據(jù)。

          三、Checkpoint 參數(shù)詳解

          StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();

          //?每?60s?做一次?checkpoint
          env.enableCheckpointing(60000);

          //?高級(jí)配置:

          //?checkpoint?語義設(shè)置為?EXACTLY_ONCE,這是默認(rèn)語義
          env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

          //?兩次?checkpoint?的間隔時(shí)間至少為?1?s,默認(rèn)是?0,立即進(jìn)行下一次?checkpoint
          env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

          //?checkpoint?必須在?60s?內(nèi)結(jié)束,否則被丟棄,默認(rèn)是?10?分鐘
          env.getCheckpointConfig().setCheckpointTimeout(60000);

          //?同一時(shí)間只能允許有一個(gè)?checkpoint
          env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

          //?最多允許?checkpoint?失敗?3?次
          env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);

          //?當(dāng)?Flink?任務(wù)取消時(shí),保留外部保存的?checkpoint?信息
          env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

          //?當(dāng)有較新的?Savepoint?時(shí),作業(yè)也會(huì)從?Checkpoint?處恢復(fù)
          env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

          //?允許實(shí)驗(yàn)性的功能:非對(duì)齊的 checkpoint,以提升性能
          env.getCheckpointConfig().enableUnalignedCheckpoints();

          相關(guān)參數(shù)的文字描述:

          1. env.enableCheckpointing(60000),1 分鐘觸發(fā)一次 checkpoint;
          2. setCheckpointTimeout,checkpoint 超時(shí)時(shí)間,默認(rèn)是 10 分鐘超時(shí),超過了超時(shí)時(shí)間就會(huì)被丟棄;
          3. setCheckpointingMode,設(shè)置 checkpoint 語義,可以設(shè)置為 EXACTLY_ONCE,表示既不重復(fù)消費(fèi)也不丟數(shù)據(jù);AT_LEAST_ONCE,表示至少消費(fèi)一次,可能會(huì)重復(fù)消費(fèi);
          4. setMinPauseBetweenCheckpoints,兩次 checkpoint 之間的間隔時(shí)間。假如設(shè)置每分鐘進(jìn)行一次 checkpoint,兩次 checkpoint 間隔時(shí)間為 30s。假設(shè)某一次 checkpoint 耗時(shí) 40s,那么理論上20s 后就要進(jìn)行一次 checkpoint,但是設(shè)置了兩次 checkpoint 之間的間隔時(shí)間為 30s,所以是 30s 之后才會(huì)進(jìn)行 checkpoint。另外,如果配置了該參數(shù),那么同時(shí)進(jìn)行的 checkpoint 數(shù)量只能為 1;
          5. enableExternalizedCheckpoints,F(xiàn)link 任務(wù)取消后,外部 checkpoint 信息是否被清理。
          • DELETE_ON_CANCELLATION,任務(wù)取消后,所有的 checkpoint 都將會(huì)被清理。只有在任務(wù)失敗后,才會(huì)被保留;
          • RETAIN_ON_CANCELLATION,任務(wù)取消后,所有的 checkpoint 都將會(huì)被保留,需要手工清理。
          1. setPreferCheckpointForRecovery,恢復(fù)任務(wù)時(shí),是否從最近一個(gè)比較新的 savepoint 處恢復(fù),默認(rèn)是 false;
          2. enableUnalignedCheckpoints,是否開啟試驗(yàn)性的非對(duì)齊的 checkpoint,可以在反壓情況下極大減少 checkpoint 的次數(shù);

          四、Checkpoint 如何實(shí)現(xiàn)的

          Flink 的 checkpoint 是基于 Chandy-Lamport 算法,實(shí)現(xiàn)了一個(gè)分布式一致性的存儲(chǔ)快照算法。

          這里我們假設(shè)一個(gè)簡(jiǎn)單的場(chǎng)景來描述 checkpoint 具體過程是怎樣的。

          場(chǎng)景是:假如現(xiàn)在 kafka 只有一個(gè)分區(qū),數(shù)據(jù)是每個(gè) app 發(fā)過來的日志,我們統(tǒng)計(jì)每個(gè) app 的 PV。

          Flink 的 checkpoint coordinator (JobManager 的一部分)會(huì)周期性的在流事件中插入一個(gè) barrier 事件(柵欄),用來隔離不同批次的事件,如下圖紅色的部分。

          下圖中有兩個(gè) barrier ,checkpoint barrier n-1 處的 barrier 是指 Job 從開始處理到 barrier n -1 所有的狀態(tài)數(shù)據(jù),checkpoint barrier n 處的 barrier 是指 Job 從開始處理到 barrier n 所有的狀態(tài)數(shù)據(jù)。

          回到剛剛計(jì)算 PV 的場(chǎng)景,當(dāng) Source Task 接受到 JobManager 的編號(hào)為 chk-100 的 Checkpoint 觸發(fā)請(qǐng)求后,發(fā)現(xiàn)自己恰好接收到了 offset 為(0,1000)【表示分區(qū)0,offset 為1000】處的數(shù)據(jù),所以會(huì)往 offset 為(0,1000)數(shù)據(jù)之后,(0,1001)數(shù)據(jù)之前安插一個(gè) barrier,然后自己開始做快照,把 offset (0,1000)保存到狀態(tài)后端中,向 CheckpointCoordinator報(bào)告自己快照制作情況,同時(shí)向自身所有下游算子廣播該barrier。如下圖:

          當(dāng)下游計(jì)算的算子收到 barrier 后,會(huì)看是否收到了所有輸入流的 barrier,我們現(xiàn)在只有一個(gè)分區(qū),Source 算子只有一個(gè)實(shí)例,barrier 到了就是收到了所有的輸入流的 barrier。

          開始把本次的計(jì)算結(jié)果(app1,1000),(app2,5000)寫到狀態(tài)存儲(chǔ)之中,向 CheckpointCoordinator 報(bào)告自己快照制作情況,同時(shí)向自身所有下游算子廣播該barrier。

          當(dāng) Operator 2 收到柵欄后,會(huì)觸發(fā)自身進(jìn)行快照,把自己當(dāng)時(shí)的狀態(tài)存儲(chǔ)下來,向 CheckpointCoordinator 報(bào)告 自己快照制作情況。因?yàn)檫@是一個(gè) sink ,狀態(tài)存儲(chǔ)成功后,意味著本次 checkpoint 也成功了。

          Barrier 對(duì)齊

          上面我們舉的例子是 Source Task 實(shí)例只有一個(gè)的情況,在輸入流的算子有多個(gè)實(shí)例的情況下,會(huì)有一個(gè)概念叫 Barrier 對(duì)齊。

          可以看上面的第一張圖,有兩個(gè)輸入流,一個(gè)是上面的數(shù)字流,一個(gè)是下面的字母流。

          數(shù)字流的 barrier 在 1 后面,字母流的 barrier 在 e 后面。當(dāng)上面的 barrier 到達(dá) operator 之后,必須要等待下面的數(shù)字流的 barrier 也到達(dá),此時(shí)數(shù)字流后面過來的數(shù)據(jù)會(huì)被緩存到緩沖區(qū)。這就是 barrier 對(duì)齊的過程。

          看上面的第二張圖,當(dāng)數(shù)字流的 barrier 到達(dá)后,意味著輸入流的所有實(shí)例的 barrier 都到達(dá)了,此時(shí)開始處理 到第三張圖的時(shí)候,處理完畢,自身做快照,然后把緩沖區(qū)的 pending 數(shù)據(jù)都發(fā)出去,把 checkpoint barrier n 繼續(xù)往下發(fā)送。

          五、Flink 1.11 對(duì) Checkpoint 的優(yōu)化

          從上圖的對(duì)齊過程,我們可以發(fā)現(xiàn),在進(jìn)行對(duì)齊的過程中,算子是不會(huì)再接著處理數(shù)據(jù)了,一定要等到對(duì)齊動(dòng)作完成之后,才能繼續(xù)對(duì)齊。也就是上圖中的數(shù)字流的 barrier 到達(dá)之后,需要去等待字母流的 barrier 事件。

          這其中會(huì)有一個(gè)阻塞的過程。在大多數(shù)情況下運(yùn)行良好,然而當(dāng)作業(yè)出現(xiàn)反壓時(shí),阻塞式的 Barrier 對(duì)齊反而會(huì)加劇作業(yè)的反壓,甚至導(dǎo)致作業(yè)不穩(wěn)定。

          首先, Chandy-Lamport 分布式快照的結(jié)束依賴于 Marker 的流動(dòng),而反壓則會(huì)限制 Marker 的流動(dòng),導(dǎo)致快照的完成時(shí)間變長(zhǎng)甚至超時(shí)。無論是哪種情況,都會(huì)導(dǎo)致 Checkpoint 的時(shí)間點(diǎn)落后于實(shí)際數(shù)據(jù)流較多。

          這時(shí)作業(yè)的計(jì)算進(jìn)度是沒有被持久化的,處于一個(gè)比較脆弱的狀態(tài),如果作業(yè)出于異常被動(dòng)重啟或者被用戶主動(dòng)重啟,作業(yè)會(huì)回滾丟失一定的進(jìn)度。如果 Checkpoint 連續(xù)超時(shí)且沒有很好的監(jiān)控,回滾丟失的進(jìn)度可能高達(dá)一天以上,對(duì)于實(shí)時(shí)業(yè)務(wù)這通常是不可接受的。更糟糕的是,回滾后的作業(yè)落后的 Lag 更大,通常帶來更大的反壓,形成一個(gè)惡性循環(huán)。

          所以在 Flink 1.11 版本中,引入了一個(gè) Unaligned Checkpointing 的模塊,主要功能是,在 barrier 到達(dá)之后,不必等待所有的輸入流的 barrier,而是繼續(xù)處理數(shù)據(jù)。

          然后把第一次到達(dá)的 barrier 之后的所有數(shù)據(jù)也放到 checkpoint 里面,在下一次計(jì)算的時(shí)候,會(huì)合并上次保存的數(shù)據(jù)以及流入的數(shù)據(jù)后再計(jì)算。這樣會(huì)大大加快 Barrier 流動(dòng)的速度,降低 checkpoint 整體的時(shí)長(zhǎng)。

          六、總結(jié) Checkpoint 的原理

          1. JobManager 端的 CheckPointCoordinator 會(huì)定期向所有 SourceTask 發(fā)送 CheckPointTrigger,Source Task 會(huì)在數(shù)據(jù)流中安插 Checkpoint barrier;

          2. 當(dāng) task 收到上游所有實(shí)例的 barrier 后,向自己的下游繼續(xù)傳遞 barrier,然后自身同步進(jìn)行快照,并將自己的狀態(tài)異步寫入到持久化存儲(chǔ)中

          • 如果是增量 Checkpoint,則只是把最新的一部分更新寫入到外部持久化存儲(chǔ)中
          • 為了下游盡快進(jìn)行 Checkpoint,所以 task 會(huì)先發(fā)送 barrier 到下游,自身再同步進(jìn)行快照;
          1. 當(dāng) task 將狀態(tài)信息完成備份后,會(huì)將備份數(shù)據(jù)的地址(state handle)通知給 JobManager 的CheckPointCoordinator,如果 Checkpoint 的持續(xù)時(shí)長(zhǎng)超過了 Checkpoint 設(shè)定的超時(shí)時(shí)間CheckPointCoordinator 還沒有收集完所有的 State Handle,CheckPointCoordinator 就會(huì)認(rèn)為本次 Checkpoint 失敗,會(huì)把這次 Checkpoint 產(chǎn)生的所有狀態(tài)數(shù)據(jù)全部刪除;

          2. 如果 CheckPointCoordinator 收集完所有算子的 State Handle,CheckPointCoordinator 會(huì)把整個(gè) StateHandle 封裝成 completed Checkpoint Meta,寫入到外部存儲(chǔ)中,Checkpoint 結(jié)束;


          --end--


          掃描下方二維碼
          添加好友,備注【交流
          可私聊交流,也可進(jìn)資源豐富學(xué)習(xí)群

          瀏覽 43
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产CD系列TS人妖视频 | 国产美女操逼视频 | 无套内射学生妹去看片 | 在线观看黄色视频网站 | 欧美激情一区二区 |