2022年最新版 | Flink經(jīng)典線上問題小盤點(diǎn)

本文已經(jīng)加入「大數(shù)據(jù)成神之路PDF版」中提供下載。
你可以關(guān)注公眾號(hào),后臺(tái)回復(fù):「PDF」?即可獲取。
2020年和2021年分別寫了很多篇類似的文章,這篇文章是關(guān)于Flink生產(chǎn)環(huán)境中遇到的各種問題的匯總。
這個(gè)版本在Flink新版本的基礎(chǔ)上梳理了一個(gè)更加完整的版本。
新增了一些Flink CDC和大作業(yè)的啟停已經(jīng)數(shù)據(jù)缺失的問題。
如果你遇到過一些共性的問題,希望對(duì)你有幫助。本文參考了我在查問題中找到的網(wǎng)上的資源和一些博客。
如何規(guī)劃生產(chǎn)中的集群大???
第一步是仔細(xì)考慮應(yīng)用程序的運(yùn)維指標(biāo),以達(dá)到所需資源的基線。需要考慮的關(guān)鍵指標(biāo)是:
每秒記錄數(shù)和每條記錄的大小 已有的不同鍵(key)的數(shù)量和每個(gè)鍵對(duì)應(yīng)的狀態(tài)大小 狀態(tài)更新的次數(shù)和狀態(tài)后端的訪問模式
最后,一個(gè)更實(shí)際的問題是與客戶之間圍繞停機(jī)時(shí)間、延遲和最大吞吐量的服務(wù)級(jí)別協(xié)議(SLA),因?yàn)檫@些直接影響容量規(guī)劃。接下來,根據(jù)預(yù)算,看看有什么可用的資源。例如:
網(wǎng)絡(luò)容量,同時(shí)把使用網(wǎng)絡(luò)的外部服務(wù)也納入考慮,如 Kafka、HDFS 等。 磁盤帶寬,如果您依賴于基于磁盤的狀態(tài)后端,如 RocksDB(并考慮其他磁 盤使用,如 Kafka 或 HDFS) 可用的機(jī)器數(shù)量、CPU 和內(nèi)存
Flink CheckPoint問題如何排查?
Flink 的 Checkpoint 包括如下幾個(gè)部分:

JM trigger checkpoint Source 收到 trigger checkpoint 的 PRC,自己開始做 snapshot,并往下游發(fā)送 barrier 下游接收 barrier(需要 barrier 都到齊才會(huì)開始做 checkpoint) Task 開始同步階段 snapshot Task 開始異步階段 snapshot Task snapshot 完成,匯報(bào)給 JM
上面的任何一個(gè)步驟不成功,整個(gè) checkpoint 都會(huì)失敗。
Checkpoint問題可以分為下面幾個(gè)大類:
Checkpoint失敗
假如我們?cè)?Checkpoint 界面看到如下圖所示,下圖中 Checkpoint 10423 失敗了。點(diǎn)擊Checkpoint10423 的詳情,我們可以看到類系下圖所示的表格。


Checkpoint 失敗大致分為兩種情況:Checkpoint Decline 和 CheckpointExpire。
Checkpoint Decline發(fā)生時(shí)我們可以在日志匯總發(fā)現(xiàn)類似下面這樣的日志:
Decline checkpoint 10423 by task 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178. 其中 10423 是 checkpointID,0b60f08bf8984085b59f8d9bc74ce2e1 是 execution id,85d268e6fbc19411185f7e4868a44178 是 job id,我們可以在 jobmanager.log 中查找 execution id,找到被調(diào)度到哪個(gè) taskmanager 上,類似如下所示:

從上面的日志我們知道該 execution 被調(diào)度到 hostnameABCDE 的 container_e24_1566836790522_8088_04_013155_1 slot 上,接下來我們就可以到 container container_e24_1566836790522_8088_04_013155 的 taskmanager.log 中查找Checkpoint 失敗的具體原因了。
Checkpoint Expire
如果 Checkpoint 做的非常慢,超過了 timeout 還沒有完成,則整個(gè) Checkpoint 也會(huì)失敗。當(dāng)一個(gè) Checkpoint 由于超時(shí)而失敗是,會(huì)在 jobmanager.log 中看到如下的日志:
Checkpoint?1?of?job?85d268e6fbc19411185f7e4868a44178?expired?before?completing.
表示 Chekpoint 1 由于超時(shí)而失敗,這個(gè)時(shí)候可以可以看這個(gè)日志后面是否有類似下面的日志:
Received?late?message?for?now?expired?checkpoint?attempt?1?from?0b60f08bf8984085b59f8d9bc74ce2e1?of?job?85d268e6fbc19411185f7e4868a44178.
我們就可以按照上面的方法找到對(duì)應(yīng)的 taskmanager.log 查看具體信息。
Checkpoint 慢
Checkpoint 慢的情況如下:比如 Checkpoint interval 1 分鐘,超時(shí) 10 分鐘,Checkpoint 經(jīng)常需要做 9 分鐘(我們希望 1 分鐘左右就能夠做完),而且我們預(yù)期 state size 不是非常大。對(duì)于 Checkpoint 慢的情況,我們可以按照下面的順序逐一檢查。
Source Trigger Checkpoint 慢 使用增量 Checkpoint 作業(yè)存在反壓或者數(shù)據(jù)傾斜 Barrier 對(duì)齊慢 主線程太忙,導(dǎo)致沒機(jī)會(huì)做 snapshot 同步階段做的慢 異步階段做的慢
反壓?jiǎn)栴}如何排查?
反壓(backpressure)是實(shí)時(shí)計(jì)算應(yīng)用開發(fā)中,特別是流式計(jì)算中,十分常見的問題。反壓意味著數(shù)據(jù)管道中某個(gè)節(jié)點(diǎn)成為瓶頸,處理速率跟不上上游發(fā)送數(shù)據(jù)的速率,而需要對(duì)上游進(jìn)行限速。由于實(shí)時(shí)計(jì)算應(yīng)用通常使用消息隊(duì)列來進(jìn)行生產(chǎn)端和消費(fèi)端的解耦,消費(fèi)端數(shù)據(jù)源是 pull-based 的,所以反壓通常是從某個(gè)節(jié)點(diǎn)傳導(dǎo)至數(shù)據(jù)源并降低數(shù)據(jù)源(比如 Kafka consumer)的攝入速率。
要解決反壓首先要做的是定位到造成反壓的節(jié)點(diǎn),這主要有兩種辦法 :
通過 Flink Web UI 自帶的反壓監(jiān)控面板 通過 Flink Task Metrics
Flink Web UI 的反壓監(jiān)控提供了 SubTask 級(jí)別的反壓監(jiān)控,原理是通過周期性對(duì) Task 線程的棧信息采樣,得到線程被阻塞在請(qǐng)求 Buffer(意味著被下游隊(duì)列阻塞)的頻率來判斷該節(jié)點(diǎn)是否處于反壓狀態(tài)。默認(rèn)配置下,這個(gè)頻率在 0.1 以下則為OK,0.1 至 0.5 為 LOW,而超過 0.5 則為 HIGH。

