<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          2022年最新版 | Flink經(jīng)典線上問題小盤點(diǎn)

          共 28433字,需瀏覽 57分鐘

           ·

          2022-01-14 12:11

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
          回復(fù)”面試“獲取更多驚喜
          本文已經(jīng)加入「大數(shù)據(jù)成神之路PDF版」中提供下載。
          你可以關(guān)注公眾號(hào),后臺(tái)回復(fù):
          PDF?即可獲取。

          2020年和2021年分別寫了很多篇類似的文章,這篇文章是關(guān)于Flink生產(chǎn)環(huán)境中遇到的各種問題的匯總。

          這個(gè)版本在Flink新版本的基礎(chǔ)上梳理了一個(gè)更加完整的版本。

          新增了一些Flink CDC和大作業(yè)的啟停已經(jīng)數(shù)據(jù)缺失的問題。

          如果你遇到過一些共性的問題,希望對(duì)你有幫助。本文參考了我在查問題中找到的網(wǎng)上的資源和一些博客。

          如何規(guī)劃生產(chǎn)中的集群大???

          第一步是仔細(xì)考慮應(yīng)用程序的運(yùn)維指標(biāo),以達(dá)到所需資源的基線。需要考慮的關(guān)鍵指標(biāo)是:

          • 每秒記錄數(shù)和每條記錄的大小
          • 已有的不同鍵(key)的數(shù)量和每個(gè)鍵對(duì)應(yīng)的狀態(tài)大小
          • 狀態(tài)更新的次數(shù)和狀態(tài)后端的訪問模式

          最后,一個(gè)更實(shí)際的問題是與客戶之間圍繞停機(jī)時(shí)間、延遲和最大吞吐量的服務(wù)級(jí)別協(xié)議(SLA),因?yàn)檫@些直接影響容量規(guī)劃。接下來,根據(jù)預(yù)算,看看有什么可用的資源。例如:

          • 網(wǎng)絡(luò)容量,同時(shí)把使用網(wǎng)絡(luò)的外部服務(wù)也納入考慮,如 Kafka、HDFS 等。
          • 磁盤帶寬,如果您依賴于基于磁盤的狀態(tài)后端,如 RocksDB(并考慮其他磁 盤使用,如 Kafka 或 HDFS)
          • 可用的機(jī)器數(shù)量、CPU 和內(nèi)存

          Flink CheckPoint問題如何排查?

          Flink 的 Checkpoint 包括如下幾個(gè)部分:

          • JM trigger checkpoint
          • Source 收到 trigger checkpoint 的 PRC,自己開始做 snapshot,并往下游發(fā)送 barrier
          • 下游接收 barrier(需要 barrier 都到齊才會(huì)開始做 checkpoint)
          • Task 開始同步階段 snapshot
          • Task 開始異步階段 snapshot
          • Task snapshot 完成,匯報(bào)給 JM

          上面的任何一個(gè)步驟不成功,整個(gè) checkpoint 都會(huì)失敗。

          Checkpoint問題可以分為下面幾個(gè)大類:

          Checkpoint失敗

          假如我們?cè)?Checkpoint 界面看到如下圖所示,下圖中 Checkpoint 10423 失敗了。點(diǎn)擊Checkpoint10423 的詳情,我們可以看到類系下圖所示的表格。

          Checkpoint 失敗大致分為兩種情況:Checkpoint Decline 和 CheckpointExpire。

          Checkpoint Decline發(fā)生時(shí)我們可以在日志匯總發(fā)現(xiàn)類似下面這樣的日志:

          Decline checkpoint 10423 by task 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178. 其中 10423 是 checkpointID,0b60f08bf8984085b59f8d9bc74ce2e1 是 execution id,85d268e6fbc19411185f7e4868a44178 是 job id,我們可以在 jobmanager.log 中查找 execution id,找到被調(diào)度到哪個(gè) taskmanager 上,類似如下所示:

          從上面的日志我們知道該 execution 被調(diào)度到 hostnameABCDE 的 container_e24_1566836790522_8088_04_013155_1 slot 上,接下來我們就可以到 container container_e24_1566836790522_8088_04_013155 的 taskmanager.log 中查找Checkpoint 失敗的具體原因了。

          Checkpoint Expire

          如果 Checkpoint 做的非常慢,超過了 timeout 還沒有完成,則整個(gè) Checkpoint 也會(huì)失敗。當(dāng)一個(gè) Checkpoint 由于超時(shí)而失敗是,會(huì)在 jobmanager.log 中看到如下的日志:

          Checkpoint?1?of?job?85d268e6fbc19411185f7e4868a44178?expired?before?completing.

          表示 Chekpoint 1 由于超時(shí)而失敗,這個(gè)時(shí)候可以可以看這個(gè)日志后面是否有類似下面的日志:

          Received?late?message?for?now?expired?checkpoint?attempt?1?from?0b60f08bf8984085b59f8d9bc74ce2e1?of?job?85d268e6fbc19411185f7e4868a44178.

          我們就可以按照上面的方法找到對(duì)應(yīng)的 taskmanager.log 查看具體信息。

          Checkpoint 慢

          Checkpoint 慢的情況如下:比如 Checkpoint interval 1 分鐘,超時(shí) 10 分鐘,Checkpoint 經(jīng)常需要做 9 分鐘(我們希望 1 分鐘左右就能夠做完),而且我們預(yù)期 state size 不是非常大。對(duì)于 Checkpoint 慢的情況,我們可以按照下面的順序逐一檢查。

          • Source Trigger Checkpoint 慢
          • 使用增量 Checkpoint
          • 作業(yè)存在反壓或者數(shù)據(jù)傾斜
          • Barrier 對(duì)齊慢
          • 主線程太忙,導(dǎo)致沒機(jī)會(huì)做 snapshot
          • 同步階段做的慢
          • 異步階段做的慢

          反壓?jiǎn)栴}如何排查?

          反壓(backpressure)是實(shí)時(shí)計(jì)算應(yīng)用開發(fā)中,特別是流式計(jì)算中,十分常見的問題。反壓意味著數(shù)據(jù)管道中某個(gè)節(jié)點(diǎn)成為瓶頸,處理速率跟不上上游發(fā)送數(shù)據(jù)的速率,而需要對(duì)上游進(jìn)行限速。由于實(shí)時(shí)計(jì)算應(yīng)用通常使用消息隊(duì)列來進(jìn)行生產(chǎn)端和消費(fèi)端的解耦,消費(fèi)端數(shù)據(jù)源是 pull-based 的,所以反壓通常是從某個(gè)節(jié)點(diǎn)傳導(dǎo)至數(shù)據(jù)源并降低數(shù)據(jù)源(比如 Kafka consumer)的攝入速率。

          要解決反壓首先要做的是定位到造成反壓的節(jié)點(diǎn),這主要有兩種辦法 :

          1. 通過 Flink Web UI 自帶的反壓監(jiān)控面板
          2. 通過 Flink Task Metrics

          Flink Web UI 的反壓監(jiān)控提供了 SubTask 級(jí)別的反壓監(jiān)控,原理是通過周期性對(duì) Task 線程的棧信息采樣,得到線程被阻塞在請(qǐng)求 Buffer(意味著被下游隊(duì)列阻塞)的頻率來判斷該節(jié)點(diǎn)是否處于反壓狀態(tài)。默認(rèn)配置下,這個(gè)頻率在 0.1 以下則為OK,0.1 至 0.5 為 LOW,而超過 0.5 則為 HIGH。

          如果處于反壓狀態(tài),那么有兩種可能性:

          1. 該節(jié)點(diǎn)的發(fā)送速率跟不上它的產(chǎn)生數(shù)據(jù)速率。這一般會(huì)發(fā)生在一條輸入多條 輸出的 Operator(比如 flatmap)。

          2. 下游的節(jié)點(diǎn)接受速率較慢,通過反壓機(jī)制限制了該節(jié)點(diǎn)的發(fā)送速率。

          如果是第一種狀況,那么該節(jié)點(diǎn)則為反壓的根源節(jié)點(diǎn),它是從 Source Task 到Sink Task 的第一個(gè)出現(xiàn)反壓的節(jié)點(diǎn)。如果是第二種情況,則需要繼續(xù)排查下游節(jié)點(diǎn)。

          值得注意的是,反壓的根源節(jié)點(diǎn)并不一定會(huì)在反壓面板體現(xiàn)出高反壓,因?yàn)榉磯好姘灞O(jiān)控的是發(fā)送端,如果某個(gè)節(jié)點(diǎn)是性能瓶頸并不會(huì)導(dǎo)致它本身出現(xiàn)高反壓,而是導(dǎo)致它的上游出現(xiàn)高反壓。總體來看,如果我們找到第一個(gè)出現(xiàn)反壓的節(jié)點(diǎn),那么反壓根源要么是就這個(gè)節(jié)點(diǎn),要么是它緊接著的下游節(jié)點(diǎn)。

          此外,F(xiàn)link 提供的 Task Metrics 是更好的反壓監(jiān)控手段,我們?cè)诒O(jiān)控反壓時(shí)會(huì)用到的 Metrics 主要和 Channel 接受端的 Buffer 使用率有關(guān),最為有用的是以下幾個(gè) Metrics:

          分析反壓的大致思路是:如果一個(gè) Subtask 的發(fā)送端 Buffer 占用率很高,則表明它被下游反壓限速了;如果一個(gè) Subtask 的接受端 Buffer 占用很高,則表明它將反壓傳導(dǎo)至上游。反壓情況可以根據(jù)以下表格進(jìn)行對(duì)號(hào)入座:

          outPoolUsage 和 inPoolUsage 同為低或同為高分別表明當(dāng)前 Subtask 正?;蛱幱诒幌掠畏磯海@應(yīng)該沒有太多疑問。而比較有趣的是當(dāng) outPoolUsage 和 inPoolUsage 表現(xiàn)不同時(shí),這可能是出于反壓傳導(dǎo)的中間狀態(tài)或者表明該 Subtask就是反壓的根源。

          如果一個(gè) Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影響,所以可以排查它本身是反壓根源的可能性。如果一個(gè) Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,則表明它有可能是反壓的根源。因?yàn)橥ǔ7磯簳?huì)傳導(dǎo)至其上游,導(dǎo)致上游某些 Subtask 的 outPoolUsage 為高,我們可以根據(jù)這點(diǎn)來進(jìn)一步判斷。值得注意的是,反壓有時(shí)是短暫的且影響不大,比如來自某個(gè) Channel 的短暫網(wǎng)絡(luò)延遲或者 TaskManager 的正常 GC,這種情況下我們可以不用處理。

          在實(shí)踐中,很多情況下的反壓是由于數(shù)據(jù)傾斜造成的,這點(diǎn)我們可以通過 Web UI 各個(gè) SubTask 的 Records Sent 和 Record Received 來確認(rèn),另外 Checkpoint detail 里不同 SubTask 的 State size 也是一個(gè)分析數(shù)據(jù)傾斜的有用指標(biāo)。

          此外,最常見的問題可能是用戶代碼的執(zhí)行效率問題(頻繁被阻塞或者性能問題)。最有用的辦法就是對(duì) TaskManager 進(jìn)行 CPU profile,從中我們可以分析到Task Thread 是否跑滿一個(gè) CPU 核:如果是的話要分析 CPU 主要花費(fèi)在哪些函數(shù)里面,比如我們生產(chǎn)環(huán)境中就偶爾遇到卡在 Regex 的用戶函數(shù)(ReDoS);如果不是的話要看 Task Thread 阻塞在哪里,可能是用戶函數(shù)本身有些同步的調(diào)用,可能是checkpoint 或者 GC 等系統(tǒng)活動(dòng)導(dǎo)致的暫時(shí)系統(tǒng)暫停。

          另外 TaskManager 的內(nèi)存以及 GC 問題也可能會(huì)導(dǎo)致反壓,包括 TaskManager JVM 各區(qū)內(nèi)存不合理導(dǎo)致的頻繁 Full GC 甚至失聯(lián)。推薦可以通過給TaskManager 啟用 G1 垃圾回收器來優(yōu)化 GC,并加上 -XX:+PrintGCDetails 來打印 GC 日志的方式來觀察 GC 的問題。

          客戶端常見問題

          應(yīng)用提交控制臺(tái)異常信息:Could not build the program from JAR file.

          這個(gè)問題的迷惑性較大,很多時(shí)候并非指定運(yùn)行的 JAR 文件問題,而是提交過程中發(fā)生了異常,需要根據(jù)日志信息進(jìn)一步排查。最常見原因是未將依賴的 Hadoop JAR 文件加到 CLASSPATH,找不到依賴類(例如:ClassNotFoundException:org.apache.hadoop.yarn.exceptions.YarnException)導(dǎo)致加載客戶端入口類(FlinkYarnSessionCli)失敗。

          用戶應(yīng)用和框架 JAR 包版本沖突問題

          該問題通常會(huì)拋出 NoSuchMethodError/ClassNotFoundException/IncompatibleClassChangeError 等異常,要解決此類問題:

          1. 首先需要根據(jù)異常類定位依賴庫,然后可以在項(xiàng)目中執(zhí)行 mvn dependency:tree 以樹形結(jié)構(gòu)展示全部依賴鏈,再從中定位沖突的依賴庫,也可以增加參數(shù) -Dincludes 指定要顯示的包,格式為 [groupId]:[artifactId]:[-type]:[version],支持匹配,多個(gè)用逗號(hào)分隔,例如:mvn dependency:tree-Dincludes=power,javaassist。
          2. 定位沖突包后就要考慮如何排包,簡(jiǎn)單的方案是用 exclusion 來排除掉其從他依賴項(xiàng)目中傳遞過來的依賴,不過有的應(yīng)用場(chǎng)景需要多版本共存,不同組件依賴不同版本,就要考慮用 Maven Shade 插件來解決,詳情請(qǐng)參考Maven Shade Plugin。

          Flink 應(yīng)用資源分配問題排查思路

          如果 Flink 應(yīng)用不能正常啟動(dòng)達(dá)到 RUNNING 狀態(tài),可以按以下步驟進(jìn)行排查:

          步驟1. 需要先檢查應(yīng)用當(dāng)前狀態(tài),根據(jù)上述對(duì)啟動(dòng)流程的說明,我們知道:

          • 處于 NEW_SAVING 狀態(tài)時(shí)正在進(jìn)行應(yīng)用信息持久化,如果持續(xù)處于這個(gè)狀態(tài)我們需要檢查 RM 狀態(tài)存儲(chǔ)服務(wù)(通常是 ZooKeeper 集群)是否正常;
          • 如果處于 SUBMITTED 狀態(tài),可能是 RM 內(nèi)部發(fā)生一些 hold 讀寫鎖的耗時(shí)操作導(dǎo)致事件堆積,需要根據(jù) YARN 集群日志進(jìn)一步定位.
          • 如果處于 ACCEPTED 狀態(tài),需要先檢查 AM 是否正常,跳轉(zhuǎn)到步驟 2;
          • 如果已經(jīng)是 RUNNING 狀態(tài),但是資源沒有全部拿到導(dǎo)致 JOB 無法正常運(yùn)行,跳轉(zhuǎn)到步驟 3;

          步驟2. 檢查 AM 是否正常,可以從 YARN 應(yīng)用展示界面(http:// /cluster/app/)或YARN 應(yīng)用 REST API(http:///ws/v1/cluster/apps/)查看 diagnostics 信 息,根據(jù)關(guān)鍵字信息明確問題原因與解決方案:

          • Queue’s AM resource limit exceeded. 原因是達(dá)到了隊(duì)列 AM 可用資源上限,即隊(duì)列的 AM 已使用資源和 AM 新申請(qǐng)資源之和超出了隊(duì)列的AM 資源上限,可以適當(dāng)調(diào)整隊(duì)列 AM 可用資源百分比的配置項(xiàng):yarn.scheduler.capacity..maximum-am-resource-percent。
          • User’s AM resource limit exceeded. 原因是達(dá)到了應(yīng)用所屬用戶在該隊(duì)列的 AM 可用資源上限,即應(yīng)用所屬用戶在該隊(duì)列的 AM 已使用資源和 AM新申請(qǐng)資源之和超出了應(yīng)用所屬用戶在該隊(duì)列的 AM 資源上限,可以適當(dāng)提高用戶可用 AM 資源比例來解決該問題,相關(guān)配置項(xiàng):yarn.scheduler.capacity..user-limit-factor 與 yarn.scheduler.capacity..minimum-user-limit-percent。
          • AM container is launched, waiting for AM container to Register with RM. 大致原因是 AM 已啟動(dòng),但內(nèi)部初始化未完成,可能有 ZK 連接超時(shí)等問題,具體原因需排查 AM 日志,根據(jù)具體問題來解決。
          • Application is Activated, waiting for resources to be assigned for AM.該信息表示應(yīng)用 AM 檢查已經(jīng)通過,正在等待調(diào)度器分配,此時(shí)需要進(jìn)行調(diào)度器層面的資源檢查,跳轉(zhuǎn)到步驟 4。

          步驟3. 確認(rèn)應(yīng)用確實(shí)有 YARN 未能滿足的資源請(qǐng)求:從應(yīng)用列表頁點(diǎn)擊問題應(yīng)用ID 進(jìn)入應(yīng)用頁面,再點(diǎn)擊下方列表的應(yīng)用實(shí)例 ID 進(jìn)入應(yīng)用實(shí)例頁面,看 Total Outstanding Resource Requests 列表中是否有 Pending 資源,如果沒有,說明 YARN 已分配完畢,退出該檢查流程,轉(zhuǎn)去檢查 AM;如果有,說明調(diào)度器未能完成分配,跳轉(zhuǎn)到步驟 4。

          步驟4.調(diào)度器分配問題排查YARN-9050 支持在 WebUI 上或通過 REST API 自動(dòng)診斷應(yīng)用問題,將在 Hadoop3.3.0 發(fā)布,之前的版本仍需進(jìn)行人工排查:

          • 檢查集群或 queue 資源,scheduler 頁面樹狀圖葉子隊(duì)列展開查看資源信息:Effective Max Resource、Used Resources:(1)檢查集群資源或所在隊(duì)列資源或其父隊(duì)列資源是否已用完;(2)檢查葉子隊(duì)列某維度資源是否接近或達(dá)到上限;

          • 檢查是否存在資源碎片:(1)檢查集群 Used 資源和 Reserved 資源之和占總資源的比例,當(dāng)集群資源接近用滿時(shí)(例如 90% 以上),可能存在資源碎片的情況,應(yīng)用的分配速度就會(huì)受影響變慢,因?yàn)榇蟛糠謾C(jī)器都沒有資源了,機(jī)器可用資源不足會(huì)被 reserve,reserved 資源達(dá)到一定規(guī)模后可能導(dǎo)致大部分機(jī)器資源被鎖定,后續(xù)分配可能就會(huì)變慢;(2)檢查 NM 可用資源分布情況,即使集群資源使用率不高,也有可能是因?yàn)楦骶S度資源分布不同造成,例如 1/2 節(jié)點(diǎn)上的內(nèi)存資源接近用滿 CPU 資源剩余較多,1/2 節(jié)點(diǎn)上的 CPU 資源接近用滿內(nèi)存資源剩余較多,申請(qǐng)資源中某一維度資源值配置過大也可能造成無法申請(qǐng)到資源;

          • 檢查是否有高優(yōu)先級(jí)的問題應(yīng)用頻繁申請(qǐng)并立即釋放資源的問題,這種情況會(huì)造成調(diào)度器忙于滿足這一個(gè)應(yīng)用的資源請(qǐng)求而無暇顧及其他應(yīng)用;

          • 檢查是否存在 Container 啟動(dòng)失敗或剛啟動(dòng)就自動(dòng)退出的情況,可以查看Container 日志 ( 包括 localize 日志、launch 日志等 )、YARN NM 日志或YARN RM 日志進(jìn)行排查。

          TaskManager 啟動(dòng)異常

          org.apache.hadoop.yarn.exceptions.YarnException:?Unauthorized?request?to?start?container.?This?token?is?expired.?current?time?is?...?found?...

          該異常在 Flink AM 向 YARN NM 申請(qǐng)啟動(dòng) token 已超時(shí)的 Container 時(shí)拋出,通常原因是 Flink AM 從 YARN RM 收到這個(gè) Container 很久之后(超過了Container 有效時(shí)間,默認(rèn) 10 分鐘,該 Container 已經(jīng)被釋放)才去啟動(dòng)它,進(jìn)一步原因是 Flink 內(nèi)部在收到 YARN RM 返回的 Container 資源后串行啟動(dòng)。

          當(dāng)待啟動(dòng)的 Container 數(shù)量較多且分布式文件存儲(chǔ)如 HDFS 性能較慢(啟動(dòng)前需上傳 TaskManager配置)時(shí) Container啟動(dòng)請(qǐng)求容易堆積在內(nèi)部,F(xiàn)LINK-13184 對(duì)這個(gè)問題進(jìn)行了優(yōu)化,一是在啟動(dòng)前增加了有效性檢查,避免了無意義的配置上傳流程,二是進(jìn)行了異步多線程優(yōu)化,加快啟動(dòng)速度。

          PyFlink如何定義UDF

          在 Apache Flink 1.10 中我們有多種方式進(jìn)行 UDF 的定義,比如:

          • Extend ScalarFunction, e.g.:
          class?HashCodeMean(ScalarFunction):
          ?def?eval(self,?i,?j):
          ?return?(hash(i)?+?hash(j))?/?2
          • Lambda Functio
          lambda?i,?j:?(hash(i)?+?hash(j))?/?2
          • Named Function
          def?hash_code_mean(i,?j):
          ?return?(hash(i)?+?hash(j))?/?2
          • Callable Function
          class?CallableHashCodeMean(object):
          ?def?__call__(self,?i,?j):
          ?return?(hash(i)?+?hash(j))?/?2

          現(xiàn)上面定義函數(shù)除了第一個(gè)擴(kuò)展 ScalaFunction 的方式是 PyFlink 特有的,其他方式都是 Python 語言本身就支持的,也就是說,在 Apache Flink 1.10中PyFlink 允許以任何 Python 語言所支持的方式定義 UDF。

          那么定義完 UDF 我們應(yīng)該怎樣使用呢?Apache Flink 1.10 中提供了2種Decorators,如下:

          • Decorators - udf(), e.g. :
          udf(lambda?i,?j:?(hash(i)?+?hash(j))?/?2,
          ?[for?input?types],?[for?result?types])
          Decorators?-?@udf,?e.g.?:
          @udf(input_types=...,?result_type=...)
          ?def?hash_code_mean(…):

          然后在使用之前進(jìn)行注冊(cè),如下:

          st_env.register_function("hash_code",?hash_code_mean)

          接下來就可以在 Table API/SQL 中進(jìn)行使用了,如下:

          my_table.select("hash_code_mean(a,?b)").insert_into("Results")

          數(shù)據(jù)傾斜導(dǎo)致子任務(wù)積壓

          業(yè)務(wù)背景

          一個(gè)流程中,有兩個(gè)重要子任務(wù):一是數(shù)據(jù)遷移,將kafka實(shí)時(shí)數(shù)據(jù)落Es,二是將kafka數(shù)據(jù)做窗口聚合落hbase,兩個(gè)子任務(wù)接的是同一個(gè)Topic GroupId。上游 Topic 的 tps 高峰達(dá)到5-6w。

          問題描述

          給 24個(gè) TaskManager(CPU) 都會(huì)出現(xiàn)來不及消費(fèi)的情況

          問題原因

          做窗口聚合的任務(wù)的分組字段,分組粒度太小,hash不能打散,數(shù)據(jù)傾斜嚴(yán)重,導(dǎo)致少數(shù) TaskManager 上壓力過大,從而影響落Es的效率,導(dǎo)致背壓。

          解決方式

          將兩個(gè)任務(wù)獨(dú)立開來,作為不同的流程。

          結(jié)果

          修改之前 24個(gè) TaskManager(CPU) 來不及消費(fèi),改完之后 20 個(gè) CPU 可完成任務(wù)。Kafka實(shí)時(shí)數(shù)據(jù)落Es的16個(gè)TaskManager,將kafka數(shù)據(jù)做窗口聚合落hbase的4個(gè)TaskManager。

          另:

          同樣的數(shù)據(jù)、同樣的Tps作為數(shù)據(jù)輸入,Hbase的輸出能力遠(yuǎn)超過Es,考慮實(shí)時(shí)任務(wù)落數(shù)據(jù)進(jìn)Es要慎重。

          Flink任務(wù)落Es時(shí)要考慮設(shè)置微批落數(shù)據(jù),設(shè)置 bulk.flush.max.actions 和 bulk.flush.interval.ms至合適值,否則影響吞吐量。

          Kafka 消息大小默認(rèn)配置太小,導(dǎo)致數(shù)據(jù)未處理

          業(yè)務(wù)背景

          正常的Flink任務(wù)消費(fèi) Topic 數(shù)據(jù),但是Topic中的數(shù)據(jù)為 XML 以及 JSON,單條數(shù)據(jù)較大

          問題描述

          Flink各項(xiàng)metrics指標(biāo)正常,但是沒處理到數(shù)據(jù)

          問題原因

          Topic中單條數(shù)據(jù) > 1M,超過 Kafka Consumer 處理單條數(shù)據(jù)的默認(rèn)最大值。

          解決方式

          有三種可選方式:擴(kuò)大kafka consumer 單條數(shù)據(jù)的數(shù)據(jù)大?。篺etch.message.max.bytes。對(duì)消息進(jìn)行壓縮:上游 kafka producer 設(shè)置 compression.codec 和 commpressed.topics。業(yè)務(wù)上對(duì)數(shù)據(jù)切片,在上游 kafka producer 端將數(shù)據(jù)切片為 10K,使用分區(qū)主鍵確保同一條數(shù)據(jù)發(fā)送到同一Partition,consumer對(duì)消息重組。

          結(jié)果

          方式一:按業(yè)務(wù)要求擴(kuò)大 Kafka Consumer 可處理的單條數(shù)據(jù)字節(jié)數(shù)即可正常處理業(yè)務(wù) 方式二:Kafka Consumer 需先解碼,再進(jìn)行業(yè)務(wù)處理。方式三:Kafka Consumer 需先重組數(shù)據(jù),再進(jìn)行業(yè)務(wù)處理。

          Tps 很大,Kafka Ack 默認(rèn)配置 拖慢消息處理速度

          業(yè)務(wù)背景

          實(shí)時(shí)任務(wù),上游接流量頁面點(diǎn)擊事件的數(shù)據(jù),下游輸出Kafka,輸出tps很大。流量數(shù)據(jù)不重要,可接受丟失的情況

          問題描述

          CPU資源耗費(fèi)較多的情況下,才能正常消費(fèi),考慮如果縮減資源。

          問題原因

          Kafka Producer 默認(rèn) acks=1,即Partition Leader接收到消息而且寫入本地磁盤了,就認(rèn)為成功了

          解決方式

          Kafka Producer 設(shè)置 :props.put(“acks”, “0”); 將 acks=0,即KafkaProducer在客戶端,只要把消息發(fā)送出去,不管那條數(shù)據(jù)有沒有在哪怕Partition Leader上落到磁盤,直接就認(rèn)為這個(gè)消息發(fā)送成功了。

          結(jié)果

          資源降低三分之一。

          The assigned slot container_e08_1539148828017_15937_01_003564_0 was removed.

          org.apache.flink.util.FlinkException: The assigned slot container_e08_1539148828017_15937_01_003564_0 was removed. at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372) at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:803) at org.apache.flink.yarn.YarnResourceManager.lambda0(YarnResourceManager.java:340) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)

          程序內(nèi)存占用過大,導(dǎo)致TaskManager在yarn上kill了,分析原因應(yīng)該是資源不夠,可以將程序放在資源更大的集群上,再不行就設(shè)置減少Slot中共享的task的個(gè)數(shù),也可能是內(nèi)存泄露或內(nèi)存資源配置不合理造成,需要進(jìn)行合理分配。

          The heartbeat of TaskManager with id container ....... timed out

          此錯(cuò)誤是container心跳超時(shí),出現(xiàn)此種錯(cuò)誤一般有兩種可能:

          1、分布式物理機(jī)網(wǎng)絡(luò)失聯(lián),這種原因一般情況下failover后作業(yè)能正?;謴?fù),如果出現(xiàn)的不頻繁可以不用關(guān)注;2、failover的節(jié)點(diǎn)對(duì)應(yīng)TM的內(nèi)存設(shè)置太小,GC嚴(yán)重導(dǎo)致心跳超時(shí),建議調(diào)大對(duì)應(yīng)節(jié)點(diǎn)的內(nèi)存值。

          Caused by: java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@flink88:15265/user/taskmanager_0#6643546564]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation".

          在flink-conf.yaml中添加或修改:akka.ask.timeout: 100s web.timeout: 100000

          Checkpoint:Checkpoint expired before completing

          checkpointConf.setCheckpointTimeout(5000L)這個(gè)值設(shè)置過小,默認(rèn)是10min,需要進(jìn)行調(diào)大測(cè)試。

          Kafka partition leader切換導(dǎo)致Flink重啟

          Flink重啟,查看日志,顯示:

          java.lang.Exception:?Failed?to?send?data?to?Kafka:?This?server?is?not?the?leader?for?that?topic-partition.
          ??at?org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
          ??at?org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:280)
          ??at?org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
          ??at?org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
          ??at?org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
          ??at?org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
          ??at?org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
          ??at?java.lang.Thread.run(Thread.java:748)
          Caused?by:?org.apache.kafka.common.errors.NotLeaderForPartitionException:?This?server?is?not?the?leader?for?that?topic-partition.

          查看Kafka的Controller日志,顯示:

          ?INFO?[SessionExpirationListener?on?10],?ZK?expired;?shut?down?all?controller?components?and?
          try?to?re-elect?(kafka.controller.KafkaController$SessionExpirationListener)

          關(guān)于producer參數(shù)設(shè)置,設(shè)置retries參數(shù),可以在Kafka的Partition發(fā)生leader切換時(shí),F(xiàn)link不重啟,而是做3次嘗試:

          kafkaProducerConfig
          ??{
          ????????"bootstrap.servers":?"192.169.2.20:9093,192.169.2.21:9093,192.169.2.22:9093"
          ????????"retries":3
          ??}

          注意 mapWithState & TTL 的重要性

          在處理包含無限多鍵的數(shù)據(jù)時(shí),要考慮到 keyed 狀態(tài)保留策略(通過 TTL 定時(shí)器來在給定的時(shí)間之后清理未使用的數(shù)據(jù))是很重要的。術(shù)語『無限』在這里有點(diǎn)誤導(dǎo),因?yàn)槿绻阋幚淼?key 以 128 位編碼,則 key 的最大數(shù)量將會(huì)有個(gè)限制(等于 2 的 128 次方)。但這是一個(gè)巨大的數(shù)字!你可能無法在狀態(tài)中存儲(chǔ)那么多值,所以最好考慮你的鍵空間是無界的,同時(shí)新鍵會(huì)隨著時(shí)間不斷出現(xiàn)。

          如果你的 keyed 狀態(tài)包含在某個(gè) Flink 的默認(rèn)窗口中,則將是安全的:即使未使用 ?TTL,在處理窗口的元素時(shí)也會(huì)注冊(cè)一個(gè)清除計(jì)時(shí)器,該計(jì)時(shí)器將調(diào)用 clearAllState 函數(shù),并刪除與該窗口關(guān)聯(lián)的狀態(tài)及其元數(shù)據(jù)。

          如果要使用 Keyed State Descriptor 來管理狀態(tài),可以很方便地添加 TTL 配置,以確保在狀態(tài)中的鍵數(shù)量不會(huì)無限制地增加。

          但是,你可能會(huì)想使用更簡(jiǎn)便的 mapWithState 方法,該方法可讓你訪問 valueState 并隱藏操作的復(fù)雜性。雖然這對(duì)于測(cè)試和少量鍵的數(shù)據(jù)來說是很好的選擇,但如果在生產(chǎn)環(huán)境中遇到無限多鍵值時(shí),會(huì)引發(fā)問題。由于狀態(tài)是對(duì)你隱藏的,因此你無法設(shè)置 TTL,并且默認(rèn)情況下未配置任何 TTL。這就是為什么值得考慮做一些額外工作的原因,如聲明諸如 RichMapFunction 之類的東西,這將使你能更好的控制狀態(tài)的生命周期。

          部署和資源問題

          (0) JDK版本過低

          這不是個(gè)顯式錯(cuò)誤,但是JDK版本過低很有可能會(huì)導(dǎo)致Flink作業(yè)出現(xiàn)各種莫名其妙的問題,因此在生產(chǎn)環(huán)境中建議采用JDK 8的較高update(我們使用的是181)。

          (1) Could not build the program from JAR file

          該信息不甚準(zhǔn)確,因?yàn)榻^大多數(shù)情況下都不是JAR包本身有毛病,而是在作業(yè)提交過程中出現(xiàn)異常退出了。因此需要查看本次提交產(chǎn)生的客戶端日志(默認(rèn)位于$FLINK_HOME/logs目錄下),再根據(jù)其中的信息定位并解決問題。

          (2) ClassNotFoundException/NoSuchMethodError/IncompatibleClassChangeError/...

          一般都是因?yàn)橛脩粢蕾嚨谌桨陌姹九cFlink框架依賴的版本有沖突導(dǎo)致。

          (3) Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

          就是字面意思,YARN集群內(nèi)沒有足夠的資源啟動(dòng)Flink作業(yè)。檢查一下當(dāng)前YARN集群的狀態(tài)、正在運(yùn)行的YARN App以及Flink作業(yè)所處的隊(duì)列,釋放一些資源或者加入新的資源。

          (4) java.util.concurrent.TimeoutException: Slot allocation request timed out

          slot分配請(qǐng)求超時(shí),是因?yàn)門askManager申請(qǐng)資源時(shí)無法正常獲得,按照上一條的思路檢查即可。

          (5) org.apache.flink.util.FlinkException: The assigned slot < container_id> was removed

          TaskManager的Container因?yàn)槭褂觅Y源超限被kill掉了。首先需要保證每個(gè)slot分配到的內(nèi)存量足夠,特殊情況下可以手動(dòng)配置SlotSharingGroup來減少單個(gè)slot中共享Task的數(shù)量。如果資源沒問題,那么多半就是程序內(nèi)部發(fā)生了內(nèi)存泄露。建議仔細(xì)查看TaskManager日志,并按處理JVM OOM問題的常規(guī)操作來排查。

          (6) java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id < tm_id>timed out

          TaskManager心跳超時(shí)。有可能是TaskManager已經(jīng)失敗,如果沒有失敗,那么有可能是因?yàn)榫W(wǎng)絡(luò)不好導(dǎo)致JobManager沒能收到心跳信號(hào),或者TaskManager忙于GC,無法發(fā)送心跳信號(hào)。JobManager會(huì)重啟心跳超時(shí)的TaskManager,如果頻繁出現(xiàn)此異常,應(yīng)該通過日志進(jìn)一步定位問題所在。

          在Flink中,資源的隔離是通過Slot進(jìn)行的,也就是說多個(gè)Slot會(huì)運(yùn)行在同一個(gè)JVM中,這種隔離很弱,尤其對(duì)于生產(chǎn)環(huán)境。Flink App上線之前要在一個(gè)單獨(dú)的Flink集群上進(jìn)行測(cè)試,否則一個(gè)不穩(wěn)定、存在問題的Flink App上線,很可能影響整個(gè)Flink集群上的App。

          (7)資源不足導(dǎo)致 container 被 kill

          The assigned slot container_container編號(hào) was removed.Flink App 拋出此類異常,通過查看日志,一般就是某一個(gè) Flink App 內(nèi)存占用大,導(dǎo)致 TaskManager(在 Yarn 上就是 Container )被Kill 掉。

          但是并不是所有的情況都是這個(gè)原因,還需要進(jìn)一步看 yarn 的日志( 查看 yarn 任務(wù)日志:yarn logs -applicationId ?-appOwner),如果代碼寫的沒問題,就確實(shí)是資源不夠了,其實(shí) 1G Slot 跑多個(gè)Task( Slot Group Share )其實(shí)挺容易出現(xiàn)的。

          因此有兩種選擇,可以根據(jù)具體情況,權(quán)衡選擇一個(gè)。

          將該 Flink App 調(diào)度在 Per Slot 內(nèi)存更大的集群上。通過 slotSharingGroup("xxx") ,減少 Slot 中共享 Task 的個(gè)數(shù)

          (8)啟動(dòng)報(bào)錯(cuò),提示找不到 jersey 的類

          java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties 解決辦法進(jìn)入 yarn中 把 lib 目中的一下兩個(gè)問價(jià)拷貝到flink的lib中 hadoop/share/hadoop/yarn/lib/jersey-client-1.9.jar /hadoop/share/hadoop/yarn/lib/jersey-core-1.9.jar

          (9)Scala版本沖突

          java.lang.NoSuchMethodError:scala.collection.immutable.HashSet$.empty()Lscala/collection/

          解決辦法,添加: import org.apache.flink.api.scala._

          (10)沒有使用回撤流報(bào)錯(cuò)

          Table?is?not?an?append一only?table.?Use?the?toRetractStream()?in?order?to?handle?add?and?retract?messages.

          這個(gè)是因?yàn)閯?dòng)態(tài)表不是 append-only 模式的,需要用 toRetractStream (回撤流) 處理就好了.

          tableEnv.toRetractStream[Person](result).print()

          (11)OOM 問題解決思路

          java.lang.OutOfMemoryError:?GC?overhead?limit?exceeded
          java.lang.OutOfMemoryError:?GC?overhead?limit?exceeded
          ????????at?java.util.Arrays.copyOfRange(Arrays.java:3664)
          ????????at?java.lang.String.(String.java:207)
          ????????at?com.esotericsoftware.kryo.io.Input.readString(Input.java:466)
          ????????at?com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:177)
          ......
          ????????at?org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)

          解決方案:

          檢查 slot 槽位夠不夠或者 slot 分配的數(shù)量有沒有生效。程序起的并行是否都正常分配了(會(huì)有這樣的情況出現(xiàn),假如 5 個(gè)并行,但是只有 2 個(gè)在幾點(diǎn)上生效了,另外 3 個(gè)沒有數(shù)據(jù)流動(dòng))。檢查flink程序有沒有數(shù)據(jù)傾斜,可以通過 flink 的 ui 界面查看每個(gè)分區(qū)子節(jié)點(diǎn)處理的數(shù)據(jù)量。

          (12)解析返回值類型失敗報(bào)錯(cuò)

          The?return?type?of?function?could?not?be?determined?automatically
          Exception?in?thread?"main"?org.apache.flink.api.common.functions.InvalidTypesException:?The?return?type?of?function?'main(RemoteEnvironmentTest.java:27)'?could?not?be?determined?automatically,?due?to?type?erasure.?You?can?give?type?information?hints?by?using?the?returns(...)?method?on?the?result?of?the?transformation?call,?or?by?letting?your?function?implement?the?'ResultTypeQueryable'?interface.
          ?at?org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
          ?at?org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
          ?at?org.apache.flink.api.java.DataSet.print(DataSet.java:1652)

          解決方案:產(chǎn)生這種現(xiàn)象的原因一般是使用 lambda 表達(dá)式?jīng)]有明確返回值類型,或者使用特使的數(shù)據(jù)結(jié)構(gòu) flink 無法解析其類型,這時(shí)候我們需要在方法的后面添加返回值類型,比如字符串。

          input.flatMap((Integer?number,?Collector?out)?->?{
          ?......
          })
          //?提供返回值類型
          .returns(Types.STRING)

          (13)Hadoop jar 包沖突

          Caused?by:?java.io.IOException:?The?given?file?system?URI?(hdfs:///data/checkpoint-data/abtest)?did?not?describe?the?authority?(like?for?example?HDFS?NameNode?address/port?or?S3?host).?The?attempt?to?use?a?configured?default?authority?failed:?Hadoop?configuration?did?not?contain?an?entry?for?the?default?file?system?('fs.defaultFS').
          ????????at?org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:135)
          ????????at?org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
          ????????at?org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
          ????????at?org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)

          解決:pom 文件中去掉和 hadoop 相關(guān)的依賴就好了

          作業(yè)問題

          (1)org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

          該異常幾乎都是由于程序業(yè)務(wù)邏輯有誤,或者數(shù)據(jù)流里存在未處理好的臟數(shù)據(jù)導(dǎo)致的,繼續(xù)向下追溯異常棧一般就可以看到具體的出錯(cuò)原因,比較常見的如POJO內(nèi)有空字段,或者抽取事件時(shí)間的時(shí)間戳為null等。

          (2) java.lang.IllegalStateException: Buffer pool is destroyed || Memory manager has been shut down

          很多童鞋拿著這兩條異常信息來求助,但實(shí)際上它們只是表示BufferPool、MemoryManager這些Flink運(yùn)行時(shí)組件被銷毀,亦即作業(yè)已經(jīng)失敗。具體的原因多種多樣,根據(jù)經(jīng)驗(yàn),一般是上一條描述的情況居多(即Could not forward element to next operator錯(cuò)誤會(huì)伴隨出現(xiàn)),其次是JDK版本問題。具體情況還是要根據(jù)TaskManager日志具體分析。

          (3) akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://...]] after [10000 ms]

          Akka超時(shí)導(dǎo)致,一般有兩種原因:一是集群負(fù)載比較大或者網(wǎng)絡(luò)比較擁塞,二是業(yè)務(wù)邏輯同步調(diào)用耗時(shí)的外部服務(wù)。如果負(fù)載或網(wǎng)絡(luò)問題無法徹底緩解,需考慮調(diào)大akka.ask.timeout參數(shù)的值(默認(rèn)只有10秒);另外,調(diào)用外部服務(wù)時(shí)盡量異步操作(Async I/O)。

          (4) java.io.IOException: Too many open files

          這個(gè)異常我們應(yīng)該都不陌生,首先檢查系統(tǒng)ulimit -n的文件描述符限制,再注意檢查程序內(nèi)是否有資源(如各種連接池的連接)未及時(shí)釋放。值得注意的是,F(xiàn)link使用RocksDB狀態(tài)后端也有可能會(huì)拋出這個(gè)異常,此時(shí)需修改flink-conf.yaml中的state.backend.rocksdb.files.open參數(shù),如果不限制,可以改為-1。

          (5) org.apache.flink.api.common.function.InvalidTypesException: The generic type parameters of '< class>' are missing

          在Flink內(nèi)使用Java Lambda表達(dá)式時(shí),由于類型擦除造成的副作用,注意調(diào)用returns()方法指定被擦除的類型。

          (6)Checkpoint失?。篊heckpoint expired before completing

          原因是因?yàn)閏heckpointConf.setCheckpointTimeout(8000L)。設(shè)置的太小了,默認(rèn)是10min,這里只設(shè)置了8sec。當(dāng)一個(gè)Flink App背壓的時(shí)候(例如由外部組件異常引起),Barrier會(huì)流動(dòng)的非常緩慢,導(dǎo)致Checkpoint時(shí)長(zhǎng)飆升。

          檢查點(diǎn)和狀態(tài)問題

          (1) Received checkpoint barrier for checkpoint < cp_id> before completing current checkpoint < cp_id>. Skipping current checkpoint

          在當(dāng)前檢查點(diǎn)還未做完時(shí),收到了更新的檢查點(diǎn)的barrier,表示當(dāng)前檢查點(diǎn)不再需要而被取消掉,一般不需要特殊處理。

          (2) Checkpoint < cp_id> expired before completing

          首先應(yīng)檢查CheckpointConfig.setCheckpointTimeout()方法設(shè)定的檢查點(diǎn)超時(shí),如果設(shè)的太短,適當(dāng)改長(zhǎng)一點(diǎn)。另外就是考慮發(fā)生了反壓或數(shù)據(jù)傾斜,或者barrier對(duì)齊太慢。

          (3) org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible

          我們知道Flink的狀態(tài)是按key組織并保存的,如果程序邏輯內(nèi)改了keyBy()邏輯或者key的序列化邏輯,就會(huì)導(dǎo)致檢查點(diǎn)/保存點(diǎn)的數(shù)據(jù)無法正確恢復(fù)。所以如果必須要改key相關(guān)的東西,就棄用之前的狀態(tài)數(shù)據(jù)吧。

          (4) org.apache.flink.util.StateMigrationException: The new serializer for a MapState requires state migration in order for the job to proceed. However, migration for MapState currently isn't supported

          在1.9之前的Flink版本中,如果我們使用RocksDB狀態(tài)后端,并且更改了自用MapState的schema,恢復(fù)作業(yè)時(shí)會(huì)拋出此異常,表示不支持更改schema。這個(gè)問題已經(jīng)在FLINK-11947解決,升級(jí)版本即可。

          (5)時(shí)鐘不同步導(dǎo)致無法啟動(dòng)

          啟動(dòng)Flink任務(wù)的時(shí)候報(bào)錯(cuò) Caused by: java.lang.RuntimeException: Couldn't deploy Yarn cluster。

          然后仔細(xì)看發(fā)現(xiàn):system times on machines may be out of sync。

          意思說是機(jī)器上的系統(tǒng)時(shí)間可能不同步。同步集群機(jī)器時(shí)間即可。

          不同的kafka版本依賴沖突

          不同的kafka版本依賴沖突會(huì)造成cdc報(bào)錯(cuò),參考這個(gè)issue:

          http://apache-flink.147419.n8.nabble.com/cdc-td8357.html#a8393

          2020-11-04?16:39:10.972?[Source:?Custom?Source?->?Sink:?Print?to?Std.?Out?(1/1)]?WARN??org.apache.flink.runtime.taskmanager.Task??-?Source:?Custom?Source?->?Sink:?Print?to?Std.?Out?(1/1)?(7c3ccf7686ccfb33254e8cb785cd339d)?switched?from?RUNNING?to?FAILED.
          java.lang.AbstractMethodError:?org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
          at?org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
          at?org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
          at?io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:583)
          at?io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:80)
          at?io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
          at?io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
          at?io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
          at?com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
          at?org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
          at?org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
          at?org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

          源碼如下:

          public?class?CdcTest?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????SourceFunction?sourceFunction?=?MySQLSource.builder()
          ????????????????.hostname("localhost")
          ????????????????.port(3306)
          ????????????????.databaseList("sohay")?//?monitor?all?tables?under?inventory?database
          ????????????????.username("root")
          ????????????????.password("123456")
          ????????????????.deserializer(new?StringDebeziumDeserializationSchema())?//?converts?SourceRecord?to?String
          ????????????????.build();

          ????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();

          ????????env.addSource(sourceFunction).print().setParallelism(1);?//?use?parallelism?1?for?sink?to?keep?message?ordering

          ????????env.execute();
          ????}
          }

          確實(shí)是pom中存在一個(gè)Kafka的依賴包,導(dǎo)致沖突,去掉后問題解決。

          MySQL CDC源等待超時(shí)

          在掃描表期間,由于沒有可恢復(fù)的位置,因此無法執(zhí)行checkpoints。為了不執(zhí)行檢查點(diǎn),MySQL CDC源將保持檢查點(diǎn)等待超時(shí)。超時(shí)檢查點(diǎn)將被識(shí)別為失敗的檢查點(diǎn),默認(rèn)情況下,這將觸發(fā)Flink作業(yè)的故障轉(zhuǎn)移。因此,如果數(shù)據(jù)庫表很大,則建議添加以下Flink配置,以避免由于超時(shí)檢查點(diǎn)而導(dǎo)致故障轉(zhuǎn)移:

          execution.checkpointing.interval:?10min
          execution.checkpointing.tolerable-failed-checkpoints:?100
          restart-strategy:?fixed-delay
          restart-strategy.fixed-delay.attempts:?2147483647

          數(shù)據(jù)庫切換,重新開啟binlog,Mysql全局鎖無法釋放

          原因是因?yàn)榍袚Q了數(shù)據(jù)庫環(huán)境,重新開啟binlog,所有的作業(yè)都重新同步binlog的全量數(shù)據(jù),導(dǎo)致了全局鎖一直在等待,所有作業(yè)都無法執(zhí)行。解決方法:記錄checkpoint的地址,取消作業(yè),然后根據(jù)checkpoint重啟作業(yè)。

          使用Flink SQL CDC模式創(chuàng)建維表異常

          CREATE?TABLE?cdc_test
          (
          ????id??STRING,
          ????ip??STRING,
          ????url?STRING,
          ????PRIMARY?KEY?(id)?NOT?ENFORCED
          )?WITH?(
          ????'connector'?=?'mysql-cdc',?
          ????'hostname'?=?'127.0.0.1',
          ????'port'?=?'3306',
          ????'database-name'?=?'xx',
          ????'table-name'?=?'xx',
          ????'username'?=?'xx',
          ????'password'?=?'xx'
          );

          執(zhí)行查詢:

          SELECT?*?FROM?cdc_test;

          任務(wù)無法運(yùn)行,拋出異常

          User?does?not?have?the?'LOCK?TABLES'?privilege?required?to?obtain?a?consistent?snapshot?by?preventing?concurrent?writes?to?tables.

          原因是連接MySQL的用戶缺乏必要的CDC權(quán)限。

          Flink SQL CDC基于Debezium實(shí)現(xiàn)。當(dāng)啟動(dòng)MySQL CDC源時(shí),它將獲取一個(gè)全局讀取鎖(FLUSH TABLES WITH READ LOCK),該鎖將阻止其他數(shù)據(jù)庫的寫入,然后讀取當(dāng)前binlog位置以及數(shù)據(jù)庫和表的schema,之后將釋放全局讀取鎖。然后它掃描數(shù)據(jù)庫表并從先前記錄的位置讀取binlog,F(xiàn)link將定期執(zhí)行checkpoints以記錄binlog位置。如果發(fā)生故障,作業(yè)將重新啟動(dòng)并從checkpoint完成的binlog位置恢復(fù),因此它保證了僅一次的語義。

          解決辦法:創(chuàng)建一個(gè)新的MySQL用戶并授予其必要的權(quán)限。

          mysql>?CREATE?USER?'user'@'localhost'?IDENTIFIED?BY?'password';
          mysql>?GRANT?SELECT,?RELOAD,?SHOW?DATABASES,?REPLICATION?SLAVE,?REPLICATION?CLIENT?ON?*.*?TO?'user'?IDENTIFIED?BY?'password';
          mysql>?FLUSH?PRIVILEGES;

          Flink作業(yè)掃描MySQL全量數(shù)據(jù)出現(xiàn)fail-over

          Flink 作業(yè)在掃描 MySQL 全量數(shù)據(jù)時(shí),checkpoint 超時(shí),出現(xiàn)作業(yè) failover,如下圖:

          原因:Flink CDC 在 scan 全表數(shù)據(jù)(我們的實(shí)收表有千萬級(jí)數(shù)據(jù))需要小時(shí)級(jí)的時(shí)間(受下游聚合反壓影響),而在 scan 全表過程中是沒有 offset 可以記錄的(意味著沒法做 checkpoint),但是 Flink 框架任何時(shí)候都會(huì)按照固定間隔時(shí)間做 checkpoint,所以此處 mysql-cdc source 做了比較取巧的方式,即在 scan 全表的過程中,會(huì)讓執(zhí)行中的 checkpoint 一直等待甚至超時(shí)。超時(shí)的 checkpoint 會(huì)被仍未認(rèn)為是 failed checkpoint,默認(rèn)配置下,這會(huì)觸發(fā) Flink 的 failover 機(jī)制,而默認(rèn)的 failover 機(jī)制是不重啟。所以會(huì)造成上面的現(xiàn)象。

          解決辦法:在 flink-conf.yaml 配置 failed checkpoint 容忍次數(shù),以及失敗重啟策略,如下:

          execution.checkpointing.interval:?10min???#?checkpoint間隔時(shí)間
          execution.checkpointing.tolerable-failed-checkpoints:?100??#?checkpoint?失敗容忍次數(shù)
          restart-strategy:?fixed-delay??#?重試策略
          restart-strategy.fixed-delay.attempts:?2147483647???#?重試次數(shù)

          作業(yè)在運(yùn)行時(shí) mysql cdc source 報(bào) no viable alternative at input 'alter table std'

          原因:因?yàn)閿?shù)據(jù)庫中別的表做了字段修改,CDC source 同步到了 ALTER DDL 語句,但是解析失敗拋出的異常。

          解決方法:在 flink-cdc-connectors 最新版本中已經(jīng)修復(fù)該問題(跳過了無法解析的 DDL)。升級(jí) connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替換 flink/lib 下的舊包。

          多個(gè)作業(yè)共用同一張 source table 時(shí),沒有修改 server id 導(dǎo)致讀取出來的數(shù)據(jù)有丟失。

          原因:MySQL binlog 數(shù)據(jù)同步的原理是,CDC source 會(huì)偽裝成 MySQL 集群的一個(gè) slave(使用指定的 server id 作為唯一 id),然后從 MySQL 拉取 binlog 數(shù)據(jù)。如果一個(gè) MySQL 集群中有多個(gè) slave 有同樣的 id,就會(huì)導(dǎo)致拉取數(shù)據(jù)錯(cuò)亂的問題。

          解決方法:默認(rèn)會(huì)隨機(jī)生成一個(gè) server id,容易有碰撞的風(fēng)險(xiǎn)。所以建議使用動(dòng)態(tài)參數(shù)(table hint)在 query 中覆蓋 server id。如下所示:

          FROM?bill_info?/*+?OPTIONS('server-id'='123456')?*/?;

          CDC source 掃描 MySQL 表期間,發(fā)現(xiàn)無法往該表 insert 數(shù)據(jù)

          原因:由于使用的 MySQL 用戶未授權(quán) RELOAD 權(quán)限,導(dǎo)致無法獲取全局讀鎖(FLUSH TABLES WITH READ LOCK), CDC source 就會(huì)退化成表級(jí)讀鎖,而使用表級(jí)讀鎖需要等到全表 scan 完,才能釋放鎖,所以會(huì)發(fā)現(xiàn)持鎖時(shí)間過長(zhǎng)的現(xiàn)象,影響其他業(yè)務(wù)寫入數(shù)據(jù)。

          解決方法:給使用的 MySQL 用戶授予 RELOAD 權(quán)限即可。所需的權(quán)限列表詳見文檔:

          https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server

          如果出于某些原因無法授予 RELOAD 權(quán)限,也可以顯式配上 'debezium.snapshot.locking.mode' = 'none'來避免所有鎖的獲取,但要注意只有當(dāng)快照期間表的 schema 不會(huì)變更才安全。

          下面這些問題來自騰訊云大數(shù)據(jù)官方博客,供大家參考。

          Flink 作業(yè)自動(dòng)停止

          現(xiàn)象:本應(yīng)長(zhǎng)期運(yùn)行的作業(yè),突然停止運(yùn)行,且再也不恢復(fù)。

          如果 Flink 作業(yè)在編程時(shí),源算子實(shí)現(xiàn)不當(dāng),則可能造成源算子處理完數(shù)據(jù)以后進(jìn)入 FINISHED 狀態(tài)。如果所有源算子都進(jìn)入了 FINISHED 狀態(tài),那整個(gè) Flink 作業(yè)也會(huì)跟著結(jié)束。

          Flink 作業(yè)默認(rèn)的容錯(cuò)次數(shù)是 2,即發(fā)生兩次崩潰后,作業(yè)就自動(dòng)退出了,不再進(jìn)行重試。當(dāng)出現(xiàn)此種場(chǎng)景時(shí),TaskManager 的日志中會(huì)有“restart strategy prevented it”字樣。我們首先要找到作業(yè)崩潰的原因,其次可以適當(dāng)調(diào)大 RestartStrategy 中容錯(cuò)的最大次數(shù),畢竟節(jié)點(diǎn)異常等外部風(fēng)險(xiǎn)始終存在,作業(yè)不會(huì)在理想的環(huán)境中運(yùn)行。

          此外,舊版Flink(低于 1.11.0)的 RocksDB 內(nèi)存使用不受管控,造成很容易由于超量使用而被外界(YARN、Kubernetes 等)KILL 掉。如果經(jīng)常受此困擾,可以考慮升級(jí) Flink 版本到最新,其默認(rèn)開啟自動(dòng)內(nèi)存管理功能。

          Flink作業(yè)頻繁重啟

          現(xiàn)象:作業(yè)頻繁重啟又自行恢復(fù),陷入無盡循環(huán),無法正常處理數(shù)據(jù)。

          作業(yè)頻繁重啟的成因非常多,例如異常數(shù)據(jù)造成的作業(yè)崩潰,可以在 TaskManager 的日志中找到報(bào)錯(cuò)。數(shù)據(jù)源或者數(shù)據(jù)目的等上下游系統(tǒng)超時(shí)也會(huì)造成作業(yè)無法啟動(dòng)而一直在重啟。此外 TaskManager Full GC 太久造成心跳包超時(shí)而被 JobManager 踢掉也是常見的作業(yè)重啟原因。如果系統(tǒng)內(nèi)存嚴(yán)重匱乏,那么 Linux 自帶的 OOM Killer 也可能把 TaskManager 所在的 JVM 進(jìn)程 kill 了。

          當(dāng)一個(gè)正常運(yùn)行的作業(yè)失敗時(shí),日志里會(huì)有 from RUNNING to FAILED 的關(guān)鍵字,我們以此為著手點(diǎn),查看它后面的 Exception 原因,通常最下面的 caused by 即是直接原因。當(dāng)然,直接原因不一定等于根本原因,后者需要借助下文提到的多項(xiàng)技術(shù)進(jìn)行分析。

          如果 JVM 的內(nèi)存容量超出了平臺(tái)方(例如 YARN 或 Kubernetes 等)的容器限制,則可能被 KILL。問題的確認(rèn)方式也是查看作業(yè)日志以及平臺(tái)組件的運(yùn)行狀態(tài)。值得一提的是,在最新的 Flink 版本中,只要設(shè)置 taskmanager.memory.process.size 參數(shù),基本可以保證內(nèi)存用量不會(huì)超過該值(前提是用戶沒有使用 JNI 等方式申請(qǐng) native 內(nèi)存)。

          作業(yè)的崩潰重啟還有一些原因,例如使用了不成熟的第三方 so 庫,或者連接數(shù)過多等,都可以從日志中找到端倪。

          作業(yè)輸出整體穩(wěn)定,但是個(gè)別數(shù)據(jù)缺失

          現(xiàn)象:作業(yè)輸出整體穩(wěn)定,但是個(gè)別數(shù)據(jù)缺失,造成結(jié)果的精度下降,甚至結(jié)果完全錯(cuò)亂。

          當(dāng)遇到懷疑數(shù)據(jù)缺失造成的計(jì)算結(jié)果不正確時(shí),首先需要檢查作業(yè)邏輯是否不小心過濾了一些正常數(shù)據(jù)。檢查方法可以在本地運(yùn)行一個(gè) Mini Cluster,也可以在遠(yuǎn)端的調(diào)試環(huán)境進(jìn)行遠(yuǎn)程調(diào)試或者采樣等。具體技巧后文也會(huì)提到。

          另外還有一種情況是,如果用戶定義了批量存取的算子(通常用于與外部系統(tǒng)進(jìn)行交互),則有可能出現(xiàn)一批數(shù)據(jù)中有一條異常數(shù)據(jù),導(dǎo)致整批次都失敗而被丟棄的情況。

          對(duì)于數(shù)據(jù)源 Source 和數(shù)據(jù)目的Sink,請(qǐng)務(wù)必保證 Flink 作業(yè)運(yùn)行期間不要對(duì)其進(jìn)行任何改動(dòng)(例如新增 Kafka 分區(qū)、調(diào)整 MySQL 表結(jié)構(gòu)等),否則可能造成正在運(yùn)行的作業(yè)無法感知新增的分區(qū)或者讀寫失敗。盡管 Flink 可以開啟 Kafka 分區(qū)自動(dòng)發(fā)現(xiàn)機(jī)制(在 Configuration 里設(shè)置 flink.partition-discovery.interval-millis 值),但分區(qū)發(fā)現(xiàn)仍然需要一定時(shí)間,數(shù)據(jù)的精度可能會(huì)稍有影響。



          《大數(shù)據(jù)成神之路》正在全面PDF化。
          你只需要關(guān)注并在后臺(tái)回復(fù)「PDF」就可以看到阿里云盤下載鏈接了!
          另外我把發(fā)表過的文章按照體系全部整理好了?,F(xiàn)在你可以在后臺(tái)方便的進(jìn)行查找:
          電子版把他們分類做成了下面這個(gè)樣子,并且放在了阿里云盤提供下載。
          我們點(diǎn)開一個(gè)文件夾后:
          如果這個(gè)文章對(duì)你有幫助,不要忘記?「在看」?「點(diǎn)贊」?「收藏」?三連啊喂!
          Hi,我是王知無,一個(gè)大數(shù)據(jù)領(lǐng)域的原創(chuàng)作者。?
          放心關(guān)注我,獲取更多行業(yè)的一手消息。


          2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專家級(jí)技能模型與學(xué)習(xí)指南(勝天半子篇)
          互聯(lián)網(wǎng)最壞的時(shí)代可能真的來了
          我在B站讀大學(xué),大數(shù)據(jù)專業(yè)
          我們?cè)趯W(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么?
          193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下
          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點(diǎn)
          我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么?
          在所有Spark模塊中,我愿稱SparkSQL為最強(qiáng)!
          硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
          數(shù)據(jù)治理方法論和實(shí)踐小百科全書
          標(biāo)簽體系下的用戶畫像建設(shè)小指南
          4萬字長(zhǎng)文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
          【面試&個(gè)人成長(zhǎng)】2021年過半,社招和校招的經(jīng)驗(yàn)之談
          大數(shù)據(jù)方向另一個(gè)十年開啟 |《硬剛系列》第一版完結(jié)
          我寫過的關(guān)于成長(zhǎng)/面試/職場(chǎng)進(jìn)階的文章
          當(dāng)我們?cè)趯W(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
          瀏覽 323
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美怡红院视频一区二区三区 | 黄片免费视频 | 亚洲无吗免费在线观看 | 影音先锋噜噜资源 | 久久午夜无码人妻精品蜜桃冫 |