Flink的處理背壓?原理及問題-面試必備
轉(zhuǎn)自:https://zhuanlan.zhihu.com/p/38157397
反壓機制(BackPressure)被廣泛應用到實時流處理系統(tǒng)中,流處理系統(tǒng)需要能優(yōu)雅地處理反壓(backpressure)問題。反壓通常產(chǎn)生于這樣的場景:短時負載高峰導致系統(tǒng)接收數(shù)據(jù)的速率遠高于它處理數(shù)據(jù)的速率。許多日常問題都會導致反壓,例如,垃圾回收停頓可能會導致流入的數(shù)據(jù)快速堆積,或者遇到大促或秒殺活動導致流量陡增。反壓如果不能得到正確的處理,可能會導致資源耗盡甚至系統(tǒng)崩潰。反壓機制就是指系統(tǒng)能夠自己檢測到被阻塞的Operator,然后系統(tǒng)自適應地降低源頭或者上游的發(fā)送速率。目前主流的流處理系統(tǒng) Apache Storm、JStorm、Spark Streaming、S4、Apache Flink、Twitter Heron都采用反壓機制解決這個問題,不過他們的實現(xiàn)各自不同。

不同的組件可以不同的速度執(zhí)行(并且每個組件中的處理速度隨時間改變)。例如,考慮一個工作流程,或由于數(shù)據(jù)傾斜或任務(wù)調(diào)度而導致數(shù)據(jù)被處理十分緩慢。在這種情況下,如果上游階段不減速,將導致緩沖區(qū)建立長隊列,或?qū)е孪到y(tǒng)丟棄元組。如果元組在中途丟棄,那么效率可能會有損失,因為已經(jīng)為這些元組產(chǎn)生的計算被浪費了。并且在一些流處理系統(tǒng)中比如Strom,會將這些丟失的元組重新發(fā)送,這樣會導致數(shù)據(jù)的一致性問題,并且還會導致某些Operator狀態(tài)疊加。進而整個程序輸出結(jié)果不準確。第二由于系統(tǒng)接收數(shù)據(jù)的速率是隨著時間改變的,短時負載高峰導致系統(tǒng)接收數(shù)據(jù)的速率遠高于它處理數(shù)據(jù)的速率的情況,也會導致Tuple在中途丟失。所以實時流處理系統(tǒng)必須能夠解決發(fā)送速率遠大于系統(tǒng)能處理速率這個問題,大多數(shù)實時流處理系統(tǒng)采用反壓(BackPressure)機制解決這個問題。下面我們就來介紹一下不同的實時流處理系統(tǒng)采用的反壓機制:
1.Strom 反壓機制
1.1 Storm 1.0 以前的反壓機制
對于開啟了acker機制的storm程序,可以通過設(shè)置conf.setMaxSpoutPending參數(shù)來實現(xiàn)反壓效果,如果下游組件(bolt)處理速度跟不上導致spout發(fā)送的tuple沒有及時確認的數(shù)超過了參數(shù)設(shè)定的值,spout會停止發(fā)送數(shù)據(jù),這種方式的缺點是很難調(diào)優(yōu)conf.setMaxSpoutPending參數(shù)的設(shè)置以達到最好的反壓效果,設(shè)小了會導致吞吐上不去,設(shè)大了會導致worker OOM;有震蕩,數(shù)據(jù)流會處于一個顛簸狀態(tài),效果不如逐級反壓;另外對于關(guān)閉acker機制的程序無效;
1.2 Storm Automatic Backpressure
新的storm自動反壓機制(Automatic Back Pressure)通過監(jiān)控bolt中的接收隊列的情況,當超過高水位值時專門的線程會將反壓信息寫到 Zookeeper ,Zookeeper上的watch會通知該拓撲的所有Worker都進入反壓狀態(tài),最后Spout降低tuple發(fā)送的速度。