如果處于反壓狀態(tài),那么有兩種可能性:
該節(jié)點(diǎn)的發(fā)送速率跟不上它的產(chǎn)生數(shù)據(jù)速率。這一般會(huì)發(fā)生在一條輸入多條 輸出的 Operator(比如 flatmap)。
下游的節(jié)點(diǎn)接受速率較慢,通過反壓機(jī)制限制了該節(jié)點(diǎn)的發(fā)送速率。
如果是第一種狀況,那么該節(jié)點(diǎn)則為反壓的根源節(jié)點(diǎn),它是從 Source Task 到Sink Task 的第一個(gè)出現(xiàn)反壓的節(jié)點(diǎn)。如果是第二種情況,則需要繼續(xù)排查下游節(jié)點(diǎn)。
值得注意的是,反壓的根源節(jié)點(diǎn)并不一定會(huì)在反壓面板體現(xiàn)出高反壓,因?yàn)榉磯好姘灞O(jiān)控的是發(fā)送端,如果某個(gè)節(jié)點(diǎn)是性能瓶頸并不會(huì)導(dǎo)致它本身出現(xiàn)高反壓,而是導(dǎo)致它的上游出現(xiàn)高反壓。總體來看,如果我們找到第一個(gè)出現(xiàn)反壓的節(jié)點(diǎn),那么反壓根源要么是就這個(gè)節(jié)點(diǎn),要么是它緊接著的下游節(jié)點(diǎn)。
此外,F(xiàn)link 提供的 Task Metrics 是更好的反壓監(jiān)控手段,我們?cè)诒O(jiān)控反壓時(shí)會(huì)用到的 Metrics 主要和 Channel 接受端的 Buffer 使用率有關(guān),最為有用的是以下幾個(gè) Metrics:

分析反壓的大致思路是:如果一個(gè) Subtask 的發(fā)送端 Buffer 占用率很高,則表明它被下游反壓限速了;如果一個(gè) Subtask 的接受端 Buffer 占用很高,則表明它將反壓傳導(dǎo)至上游。反壓情況可以根據(jù)以下表格進(jìn)行對(duì)號(hào)入座:

