<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          深入解讀 Flink 窗口的應用與實現(xiàn)

          共 6014字,需瀏覽 13分鐘

           ·

          2021-09-21 20:26

          本文主要分享 Flink 的 CheckPoint 機制、反壓機制及 Flink 的內(nèi)存模型。熟悉這 3 部分內(nèi)容是調(diào)優(yōu)的前提,文章主要從以下幾個方面分享:

          1. 原理剖析

          2. 性能定位

          3. 經(jīng)典場景調(diào)優(yōu)

          4. 內(nèi)存調(diào)優(yōu)

          Checkpoint 機制

          1.什么是 checkpoint

          簡單地說就是 Flink 為了達到容錯和 exactly-once 語義的功能,定期把 state 持久化下來,而這一持久化的過程就叫做 checkpoint ,它是 Flink Job 在某一時刻全局狀態(tài)的快照。


          當我們要對分布式系統(tǒng)實現(xiàn)一個全局狀態(tài)保留的功能時,傳統(tǒng)方案會引入一個統(tǒng)一時鐘,通過分布式系統(tǒng)中的 master 節(jié)點廣播出去給每一個 slaves 節(jié)點,當節(jié)點接收到這個統(tǒng)一時鐘時,它們就記錄下自己當前的狀態(tài)即可。



          但是統(tǒng)一時鐘的方式也存在一定的問題,某一個 node 進行的 GC 時間比較長,或者 master 與 slaves 的網(wǎng)絡(luò)在當時存在波動而造成時鐘的發(fā)送延遲或者發(fā)送失敗,都會造成此 slave 和其它的機器出現(xiàn)數(shù)據(jù)不一致而最終導致腦裂的情況。如果我們想要解決這個問題,就需要對 master 和 slaves 做一個 HA(High Availability)。但是,一個系統(tǒng)越是復雜,就越不穩(wěn)定且維護成本越高。


          Flink 是將 checkpoint 都放進了一個名為 Barrier 的流。



          上圖中就是一個 Barrier 的例子,從上游的第一個 Task 到下游的最后一個 Task,每次當 Task 經(jīng)過圖中藍色的柵欄時,就會觸發(fā) save snapshot(快照)的功能。我們用一個例子來簡單說明。


          2.實例分析


          這是一個簡單的 ETL 過程,首先我們把數(shù)據(jù)從 Kafka 中拿過來進行一個 trans 的轉(zhuǎn)換操作,然后再發(fā)送到一個下游的 Kafka


          此時這個例子中沒有進行 chaining 的調(diào)優(yōu)。所以此時采用的是 forward strategy ,也就是 “一個 task 的輸出只發(fā)送給一個 task 作為輸入”,這樣的方式,這樣做也有一個好處就是如果兩個 task 都在一個 JVM 中的話,那么就可以避免不必要的網(wǎng)絡(luò)開銷


          設(shè)置 Parallism 為 2,此時的 DAG 圖如下:



          ■ CK 的分析過程


          每一個 Flink 作業(yè)都會有一個 JobManager ,JobManager 里面又會有一個 checkpoint coordinator 來管理整個 checkpoint 的過程,我們可以設(shè)置一個時間間隔讓 checkpoint coordinator 將一個 checkpoint 的事件發(fā)送給每一個 Container 中的 source task,也就是第一個任務(wù)(對應并行圖中的 task1,task2)。


          當某個 Source 算子收到一個 Barrier 時,它會暫停自身的數(shù)據(jù)處理,然后將自己的當前 state 制作成 snapshot(快照),并保存到指定的持久化存儲中,最后向 CheckpointCoordinator 異步發(fā)送一個 ack(Acknowledge character — 確認字符),同時向自身所有下游算子廣播該 Barrier 后恢復自身的數(shù)據(jù)處理。


          每個算子按照上面不斷制作 snapshot 并向下游廣播,直到最后 Barrier 傳遞到 sink 算子,此時快照便制作完成。這時候需要注意的是,上游算子可能是多個數(shù)據(jù)源,對應多個 Barrier 需要全部到齊才一次性觸發(fā) checkpoint ,所以在遇到 checkpoint 時間較長的情況時,有可能是因為數(shù)據(jù)對齊需要耗費的時間比較長所造成的。


          ■ Snapshot & Recover


          如圖,這是我們的 Container 容器初始化的階段,e1 和 e2 是剛從 Kafka 消費過來的數(shù)據(jù),與此同時,CheckpointCoordinator 也往它發(fā)送了 Barrier。



          此時 Task1 完成了它的 checkpoint 過程,效果就是記錄下 offset 為 2(e1,e2),然后把 Barrier 往下游的算子廣播,Task3 的輸入為 Task1 的輸出,現(xiàn)在假設(shè)我的這個程序的功能是統(tǒng)計數(shù)據(jù)的條數(shù),此時 Task3 的 checkpoint 效果就是就記錄數(shù)據(jù)數(shù)為 2(因為從 Task1 過來的數(shù)據(jù)就是 e1 和 e2 兩條),之后再將 Barrier 往下廣播,當此 Barrier 傳遞到 sink 算子,snapshot 就算是制作完成了。



          此時 source 中還會源源不斷的產(chǎn)生數(shù)據(jù),并產(chǎn)生新的 checkpoint ,但是此時如果 Container 宕機重啟就需要進行數(shù)據(jù)的恢復了。剛剛完成的 checkpoint 中 offset 為 2,count 為 2,那我們就按照這個 state 進行恢復。此時 Task1 會從 e3 開始消費,這就是 Recover 操作。



          ■ checkpoint 的注意事項

          下面列舉的 3 個注意要點都會影響到系統(tǒng)的吞吐,在實際開發(fā)過程中需要注意:



          3.背壓的產(chǎn)生及 Flink 的反壓處理

          在分布式系統(tǒng)中經(jīng)常會出現(xiàn)多個 Task 多個 JVM 之間可能需要做數(shù)據(jù)的交換,我們使用生產(chǎn)者和消費者來說明這個事情。



          假設(shè)我現(xiàn)在的 Producer 是使用了無界 buffer 來進行存儲,當我們的生產(chǎn)者生產(chǎn)速度遠大于消費者消費的速度時,生產(chǎn)端的數(shù)據(jù)會因為消費端的消費能力低下而導致數(shù)據(jù)積壓,最終導致 OOM 的產(chǎn)生。



          而就算使用了有界 buffer,同樣消費者端的消費能力低下,當 buffer 被積滿時生產(chǎn)者就會停止生產(chǎn),這樣還不能完全地解決我們的問題,所以就需要根據(jù)不同的情況進行調(diào)整。


          Flink 也是通過有界 buffer 來進行不同 TaskManager 的數(shù)據(jù)交換。而且做法分為了靜態(tài)控流和動態(tài)控流兩種方式。



          簡單來說就是當生產(chǎn)者比消費者的 TPS 多時,我們采用溢寫的方式,使用 batch 來封裝好我們的數(shù)據(jù),然后分批發(fā)送出去,每次發(fā)送完成后再 sleep 一段時間,這個時間的計算方式是 left(剩余的數(shù)據(jù))/ tps,但是這個做法是很難去預估系統(tǒng)的情況的。



          Flink 1.5 之前的流控是基于 TCP 的滑動窗口實現(xiàn)的,在之前的課程中已經(jīng)有提到過了。而 Flink 在 1.5 之后已經(jīng)棄用了該機制,所以這里不展開說明。在此網(wǎng)絡(luò)模型中,數(shù)據(jù)生成節(jié)點只能通過檢查當前的 channel 是否可寫來決定自己是否要向消費端發(fā)送數(shù)據(jù),它對下游數(shù)據(jù)消費端的真實容量情況一概不知。這就導致,當生成節(jié)點發(fā)現(xiàn) channel 已經(jīng)不可寫的時候,有可能下游消費節(jié)點已經(jīng)積壓了很多數(shù)據(jù)。


          Credit-Based 我們用下面的數(shù)據(jù)交換的例子說明:


          Flink 的數(shù)據(jù)交換大致分為三種,一種是同一個 Task 的數(shù)據(jù)交換,另一種是 不同 Task 同 JVM 下的數(shù)據(jù)交換。第三種就是不同 Task 且不同 JVM 之間的交換。



          同一個 Task 的數(shù)據(jù)交換就是我們剛剛提到的 forward strategy 方式,主要就是避免了序列化和網(wǎng)絡(luò)的開銷。



          第二種數(shù)據(jù)交換的方式就是數(shù)據(jù)會先通過一個 record Writer ,數(shù)據(jù)在里面進行序列化之后再傳遞給 Result Partition ,之后數(shù)據(jù)會通過 local channel 傳遞給另外一個 Task 的 Input Gate 里面,再進行反序列化,推送給 Record Reader 之后進行操作。



          因為第三種數(shù)據(jù)交換涉及到了不同的 JVM,所以會有一定的網(wǎng)絡(luò)開銷,和第二種的區(qū)別就在于它先推給了 Netty ,通過 netty 把數(shù)據(jù)推送到遠程端的 Task 上。


          ■ Credit-Based


          此時我們可以看到 event1 已經(jīng)連帶一個 backlog = 1 推送給了 TaskB,backlog 的作用其實只是為了讓消費端感知到我們生產(chǎn)端的情況



          此時 event1 被 TaskB 接收后,TaskB 會返回一個 ack 給 TaskA,同時返回一個 credit = 3,這個是告知 TaskA 它還能接收多少條數(shù)據(jù),F(xiàn)link 就是通過這種互相告知的方式,來讓生產(chǎn)者和消費者都能感知到對方的狀態(tài)。



          此時經(jīng)過一段時間之后,TaskB 中的有界 buffer 已經(jīng)滿了,此時 TaskB 回復 credit = 0 給 TaskA,此時 channel 通道將會停止工作,TaskA 不再將數(shù)據(jù)發(fā)往 TaskB。




          此時再經(jīng)過一段時間,TaskA 中的有界 Buffer 也已經(jīng)出現(xiàn)了數(shù)據(jù)積壓,所以我們平時遇到的吞吐下降,處理延遲的問題,就是因為此時整個系統(tǒng)相當于一個停滯的狀態(tài),如圖二示,所有的過程都被打上 “X”,表示這些過程都已經(jīng)停止工作。



          JVM 是一個非常復雜的系統(tǒng),當其內(nèi)存不足時會造成 OOM ,導致系統(tǒng)的崩潰。Flink 在拿到我們分配的內(nèi)存之后會先分配一個 cutoff 預留內(nèi)存,保證系統(tǒng)的安全性。Netword buffers 其實就是對應我們剛剛一直提到的有界 buffer,momery manager 是一個內(nèi)存池,這部分的內(nèi)存可以設(shè)置為堆內(nèi)或者堆外的內(nèi)存,當然在流式作業(yè)中我們一般設(shè)置其為堆外內(nèi)存,而 Free 部分就是提供給用戶使用的內(nèi)存塊。


          現(xiàn)在我們假設(shè)分配給此 TaskManager 的內(nèi)存是 8g。


          • 首先是要砍掉 cutoff 的部分,默認是 0.25,所以我們的可用內(nèi)存就是 8gx0.75

          • network buffers 占用可用內(nèi)存的 0.1 ,所以是 6144x0.1

          • 堆內(nèi)/堆外內(nèi)存為可用內(nèi)存減去 network buffers 的部分,再乘以 0.8

          • 給到用戶使用的內(nèi)存就是堆內(nèi)存剩下的 0.2 那部分

          其實真實情況是 Flink 是先知道了 heap 內(nèi)存的大小然后逆推出其它內(nèi)存的大小。


          Flink 作業(yè)的問題定位

          1.問題定位口訣

          一壓二查三指標,延遲吞吐是核心。時刻關(guān)注資源量 ,  排查首先看 GC。”


          一壓是指背壓,遇到問題先看背壓的情況,二查就是指 checkpoint ,對齊數(shù)據(jù)的時間是否很長,state 是否很大,這些都是和系統(tǒng)吞吐密切相關(guān)的,三指標就是指 Flink UI 那塊的一些展示,我們的主要關(guān)注點其實就是延遲和吞吐,系統(tǒng)資源,還有就是 GC logs。

          • 看反壓:通常最后一個被壓高的 subTask 的下游就是 job 的瓶頸之一。

          • 看 Checkpoint 時長:Checkpoint 時長能在一定程度影響 job 的整體吞吐。

          • 看核心指標:指標是對一個任務(wù)性能精準判斷的依據(jù),延遲指標和吞吐則是其中最為關(guān)鍵的指標。

          • 資源的使用率:提高資源的利用率是最終的目的。

          ■ 常見的性能問題


          簡單解釋一下:

          • 在關(guān)注背壓的時候大家往往忽略了數(shù)據(jù)的序列化和反序列化過程所造成的性能問題。

          • 一些數(shù)據(jù)結(jié)構(gòu),比如 HashMap 和 HashSet 這種 key 需要經(jīng)過 hash 計算的數(shù)據(jù)結(jié)構(gòu),在數(shù)據(jù)量大的時候使用 keyby 進行操作, 造成的性能影響是非常大的。

          • 數(shù)據(jù)傾斜是我們的經(jīng)典問題,后面再進行展開。

          • 如果我們的下游是 MySQL,HBase 這種,我們都會進行一個批處理的操作,就是讓數(shù)據(jù)存儲到一個 buffer 里面,在達到某些條件的時候再進行發(fā)送,這樣做的目的就是減少和外部系統(tǒng)的交互,降低網(wǎng)絡(luò)開銷的成本。

          • 頻繁 GC ,無論是 CMS 也好,G1 也好,在進行 GC 的時候,都會停止整個作業(yè)的運行,GC 時間較長還會導致 JobManager 和 TaskManager 沒有辦法準時發(fā)送心跳,此時 JobManager 就會認為此 TaskManager 失聯(lián),它就會另外開啟一個新的 TaskManager

          • 窗口是一種可以把無限數(shù)據(jù)切割為有限數(shù)據(jù)塊的手段。比如我們知道,使用滑動窗口的時候數(shù)據(jù)的重疊問題,size = 5min 雖然不屬于大窗口的范疇,可是 step = 1s 代表 1 秒就要進行一次數(shù)據(jù)的處理,這樣就會造成數(shù)據(jù)的重疊很高,數(shù)據(jù)量很大的問題。

          2.Flink 作業(yè)調(diào)優(yōu)



          我們可以通過一些數(shù)據(jù)結(jié)構(gòu),比如 Set 或者 Map 來結(jié)合 Flink state 進行去重。但是這些去重方案會隨著數(shù)據(jù)量不斷增大,從而導致性能的急劇下降,比如剛剛我們分析過的 hash 沖突帶來的寫入性能問題,內(nèi)存過大導致的 GC 問題,TaskManger 的失聯(lián)問題。






          方案二和方案三也都是通過一些數(shù)據(jù)結(jié)構(gòu)的手段去進行去重,有興趣的同學可以自行下去了解,在這里不再展開。


          ■ 數(shù)據(jù)傾斜


          數(shù)據(jù)傾斜是大家都會遇到的高頻問題,解決的方案也不少。



          第一種場景是當我們的并發(fā)度設(shè)置的比分區(qū)數(shù)要低時,就會造成上面所說的消費不均勻的情況。



          第二種提到的就是 key 分布不均勻的情況,可以通過添加隨機前綴打散它們的分布,使得數(shù)據(jù)不會集中在幾個 Task 中。



          在每個節(jié)點本地對相同的 key 進行一次聚合操作,類似于 MapReduce 中的本地 combiner。map-side 預聚合之后,每個節(jié)點本地就只會有一條相同的 key,因為多條相同的 key 都被聚合起來了。其他節(jié)點在拉取所有節(jié)點上的相同 key 時,就會大大減少需要拉取的數(shù)據(jù)數(shù)量,從而也就減少了磁盤 IO 以及網(wǎng)絡(luò)傳輸開銷。


          ■ 內(nèi)存調(diào)優(yōu)

          Flink 的內(nèi)存結(jié)構(gòu)剛剛我們已經(jīng)提及到了,所以我們清楚,調(diào)優(yōu)的方面主要是針對 非堆內(nèi)存 Network buffer ,manager pool 和堆內(nèi)存的調(diào)優(yōu),這些基本都是通過參數(shù)來進行控制的。



          這些參數(shù)我們都需要結(jié)合自身的情況去進行調(diào)整,這里只給出一些建議。而且對于 ManagerBuffer 來說,F(xiàn)link 的流式作業(yè)現(xiàn)在并沒有過多使用到這部分的內(nèi)存,所以我們都會設(shè)置得比較小,不超過 0.3。



          堆內(nèi)存的調(diào)優(yōu)是關(guān)于 JVM 方面的,主要就是將默認使用的垃圾回收器改為 G1 ,因為默認使用的 Parallel Scavenge 對于老年代的 GC 存在一個串行化的問題,它的 Full GC 耗時較長,下面是關(guān)于 G1 的一些介紹,網(wǎng)上資料也非常多,這里就不展開說明了。





          總 結(jié)

          本文帶大家了解了 Flink 的 CheckPoint 機制、反壓機制及 Flink 的內(nèi)存模型和基于內(nèi)存模型分析了一些調(diào)優(yōu)的策略,希望能對大家有所幫助。

          瀏覽 58
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  自拍偷拍视频网站 | 粉嫩小泬BBBB免费观看 | 91精品视频xxx... | 伊人婷婷色五月色婷婷区 | 国产人伦子伦一级A片下载 |