每個Executor都有一個接受隊列和發(fā)送隊列用來接收Tuple和發(fā)送Spout或者Bolt生成的Tuple元組。每個Worker進程都有一個單的的接收線程監(jiān)聽接收端口。它從每個網(wǎng)絡(luò)上進來的消息發(fā)送到Executor的接收隊列中。Executor接收隊列存放Worker或者Worker內(nèi)部其他Executor發(fā)過來的消息。Executor工作線程從接收隊列中拿出數(shù)據(jù),然后調(diào)用execute方法,發(fā)送Tuple到Executor的發(fā)送隊列。Executor的發(fā)送線程從發(fā)送隊列中獲取消息,按照消息目的地址選擇發(fā)送到Worker的傳輸隊列中或者其他Executor的接收隊列中。最后Worker的發(fā)送線程從傳輸隊列中讀取消息,然后將Tuple元組發(fā)送到網(wǎng)絡(luò)中。
1. 當Worker進程中的Executor線程發(fā)現(xiàn)自己的接收隊列滿了時,也就是接收隊列達到high watermark的閾值后,因此它會發(fā)送通知消息到背壓線程。
2. 背壓線程將當前worker進程的信息注冊到Zookeeper的Znode節(jié)點中。具體路徑就是 /Backpressure/topo1/wk1下
3. Zookeepre的Znode Watcher監(jiān)視/Backpreesure/topo1下的節(jié)點目錄變化情況,如果發(fā)現(xiàn)目錄增加了znode節(jié)點說明或者其他變化。這就說明該Topo1需要反壓控制,然后它會通知Topo1所有的Worker進入反壓狀態(tài)。
4.最終Spout降低tuple發(fā)送的速度。
2. JStorm 反壓機制
Jstorm做了兩級的反壓,第一級和Jstorm類似,通過執(zhí)行隊列來監(jiān)測,但是不會通過ZK來協(xié)調(diào),而是通過Topology Master來協(xié)調(diào)。在隊列中會標記high water mark和low water mark,當執(zhí)行隊列超過high water mark時,就認為bolt來不及處理,則向TM發(fā)一條控制消息,上游開始減慢發(fā)送速率,直到下游低于low water mark時解除反壓。
此外,在Netty層也做了一級反壓,由于每個Worker Task都有自己的發(fā)送和接收的緩沖區(qū),可以對緩沖區(qū)設(shè)定限額、控制大小,如果spout數(shù)據(jù)量特別大,緩沖區(qū)填滿會導致下游bolt的接收緩沖區(qū)填滿,造成了反壓。

限流機制:jstorm的限流機制, 當下游bolt發(fā)生阻塞時, 并且阻塞task的比例超過某個比例時(現(xiàn)在默認設(shè)置為0.1),觸發(fā)反壓
限流方式:計算阻塞Task的地方執(zhí)行線程執(zhí)行時間,Spout每發(fā)送一個tuple等待相應時間,然后講這個時間發(fā)送給Spout, 于是, spout每發(fā)送一個tuple,就會等待這個執(zhí)行時間。
Task阻塞判斷方式:在jstorm 連續(xù)4次采樣周期中采樣,隊列情況,當隊列超過80%(可以設(shè)置)時,即可認為該task處在阻塞狀態(tài)。
3. SparkStreaming 反壓機制
3.1 為什么引入反壓機制Backpressure
默認情況下,Spark Streaming通過Receiver以生產(chǎn)者生產(chǎn)數(shù)據(jù)的速率接收數(shù)據(jù),計算過程中會出現(xiàn)batch processing time > batch interval的情況,其中batch processing time 為實際計算一個批次花費時間, batch interval為Streaming應用設(shè)置的批處理間隔。這意味著Spark Streaming的數(shù)據(jù)接收速率高于Spark從隊列中移除數(shù)據(jù)的速率,也就是數(shù)據(jù)處理能力低,在設(shè)置間隔內(nèi)不能完全處理當前接收速率接收的數(shù)據(jù)。如果這種情況持續(xù)過長的時間,會造成數(shù)據(jù)在內(nèi)存中堆積,導致Receiver所在Executor內(nèi)存溢出等問題(如果設(shè)置StorageLevel包含disk, 則內(nèi)存存放不下的數(shù)據(jù)會溢寫至disk, 加大延遲)。Spark 1.5以前版本,用戶如果要限制Receiver的數(shù)據(jù)接收速率,可以通過設(shè)置靜態(tài)配制參數(shù)“spark.streaming.receiver.maxRate”的值來實現(xiàn),此舉雖然可以通過限制接收速率,來適配當前的處理能力,防止內(nèi)存溢出,但也會引入其它問題。比如:producer數(shù)據(jù)生產(chǎn)高于maxRate,當前集群處理能力也高于maxRate,這就會造成資源利用率下降等問題。為了更好的協(xié)調(diào)數(shù)據(jù)接收速率與資源處理能力,Spark Streaming 從v1.5開始引入反壓機制(back-pressure),通過動態(tài)控制數(shù)據(jù)接收速率來適配集群數(shù)據(jù)處理能力。
3.2 反壓機制Backpressure
Spark Streaming Backpressure: 根據(jù)JobScheduler反饋作業(yè)的執(zhí)行信息來動態(tài)調(diào)整Receiver數(shù)據(jù)接收率。通過屬性“spark.streaming.backpressure.enabled”來控制是否啟用backpressure機制,默認值false,即不啟用。
SparkStreaming 架構(gòu)圖如下所示:

SparkStreaming 反壓過程執(zhí)行如下圖所示:
在原架構(gòu)的基礎(chǔ)上加上一個新的組件RateController,這個組件負責監(jiān)聽“OnBatchCompleted”事件,然后從中抽取processingDelay 及schedulingDelay信息. Estimator依據(jù)這些信息估算出最大處理速度(rate),最后由基于Receiver的Input Stream將rate通過ReceiverTracker與ReceiverSupervisorImpl轉(zhuǎn)發(fā)給BlockGenerator(繼承自RateLimiter).

4. Heron 反壓機制

當下游處理速度跟不上上游發(fā)送速度時,一旦StreamManager 發(fā)現(xiàn)一個或多個Heron Instance 速度變慢,立刻對本地spout進行降級,降低本地Spout發(fā)送速度, 停止從這些spout讀取數(shù)據(jù)。并且受影響的StreamManager 會發(fā)送一個特殊的start backpressure message 給其他的StreamManager ,要求他們對spout進行本地降級。當其他StreamManager 接收到這個特殊消息時,他們通過不讀取當?shù)豐pout中的Tuple來進行降級。一旦出問題的Heron Instance 恢復速度后,本地的SM 會發(fā)送stop backpressure message 解除降級。
很多Socket Channel與應用程序級別的Buffer相關(guān)聯(lián),該緩沖區(qū)由high watermark 和low watermark組成。當緩沖區(qū)大小達到high watermark時觸發(fā)反壓,并保持有效,直到緩沖區(qū)大小低于low watermark。此設(shè)計的基本原理是防止拓撲在進入和退出背壓緩解模式之間快速振蕩。
5. Flink 反壓機制
Flink 沒有使用任何復雜的機制來解決反壓問題,因為根本不需要那樣的方案!它利用自身作為純數(shù)據(jù)流引擎的優(yōu)勢來優(yōu)雅地響應反壓問題。下面我們會深入分析 Flink 是如何在 Task 之間傳輸數(shù)據(jù)的,以及數(shù)據(jù)流如何實現(xiàn)自然降速的。
Flink 在運行時主要由operators和streams兩大組件構(gòu)成。每個 operator 會消費中間態(tài)的流,并在流上進行轉(zhuǎn)換,然后生成新的流。對于 Flink 的網(wǎng)絡(luò)機制一種形象的類比是,F(xiàn)link 使用了高效有界的分布式阻塞隊列,就像 Java 通用的阻塞隊列(BlockingQueue)一樣。還記得經(jīng)典的線程間通信案例:生產(chǎn)者消費者模型嗎?使用 BlockingQueue 的話,一個較慢的接受者會降低發(fā)送者的發(fā)送速率,因為一旦隊列滿了(有界隊列)發(fā)送者會被阻塞。Flink 解決反壓的方案就是這種感覺。
在 Flink 中,這些分布式阻塞隊列就是這些邏輯流,而隊列容量是通過緩沖池來(LocalBufferPool)實現(xiàn)的。每個被生產(chǎn)和被消費的流都會被分配一個緩沖池。緩沖池管理著一組緩沖(Buffer),緩沖在被消費后可以被回收循環(huán)利用。這很好理解:你從池子中拿走一個緩沖,填上數(shù)據(jù),在數(shù)據(jù)消費完之后,又把緩沖還給池子,之后你可以再次使用它。
5.1 Flink 網(wǎng)絡(luò)傳輸中的內(nèi)存管理
如下圖所示展示了 Flink 在網(wǎng)絡(luò)傳輸場景下的內(nèi)存管理。網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)會寫到 Task 的 InputGate(IG) 中,經(jīng)過 Task 的處理后,再由 Task 寫到 ResultPartition(RS) 中。每個 Task 都包括了輸入和輸入,輸入和輸出的數(shù)據(jù)存在 Buffer 中(都是字節(jié)數(shù)據(jù))。Buffer 是 MemorySegment 的包裝類。