outPoolUsage 和 inPoolUsage 同為低或同為高分別表明當(dāng)前 Subtask 正?;蛱幱诒幌掠畏磯海@應(yīng)該沒有太多疑問。而比較有趣的是當(dāng) outPoolUsage 和 inPoolUsage 表現(xiàn)不同時(shí),這可能是出于反壓傳導(dǎo)的中間狀態(tài)或者表明該 Subtask就是反壓的根源。
如果一個(gè) Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影響,所以可以排查它本身是反壓根源的可能性。如果一個(gè) Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,則表明它有可能是反壓的根源。因?yàn)橥ǔ7磯簳?huì)傳導(dǎo)至其上游,導(dǎo)致上游某些 Subtask 的 outPoolUsage 為高,我們可以根據(jù)這點(diǎn)來進(jìn)一步判斷。值得注意的是,反壓有時(shí)是短暫的且影響不大,比如來自某個(gè) Channel 的短暫網(wǎng)絡(luò)延遲或者 TaskManager 的正常 GC,這種情況下我們可以不用處理。
在實(shí)踐中,很多情況下的反壓是由于數(shù)據(jù)傾斜造成的,這點(diǎn)我們可以通過 Web UI 各個(gè) SubTask 的 Records Sent 和 Record Received 來確認(rèn),另外 Checkpoint detail 里不同 SubTask 的 State size 也是一個(gè)分析數(shù)據(jù)傾斜的有用指標(biāo)。
此外,最常見的問題可能是用戶代碼的執(zhí)行效率問題(頻繁被阻塞或者性能問題)。最有用的辦法就是對(duì) TaskManager 進(jìn)行 CPU profile,從中我們可以分析到Task Thread 是否跑滿一個(gè) CPU 核:如果是的話要分析 CPU 主要花費(fèi)在哪些函數(shù)里面,比如我們生產(chǎn)環(huán)境中就偶爾遇到卡在 Regex 的用戶函數(shù)(ReDoS);如果不是的話要看 Task Thread 阻塞在哪里,可能是用戶函數(shù)本身有些同步的調(diào)用,可能是checkpoint 或者 GC 等系統(tǒng)活動(dòng)導(dǎo)致的暫時(shí)系統(tǒng)暫停。
另外 TaskManager 的內(nèi)存以及 GC 問題也可能會(huì)導(dǎo)致反壓,包括 TaskManager JVM 各區(qū)內(nèi)存不合理導(dǎo)致的頻繁 Full GC 甚至失聯(lián)。推薦可以通過給TaskManager 啟用 G1 垃圾回收器來優(yōu)化 GC,并加上 -XX:+PrintGCDetails 來打印 GC 日志的方式來觀察 GC 的問題。
客戶端常見問題
應(yīng)用提交控制臺(tái)異常信息:Could not build the program from JAR file.
這個(gè)問題的迷惑性較大,很多時(shí)候并非指定運(yùn)行的 JAR 文件問題,而是提交過程中發(fā)生了異常,需要根據(jù)日志信息進(jìn)一步排查。最常見原因是未將依賴的 Hadoop JAR 文件加到 CLASSPATH,找不到依賴類(例如:ClassNotFoundException:org.apache.hadoop.yarn.exceptions.YarnException)導(dǎo)致加載客戶端入口類(FlinkYarnSessionCli)失敗。
用戶應(yīng)用和框架 JAR 包版本沖突問題
該問題通常會(huì)拋出 NoSuchMethodError/ClassNotFoundException/IncompatibleClassChangeError 等異常,要解決此類問題:
首先需要根據(jù)異常類定位依賴庫,然后可以在項(xiàng)目中執(zhí)行 mvn dependency:tree 以樹形結(jié)構(gòu)展示全部依賴鏈,再從中定位沖突的依賴庫,也可以增加參數(shù) -Dincludes 指定要顯示的包,格式為 [groupId]:[artifactId]:[-type]:[version],支持匹配,多個(gè)用逗號(hào)分隔,例如:mvn dependency:tree-Dincludes=power,javaassist。 定位沖突包后就要考慮如何排包,簡(jiǎn)單的方案是用 exclusion 來排除掉其從他依賴項(xiàng)目中傳遞過來的依賴,不過有的應(yīng)用場(chǎng)景需要多版本共存,不同組件依賴不同版本,就要考慮用 Maven Shade 插件來解決,詳情請(qǐng)參考Maven Shade Plugin。
Flink 應(yīng)用資源分配問題排查思路
如果 Flink 應(yīng)用不能正常啟動(dòng)達(dá)到 RUNNING 狀態(tài),可以按以下步驟進(jìn)行排查:
步驟1. 需要先檢查應(yīng)用當(dāng)前狀態(tài),根據(jù)上述對(duì)啟動(dòng)流程的說明,我們知道:
處于 NEW_SAVING 狀態(tài)時(shí)正在進(jìn)行應(yīng)用信息持久化,如果持續(xù)處于這個(gè)狀態(tài)我們需要檢查 RM 狀態(tài)存儲(chǔ)服務(wù)(通常是 ZooKeeper 集群)是否正常; 如果處于 SUBMITTED 狀態(tài),可能是 RM 內(nèi)部發(fā)生一些 hold 讀寫鎖的耗時(shí)操作導(dǎo)致事件堆積,需要根據(jù) YARN 集群日志進(jìn)一步定位. 如果處于 ACCEPTED 狀態(tài),需要先檢查 AM 是否正常,跳轉(zhuǎn)到步驟 2; 如果已經(jīng)是 RUNNING 狀態(tài),但是資源沒有全部拿到導(dǎo)致 JOB 無法正常運(yùn)行,跳轉(zhuǎn)到步驟 3;
步驟2. 檢查 AM 是否正常,可以從 YARN 應(yīng)用展示界面(http:// /cluster/app/)或YARN 應(yīng)用 REST API(http:///ws/v1/cluster/apps/)查看 diagnostics 信 息,根據(jù)關(guān)鍵字信息明確問題原因與解決方案:
Queue’s AM resource limit exceeded. 原因是達(dá)到了隊(duì)列 AM 可用資源上限,即隊(duì)列的 AM 已使用資源和 AM 新申請(qǐng)資源之和超出了隊(duì)列的AM 資源上限,可以適當(dāng)調(diào)整隊(duì)列 AM 可用資源百分比的配置項(xiàng):yarn.scheduler.capacity..maximum-am-resource-percent。 User’s AM resource limit exceeded. 原因是達(dá)到了應(yīng)用所屬用戶在該隊(duì)列的 AM 可用資源上限,即應(yīng)用所屬用戶在該隊(duì)列的 AM 已使用資源和 AM新申請(qǐng)資源之和超出了應(yīng)用所屬用戶在該隊(duì)列的 AM 資源上限,可以適當(dāng)提高用戶可用 AM 資源比例來解決該問題,相關(guān)配置項(xiàng):yarn.scheduler.capacity..user-limit-factor 與 yarn.scheduler.capacity..minimum-user-limit-percent。 AM container is launched, waiting for AM container to Register with RM. 大致原因是 AM 已啟動(dòng),但內(nèi)部初始化未完成,可能有 ZK 連接超時(shí)等問題,具體原因需排查 AM 日志,根據(jù)具體問題來解決。 Application is Activated, waiting for resources to be assigned for AM.該信息表示應(yīng)用 AM 檢查已經(jīng)通過,正在等待調(diào)度器分配,此時(shí)需要進(jìn)行調(diào)度器層面的資源檢查,跳轉(zhuǎn)到步驟 4。
步驟3. 確認(rèn)應(yīng)用確實(shí)有 YARN 未能滿足的資源請(qǐng)求:從應(yīng)用列表頁點(diǎn)擊問題應(yīng)用ID 進(jìn)入應(yīng)用頁面,再點(diǎn)擊下方列表的應(yīng)用實(shí)例 ID 進(jìn)入應(yīng)用實(shí)例頁面,看 Total Outstanding Resource Requests 列表中是否有 Pending 資源,如果沒有,說明 YARN 已分配完畢,退出該檢查流程,轉(zhuǎn)去檢查 AM;如果有,說明調(diào)度器未能完成分配,跳轉(zhuǎn)到步驟 4。
步驟4.調(diào)度器分配問題排查YARN-9050 支持在 WebUI 上或通過 REST API 自動(dòng)診斷應(yīng)用問題,將在 Hadoop3.3.0 發(fā)布,之前的版本仍需進(jìn)行人工排查:
檢查集群或 queue 資源,scheduler 頁面樹狀圖葉子隊(duì)列展開查看資源信息:Effective Max Resource、Used Resources:(1)檢查集群資源或所在隊(duì)列資源或其父隊(duì)列資源是否已用完;(2)檢查葉子隊(duì)列某維度資源是否接近或達(dá)到上限;
檢查是否存在資源碎片:(1)檢查集群 Used 資源和 Reserved 資源之和占總資源的比例,當(dāng)集群資源接近用滿時(shí)(例如 90% 以上),可能存在資源碎片的情況,應(yīng)用的分配速度就會(huì)受影響變慢,因?yàn)榇蟛糠謾C(jī)器都沒有資源了,機(jī)器可用資源不足會(huì)被 reserve,reserved 資源達(dá)到一定規(guī)模后可能導(dǎo)致大部分機(jī)器資源被鎖定,后續(xù)分配可能就會(huì)變慢;(2)檢查 NM 可用資源分布情況,即使集群資源使用率不高,也有可能是因?yàn)楦骶S度資源分布不同造成,例如 1/2 節(jié)點(diǎn)上的內(nèi)存資源接近用滿 CPU 資源剩余較多,1/2 節(jié)點(diǎn)上的 CPU 資源接近用滿內(nèi)存資源剩余較多,申請(qǐng)資源中某一維度資源值配置過大也可能造成無法申請(qǐng)到資源;
檢查是否有高優(yōu)先級(jí)的問題應(yīng)用頻繁申請(qǐng)并立即釋放資源的問題,這種情況會(huì)造成調(diào)度器忙于滿足這一個(gè)應(yīng)用的資源請(qǐng)求而無暇顧及其他應(yīng)用;
檢查是否存在 Container 啟動(dòng)失敗或剛啟動(dòng)就自動(dòng)退出的情況,可以查看Container 日志 ( 包括 localize 日志、launch 日志等 )、YARN NM 日志或YARN RM 日志進(jìn)行排查。
TaskManager 啟動(dòng)異常
org.apache.hadoop.yarn.exceptions.YarnException:?Unauthorized?request?to?start?container.?This?token?is?expired.?current?time?is?...?found?...
該異常在 Flink AM 向 YARN NM 申請(qǐng)啟動(dòng) token 已超時(shí)的 Container 時(shí)拋出,通常原因是 Flink AM 從 YARN RM 收到這個(gè) Container 很久之后(超過了Container 有效時(shí)間,默認(rèn) 10 分鐘,該 Container 已經(jīng)被釋放)才去啟動(dòng)它,進(jìn)一步原因是 Flink 內(nèi)部在收到 YARN RM 返回的 Container 資源后串行啟動(dòng)。
當(dāng)待啟動(dòng)的 Container 數(shù)量較多且分布式文件存儲(chǔ)如 HDFS 性能較慢(啟動(dòng)前需上傳 TaskManager配置)時(shí) Container啟動(dòng)請(qǐng)求容易堆積在內(nèi)部,F(xiàn)LINK-13184 對(duì)這個(gè)問題進(jìn)行了優(yōu)化,一是在啟動(dòng)前增加了有效性檢查,避免了無意義的配置上傳流程,二是進(jìn)行了異步多線程優(yōu)化,加快啟動(dòng)速度。
PyFlink如何定義UDF
在 Apache Flink 1.10 中我們有多種方式進(jìn)行 UDF 的定義,比如:
Extend ScalarFunction, e.g.:
class?HashCodeMean(ScalarFunction):
?def?eval(self,?i,?j):
?return?(hash(i)?+?hash(j))?/?2
Lambda Functio
lambda?i,?j:?(hash(i)?+?hash(j))?/?2
Named Function
def?hash_code_mean(i,?j):
?return?(hash(i)?+?hash(j))?/?2
Callable Function
class?CallableHashCodeMean(object):
?def?__call__(self,?i,?j):
?return?(hash(i)?+?hash(j))?/?2
現(xiàn)上面定義函數(shù)除了第一個(gè)擴(kuò)展 ScalaFunction 的方式是 PyFlink 特有的,其他方式都是 Python 語言本身就支持的,也就是說,在 Apache Flink 1.10中PyFlink 允許以任何 Python 語言所支持的方式定義 UDF。
那么定義完 UDF 我們應(yīng)該怎樣使用呢?Apache Flink 1.10 中提供了2種Decorators,如下:
Decorators - udf(), e.g. :
udf(lambda?i,?j:?(hash(i)?+?hash(j))?/?2,
?[for?input?types],?[for?result?types])
Decorators?-?@udf,?e.g.?:
@udf(input_types=...,?result_type=...)
?def?hash_code_mean(…):
然后在使用之前進(jìn)行注冊(cè),如下:
st_env.register_function("hash_code",?hash_code_mean)
接下來就可以在 Table API/SQL 中進(jìn)行使用了,如下:
my_table.select("hash_code_mean(a,?b)").insert_into("Results")
數(shù)據(jù)傾斜導(dǎo)致子任務(wù)積壓
業(yè)務(wù)背景
一個(gè)流程中,有兩個(gè)重要子任務(wù):一是數(shù)據(jù)遷移,將kafka實(shí)時(shí)數(shù)據(jù)落Es,二是將kafka數(shù)據(jù)做窗口聚合落hbase,兩個(gè)子任務(wù)接的是同一個(gè)Topic GroupId。上游 Topic 的 tps 高峰達(dá)到5-6w。
問題描述
給 24個(gè) TaskManager(CPU) 都會(huì)出現(xiàn)來不及消費(fèi)的情況
問題原因
做窗口聚合的任務(wù)的分組字段,分組粒度太小,hash不能打散,數(shù)據(jù)傾斜嚴(yán)重,導(dǎo)致少數(shù) TaskManager 上壓力過大,從而影響落Es的效率,導(dǎo)致背壓。
解決方式
將兩個(gè)任務(wù)獨(dú)立開來,作為不同的流程。
結(jié)果
修改之前 24個(gè) TaskManager(CPU) 來不及消費(fèi),改完之后 20 個(gè) CPU 可完成任務(wù)。Kafka實(shí)時(shí)數(shù)據(jù)落Es的16個(gè)TaskManager,將kafka數(shù)據(jù)做窗口聚合落hbase的4個(gè)TaskManager。
另:
同樣的數(shù)據(jù)、同樣的Tps作為數(shù)據(jù)輸入,Hbase的輸出能力遠(yuǎn)超過Es,考慮實(shí)時(shí)任務(wù)落數(shù)據(jù)進(jìn)Es要慎重。
Flink任務(wù)落Es時(shí)要考慮設(shè)置微批落數(shù)據(jù),設(shè)置 bulk.flush.max.actions 和 bulk.flush.interval.ms至合適值,否則影響吞吐量。
Kafka 消息大小默認(rèn)配置太小,導(dǎo)致數(shù)據(jù)未處理
業(yè)務(wù)背景
正常的Flink任務(wù)消費(fèi) Topic 數(shù)據(jù),但是Topic中的數(shù)據(jù)為 XML 以及 JSON,單條數(shù)據(jù)較大
問題描述
Flink各項(xiàng)metrics指標(biāo)正常,但是沒處理到數(shù)據(jù)
問題原因
Topic中單條數(shù)據(jù) > 1M,超過 Kafka Consumer 處理單條數(shù)據(jù)的默認(rèn)最大值。
解決方式
有三種可選方式:擴(kuò)大kafka consumer 單條數(shù)據(jù)的數(shù)據(jù)大?。篺etch.message.max.bytes。對(duì)消息進(jìn)行壓縮:上游 kafka producer 設(shè)置 compression.codec 和 commpressed.topics。業(yè)務(wù)上對(duì)數(shù)據(jù)切片,在上游 kafka producer 端將數(shù)據(jù)切片為 10K,使用分區(qū)主鍵確保同一條數(shù)據(jù)發(fā)送到同一Partition,consumer對(duì)消息重組。
結(jié)果
方式一:按業(yè)務(wù)要求擴(kuò)大 Kafka Consumer 可處理的單條數(shù)據(jù)字節(jié)數(shù)即可正常處理業(yè)務(wù) 方式二:Kafka Consumer 需先解碼,再進(jìn)行業(yè)務(wù)處理。方式三:Kafka Consumer 需先重組數(shù)據(jù),再進(jìn)行業(yè)務(wù)處理。
Tps 很大,Kafka Ack 默認(rèn)配置 拖慢消息處理速度
業(yè)務(wù)背景
實(shí)時(shí)任務(wù),上游接流量頁面點(diǎn)擊事件的數(shù)據(jù),下游輸出Kafka,輸出tps很大。流量數(shù)據(jù)不重要,可接受丟失的情況
問題描述
CPU資源耗費(fèi)較多的情況下,才能正常消費(fèi),考慮如果縮減資源。
問題原因
Kafka Producer 默認(rèn) acks=1,即Partition Leader接收到消息而且寫入本地磁盤了,就認(rèn)為成功了
解決方式
Kafka Producer 設(shè)置 :props.put(“acks”, “0”); 將 acks=0,即KafkaProducer在客戶端,只要把消息發(fā)送出去,不管那條數(shù)據(jù)有沒有在哪怕Partition Leader上落到磁盤,直接就認(rèn)為這個(gè)消息發(fā)送成功了。
結(jié)果
資源降低三分之一。
The assigned slot container_e08_1539148828017_15937_01_003564_0 was removed.
org.apache.flink.util.FlinkException: The assigned slot container_e08_1539148828017_15937_01_003564_0 was removed. at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372) at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:803) at org.apache.flink.yarn.YarnResourceManager.lambda0(YarnResourceManager.java:340) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
程序內(nèi)存占用過大,導(dǎo)致TaskManager在yarn上kill了,分析原因應(yīng)該是資源不夠,可以將程序放在資源更大的集群上,再不行就設(shè)置減少Slot中共享的task的個(gè)數(shù),也可能是內(nèi)存泄露或內(nèi)存資源配置不合理造成,需要進(jìn)行合理分配。
The heartbeat of TaskManager with id container ....... timed out
此錯(cuò)誤是container心跳超時(shí),出現(xiàn)此種錯(cuò)誤一般有兩種可能:
1、分布式物理機(jī)網(wǎng)絡(luò)失聯(lián),這種原因一般情況下failover后作業(yè)能正?;謴?fù),如果出現(xiàn)的不頻繁可以不用關(guān)注;2、failover的節(jié)點(diǎn)對(duì)應(yīng)TM的內(nèi)存設(shè)置太小,GC嚴(yán)重導(dǎo)致心跳超時(shí),建議調(diào)大對(duì)應(yīng)節(jié)點(diǎn)的內(nèi)存值。
Caused by: java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@flink88:15265/user/taskmanager_0#6643546564]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation".
在flink-conf.yaml中添加或修改:akka.ask.timeout: 100s web.timeout: 100000
Checkpoint:Checkpoint expired before completing
checkpointConf.setCheckpointTimeout(5000L)這個(gè)值設(shè)置過小,默認(rèn)是10min,需要進(jìn)行調(diào)大測(cè)試。
Kafka partition leader切換導(dǎo)致Flink重啟
Flink重啟,查看日志,顯示:
java.lang.Exception:?Failed?to?send?data?to?Kafka:?This?server?is?not?the?leader?for?that?topic-partition.
??at?org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
??at?org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:280)
??at?org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
??at?org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
??at?org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
??at?org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
??at?org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
??at?java.lang.Thread.run(Thread.java:748)
Caused?by:?org.apache.kafka.common.errors.NotLeaderForPartitionException:?This?server?is?not?the?leader?for?that?topic-partition.
查看Kafka的Controller日志,顯示:
?INFO?[SessionExpirationListener?on?10],?ZK?expired;?shut?down?all?controller?components?and?
try?to?re-elect?(kafka.controller.KafkaController$SessionExpirationListener)
關(guān)于producer參數(shù)設(shè)置,設(shè)置retries參數(shù),可以在Kafka的Partition發(fā)生leader切換時(shí),F(xiàn)link不重啟,而是做3次嘗試:
kafkaProducerConfig
??{
????????"bootstrap.servers":?"192.169.2.20:9093,192.169.2.21:9093,192.169.2.22:9093"
????????"retries":3
??}
注意 mapWithState & TTL 的重要性
在處理包含無限多鍵的數(shù)據(jù)時(shí),要考慮到 keyed 狀態(tài)保留策略(通過 TTL 定時(shí)器來在給定的時(shí)間之后清理未使用的數(shù)據(jù))是很重要的。術(shù)語『無限』在這里有點(diǎn)誤導(dǎo),因?yàn)槿绻阋幚淼?key 以 128 位編碼,則 key 的最大數(shù)量將會(huì)有個(gè)限制(等于 2 的 128 次方)。但這是一個(gè)巨大的數(shù)字!你可能無法在狀態(tài)中存儲(chǔ)那么多值,所以最好考慮你的鍵空間是無界的,同時(shí)新鍵會(huì)隨著時(shí)間不斷出現(xiàn)。
如果你的 keyed 狀態(tài)包含在某個(gè) Flink 的默認(rèn)窗口中,則將是安全的:即使未使用 ?TTL,在處理窗口的元素時(shí)也會(huì)注冊(cè)一個(gè)清除計(jì)時(shí)器,該計(jì)時(shí)器將調(diào)用 clearAllState 函數(shù),并刪除與該窗口關(guān)聯(lián)的狀態(tài)及其元數(shù)據(jù)。
如果要使用 Keyed State Descriptor 來管理狀態(tài),可以很方便地添加 TTL 配置,以確保在狀態(tài)中的鍵數(shù)量不會(huì)無限制地增加。
但是,你可能會(huì)想使用更簡(jiǎn)便的 mapWithState 方法,該方法可讓你訪問 valueState 并隱藏操作的復(fù)雜性。雖然這對(duì)于測(cè)試和少量鍵的數(shù)據(jù)來說是很好的選擇,但如果在生產(chǎn)環(huán)境中遇到無限多鍵值時(shí),會(huì)引發(fā)問題。由于狀態(tài)是對(duì)你隱藏的,因此你無法設(shè)置 TTL,并且默認(rèn)情況下未配置任何 TTL。這就是為什么值得考慮做一些額外工作的原因,如聲明諸如 RichMapFunction 之類的東西,這將使你能更好的控制狀態(tài)的生命周期。
部署和資源問題
(0) JDK版本過低
這不是個(gè)顯式錯(cuò)誤,但是JDK版本過低很有可能會(huì)導(dǎo)致Flink作業(yè)出現(xiàn)各種莫名其妙的問題,因此在生產(chǎn)環(huán)境中建議采用JDK 8的較高update(我們使用的是181)。
(1) Could not build the program from JAR file
該信息不甚準(zhǔn)確,因?yàn)榻^大多數(shù)情況下都不是JAR包本身有毛病,而是在作業(yè)提交過程中出現(xiàn)異常退出了。因此需要查看本次提交產(chǎn)生的客戶端日志(默認(rèn)位于$FLINK_HOME/logs目錄下),再根據(jù)其中的信息定位并解決問題。
(2) ClassNotFoundException/NoSuchMethodError/IncompatibleClassChangeError/...
一般都是因?yàn)橛脩粢蕾嚨谌桨陌姹九cFlink框架依賴的版本有沖突導(dǎo)致。
(3) Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
就是字面意思,YARN集群內(nèi)沒有足夠的資源啟動(dòng)Flink作業(yè)。檢查一下當(dāng)前YARN集群的狀態(tài)、正在運(yùn)行的YARN App以及Flink作業(yè)所處的隊(duì)列,釋放一些資源或者加入新的資源。
(4) java.util.concurrent.TimeoutException: Slot allocation request timed out
slot分配請(qǐng)求超時(shí),是因?yàn)門askManager申請(qǐng)資源時(shí)無法正常獲得,按照上一條的思路檢查即可。
(5) org.apache.flink.util.FlinkException: The assigned slot < container_id> was removed
TaskManager的Container因?yàn)槭褂觅Y源超限被kill掉了。首先需要保證每個(gè)slot分配到的內(nèi)存量足夠,特殊情況下可以手動(dòng)配置SlotSharingGroup來減少單個(gè)slot中共享Task的數(shù)量。如果資源沒問題,那么多半就是程序內(nèi)部發(fā)生了內(nèi)存泄露。建議仔細(xì)查看TaskManager日志,并按處理JVM OOM問題的常規(guī)操作來排查。
(6) java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id < tm_id>timed out
TaskManager心跳超時(shí)。有可能是TaskManager已經(jīng)失敗,如果沒有失敗,那么有可能是因?yàn)榫W(wǎng)絡(luò)不好導(dǎo)致JobManager沒能收到心跳信號(hào),或者TaskManager忙于GC,無法發(fā)送心跳信號(hào)。JobManager會(huì)重啟心跳超時(shí)的TaskManager,如果頻繁出現(xiàn)此異常,應(yīng)該通過日志進(jìn)一步定位問題所在。
在Flink中,資源的隔離是通過Slot進(jìn)行的,也就是說多個(gè)Slot會(huì)運(yùn)行在同一個(gè)JVM中,這種隔離很弱,尤其對(duì)于生產(chǎn)環(huán)境。Flink App上線之前要在一個(gè)單獨(dú)的Flink集群上進(jìn)行測(cè)試,否則一個(gè)不穩(wěn)定、存在問題的Flink App上線,很可能影響整個(gè)Flink集群上的App。
(7)資源不足導(dǎo)致 container 被 kill
The assigned slot container_container編號(hào) was removed.Flink App 拋出此類異常,通過查看日志,一般就是某一個(gè) Flink App 內(nèi)存占用大,導(dǎo)致 TaskManager(在 Yarn 上就是 Container )被Kill 掉。
但是并不是所有的情況都是這個(gè)原因,還需要進(jìn)一步看 yarn 的日志( 查看 yarn 任務(wù)日志:yarn logs -applicationId ?-appOwner),如果代碼寫的沒問題,就確實(shí)是資源不夠了,其實(shí) 1G Slot 跑多個(gè)Task( Slot Group Share )其實(shí)挺容易出現(xiàn)的。
因此有兩種選擇,可以根據(jù)具體情況,權(quán)衡選擇一個(gè)。
將該 Flink App 調(diào)度在 Per Slot 內(nèi)存更大的集群上。通過 slotSharingGroup("xxx") ,減少 Slot 中共享 Task 的個(gè)數(shù)
(8)啟動(dòng)報(bào)錯(cuò),提示找不到 jersey 的類
java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties 解決辦法進(jìn)入 yarn中 把 lib 目中的一下兩個(gè)問價(jià)拷貝到flink的lib中 hadoop/share/hadoop/yarn/lib/jersey-client-1.9.jar /hadoop/share/hadoop/yarn/lib/jersey-core-1.9.jar
(9)Scala版本沖突
java.lang.NoSuchMethodError:scala.collection.immutable.HashSet$.empty()Lscala/collection/
解決辦法,添加: import org.apache.flink.api.scala._
(10)沒有使用回撤流報(bào)錯(cuò)
Table?is?not?an?append一only?table.?Use?the?toRetractStream()?in?order?to?handle?add?and?retract?messages.
這個(gè)是因?yàn)閯?dòng)態(tài)表不是 append-only 模式的,需要用 toRetractStream (回撤流) 處理就好了.
tableEnv.toRetractStream[Person](result).print()
(11)OOM 問題解決思路
java.lang.OutOfMemoryError:?GC?overhead?limit?exceeded
java.lang.OutOfMemoryError:?GC?overhead?limit?exceeded
????????at?java.util.Arrays.copyOfRange(Arrays.java:3664)
????????at?java.lang.String.(String.java:207)
????????at?com.esotericsoftware.kryo.io.Input.readString(Input.java:466)
????????at?com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:177)
......
????????at?org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
解決方案:
檢查 slot 槽位夠不夠或者 slot 分配的數(shù)量有沒有生效。程序起的并行是否都正常分配了(會(huì)有這樣的情況出現(xiàn),假如 5 個(gè)并行,但是只有 2 個(gè)在幾點(diǎn)上生效了,另外 3 個(gè)沒有數(shù)據(jù)流動(dòng))。檢查flink程序有沒有數(shù)據(jù)傾斜,可以通過 flink 的 ui 界面查看每個(gè)分區(qū)子節(jié)點(diǎn)處理的數(shù)據(jù)量。
(12)解析返回值類型失敗報(bào)錯(cuò)
The?return?type?of?function?could?not?be?determined?automatically
Exception?in?thread?"main"?org.apache.flink.api.common.functions.InvalidTypesException:?The?return?type?of?function?'main(RemoteEnvironmentTest.java:27)'?could?not?be?determined?automatically,?due?to?type?erasure.?You?can?give?type?information?hints?by?using?the?returns(...)?method?on?the?result?of?the?transformation?call,?or?by?letting?your?function?implement?the?'ResultTypeQueryable'?interface.
?at?org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
?at?org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
?at?org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
解決方案:產(chǎn)生這種現(xiàn)象的原因一般是使用 lambda 表達(dá)式?jīng)]有明確返回值類型,或者使用特使的數(shù)據(jù)結(jié)構(gòu) flink 無法解析其類型,這時(shí)候我們需要在方法的后面添加返回值類型,比如字符串。
input.flatMap((Integer?number,?Collector?out)?->?{
?......
})
//?提供返回值類型
.returns(Types.STRING)
(13)Hadoop jar 包沖突
Caused?by:?java.io.IOException:?The?given?file?system?URI?(hdfs:///data/checkpoint-data/abtest)?did?not?describe?the?authority?(like?for?example?HDFS?NameNode?address/port?or?S3?host).?The?attempt?to?use?a?configured?default?authority?failed:?Hadoop?configuration?did?not?contain?an?entry?for?the?default?file?system?('fs.defaultFS').
????????at?org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:135)
????????at?org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
????????at?org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
????????at?org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
解決:pom 文件中去掉和 hadoop 相關(guān)的依賴就好了
作業(yè)問題
(1)org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
該異常幾乎都是由于程序業(yè)務(wù)邏輯有誤,或者數(shù)據(jù)流里存在未處理好的臟數(shù)據(jù)導(dǎo)致的,繼續(xù)向下追溯異常棧一般就可以看到具體的出錯(cuò)原因,比較常見的如POJO內(nèi)有空字段,或者抽取事件時(shí)間的時(shí)間戳為null等。
(2) java.lang.IllegalStateException: Buffer pool is destroyed || Memory manager has been shut down
很多童鞋拿著這兩條異常信息來求助,但實(shí)際上它們只是表示BufferPool、MemoryManager這些Flink運(yùn)行時(shí)組件被銷毀,亦即作業(yè)已經(jīng)失敗。具體的原因多種多樣,根據(jù)經(jīng)驗(yàn),一般是上一條描述的情況居多(即Could not forward element to next operator錯(cuò)誤會(huì)伴隨出現(xiàn)),其次是JDK版本問題。具體情況還是要根據(jù)TaskManager日志具體分析。
(3) akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://...]] after [10000 ms]
Akka超時(shí)導(dǎo)致,一般有兩種原因:一是集群負(fù)載比較大或者網(wǎng)絡(luò)比較擁塞,二是業(yè)務(wù)邏輯同步調(diào)用耗時(shí)的外部服務(wù)。如果負(fù)載或網(wǎng)絡(luò)問題無法徹底緩解,需考慮調(diào)大akka.ask.timeout參數(shù)的值(默認(rèn)只有10秒);另外,調(diào)用外部服務(wù)時(shí)盡量異步操作(Async I/O)。
(4) java.io.IOException: Too many open files
這個(gè)異常我們應(yīng)該都不陌生,首先檢查系統(tǒng)ulimit -n的文件描述符限制,再注意檢查程序內(nèi)是否有資源(如各種連接池的連接)未及時(shí)釋放。值得注意的是,F(xiàn)link使用RocksDB狀態(tài)后端也有可能會(huì)拋出這個(gè)異常,此時(shí)需修改flink-conf.yaml中的state.backend.rocksdb.files.open參數(shù),如果不限制,可以改為-1。
(5) org.apache.flink.api.common.function.InvalidTypesException: The generic type parameters of '< class>' are missing
在Flink內(nèi)使用Java Lambda表達(dá)式時(shí),由于類型擦除造成的副作用,注意調(diào)用returns()方法指定被擦除的類型。
(6)Checkpoint失?。篊heckpoint expired before completing
原因是因?yàn)閏heckpointConf.setCheckpointTimeout(8000L)。設(shè)置的太小了,默認(rèn)是10min,這里只設(shè)置了8sec。當(dāng)一個(gè)Flink App背壓的時(shí)候(例如由外部組件異常引起),Barrier會(huì)流動(dòng)的非常緩慢,導(dǎo)致Checkpoint時(shí)長(zhǎng)飆升。
檢查點(diǎn)和狀態(tài)問題
(1) Received checkpoint barrier for checkpoint < cp_id> before completing current checkpoint < cp_id>. Skipping current checkpoint
在當(dāng)前檢查點(diǎn)還未做完時(shí),收到了更新的檢查點(diǎn)的barrier,表示當(dāng)前檢查點(diǎn)不再需要而被取消掉,一般不需要特殊處理。
(2) Checkpoint < cp_id> expired before completing
首先應(yīng)檢查CheckpointConfig.setCheckpointTimeout()方法設(shè)定的檢查點(diǎn)超時(shí),如果設(shè)的太短,適當(dāng)改長(zhǎng)一點(diǎn)。另外就是考慮發(fā)生了反壓或數(shù)據(jù)傾斜,或者barrier對(duì)齊太慢。
(3) org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible
我們知道Flink的狀態(tài)是按key組織并保存的,如果程序邏輯內(nèi)改了keyBy()邏輯或者key的序列化邏輯,就會(huì)導(dǎo)致檢查點(diǎn)/保存點(diǎn)的數(shù)據(jù)無法正確恢復(fù)。所以如果必須要改key相關(guān)的東西,就棄用之前的狀態(tài)數(shù)據(jù)吧。
(4) org.apache.flink.util.StateMigrationException: The new serializer for a MapState requires state migration in order for the job to proceed. However, migration for MapState currently isn't supported
在1.9之前的Flink版本中,如果我們使用RocksDB狀態(tài)后端,并且更改了自用MapState的schema,恢復(fù)作業(yè)時(shí)會(huì)拋出此異常,表示不支持更改schema。這個(gè)問題已經(jīng)在FLINK-11947解決,升級(jí)版本即可。
(5)時(shí)鐘不同步導(dǎo)致無法啟動(dòng)
啟動(dòng)Flink任務(wù)的時(shí)候報(bào)錯(cuò) Caused by: java.lang.RuntimeException: Couldn't deploy Yarn cluster。
然后仔細(xì)看發(fā)現(xiàn):system times on machines may be out of sync。
意思說是機(jī)器上的系統(tǒng)時(shí)間可能不同步。同步集群機(jī)器時(shí)間即可。
不同的kafka版本依賴沖突
不同的kafka版本依賴沖突會(huì)造成cdc報(bào)錯(cuò),參考這個(gè)issue:
http://apache-flink.147419.n8.nabble.com/cdc-td8357.html#a8393
2020-11-04?16:39:10.972?[Source:?Custom?Source?->?Sink:?Print?to?Std.?Out?(1/1)]?WARN??org.apache.flink.runtime.taskmanager.Task??-?Source:?Custom?Source?->?Sink:?Print?to?Std.?Out?(1/1)?(7c3ccf7686ccfb33254e8cb785cd339d)?switched?from?RUNNING?to?FAILED.
java.lang.AbstractMethodError:?org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
at?org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
at?org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
at?io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:583)
at?io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:80)
at?io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
at?io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
at?io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
at?com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
at?org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at?org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at?org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
源碼如下:
public?class?CdcTest?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????SourceFunction?sourceFunction?=?MySQLSource.builder()
????????????????.hostname("localhost")
????????????????.port(3306)
????????????????.databaseList("sohay")?//?monitor?all?tables?under?inventory?database
????????????????.username("root")
????????????????.password("123456")
????????????????.deserializer(new?StringDebeziumDeserializationSchema())?//?converts?SourceRecord?to?String
????????????????.build();
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????env.addSource(sourceFunction).print().setParallelism(1);?//?use?parallelism?1?for?sink?to?keep?message?ordering
????????env.execute();
????}
}
確實(shí)是pom中存在一個(gè)Kafka的依賴包,導(dǎo)致沖突,去掉后問題解決。
MySQL CDC源等待超時(shí)
在掃描表期間,由于沒有可恢復(fù)的位置,因此無法執(zhí)行checkpoints。為了不執(zhí)行檢查點(diǎn),MySQL CDC源將保持檢查點(diǎn)等待超時(shí)。超時(shí)檢查點(diǎn)將被識(shí)別為失敗的檢查點(diǎn),默認(rèn)情況下,這將觸發(fā)Flink作業(yè)的故障轉(zhuǎn)移。因此,如果數(shù)據(jù)庫表很大,則建議添加以下Flink配置,以避免由于超時(shí)檢查點(diǎn)而導(dǎo)致故障轉(zhuǎn)移:

execution.checkpointing.interval:?10min
execution.checkpointing.tolerable-failed-checkpoints:?100
restart-strategy:?fixed-delay
restart-strategy.fixed-delay.attempts:?2147483647
數(shù)據(jù)庫切換,重新開啟binlog,Mysql全局鎖無法釋放

原因是因?yàn)榍袚Q了數(shù)據(jù)庫環(huán)境,重新開啟binlog,所有的作業(yè)都重新同步binlog的全量數(shù)據(jù),導(dǎo)致了全局鎖一直在等待,所有作業(yè)都無法執(zhí)行。解決方法:記錄checkpoint的地址,取消作業(yè),然后根據(jù)checkpoint重啟作業(yè)。
使用Flink SQL CDC模式創(chuàng)建維表異常
CREATE?TABLE?cdc_test
(
????id??STRING,
????ip??STRING,
????url?STRING,
????PRIMARY?KEY?(id)?NOT?ENFORCED
)?WITH?(
????'connector'?=?'mysql-cdc',?
????'hostname'?=?'127.0.0.1',
????'port'?=?'3306',
????'database-name'?=?'xx',
????'table-name'?=?'xx',
????'username'?=?'xx',
????'password'?=?'xx'
);
執(zhí)行查詢:
SELECT?*?FROM?cdc_test;
任務(wù)無法運(yùn)行,拋出異常
User?does?not?have?the?'LOCK?TABLES'?privilege?required?to?obtain?a?consistent?snapshot?by?preventing?concurrent?writes?to?tables.
原因是連接MySQL的用戶缺乏必要的CDC權(quán)限。
Flink SQL CDC基于Debezium實(shí)現(xiàn)。當(dāng)啟動(dòng)MySQL CDC源時(shí),它將獲取一個(gè)全局讀取鎖(FLUSH TABLES WITH READ LOCK),該鎖將阻止其他數(shù)據(jù)庫的寫入,然后讀取當(dāng)前binlog位置以及數(shù)據(jù)庫和表的schema,之后將釋放全局讀取鎖。然后它掃描數(shù)據(jù)庫表并從先前記錄的位置讀取binlog,F(xiàn)link將定期執(zhí)行checkpoints以記錄binlog位置。如果發(fā)生故障,作業(yè)將重新啟動(dòng)并從checkpoint完成的binlog位置恢復(fù),因此它保證了僅一次的語義。
解決辦法:創(chuàng)建一個(gè)新的MySQL用戶并授予其必要的權(quán)限。
mysql>?CREATE?USER?'user'@'localhost'?IDENTIFIED?BY?'password';
mysql>?GRANT?SELECT,?RELOAD,?SHOW?DATABASES,?REPLICATION?SLAVE,?REPLICATION?CLIENT?ON?*.*?TO?'user'?IDENTIFIED?BY?'password';
mysql>?FLUSH?PRIVILEGES;
Flink作業(yè)掃描MySQL全量數(shù)據(jù)出現(xiàn)fail-over
Flink 作業(yè)在掃描 MySQL 全量數(shù)據(jù)時(shí),checkpoint 超時(shí),出現(xiàn)作業(yè) failover,如下圖:

原因:Flink CDC 在 scan 全表數(shù)據(jù)(我們的實(shí)收表有千萬級(jí)數(shù)據(jù))需要小時(shí)級(jí)的時(shí)間(受下游聚合反壓影響),而在 scan 全表過程中是沒有 offset 可以記錄的(意味著沒法做 checkpoint),但是 Flink 框架任何時(shí)候都會(huì)按照固定間隔時(shí)間做 checkpoint,所以此處 mysql-cdc source 做了比較取巧的方式,即在 scan 全表的過程中,會(huì)讓執(zhí)行中的 checkpoint 一直等待甚至超時(shí)。超時(shí)的 checkpoint 會(huì)被仍未認(rèn)為是 failed checkpoint,默認(rèn)配置下,這會(huì)觸發(fā) Flink 的 failover 機(jī)制,而默認(rèn)的 failover 機(jī)制是不重啟。所以會(huì)造成上面的現(xiàn)象。
解決辦法:在 flink-conf.yaml 配置 failed checkpoint 容忍次數(shù),以及失敗重啟策略,如下:
execution.checkpointing.interval:?10min???#?checkpoint間隔時(shí)間
execution.checkpointing.tolerable-failed-checkpoints:?100??#?checkpoint?失敗容忍次數(shù)
restart-strategy:?fixed-delay??#?重試策略
restart-strategy.fixed-delay.attempts:?2147483647???#?重試次數(shù)
作業(yè)在運(yùn)行時(shí) mysql cdc source 報(bào) no viable alternative at input 'alter table std'

