<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          三歪怎么連Flink的背壓都不懂,太菜了吧?

          共 7216字,需瀏覽 15分鐘

           ·

          2021-11-13 19:27


          最近一直在遷移Flink相關(guān)的工程,期間也踩了些坑,checkpoint反壓是其中的一個(gè)。

          三歪太菜了,Flink都不會(huì),只能我自己來了。

          看三歪只能圖一樂,學(xué)技術(shù)還是得看敖丙

          平時(shí)三歪黑我都沒啥水平,拿點(diǎn)簡單的東西來就說我不會(huì),麻煩專業(yè)點(diǎn)@三歪。

          今天來分享一下 Flinkcheckpoint機(jī)制和背壓原理,我相信通過這篇文章,大家在玩Flink的時(shí)候可以更加深刻地了解Checkpoint是怎么實(shí)現(xiàn)的,并且在設(shè)置相關(guān)參數(shù)以及使用的時(shí)候可以更加地得心應(yīng)手。

          上一篇已經(jīng)寫過Flink的入門教程了,如果還不了解Flink的同學(xué)可以先去看看:《Flink入門教程

          前排提醒,本文基于Flink 1.7

          淺入淺出學(xué)習(xí)Flink的背壓知識(shí)》

          開胃菜

          在講解FlinkcheckPoint背壓機(jī)制之前,我們先來看下checkpoint背壓的相關(guān)基礎(chǔ),有助于后面的理解。

          作為用戶,我們寫好Flink的程序,上管理平臺(tái)提交,Flink就跑起來了(只要程序代碼沒有問題),細(xì)節(jié)對(duì)用戶都是屏蔽的。

          實(shí)際上大致的流程是這樣的:

          1. Flink會(huì)根據(jù)我們所寫代碼,會(huì)生成一個(gè)StreamGraph的圖出來,來代表我們所寫程序的拓?fù)浣Y(jié)構(gòu)。
          2. 然后在提交的之前會(huì)將StreamGraph這個(gè)圖優(yōu)化一把(可以合并的任務(wù)進(jìn)行合并),變成JobGraph
          3. JobGraph提交給JobManager
          4. JobManager收到之后JobGraph之后會(huì)根據(jù)JobGraph生成ExecutionGraphExecutionGraphJobGraph 的并行化版本)
          5. TaskManager接收到任務(wù)之后會(huì)將ExecutionGraph生成為真正的物理執(zhí)行圖

          可以看到物理執(zhí)行圖真正運(yùn)行在TaskManagerTransformSink之間都會(huì)有ResultPartitionInputGate這倆個(gè)組件,ResultPartition用來發(fā)送數(shù)據(jù),而InputGate用來接收數(shù)據(jù)。

          屏蔽掉這些Graph,可以發(fā)現(xiàn)Flink的架構(gòu)是:Client->JobManager->TaskManager

          從名字就可以看出,JobManager是干「管理」,而TaskManager是真正干活的?;氐轿覀兘裉斓闹黝},checkpoint就是由JobManager發(fā)出。

          Flink本身就是有狀態(tài)的,Flink可以讓你選擇執(zhí)行過程中的數(shù)據(jù)保存在哪里,目前有三個(gè)地方,在Flink的角度稱作State Backends

          • MemoryStateBackend(內(nèi)存)
          • FsStateBackend(文件系統(tǒng),一般是HSFS)
          • RocksDBStateBackend(RocksDB數(shù)據(jù)庫)

          同樣的,checkpoint信息也是保存在State Backends

          耗子屎

          最近在Storm遷移Flink的時(shí)候遇到個(gè)問題,我來簡單描述一下背景。

          我們從各個(gè)數(shù)據(jù)源從清洗出數(shù)據(jù),借助Flink清洗,組裝成一個(gè)寬模型,最后交由kylin做近實(shí)時(shí)數(shù)據(jù)統(tǒng)計(jì)和展示,供運(yùn)營實(shí)時(shí)查看。

          遷移的過程中,發(fā)現(xiàn)訂單的topic消費(fèi)延遲了好久,初步懷疑是因?yàn)橛唵紊嫌蔚?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">并發(fā)度不夠所影響的,所以調(diào)整了兩端的并行度重新發(fā)布一把。

          發(fā)布的過程中,系統(tǒng)起來以后,再去看topic 消費(fèi)延遲的監(jiān)控,就懵逼了。什么?怎么這么久了???絲毫沒有降下去的意思。

          這時(shí)候只能找組內(nèi)的大神去尋求幫忙了,他排查一番后表示:這checkpoint一直沒做上,都堵住了,重新發(fā)布的時(shí)候只會(huì)在上一次checkpoint開始,由于checkpoint長時(shí)間沒完成掉,所以重新發(fā)布數(shù)據(jù)量會(huì)很大。這沒啥好辦法了,只能在這個(gè)堵住的環(huán)節(jié)下扔掉吧,估計(jì)是業(yè)務(wù)邏輯出了問題。

          畫外音:接收到訂單的數(shù)據(jù),會(huì)去溯源點(diǎn)擊,判斷該訂單從哪個(gè)業(yè)務(wù)來,經(jīng)過了哪些的業(yè)務(wù),最終是哪塊業(yè)務(wù)致使該訂單成交。

          畫外音:外部真正使用時(shí),依賴「訂單結(jié)果HBase」數(shù)據(jù)

          我們認(rèn)為點(diǎn)擊的數(shù)據(jù)有可能會(huì)比訂單的數(shù)據(jù)處理要慢一會(huì),所以找不到的數(shù)據(jù)會(huì)間隔一段時(shí)間輪詢,又因?yàn)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">Flink提供State「狀態(tài)」 和checkpoint機(jī)制,我們把找不到的數(shù)據(jù)放入ListState按一定的時(shí)間輪詢就好了(即便系統(tǒng)由于重啟或其他原因掛了,也不會(huì)把數(shù)據(jù)丟了)。

          理論上只要沒問題,這套方案是可行的。但現(xiàn)在結(jié)果告訴我們:訂單數(shù)據(jù)報(bào)來了以后,一小批量數(shù)據(jù)一直在「訂單結(jié)果HBase」沒找到數(shù)據(jù),就放置到ListState上,然后來一條數(shù)據(jù)就去遍歷ListState。導(dǎo)致的后果就是:

          • 數(shù)據(jù)消費(fèi)不過來,形成反壓
          • checkpoint一直沒成功

          當(dāng)時(shí)處理的方式就是把ListState清空掉,暫時(shí)丟掉這一部分的數(shù)據(jù),讓數(shù)據(jù)追上進(jìn)度。

          后來排查后發(fā)現(xiàn)是上游在消息報(bào)字段上做了「手腳」,解析失敗導(dǎo)致點(diǎn)擊丟失,造成這一連鎖的后果。

          排查問題的關(guān)鍵是理解Flink反壓checkpoint的原理是什么樣的,下面我來講述一下。

          反壓

          反壓backpressure是流式計(jì)算中很常見的問題。它意味著數(shù)據(jù)管道中某個(gè)節(jié)點(diǎn)成為瓶頸,處理速率跟不上「上游」發(fā)送數(shù)據(jù)的速率,上游需要進(jìn)行限速

          上面的圖代表了是反壓極簡的狀態(tài),說白了就是:下游處理不過來了,上游得慢點(diǎn),要堵了!

          最令人好奇的是:“下游是怎么通知上游要發(fā)慢點(diǎn)的呢?

          在前面Flink的基礎(chǔ)知識(shí)講解,我們可以看到ResultPartition用來發(fā)送數(shù)據(jù),InputGate用來接收數(shù)據(jù)。

          Flink在一個(gè)TaskManager內(nèi)部讀寫數(shù)據(jù)的時(shí)候,會(huì)有一個(gè)BufferPool(緩沖池)供該TaskManager讀寫使用(一個(gè)TaskManager共用一個(gè)BufferPool),每個(gè)讀寫ResultPartition/InputGate都會(huì)去申請自己的LocalBuffer

          以上圖為例,假設(shè)下游處理不過來,那InputGateLocalBuffer是不是被填滿了?填滿了以后,ResultPartition是不是沒辦法往InputGate發(fā)了?而ResultPartition沒法發(fā)的話,它自己本身的LocalBuffer 也遲早被填滿,那是不是依照這個(gè)邏輯,一直到Source就不會(huì)拉數(shù)據(jù)了...

          這個(gè)過程就猶如InputGate/ResultPartition都開了自己的有界阻塞隊(duì)列,反正“我”就只能處理這么多,往我這里發(fā),我滿了就堵住唄,形成連鎖反應(yīng)一直堵到源頭上...

          上面是只有一個(gè)TaskManager的情況下的反壓,那多個(gè)TaskManager呢?(畢竟我們很多時(shí)候都是有多個(gè)TaskManager在為我們工作的)

          我們再看回Flink通信的總體數(shù)據(jù)流向架構(gòu)圖:

          從圖上可以清洗地發(fā)現(xiàn):遠(yuǎn)程通信用的Netty,底層是TCP Socket來實(shí)現(xiàn)的。

          所以,從宏觀的角度看,多個(gè)TaskManager只不過多了兩個(gè)Buffer(緩沖區(qū))。

          按照上面的思路,只要InputGateLocalBuffer被打滿,Netty Buffer也遲早被打滿,而Socket Buffer同樣遲早也會(huì)被打滿(TCP 本身就帶有流量控制),再反饋到ResultPartition上,數(shù)據(jù)又又又發(fā)不出去了...導(dǎo)致整條數(shù)據(jù)鏈路都存在反壓的現(xiàn)象。

          現(xiàn)在問題又來了,一個(gè)TaskManagertask可是有很多的,它們都共用一個(gè)TCP Buffer/Buffer Pool,那只要其中一個(gè)task的鏈路存在問題,那不導(dǎo)致整個(gè)TaskManager跟著遭殃?

          Flink 1.5版本之前,確實(shí)會(huì)有這個(gè)問題。而在Flink 1.5版本之后則引入了credit機(jī)制。

          從上面我們看到的Flink所實(shí)現(xiàn)的反壓,宏觀上就是直接依賴各個(gè)Buffer是否滿了,如果滿了則無法寫入/讀取導(dǎo)致連鎖反應(yīng),直至Source端。

          credit機(jī)制,實(shí)際上可以簡單理解為以「更細(xì)粒度」去做流量控制:每次InputGate會(huì)告訴ResultPartition自己還有多少的空閑量可以接收,讓ResultPartition看著發(fā)。如果InputGate告訴ResultPartition已經(jīng)沒有空閑量了,那ResultPartition就不發(fā)了。

          那實(shí)際上是怎么實(shí)現(xiàn)的呢?擼源碼!

          在擼源碼之前,我們再來看看下面物理執(zhí)行圖:實(shí)際上InPutGate下是InputChannel,ResultPartition下是ResultSubpartition(這些在源碼中都有體現(xiàn))。

          InputGate(接收端處理反壓)

          我們先從接收端看起吧。Flink接收數(shù)據(jù)的方法在org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput

          隨后定位到處理反壓的邏輯:

          final?BufferOrEvent?bufferOrEvent?=?barrierHandler.getNextNonBlocked();

          進(jìn)去getNextNonBlocked()方法看(選擇的是BarrierBuffer實(shí)現(xiàn)):

          我們就直接看null的情況,看下從初始化階段開始是怎么搞的,進(jìn)去getNextBufferOrEvent()

          進(jìn)去方法里面看到兩個(gè)比較重要的調(diào)用:

          requestPartitions();

          result?=?currentChannel.getNextBuffer();

          先從requestPartitions()看起吧,發(fā)現(xiàn)里邊套了一層(從InputChannel下獲取到subPartition):

          于是再進(jìn)requestSubpartition()(看RemoteInputChannel的實(shí)現(xiàn)吧)

          在這里看起來就是創(chuàng)建Client端,然后接收上游發(fā)送過來的數(shù)據(jù):

          先看看client端的創(chuàng)建姿勢吧,進(jìn)createPartitionRequestClient()方法看看(我們看Netty的實(shí)現(xiàn))。

          點(diǎn)了兩層,我們會(huì)進(jìn)到createPartitionRequestClient()方法,看源碼注釋就可以清晰發(fā)現(xiàn),這會(huì)創(chuàng)建TCP連接并且創(chuàng)建出Client供我們使用

          我們還是看null的情況,于是定位到這里:

          進(jìn)去connect()方法看看:

          我們就看看具體生成邏輯的實(shí)現(xiàn)吧,所以進(jìn)到getClientChannelHandlers

          意外發(fā)現(xiàn)源碼還有個(gè)通信簡要流程圖給我們看(哈哈哈):

          好了,來看看getClientChannelHandlers方法吧,這個(gè)方法不長,主要判斷了下要生成的client是否開啟creditBased機(jī)制:

          public?ChannelHandler[]?getClientChannelHandlers()?{
          ??NetworkClientHandler?networkClientHandler?=
          ???creditBasedEnabled???new?CreditBasedPartitionRequestClientHandler()?:
          ????new?PartitionRequestClientHandler();
          ??return?new?ChannelHandler[]?{
          ???messageEncoder,
          ???new?NettyMessage.NettyMessageDecoder(!creditBasedEnabled),
          ???networkClientHandler};
          ?}

          于是我們的networkClientHandler實(shí)例是CreditBasedPartitionRequestClientHandler

          到這里,我們暫且就認(rèn)為Client端已經(jīng)生成完了,再退回去getNextBufferOrEvent()這個(gè)方法,requestPartitions()方法是生成接收數(shù)據(jù)的Client端,具體的實(shí)例是CreditBasedPartitionRequestClientHandler

          下面我們進(jìn)getNextBuffer()看看接收數(shù)據(jù)具體是怎么處理的:

          拿到數(shù)據(jù)后,就會(huì)開始執(zhí)行我們用戶的代碼了調(diào)用process方法了(這里我們先不看了)。還是回到反壓的邏輯上,我們好像還沒看到反壓的邏輯在哪里。重點(diǎn)就是receivedBuffers這里,是誰塞進(jìn)去的呢?

          于是我們回看到Client具體的實(shí)例CreditBasedPartitionRequestClientHandler,打開方法列表一看,感覺就是ChannelRead()沒錯(cuò)了:

          ?@Override
          ?public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
          ??try?{
          ???decodeMsg(msg);
          ??}?catch?(Throwable?t)?{
          ???notifyAllChannelsOfErrorAndClose(t);
          ??}
          ?}

          跟著decodeMsg繼續(xù)往下走吧:

          繼續(xù)下到decodeBufferOrEvent()

          繼續(xù)下到onBuffer

          所以我們往onSenderBacklog上看看:

          最后調(diào)用notifyCreditAvailableCredit往上游發(fā)送:

          public?void?notifyCreditAvailable(final?RemoteInputChannel?inputChannel)?{
          ??ctx.executor().execute(()?->?ctx.pipeline().fireUserEventTriggered(inputChannel));
          ?}

          最后再畫張圖來理解一把(關(guān)鍵鏈路):

          ResultPartition(發(fā)送端處理反壓)

          發(fā)送端我們從org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager開始看起

          于是我們進(jìn)去看fromConfiguration()

          進(jìn)去start()去看,隨后進(jìn)入connectionManager.start()(還是看Netty的實(shí)例):


          進(jìn)去看service.init()方法做了什么(又看到熟悉的身影):

          好了,我們再進(jìn)去getServerChannelHandlers()看看吧:

          有了上面經(jīng)驗(yàn)的我們,直接進(jìn)去看看它的方法,沒錯(cuò),又是channnelRead,只是這次是channelRead0。

          ok,我們進(jìn)去addCredit()看看:

          reader.addCredit(credit)只是更新了下數(shù)量

          public?void?addCredit(int?creditDeltas)?{
          ??numCreditsAvailable?+=?creditDeltas;
          ?}

          重點(diǎn)我們看下enqueueAvailableReader() 方法,而enqueueAvailableReader()的重點(diǎn)就是判斷Credit是否足夠發(fā)送

          isAvailable的實(shí)現(xiàn)也很簡單,就是判斷Credit是否大于0且有真實(shí)數(shù)據(jù)可發(fā)

          writeAndFlushNextMessageIfPossible實(shí)際上就是往下游發(fā)送數(shù)據(jù):

          拿數(shù)據(jù)的時(shí)候會(huì)判斷Credit是否足夠,不足夠拋異常:

          再畫張圖來簡單理解一下:

          image-20201206161741462

          背壓總結(jié)

          「下游」的處理速度跟不上「上游」的發(fā)送速度,從而降低了處理速度,看似是很美好的(畢竟看起來就是幫助我們限流了)。

          但在Flink里,背壓再加上Checkponit機(jī)制,很有可能導(dǎo)致State狀態(tài)一直變大,拖慢完成checkpoint速度甚至超時(shí)失敗。

          當(dāng)checkpoint處理速度延遲時(shí),會(huì)加劇背壓的情況(很可能大多數(shù)時(shí)間都在處理checkpoint了)。

          當(dāng)checkpoint做不上時(shí),意味著重啟Flink應(yīng)用就會(huì)從上一次完成checkpoint重新執(zhí)行(...

          舉個(gè)我真實(shí)遇到的例子:

          我有一個(gè)Flink任務(wù),我只給了它一臺(tái)TaskManager去執(zhí)行任務(wù),在更新DB的時(shí)候發(fā)現(xiàn)會(huì)有并發(fā)的問題。

          只有一臺(tái)TaskManager定位問題很簡單,稍微定位了下判斷:我更新DB的Sink 并行度調(diào)高了。

          如果Sink的并行度設(shè)置為1,那肯定沒有并發(fā)的問題,但這樣處理起來太慢了。

          于是我就在Sink之前根據(jù)userId進(jìn)行keyBy(相同的userId都由同一個(gè)Thread處理,那這樣就沒并發(fā)的問題了)

          看似很美好,但userId存在熱點(diǎn)數(shù)據(jù)的問題,導(dǎo)致下游數(shù)據(jù)處理形成反壓。原本一次checkpoint執(zhí)行只需要30~40ms反壓后一次checkpoint需要2min+

          checkpoint執(zhí)行間隔相對(duì)頻繁(6s/次),執(zhí)行時(shí)間2min+,最終導(dǎo)致數(shù)據(jù)一直處理不過來,整條鏈路的消費(fèi)速度從原來的3000qps到背壓后的300qps,一直堵住(程序沒問題,就是處理速度大大下降,影響到數(shù)據(jù)的最終產(chǎn)出)。

          最后

          本來想著這篇文章把反壓和Checkpoint都一起寫了,但寫著寫著發(fā)現(xiàn)有點(diǎn)長了,那checkpoint開下一篇吧。

          相信我,只要你用到Flink,遲早會(huì)遇到這種問題的,現(xiàn)在可能有的同學(xué)還沒看懂,沒關(guān)系,先點(diǎn)個(gè)贊??,收藏起來,后面就用得上了。

          參考資料:

          • https://www.cnblogs.com/ljygz/tag/flink/
          • https://ci.apache.org/projects/flink/flink-docs-release-1.11/

          瀏覽 67
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产成人免费做爰视频 | 婷婷久久综合激情综合 | 在线观看国产免费视频 | 日本无码中文字幕 | www.五月花 |