TaskManager(TM)在啟動時,會先初始化NetworkEnvironment對象,TM 中所有與網(wǎng)絡(luò)相關(guān)的東西都由該類來管理(如 Netty 連接),其中就包括NetworkBufferPool。根據(jù)配置,F(xiàn)link 會在 NetworkBufferPool 中生成一定數(shù)量(默認2048個)的內(nèi)存塊 MemorySegment(關(guān)于 Flink 的內(nèi)存管理,后續(xù)文章會詳細談到),內(nèi)存塊的總數(shù)量就代表了網(wǎng)絡(luò)傳輸中所有可用的內(nèi)存。NetworkEnvironment 和 NetworkBufferPool 是 Task 之間共享的,每個 TM 只會實例化一個。 Task 線程啟動時,會向 NetworkEnvironment 注冊,NetworkEnvironment 會為 Task 的 InputGate(IG)和 ResultPartition(RP) 分別創(chuàng)建一個 LocalBufferPool(緩沖池)并設(shè)置可申請的 MemorySegment(內(nèi)存塊)數(shù)量。IG 對應的緩沖池初始的內(nèi)存塊數(shù)量與 IG 中 InputChannel 數(shù)量一致,RP 對應的緩沖池初始的內(nèi)存塊數(shù)量與 RP 中的 ResultSubpartition 數(shù)量一致。不過,每當創(chuàng)建或銷毀緩沖池時,NetworkBufferPool 會計算剩余空閑的內(nèi)存塊數(shù)量,并平均分配給已創(chuàng)建的緩沖池。注意,這個過程只是指定了緩沖池所能使用的內(nèi)存塊數(shù)量,并沒有真正分配內(nèi)存塊,只有當需要時才分配。為什么要動態(tài)地為緩沖池擴容呢?因為內(nèi)存越多,意味著系統(tǒng)可以更輕松地應對瞬時壓力(如GC),不會頻繁地進入反壓狀態(tài),所以我們要利用起那部分閑置的內(nèi)存塊。 在 Task 線程執(zhí)行過程中,當 Netty 接收端收到數(shù)據(jù)時,為了將 Netty 中的數(shù)據(jù)拷貝到 Task 中,InputChannel(實際是 RemoteInputChannel)會向其對應的緩沖池申請內(nèi)存塊(上圖中的①)。如果緩沖池中也沒有可用的內(nèi)存塊且已申請的數(shù)量還沒到池子上限,則會向 NetworkBufferPool 申請內(nèi)存塊(上圖中的②)并交給 InputChannel 填上數(shù)據(jù)(上圖中的③和④)。如果緩沖池已申請的數(shù)量達到上限了呢?或者 NetworkBufferPool 也沒有可用內(nèi)存塊了呢?這時候,Task 的 Netty Channel 會暫停讀取,上游的發(fā)送端會立即響應停止發(fā)送,拓撲會進入反壓狀態(tài)。當 Task 線程寫數(shù)據(jù)到 ResultPartition 時,也會向緩沖池請求內(nèi)存塊,如果沒有可用內(nèi)存塊時,會阻塞在請求內(nèi)存塊的地方,達到暫停寫入的目的。 當一個內(nèi)存塊被消費完成之后(在輸入端是指內(nèi)存塊中的字節(jié)被反序列化成對象了,在輸出端是指內(nèi)存塊中的字節(jié)寫入到 Netty Channel 了),會調(diào)用 Buffer.recycle() 方法,會將內(nèi)存塊還給 LocalBufferPool (上圖中的⑤)。如果LocalBufferPool中當前申請的數(shù)量超過了池子容量(由于上文提到的動態(tài)容量,由于新注冊的 Task 導致該池子容量變小),則LocalBufferPool會將該內(nèi)存塊回收給 NetworkBufferPool(上圖中的⑥)。如果沒超過池子容量,則會繼續(xù)留在池子中,減少反復申請的開銷。
5.2 Flink 反壓機制
下面這張圖簡單展示了兩個 Task 之間的數(shù)據(jù)傳輸以及 Flink 如何感知到反壓的:

記錄“A”進入了 Flink 并且被 Task 1 處理。(這里省略了 Netty 接收、反序列化等過程) 記錄被序列化到 buffer 中。 該 buffer 被發(fā)送到 Task 2,然后 Task 2 從這個 buffer 中讀出記錄。
不要忘了:記錄能被 Flink 處理的前提是,必須有空閑可用的 Buffer。
結(jié)合上面兩張圖看:Task 1 在輸出端有一個相關(guān)聯(lián)的 LocalBufferPool(稱緩沖池1),Task 2 在輸入端也有一個相關(guān)聯(lián)的 LocalBufferPool(稱緩沖池2)。如果緩沖池1中有空閑可用的 buffer 來序列化記錄 “A”,我們就序列化并發(fā)送該 buffer。
這里我們需要注意兩個場景:
本地傳輸:如果 Task 1 和 Task 2 運行在同一個 worker 節(jié)點(TaskManager),該 buffer 可以直接交給下一個 Task。一旦 Task 2 消費了該 buffer,則該 buffer 會被緩沖池1回收。如果 Task 2 的速度比 1 慢,那么 buffer 回收的速度就會趕不上 Task 1 取 buffer 的速度,導致緩沖池1無可用的 buffer,Task 1 等待在可用的 buffer 上。最終形成 Task 1 的降速。 遠程傳輸:如果 Task 1 和 Task 2 運行在不同的 worker 節(jié)點上,那么 buffer 會在發(fā)送到網(wǎng)絡(luò)(TCP Channel)后被回收。在接收端,會從 LocalBufferPool 中申請 buffer,然后拷貝網(wǎng)絡(luò)中的數(shù)據(jù)到 buffer 中。如果沒有可用的 buffer,會停止從 TCP 連接中讀取數(shù)據(jù)。在輸出端,通過 Netty 的水位值機制來保證不往網(wǎng)絡(luò)中寫入太多數(shù)據(jù)(后面會說)。如果網(wǎng)絡(luò)中的數(shù)據(jù)(Netty輸出緩沖中的字節(jié)數(shù))超過了高水位值,我們會等到其降到低水位值以下才繼續(xù)寫入數(shù)據(jù)。這保證了網(wǎng)絡(luò)中不會有太多的數(shù)據(jù)。如果接收端停止消費網(wǎng)絡(luò)中的數(shù)據(jù)(由于接收端緩沖池沒有可用 buffer),網(wǎng)絡(luò)中的緩沖數(shù)據(jù)就會堆積,那么發(fā)送端也會暫停發(fā)送。另外,這會使得發(fā)送端的緩沖池得不到回收,writer 阻塞在向 LocalBufferPool 請求 buffer,阻塞了 writer 往 ResultSubPartition 寫數(shù)據(jù)。
這種固定大小緩沖池就像阻塞隊列一樣,保證了 Flink 有一套健壯的反壓機制,使得 Task 生產(chǎn)數(shù)據(jù)的速度不會快于消費的速度。我們上面描述的這個方案可以從兩個 Task 之間的數(shù)據(jù)傳輸自然地擴展到更復雜的 pipeline 中,保證反壓機制可以擴散到整個 pipeline。
5.3 反壓實驗
另外,官方博客中為了展示反壓的效果,給出了一個簡單的實驗。下面這張圖顯示了:隨著時間的改變,生產(chǎn)者(黃色線)和消費者(綠色線)每5秒的平均吞吐與最大吞吐(在單一JVM中每秒達到8百萬條記錄)的百分比。我們通過衡量task每5秒鐘處理的記錄數(shù)來衡量平均吞吐。該實驗運行在單 JVM 中,不過使用了完整的 Flink 功能棧。