原因:因?yàn)閿?shù)據(jù)庫中別的表做了字段修改,CDC source 同步到了 ALTER DDL 語句,但是解析失敗拋出的異常。
解決方法:在 flink-cdc-connectors 最新版本中已經(jīng)修復(fù)該問題(跳過了無法解析的 DDL)。升級(jí) connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替換 flink/lib 下的舊包。
多個(gè)作業(yè)共用同一張 source table 時(shí),沒有修改 server id 導(dǎo)致讀取出來的數(shù)據(jù)有丟失。
原因:MySQL binlog 數(shù)據(jù)同步的原理是,CDC source 會(huì)偽裝成 MySQL 集群的一個(gè) slave(使用指定的 server id 作為唯一 id),然后從 MySQL 拉取 binlog 數(shù)據(jù)。如果一個(gè) MySQL 集群中有多個(gè) slave 有同樣的 id,就會(huì)導(dǎo)致拉取數(shù)據(jù)錯(cuò)亂的問題。
解決方法:默認(rèn)會(huì)隨機(jī)生成一個(gè) server id,容易有碰撞的風(fēng)險(xiǎn)。所以建議使用動(dòng)態(tài)參數(shù)(table hint)在 query 中覆蓋 server id。如下所示:
FROM?bill_info?/*+?OPTIONS('server-id'='123456')?*/?;
CDC source 掃描 MySQL 表期間,發(fā)現(xiàn)無法往該表 insert 數(shù)據(jù)
原因:由于使用的 MySQL 用戶未授權(quán) RELOAD 權(quán)限,導(dǎo)致無法獲取全局讀鎖(FLUSH TABLES WITH READ LOCK), CDC source 就會(huì)退化成表級(jí)讀鎖,而使用表級(jí)讀鎖需要等到全表 scan 完,才能釋放鎖,所以會(huì)發(fā)現(xiàn)持鎖時(shí)間過長(zhǎng)的現(xiàn)象,影響其他業(yè)務(wù)寫入數(shù)據(jù)。
解決方法:給使用的 MySQL 用戶授予 RELOAD 權(quán)限即可。所需的權(quán)限列表詳見文檔:
https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server
如果出于某些原因無法授予 RELOAD 權(quán)限,也可以顯式配上 'debezium.snapshot.locking.mode' = 'none'來避免所有鎖的獲取,但要注意只有當(dāng)快照期間表的 schema 不會(huì)變更才安全。
下面這些問題來自騰訊云大數(shù)據(jù)官方博客,供大家參考。
Flink 作業(yè)自動(dòng)停止
現(xiàn)象:本應(yīng)長(zhǎng)期運(yùn)行的作業(yè),突然停止運(yùn)行,且再也不恢復(fù)。

