我是Flink,現(xiàn)在"背"感壓力,通俗易懂的講解Flink背壓機(jī)制
為什么你的Flink運(yùn)行開始減慢了?
為什么你試遍Flink參數(shù)還是無法解決?
Flink背壓常常發(fā)生在生產(chǎn)事故中,切記不要掉以輕心。
不知為何,最近的我開始走下坡路了。。。
1 故事的開始
此刻,我抬頭看了一眼坐在對(duì)面的這個(gè)家伙: 格子衫、中等身材,略高的鼻梁下頂著一副黑框眼鏡,微瞇的目光透出絲絲倦意,正一眨不眨地盯著我看。

我心里直犯嘀咕: 我又有什么好看的呢?不過是A君你用來換取面包、汽車的工具罷了。雖然陪伴了五年的時(shí)光,想來也就是如此~
說到這,忘了自我介紹了。我叫Flink,當(dāng)然,我還是喜歡你們叫我的全名: Apache Flink,因?yàn)檫@樣聽起來很有科技感。我是目前最火的大數(shù)據(jù)實(shí)時(shí)計(jì)算引擎之一。
之所以敢這么說,是因?yàn)槟壳拔以?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(239, 112, 96);">實(shí)時(shí)領(lǐng)域確實(shí)處于獨(dú)領(lǐng)風(fēng)騷的地位,不信請(qǐng)看下面的統(tǒng)計(jì):

此處需要@一下我的老大哥:Apache Spark,我聽說一度出現(xiàn)過"Flink的出現(xiàn),Spark是否慢慢成為雞肋"的言論。咱也不敢說,也不敢問,對(duì)于前輩還是保持尊重和理性。
"咳"~ 一聲輕咳把我拉回了現(xiàn)實(shí),A君又開始調(diào)試代碼了~
2 我開始有壓力了
其實(shí)我是在上周和A君再次遇見的,之前聽說他在我的好朋友:Kafka那里呆了一周,好像是準(zhǔn)備搞一個(gè)大事情。
等到他找到我, 才知道公司準(zhǔn)備建設(shè)實(shí)時(shí)數(shù)倉(cāng)。需要我和Kafka兄弟一起加入,處理億級(jí)別實(shí)時(shí)數(shù)據(jù)。
對(duì)于實(shí)時(shí)數(shù)倉(cāng)我大抵是了解的。再看看A君的老大拿出的架構(gòu)方案,心中暗喜:這可是到了我的專業(yè)領(lǐng)域。

整體架構(gòu)并不難,很好理解。
程序?qū)崟r(shí)獲取源數(shù)據(jù),放置kafka ods層存儲(chǔ) 進(jìn)行ods->dwd->dws層實(shí)時(shí)加工計(jì)算,結(jié)果寫進(jìn)kafka 再加一條離線處理流程,作為備用
我看了一眼旁邊躍躍欲試的Kafka兄弟,互相點(diǎn)了點(diǎn)頭,我們開始吧~
作為老搭檔,我和Kafka兄弟配合的很默契,A君也是個(gè)老手,于是我們?cè)诙潭痰囊恢軆?nèi)就出色的完成了初步任務(wù)。
我可以給你看看我們的部分配合成果:
-?src.main.scala.com.xxproject.xx
??|--handler
????|---FlinkODSHandler.scala
????|---FlinkDWHandler.scala
????|---FlinkADSHandler.scala
????...
??|--service
????|---KafkaSchdulerService.scala
????|---SchdulerService.scala
????...
??|--config/util/model
????|---KafkaUtils.scala
????|---XXDataModel.scala
????...

春風(fēng)得意馬蹄疾~ 此刻的心情舒服極了,我們仨簡(jiǎn)直就是完美搭檔。。
可是好景不長(zhǎng)。來到第二周,我漸漸的發(fā)現(xiàn)自己開始變慢了~
具體的表現(xiàn)為 :
運(yùn)行開始時(shí)正常,到了后面就出現(xiàn)大量Task任務(wù) 等待少量Task任務(wù)開始報(bào) checkpoint超時(shí)問題Kafka數(shù)據(jù)堆積,無法消費(fèi)
我有點(diǎn)慌,去看了下自身的狀況,結(jié)果嚇了一大跳:

無論是輸入還是輸出,緩沖區(qū)內(nèi)存都被占滿了。數(shù)據(jù)處理不過來,barrier流動(dòng)極為緩慢,大量checkpoint生成時(shí)間變長(zhǎng)。
我發(fā)生了背壓問題?。?!
3 我的反壓機(jī)制
在默默的進(jìn)行一段時(shí)間的自我調(diào)節(jié)后,問題依然沒有解決。
同時(shí),我的周圍不斷拉響警報(bào),內(nèi)存頻繁告急。轉(zhuǎn)眼間我的Task執(zhí)行頁面已被紅色High標(biāo)識(shí)占滿~