首先,我們運行生產(chǎn)task到它最大生產(chǎn)速度的60%(我們通過Thread.sleep()來模擬降速)。消費者以同樣的速度處理數(shù)據(jù)。然后,我們將消費task的速度降至其最高速度的30%。你就會看到背壓問題產(chǎn)生了,正如我們所見,生產(chǎn)者的速度也自然降至其最高速度的30%。接著,停止消費task的人為降速,之后生產(chǎn)者和消費者task都達到了其最大的吞吐。接下來,我們再次將消費者的速度降至30%,pipeline給出了立即響應:生產(chǎn)者的速度也被自動降至30%。最后,我們再次停止限速,兩個task也再次恢復100%的速度??偠灾?,我們可以看到:生產(chǎn)者和消費者在 pipeline 中的處理都在跟隨彼此的吞吐而進行適當?shù)恼{(diào)整,這就是我們希望看到的反壓的效果。
5.4 Flink 反壓監(jiān)控
在 Storm/JStorm 中,只要監(jiān)控到隊列滿了,就可以記錄下拓撲進入反壓了。但是 Flink 的反壓太過于天然了,導致我們無法簡單地通過監(jiān)控隊列來監(jiān)控反壓狀態(tài)。Flink 在這里使用了一個 trick 來實現(xiàn)對反壓的監(jiān)控。如果一個 Task 因為反壓而降速了,那么它會卡在向 LocalBufferPool 申請內(nèi)存塊上。那么這時候,該 Task 的 stack trace 就會長下面這樣:
java.lang.Object.wait(Native Method)o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING request[...]
那么事情就簡單了。通過不斷地采樣每個 task 的 stack trace 就可以實現(xiàn)反壓監(jiān)控。

Flink 的實現(xiàn)中,只有當 Web 頁面切換到某個 Job 的 Backpressure 頁面,才會對這個 Job 觸發(fā)反壓檢測,因為反壓檢測還是挺昂貴的。JobManager 會通過 Akka 給每個 TaskManager 發(fā)送TriggerStackTraceSample消息。默認情況下,TaskManager 會觸發(fā)100次 stack trace 采樣,每次間隔 50ms(也就是說一次反壓檢測至少要等待5秒鐘)。并將這 100 次采樣的結(jié)果返回給 JobManager,由 JobManager 來計算反壓比率(反壓出現(xiàn)的次數(shù)/采樣的次數(shù)),最終展現(xiàn)在 UI 上。UI 刷新的默認周期是一分鐘,目的是不對 TaskManager 造成太大的負擔。
總結(jié)
Flink 不需要一種特殊的機制來處理反壓,因為 Flink 中的數(shù)據(jù)傳輸相當于已經(jīng)提供了應對反壓的機制。因此,F(xiàn)link 所能獲得的最大吞吐量由其 pipeline 中最慢的組件決定。相對于 Storm/JStorm 的實現(xiàn),F(xiàn)link 的實現(xiàn)更為簡潔優(yōu)雅,源碼中也看不見與反壓相關(guān)的代碼,無需 Zookeeper/TopologyMaster 的參與也降低了系統(tǒng)的負載,也利于對反壓更迅速的響應。