如果 Flink 作業(yè)在編程時(shí),源算子實(shí)現(xiàn)不當(dāng),則可能造成源算子處理完數(shù)據(jù)以后進(jìn)入 FINISHED 狀態(tài)。如果所有源算子都進(jìn)入了 FINISHED 狀態(tài),那整個(gè) Flink 作業(yè)也會(huì)跟著結(jié)束。
Flink 作業(yè)默認(rèn)的容錯(cuò)次數(shù)是 2,即發(fā)生兩次崩潰后,作業(yè)就自動(dòng)退出了,不再進(jìn)行重試。當(dāng)出現(xiàn)此種場(chǎng)景時(shí),TaskManager 的日志中會(huì)有“restart strategy prevented it”字樣。我們首先要找到作業(yè)崩潰的原因,其次可以適當(dāng)調(diào)大 RestartStrategy 中容錯(cuò)的最大次數(shù),畢竟節(jié)點(diǎn)異常等外部風(fēng)險(xiǎn)始終存在,作業(yè)不會(huì)在理想的環(huán)境中運(yùn)行。
此外,舊版Flink(低于 1.11.0)的 RocksDB 內(nèi)存使用不受管控,造成很容易由于超量使用而被外界(YARN、Kubernetes 等)KILL 掉。如果經(jīng)常受此困擾,可以考慮升級(jí) Flink 版本到最新,其默認(rèn)開啟自動(dòng)內(nèi)存管理功能。
Flink作業(yè)頻繁重啟
現(xiàn)象:作業(yè)頻繁重啟又自行恢復(fù),陷入無盡循環(huán),無法正常處理數(shù)據(jù)。