沒有辦法,最終我還是向A君發(fā)出了告警~
A君收到消息,盯著我看了好一會(huì),嘆了口氣。我覺得有點(diǎn)不好意思,感覺把事情搞砸了。。
他沒有多說什么,只是問起了我的反壓機(jī)制,說要從源頭解決問題。
下面是A君和我的對(duì)話
1. 反壓一般有哪些情況?
按照我以往的經(jīng)驗(yàn),一般出現(xiàn)反壓就是下游數(shù)據(jù)的處理速度跟不上上游數(shù)據(jù)的產(chǎn)生速度。

可以細(xì)分兩種情況:
當(dāng)前Task任務(wù)處理速度慢,比如task任務(wù)中調(diào)用算法處理等復(fù)雜邏輯,導(dǎo)致上游申請(qǐng)不到足夠內(nèi)存。下游Task任務(wù)處理速度慢,比如多次collect()輸出到下游,導(dǎo)致當(dāng)前節(jié)點(diǎn)無法申請(qǐng)足夠的內(nèi)存。
2. 頻繁反壓的影響是什么?
頻繁反壓會(huì)導(dǎo)致流處理作業(yè)數(shù)據(jù)延遲增加,同時(shí)還會(huì)影響到Checkpoint。

Checkpoint時(shí)需要進(jìn)行Barrier對(duì)齊,此時(shí)若某個(gè)Task出現(xiàn)反壓,Barrier流動(dòng)速度會(huì)下降,導(dǎo)致Checkpoint變慢甚至超時(shí),任務(wù)整體也變慢。
長(zhǎng)期或頻繁出現(xiàn)反壓才需要處理,如果由于
網(wǎng)絡(luò)波動(dòng)或者GC出現(xiàn)的偶爾反壓可以不必處理。
3. 你是怎么發(fā)現(xiàn)反壓的?
在我的Web界面,我會(huì)從Sink到Source逆向Task排查。逐個(gè)查看BackPressure詳情,找到第一個(gè)出現(xiàn)反壓的Task。
下面這是正常的狀況~

我的內(nèi)部檢測(cè)原理
BackPressure界面定期采樣Task線程棧信息,統(tǒng)計(jì)線程請(qǐng)求內(nèi)存Buffer的阻塞頻率,判斷節(jié)點(diǎn)是否處于反壓狀態(tài)。
默認(rèn)情況下,頻率小于 0.1顯示正常(0.1,0.5)為L(zhǎng)OW,背壓輕微超過 0.5為 HIGH,需要注意反壓

此時(shí),我指給A君看了目前項(xiàng)目的BackPressure頁面,這明顯是不正常的狀況。
4. 反壓機(jī)制原理是什么?
A君頓了頓嗓子,提示我此處講的仔細(xì)一點(diǎn)。我整理了下思路,決定先從限流開始說起:
數(shù)據(jù)流程

整體流程可類比為生產(chǎn)者->消費(fèi)者體系。上游生產(chǎn)者發(fā)送數(shù)據(jù)(2M/s)至Send Buffer,途徑網(wǎng)絡(luò)傳輸(5M/s)到Receive Buffer, 最終下游Consumer消費(fèi)(<1M/s)。
這明顯是不行的,下游速度慢于上游速度,數(shù)據(jù)久積成疾~ 需要做限流。
限流

這很好理解。既然上游處理較快,那么我添加一個(gè)限流機(jī)制將其速度降下來,讓上下游速度基本一致,這樣不就解決了嗎。。
其實(shí)不然,這里有幾個(gè)問題:
我無法提前預(yù)估下游實(shí)際速度(流速限制設(shè)置多少) 常碰到網(wǎng)絡(luò)波動(dòng)等情況,上下游的流速是 動(dòng)態(tài)變化的
考慮到這些原因,我的內(nèi)部提供一種強(qiáng)大的反壓機(jī)制:

上下游動(dòng)態(tài)反饋,如果下游速度慢,則上游限速;否則上游提速。實(shí)現(xiàn)動(dòng)態(tài)自動(dòng)反壓的效果。
反壓機(jī)制示意

上游發(fā)送網(wǎng)絡(luò)數(shù)據(jù)前經(jīng)過自身的Network Buffer層,之后往下傳輸?shù)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(239, 112, 96);">Channel Buffer層(Netty通道)。最終通過網(wǎng)絡(luò)傳輸,層層傳遞達(dá)到下游。
Network Buffer、Channel Buffer和Socket Buffer通俗理解就是
用戶態(tài)和內(nèi)核態(tài)的區(qū)別,處于不同的交換空間和操作系統(tǒng)。
有關(guān)內(nèi)核態(tài)和用戶態(tài)原理,有興趣的小伙伴歡迎添加個(gè)人微信: youlong525?一起討論~
反壓機(jī)制原理
前面做了一些鋪墊,這里我給A君總結(jié)了我的反壓機(jī)制的運(yùn)行流程:

每個(gè)
TaskManager維護(hù)共享Network BufferPool(Task共享內(nèi)存池),初始化時(shí)向Off-heap Memory中申請(qǐng)內(nèi)存。每個(gè)Task創(chuàng)建自身的
Local BufferPool(Task本地內(nèi)存池),并和Network BufferPool交換內(nèi)存。上游
Record Writer向 Local BufferPool申請(qǐng)buffer(內(nèi)存)寫數(shù)據(jù)。如果Local BufferPool沒有足夠內(nèi)存則向Network BufferPool申請(qǐng),使用完之后將申請(qǐng)的內(nèi)存返回Pool。Netty Buffer拷貝buffer并經(jīng)過Socket Buffer發(fā)送到網(wǎng)絡(luò),后續(xù)下游端按照相似機(jī)制處理。當(dāng)下游申請(qǐng)buffer失敗時(shí),表示當(dāng)前節(jié)點(diǎn)
內(nèi)存不夠,則逐層發(fā)送反壓信號(hào)給上游,上游慢慢停止數(shù)據(jù)發(fā)送,直到下游再次恢復(fù)。

所以,我的反壓機(jī)制類似于Java中的阻塞隊(duì)列,如下圖我的內(nèi)存級(jí)的反壓工作原理示意。

Task任務(wù)通過與Local BufferPool和Network BufferPool協(xié)作進(jìn)行內(nèi)存申請(qǐng)和釋放,同時(shí)下游內(nèi)存使用情況實(shí)時(shí)反饋給上游,實(shí)現(xiàn)動(dòng)態(tài)反壓。
A君聽完我的回答,陷入了沉思~
4 我要減壓
其實(shí)我心里也很迷惑。我對(duì)自己的反壓機(jī)制很有信心,會(huì)不會(huì)是其他原因影響到了反壓處理?
這時(shí),一旁的A君打開了我的WEB UI,口中喃喃的吐出幾個(gè)詞: ?數(shù)據(jù)傾斜和并發(fā)。
4.1 ?第一次嘗試
我瞬間明白了過來,轉(zhuǎn)眼去看屏幕。
我分別查看了各個(gè)SubTask情況,發(fā)現(xiàn)在某個(gè)Checkpoint中對(duì)應(yīng)的state size值存在個(gè)別異常,竟達(dá)到了10G左右大小?。?/p>
再看下分區(qū)內(nèi)的其他值(如圖):

發(fā)生數(shù)據(jù)傾斜了~
我心里有了底,立馬和A君一起找出了這些特殊的Key,進(jìn)行預(yù)聚合打散和數(shù)據(jù)拆分,再次運(yùn)行。

感覺有那么一點(diǎn)效果,但是還是有蠻多的高峰值。。
4.2 ?第二次嘗試
此刻又陷入了僵局。
沒辦法,我加大了自身的一點(diǎn)內(nèi)存。想了想,又加大了算子的并發(fā)度,畢竟增加線程數(shù)總歸會(huì)緩解一些計(jì)算壓力。
不甘心的調(diào)整了參數(shù)之后,結(jié)果依然沒有太多提升。

4.3 ?第三次嘗試
A君開始重新梳理我的整體計(jì)算流程,然后改動(dòng)了一個(gè)參數(shù)。
我看了下,還是修改并發(fā)度。心中不以為然,我剛才可就試過了這個(gè)。。
好像有點(diǎn)不對(duì)勁。。

這就是我要的結(jié)果?。∥也唤傲顺鰜怼?/p>
他笑了笑,告訴我這是用到了我的算子鏈機(jī)制。
算子鏈
通過將下游算子和上游算子設(shè)置相同并發(fā)度,可自動(dòng)形成算子鏈
這樣做的好處是:
有效減少線程間切換和數(shù)據(jù)緩存開銷 提高吞吐量且降低延遲

整個(gè)流程中形成多個(gè)算子鏈,導(dǎo)致線程開銷和內(nèi)存使用率下降。我的反壓情況自然也變得緩和了。
我不禁大受震撼~~
5 ?一日看盡長(zhǎng)安花
最終在A君的協(xié)助下,我的速度回來了。幾天高壓的日子徹底結(jié)束,此刻盡絲滑~
我緩緩?fù)鲁鲆豢跉猓悬c(diǎn)欣慰的看著最后的結(jié)果:

不自覺地抬頭看了眼A君,他也露出了久違的微笑。
我是Flink,現(xiàn)在沒有壓力~
本文完。
數(shù)據(jù)倉(cāng)庫系列文章(持續(xù)更新)
數(shù)倉(cāng)架構(gòu)發(fā)展史 數(shù)倉(cāng)建模方法論 數(shù)倉(cāng)建模分層理論 數(shù)倉(cāng)建?!獙挶淼脑O(shè)計(jì) 數(shù)倉(cāng)建模—指標(biāo)體系 數(shù)據(jù)倉(cāng)庫之拉鏈表 數(shù)倉(cāng)—數(shù)據(jù)集成 數(shù)倉(cāng)—數(shù)據(jù)集市 數(shù)倉(cāng)—商業(yè)智能系統(tǒng) 數(shù)倉(cāng)—埋點(diǎn)設(shè)計(jì)與管理 數(shù)倉(cāng)—ID Mapping 數(shù)倉(cāng)—OneID
