敖丙怎么連Flink的背壓都不懂,太菜了吧?
三歪第402篇原創(chuàng)文章
作者:三歪
本文已收錄至我的GitHub
最近一直在遷移Flink相關的工程,期間也踩了些坑,checkpoint和反壓是其中的一個。

敖丙太菜了,Flink都不會,只能我自己來了。
看敖丙只能圖一樂,學技術還是得看三歪
平時敖丙黑我都沒啥水平,拿點簡單的東西來就說我不會,麻煩專業(yè)點@敖丙。
今天來分享一下 Flink的checkpoint機制和背壓原理,我相信通過這篇文章,大家在玩Flink的時候可以更加深刻地了解Checkpoint是怎么實現(xiàn)的,并且在設置相關參數(shù)以及使用的時候可以更加地得心應手。
上一篇已經(jīng)寫過Flink的入門教程了,如果還不了解Flink的同學可以先去看看:《Flink入門教程》
前排提醒,本文基于Flink 1.7
《淺入淺出學習Flink的背壓知識》
開胃菜
在講解Flink的checkPoint和背壓機制之前,我們先來看下checkpoint和背壓的相關基礎,有助于后面的理解。
作為用戶,我們寫好Flink的程序,上管理平臺提交,Flink就跑起來了(只要程序代碼沒有問題),細節(jié)對用戶都是屏蔽的。

實際上大致的流程是這樣的:
Flink會根據(jù)我們所寫代碼,會生成一個StreamGraph的圖出來,來代表我們所寫程序的拓撲結構。然后在提交的之前會將 StreamGraph這個圖優(yōu)化一把(可以合并的任務進行合并),變成JobGraph將 JobGraph提交給JobManagerJobManager收到之后JobGraph之后會根據(jù)JobGraph生成ExecutionGraph(ExecutionGraph是JobGraph的并行化版本)TaskManager接收到任務之后會將ExecutionGraph生成為真正的物理執(zhí)行圖

可以看到物理執(zhí)行圖真正運行在TaskManager上Transform和Sink之間都會有ResultPartition和InputGate這倆個組件,ResultPartition用來發(fā)送數(shù)據(jù),而InputGate用來接收數(shù)據(jù)。

屏蔽掉這些Graph,可以發(fā)現(xiàn)Flink的架構是:Client->JobManager->TaskManager

從名字就可以看出,JobManager是干「管理」,而TaskManager是真正干活的?;氐轿覀兘裉斓闹黝},checkpoint就是由JobManager發(fā)出。

而Flink本身就是有狀態(tài)的,Flink可以讓你選擇執(zhí)行過程中的數(shù)據(jù)保存在哪里,目前有三個地方,在Flink的角度稱作State Backends:
MemoryStateBackend(內存) FsStateBackend(文件系統(tǒng),一般是HSFS) RocksDBStateBackend(RocksDB數(shù)據(jù)庫)
同樣的,checkpoint信息也是保存在State Backends上

耗子屎
最近在Storm遷移Flink的時候遇到個問題,我來簡單描述一下背景。
我們從各個數(shù)據(jù)源從清洗出數(shù)據(jù),借助Flink清洗,組裝成一個寬模型,最后交由kylin做近實時數(shù)據(jù)統(tǒng)計和展示,供運營實時查看。

遷移的過程中,發(fā)現(xiàn)訂單的topic消費延遲了好久,初步懷疑是因為訂單上游的并發(fā)度不夠所影響的,所以調整了兩端的并行度重新發(fā)布一把。
發(fā)布的過程中,系統(tǒng)起來以后,再去看topic 消費延遲的監(jiān)控,就懵逼了。什么?怎么這么久了???絲毫沒有降下去的意思。
這時候只能找組內的大神去尋求幫忙了,他排查一番后表示:這checkpoint一直沒做上,都堵住了,重新發(fā)布的時候只會在上一次checkpoint開始,由于checkpoint長時間沒完成掉,所以重新發(fā)布數(shù)據(jù)量會很大。這沒啥好辦法了,只能在這個堵住的環(huán)節(jié)下扔掉吧,估計是業(yè)務邏輯出了問題。
畫外音:接收到訂單的數(shù)據(jù),會去溯源點擊,判斷該訂單從哪個業(yè)務來,經(jīng)過了哪些的業(yè)務,最終是哪塊業(yè)務致使該訂單成交。