作業(yè)頻繁重啟的成因非常多,例如異常數(shù)據(jù)造成的作業(yè)崩潰,可以在 TaskManager 的日志中找到報(bào)錯(cuò)。數(shù)據(jù)源或者數(shù)據(jù)目的等上下游系統(tǒng)超時(shí)也會(huì)造成作業(yè)無法啟動(dòng)而一直在重啟。此外 TaskManager Full GC 太久造成心跳包超時(shí)而被 JobManager 踢掉也是常見的作業(yè)重啟原因。如果系統(tǒng)內(nèi)存嚴(yán)重匱乏,那么 Linux 自帶的 OOM Killer 也可能把 TaskManager 所在的 JVM 進(jìn)程 kill 了。
當(dāng)一個(gè)正常運(yùn)行的作業(yè)失敗時(shí),日志里會(huì)有 from RUNNING to FAILED 的關(guān)鍵字,我們以此為著手點(diǎn),查看它后面的 Exception 原因,通常最下面的 caused by 即是直接原因。當(dāng)然,直接原因不一定等于根本原因,后者需要借助下文提到的多項(xiàng)技術(shù)進(jìn)行分析。

如果 JVM 的內(nèi)存容量超出了平臺(tái)方(例如 YARN 或 Kubernetes 等)的容器限制,則可能被 KILL。問題的確認(rèn)方式也是查看作業(yè)日志以及平臺(tái)組件的運(yùn)行狀態(tài)。值得一提的是,在最新的 Flink 版本中,只要設(shè)置 taskmanager.memory.process.size 參數(shù),基本可以保證內(nèi)存用量不會(huì)超過該值(前提是用戶沒有使用 JNI 等方式申請(qǐng) native 內(nèi)存)。
作業(yè)的崩潰重啟還有一些原因,例如使用了不成熟的第三方 so 庫,或者連接數(shù)過多等,都可以從日志中找到端倪。
作業(yè)輸出整體穩(wěn)定,但是個(gè)別數(shù)據(jù)缺失
現(xiàn)象:作業(yè)輸出整體穩(wěn)定,但是個(gè)別數(shù)據(jù)缺失,造成結(jié)果的精度下降,甚至結(jié)果完全錯(cuò)亂。

當(dāng)遇到懷疑數(shù)據(jù)缺失造成的計(jì)算結(jié)果不正確時(shí),首先需要檢查作業(yè)邏輯是否不小心過濾了一些正常數(shù)據(jù)。檢查方法可以在本地運(yùn)行一個(gè) Mini Cluster,也可以在遠(yuǎn)端的調(diào)試環(huán)境進(jìn)行遠(yuǎn)程調(diào)試或者采樣等。具體技巧后文也會(huì)提到。
另外還有一種情況是,如果用戶定義了批量存取的算子(通常用于與外部系統(tǒng)進(jìn)行交互),則有可能出現(xiàn)一批數(shù)據(jù)中有一條異常數(shù)據(jù),導(dǎo)致整批次都失敗而被丟棄的情況。
對(duì)于數(shù)據(jù)源 Source 和數(shù)據(jù)目的Sink,請(qǐng)務(wù)必保證 Flink 作業(yè)運(yùn)行期間不要對(duì)其進(jìn)行任何改動(dòng)(例如新增 Kafka 分區(qū)、調(diào)整 MySQL 表結(jié)構(gòu)等),否則可能造成正在運(yùn)行的作業(yè)無法感知新增的分區(qū)或者讀寫失敗。盡管 Flink 可以開啟 Kafka 分區(qū)自動(dòng)發(fā)現(xiàn)機(jī)制(在 Configuration 里設(shè)置 flink.partition-discovery.interval-millis 值),但分區(qū)發(fā)現(xiàn)仍然需要一定時(shí)間,數(shù)據(jù)的精度可能會(huì)稍有影響。
「PDF」就可以看到阿里云盤下載鏈接了!



Hi,我是王知無,一個(gè)大數(shù)據(jù)領(lǐng)域的原創(chuàng)作者。? 放心關(guān)注我,獲取更多行業(yè)的一手消息。