畫外音:外部真正使用時,依賴「訂單結果HBase」數(shù)據(jù)

我們認為點擊的數(shù)據(jù)有可能會比訂單的數(shù)據(jù)處理要慢一會,所以找不到的數(shù)據(jù)會間隔一段時間輪詢,又因為Flink提供State「狀態(tài)」 和checkpoint機制,我們把找不到的數(shù)據(jù)放入ListState按一定的時間輪詢就好了(即便系統(tǒng)由于重啟或其他原因掛了,也不會把數(shù)據(jù)丟了)。
理論上只要沒問題,這套方案是可行的。但現(xiàn)在結果告訴我們:訂單數(shù)據(jù)報來了以后,一小批量數(shù)據(jù)一直在「訂單結果HBase」沒找到數(shù)據(jù),就放置到ListState上,然后來一條數(shù)據(jù)就去遍歷ListState。導致的后果就是:
數(shù)據(jù)消費不過來,形成反壓 checkpoint一直沒成功
當時處理的方式就是把ListState清空掉,暫時丟掉這一部分的數(shù)據(jù),讓數(shù)據(jù)追上進度。
后來排查后發(fā)現(xiàn)是上游在消息報字段上做了「手腳」,解析失敗導致點擊丟失,造成這一連鎖的后果。
排查問題的關鍵是理解Flink的反壓和checkpoint的原理是什么樣的,下面我來講述一下。
反壓
反壓backpressure是流式計算中很常見的問題。它意味著數(shù)據(jù)管道中某個節(jié)點成為瓶頸,處理速率跟不上「上游」發(fā)送數(shù)據(jù)的速率,上游需要進行限速

上面的圖代表了是反壓極簡的狀態(tài),說白了就是:下游處理不過來了,上游得慢點,要堵了!
最令人好奇的是:“下游是怎么通知上游要發(fā)慢點的呢?”
在前面Flink的基礎知識講解,我們可以看到ResultPartition用來發(fā)送數(shù)據(jù),InputGate用來接收數(shù)據(jù)。

而Flink在一個TaskManager內部讀寫數(shù)據(jù)的時候,會有一個BufferPool(緩沖池)供該TaskManager讀寫使用(一個TaskManager共用一個BufferPool),每個讀寫ResultPartition/InputGate都會去申請自己的LocalBuffer

以上圖為例,假設下游處理不過來,那InputGate的LocalBuffer是不是被填滿了?填滿了以后,ResultPartition是不是沒辦法往InputGate發(fā)了?而ResultPartition沒法發(fā)的話,它自己本身的LocalBuffer 也遲早被填滿,那是不是依照這個邏輯,一直到Source就不會拉數(shù)據(jù)了...

這個過程就猶如InputGate/ResultPartition都開了自己的有界阻塞隊列,反正“我”就只能處理這么多,往我這里發(fā),我滿了就堵住唄,形成連鎖反應一直堵到源頭上...
上面是只有一個TaskManager的情況下的反壓,那多個TaskManager呢?(畢竟我們很多時候都是有多個TaskManager在為我們工作的)
我們再看回Flink通信的總體數(shù)據(jù)流向架構圖:

從圖上可以清洗地發(fā)現(xiàn):遠程通信用的Netty,底層是TCP Socket來實現(xiàn)的。
所以,從宏觀的角度看,多個TaskManager只不過多了兩個Buffer(緩沖區(qū))。
按照上面的思路,只要InputGate的LocalBuffer被打滿,Netty Buffer也遲早被打滿,而Socket Buffer同樣遲早也會被打滿(TCP 本身就帶有流量控制),再反饋到ResultPartition上,數(shù)據(jù)又又又發(fā)不出去了...導致整條數(shù)據(jù)鏈路都存在反壓的現(xiàn)象。
現(xiàn)在問題又來了,一個TaskManager的task可是有很多的,它們都共用一個TCP Buffer/Buffer Pool,那只要其中一個task的鏈路存在問題,那不導致整個TaskManager跟著遭殃?

在Flink 1.5版本之前,確實會有這個問題。而在Flink 1.5版本之后則引入了credit機制。
從上面我們看到的Flink所實現(xiàn)的反壓,宏觀上就是直接依賴各個Buffer是否滿了,如果滿了則無法寫入/讀取導致連鎖反應,直至Source端。
而credit機制,實際上可以簡單理解為以「更細粒度」去做流量控制:每次InputGate會告訴ResultPartition自己還有多少的空閑量可以接收,讓ResultPartition看著發(fā)。如果InputGate告訴ResultPartition已經(jīng)沒有空閑量了,那ResultPartition就不發(fā)了。

那實際上是怎么實現(xiàn)的呢?擼源碼!
在擼源碼之前,我們再來看看下面物理執(zhí)行圖:實際上InPutGate下是InputChannel,ResultPartition下是ResultSubpartition(這些在源碼中都有體現(xiàn))。

InputGate(接收端處理反壓)
我們先從接收端看起吧。Flink接收數(shù)據(jù)的方法在org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput
隨后定位到處理反壓的邏輯:
final?BufferOrEvent?bufferOrEvent?=?barrierHandler.getNextNonBlocked();
進去getNextNonBlocked()方法看(選擇的是BarrierBuffer實現(xiàn)):

我們就直接看null的情況,看下從初始化階段開始是怎么搞的,進去getNextBufferOrEvent()
進去方法里面看到兩個比較重要的調用:

requestPartitions();
result?=?currentChannel.getNextBuffer();
先從requestPartitions()看起吧,發(fā)現(xiàn)里邊套了一層(從InputChannel下獲取到subPartition):

于是再進requestSubpartition()(看RemoteInputChannel的實現(xiàn)吧)
在這里看起來就是創(chuàng)建Client端,然后接收上游發(fā)送過來的數(shù)據(jù):

先看看client端的創(chuàng)建姿勢吧,進createPartitionRequestClient()方法看看(我們看Netty的實現(xiàn))。
點了兩層,我們會進到createPartitionRequestClient()方法,看源碼注釋就可以清晰發(fā)現(xiàn),這會創(chuàng)建TCP連接并且創(chuàng)建出Client供我們使用

我們還是看null的情況,于是定位到這里:

進去connect()方法看看:

我們就看看具體生成邏輯的實現(xiàn)吧,所以進到getClientChannelHandlers上
意外發(fā)現(xiàn)源碼還有個通信簡要流程圖給我們看(哈哈哈):

好了,來看看getClientChannelHandlers方法吧,這個方法不長,主要判斷了下要生成的client是否開啟creditBased機制:
public?ChannelHandler[]?getClientChannelHandlers()?{
??NetworkClientHandler?networkClientHandler?=
???creditBasedEnabled???new?CreditBasedPartitionRequestClientHandler()?:
????new?PartitionRequestClientHandler();
??return?new?ChannelHandler[]?{
???messageEncoder,
???new?NettyMessage.NettyMessageDecoder(!creditBasedEnabled),
???networkClientHandler};
?}
于是我們的networkClientHandler實例是CreditBasedPartitionRequestClientHandler
到這里,我們暫且就認為Client端已經(jīng)生成完了,再退回去getNextBufferOrEvent()這個方法,requestPartitions()方法是生成接收數(shù)據(jù)的Client端,具體的實例是CreditBasedPartitionRequestClientHandler

下面我們進getNextBuffer()看看接收數(shù)據(jù)具體是怎么處理的:

拿到數(shù)據(jù)后,就會開始執(zhí)行我們用戶的代碼了調用process方法了(這里我們先不看了)。還是回到反壓的邏輯上,我們好像還沒看到反壓的邏輯在哪里。重點就是receivedBuffers這里,是誰塞進去的呢?
于是我們回看到Client具體的實例CreditBasedPartitionRequestClientHandler,打開方法列表一看,感覺就是ChannelRead()沒錯了:

?@Override
?public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
??try?{
???decodeMsg(msg);
??}?catch?(Throwable?t)?{
???notifyAllChannelsOfErrorAndClose(t);
??}
?}
跟著decodeMsg繼續(xù)往下走吧:

繼續(xù)下到decodeBufferOrEvent()

繼續(xù)下到onBuffer:

所以我們往onSenderBacklog上看看:

最后調用notifyCreditAvailable將Credit往上游發(fā)送:
public?void?notifyCreditAvailable(final?RemoteInputChannel?inputChannel)?{
??ctx.executor().execute(()?->?ctx.pipeline().fireUserEventTriggered(inputChannel));
?}

最后再畫張圖來理解一把(關鍵鏈路):

ResultPartition(發(fā)送端處理反壓)
發(fā)送端我們從org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager開始看起

于是我們進去看fromConfiguration()

進去start()去看,隨后進入connectionManager.start()(還是看Netty的實例):

進去看service.init()方法做了什么(又看到熟悉的身影):

好了,我們再進去getServerChannelHandlers()看看吧:

有了上面經(jīng)驗的我們,直接進去看看它的方法,沒錯,又是channnelRead,只是這次是channelRead0。

ok,我們進去addCredit()看看:

reader.addCredit(credit)只是更新了下數(shù)量
public?void?addCredit(int?creditDeltas)?{
??numCreditsAvailable?+=?creditDeltas;
?}
重點我們看下enqueueAvailableReader() 方法,而enqueueAvailableReader()的重點就是判斷Credit是否足夠發(fā)送

isAvailable的實現(xiàn)也很簡單,就是判斷Credit是否大于0且有真實數(shù)據(jù)可發(fā)

而writeAndFlushNextMessageIfPossible實際上就是往下游發(fā)送數(shù)據(jù):

拿數(shù)據(jù)的時候會判斷Credit是否足夠,不足夠拋異常:

再畫張圖來簡單理解一下:

背壓總結
「下游」的處理速度跟不上「上游」的發(fā)送速度,從而降低了處理速度,看似是很美好的(畢竟看起來就是幫助我們限流了)。
但在Flink里,背壓再加上Checkponit機制,很有可能導致State狀態(tài)一直變大,拖慢完成checkpoint速度甚至超時失敗。
當checkpoint處理速度延遲時,會加劇背壓的情況(很可能大多數(shù)時間都在處理checkpoint了)。
當checkpoint做不上時,意味著重啟Flink應用就會從上一次完成checkpoint重新執(zhí)行(...
舉個我真實遇到的例子:
我有一個
Flink任務,我只給了它一臺TaskManager去執(zhí)行任務,在更新DB的時候發(fā)現(xiàn)會有并發(fā)的問題。只有一臺
TaskManager定位問題很簡單,稍微定位了下判斷:我更新DB的Sink 并行度調高了。如果Sink的并行度設置為1,那肯定沒有并發(fā)的問題,但這樣處理起來太慢了。
于是我就在Sink之前根據(jù)
userId進行keyBy(相同的userId都由同一個Thread處理,那這樣就沒并發(fā)的問題了)

看似很美好,但userId存在熱點數(shù)據(jù)的問題,導致下游數(shù)據(jù)處理形成反壓。原本一次checkpoint執(zhí)行只需要30~40ms,反壓后一次checkpoint需要2min+。
checkpoint執(zhí)行間隔相對頻繁(6s/次),執(zhí)行時間2min+,最終導致數(shù)據(jù)一直處理不過來,整條鏈路的消費速度從原來的3000qps到背壓后的300qps,一直堵?。ǔ绦驔]問題,就是處理速度大大下降,影響到數(shù)據(jù)的最終產(chǎn)出)。
最后
本來想著這篇文章把反壓和Checkpoint都一起寫了,但寫著寫著發(fā)現(xiàn)有點長了,那checkpoint開下一篇吧。
相信我,只要你用到Flink,遲早會遇到這種問題的,現(xiàn)在可能有的同學還沒看懂,沒關系,先點個贊?,收藏起來,后面就用得上了。
參考資料:
https://www.cnblogs.com/ljygz/tag/flink/ https://ci.apache.org/projects/flink/flink-docs-release-1.11/

原創(chuàng)電子書原創(chuàng)思維導圖
已經(jīng)有8756個初學者都下載了!
?三歪把【大廠面試知識點】、【簡歷模板】、【原創(chuàng)文章】
全部整理成電子書,共有1263頁!掃碼或微信搜 Java3y
回復「888」領取
![]() |
|





