Flink 實(shí)踐 | 伴魚實(shí)時(shí)計(jì)算平臺(tái) Palink 的設(shè)計(jì)與實(shí)現(xiàn)
在伴魚發(fā)展早期,出現(xiàn)了一系列實(shí)時(shí)性相關(guān)的需求,比如算法工程師期望可以拿到用戶的實(shí)時(shí)特征數(shù)據(jù)做實(shí)時(shí)推薦,產(chǎn)品經(jīng)理希望數(shù)據(jù)方可以提供實(shí)時(shí)指標(biāo)看板做實(shí)時(shí)運(yùn)營(yíng)分析。這個(gè)階段中臺(tái)數(shù)據(jù)開發(fā)工程師主要是基于「Spark」實(shí)時(shí)計(jì)算引擎開發(fā)作業(yè)來(lái)滿足業(yè)務(wù)方提出的需求。然而這類作業(yè)并沒有統(tǒng)一的平臺(tái)進(jìn)行管理,任務(wù)的開發(fā)形式、提交方式、可用性保障等也完全因人而異。
伴隨著業(yè)務(wù)的加速發(fā)展,越來(lái)越多的實(shí)時(shí)場(chǎng)景涌現(xiàn)出來(lái),對(duì)實(shí)時(shí)作業(yè)的開發(fā)效率和質(zhì)量保障提出了更高的要求。為此,我們從去年開始著手打造伴魚公司級(jí)的實(shí)時(shí)計(jì)算平臺(tái),平臺(tái)代號(hào)「Palink」,由「Palfish」 + 「Flink」組合而來(lái)。之所以選擇「Flink」作為平臺(tái)唯一的實(shí)時(shí)計(jì)算引擎,是因?yàn)榻┠陙?lái)其在實(shí)時(shí)領(lǐng)域的優(yōu)秀表現(xiàn)和主導(dǎo)地位,同時(shí)活躍的社區(qū)氛圍也提供了非常多不錯(cuò)的實(shí)踐經(jīng)驗(yàn)可供借鑒。目前「Palink」項(xiàng)目已經(jīng)落地并投入使用,很好地滿足了伴魚業(yè)務(wù)在實(shí)時(shí)場(chǎng)景的需求。
核心原則
通過調(diào)研阿里云、網(wǎng)易等各大廠商提供的實(shí)時(shí)計(jì)算服務(wù),我們基本確定了「Palink」的整個(gè)產(chǎn)品形態(tài)。同時(shí),在系統(tǒng)設(shè)計(jì)過程中緊緊圍繞以下幾個(gè)核心原則:
極簡(jiǎn)性:保持簡(jiǎn)易設(shè)計(jì),快速落地,不過度追求功能的完整性,滿足核心需求為主。 高質(zhì)量:保持項(xiàng)目質(zhì)量嚴(yán)要求,核心模塊思慮周全。 可擴(kuò)展:保持較高的可擴(kuò)展性,便于后續(xù)方案的迭代升級(jí)。
系統(tǒng)設(shè)計(jì)
平臺(tái)整體架構(gòu)
以下是平臺(tái)整體的架構(gòu)示意圖:

整個(gè)平臺(tái)由四部分組成:
Web UI:前端操作頁(yè)面。 Palink(GO) 服務(wù):實(shí)時(shí)作業(yè)管理服務(wù),負(fù)責(zé)作業(yè)元信息及作業(yè)生命周期內(nèi)全部狀態(tài)的管理,承接全部的前端流量。包括作業(yè)調(diào)度、作業(yè)提交、作業(yè)狀態(tài)同步及作業(yè) HA 管理幾個(gè)核心模塊。 PalinkProxy(JAVA) 服務(wù):SQL 化服務(wù),F(xiàn)link SQL 作業(yè)將由此模塊編譯、提交至遠(yuǎn)端集群。包括 SQL 語(yǔ)法校驗(yàn)、SQL 作業(yè)調(diào)試及 SQL 作業(yè)編譯和提交幾個(gè)核心模塊。 Flink On Yarn:基于 Hadoop Yarn 做集群的資源管理。
這里之所以將后臺(tái)服務(wù)拆分成兩塊,并且分別使用 GO 和 JAVA 語(yǔ)言實(shí)現(xiàn),原因主要有三個(gè)方面:一是伴魚擁有一套非常完善的基于 GO 語(yǔ)言實(shí)現(xiàn)的微服務(wù)基礎(chǔ)框架,基于它可以快速構(gòu)建服務(wù)并擁有包括服務(wù)監(jiān)控在內(nèi)的一系列周邊配套,公司目前 95% 以上的服務(wù)是基于此服務(wù)框架構(gòu)建的;二是 SQL 化模塊是基于開源項(xiàng)目二次開發(fā)實(shí)現(xiàn)的(這個(gè)在后文會(huì)做詳細(xì)介紹),而該開源項(xiàng)目使用的是 JAVA 語(yǔ)言;三是內(nèi)部服務(wù)增加一次遠(yuǎn)程調(diào)用的成本是可以接受的。這里也體現(xiàn)了我們極簡(jiǎn)性原則中對(duì)快速落地的要求。事實(shí)上,以 GO 為核心開發(fā)語(yǔ)言是非常具有「Palfish」特色的,在接下來(lái)伴魚大數(shù)據(jù)系列的相關(guān)文章中也會(huì)有所體現(xiàn)。
接下來(lái)本文將著重介紹「Palink」幾個(gè)核心模塊的設(shè)計(jì)。
作業(yè)調(diào)度&執(zhí)行
后端服務(wù)接收到前端創(chuàng)建作業(yè)的請(qǐng)求后,將生成一條 PalinkJob 記錄和 一條 PalinkJobCommand 記錄并持久化到 DB,PalinkJobCommand 為作業(yè)提交執(zhí)行階段抽象出的一個(gè)實(shí)體,整個(gè)作業(yè)調(diào)度過程將圍繞該實(shí)體的狀態(tài)變更向前推進(jìn)。其結(jié)構(gòu)如下:
type PalinkJobCommand struct {
ID uint64 `json:"id"`
PalinkJobID uint64 `json:"palink_job_id"`
CommandParams string `json:"command_params"`
CommandState int8 `json:"command_state"`
Log string `json:"log"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
這里并沒有直接基于 PalinkJob 實(shí)體來(lái)串聯(lián)整個(gè)調(diào)度過程,是因?yàn)樽鳂I(yè)的狀態(tài)同步會(huì)直接作用于這個(gè)實(shí)體,如果調(diào)度過程也基于該實(shí)體,兩部分的邏輯就緊耦合了。
調(diào)度流程
下圖為作業(yè)調(diào)度的流程圖:

Palink pod 異步執(zhí)行競(jìng)爭(zhēng)分布式鎖操作,保證同一時(shí)刻有且僅有一個(gè)實(shí)例獲取周期性監(jiān)測(cè)權(quán)限,滿足條件的 Command 將直接被發(fā)送到 Kafka 待執(zhí)行隊(duì)列,同時(shí)變更其狀態(tài),保證之后不再被調(diào)度。此外,所有的 palink pod 將充當(dāng)待執(zhí)行隊(duì)列消費(fèi)者的角色,并歸屬于同一個(gè)消費(fèi)者組,消費(fèi)到消息的實(shí)例將獲取到最終的執(zhí)行權(quán)。
執(zhí)行流程
作業(yè)的執(zhí)行實(shí)則是作業(yè)提交的過程,根據(jù)作業(yè)類型的不同提交工作流有所區(qū)別,可細(xì)分為三類:

Flink JAR 作業(yè):我們摒棄了用戶直接上傳 JAR 文件的交互方式。用戶只需提供作業(yè) gitlab 倉(cāng)庫(kù)地址即可,打包構(gòu)建全流程平臺(tái)直接完成。由于每一個(gè)服務(wù)實(shí)例都內(nèi)嵌 Flink 客戶端,任務(wù)是直接通過 flink run 方式提交的。 PyFlink 作業(yè):與 Flink JAR 方式類似,少了編譯的過程,提交命令也有所不同。 Flink SQL 作業(yè):與上兩種方式區(qū)別較大。對(duì)于 Flink SQL 作業(yè)而言,用戶只需提交相對(duì)簡(jiǎn)單的 SQL 文本信息,這個(gè)內(nèi)容我們是直接維護(hù)在平臺(tái)的元信息中,故沒有和 gitlab 倉(cāng)庫(kù)交互的地方。SQL 文本將進(jìn)一步提交給 PalinkProxy 服務(wù)進(jìn)行后續(xù)的編譯,然后使用 Yarn Client 方式提交。
Command 狀態(tài)機(jī)
PalinkJobCommand 的狀態(tài)流轉(zhuǎn)如下圖所示:

UNDO:初始狀態(tài),將被調(diào)度實(shí)例監(jiān)測(cè)。 DOING:執(zhí)行中狀態(tài),同樣會(huì)調(diào)度實(shí)例監(jiān)測(cè),防止長(zhǎng)期處于進(jìn)行中的臟狀態(tài)產(chǎn)生。 SUCCESSED:執(zhí)行成功狀態(tài)。隨著用戶的后續(xù)行為,如重新提交、重新啟動(dòng)操作,狀態(tài)會(huì)再次回到 UNDO 態(tài)。 FAILED:執(zhí)行失敗狀態(tài)。同上,狀態(tài)可能會(huì)再次回到 UNDO 態(tài)。
作業(yè)狀態(tài)同步
作業(yè)成功提交至集群后,由于集群狀態(tài)的不確定性或者其他的一些因素最終導(dǎo)致任務(wù)異常終止了,平臺(tái)該如何及時(shí)感知到?這就涉及到我們即將要闡述的另一個(gè)話題「狀態(tài)同步」。
狀態(tài)同步流程
這里首先要回答的一個(gè)問題是同步誰(shuí)的狀態(tài)?有過離線或者 flink on yarn 開發(fā)經(jīng)驗(yàn)的同學(xué)一定知道,作業(yè)在部署到 yarn 上之后會(huì)有一個(gè) application 與之對(duì)應(yīng),每一個(gè) application 都有其對(duì)應(yīng)的狀態(tài)和操作動(dòng)作,比如我們可以執(zhí)行 Yarn UI 上 Kill Application 操作來(lái)殺掉整個(gè)任務(wù)。同樣的,當(dāng)我們翻閱 Flink 官方文檔或者進(jìn)入 Flink UI 頁(yè)面也都可以看到每一個(gè)任務(wù)都有其對(duì)應(yīng)的狀態(tài)和一系列操作行為。最直接的想法肯定是以 flink 任務(wù)狀態(tài)為準(zhǔn),畢竟這是我們最想拿到的,但仔細(xì)分析,其實(shí)二者的狀態(tài)對(duì)于平臺(tái)而言沒有太大區(qū)別,只是狀態(tài)的粒度有所不同而已,yarn application 的狀態(tài)已經(jīng)是對(duì) flink 狀態(tài)做了一次 state mapping。可是考慮到,F(xiàn)link 在 HA 的時(shí)候,作業(yè)對(duì)外暴露的 URL 會(huì)發(fā)生變更,這種情況下只能通過獲取作業(yè)對(duì)應(yīng)的 application 信息才能拿到最新的地址。與此同時(shí),一次狀態(tài)同步的過程不僅僅只是希望拿到最新的狀態(tài),對(duì)于任務(wù)的「checkpoint」等相關(guān)信息同樣是有同步的訴求??磥?lái)二者的信息在一次同步的過程中都需要獲取,最終的狀態(tài)同步設(shè)計(jì)如下:

前置流程和作業(yè)調(diào)度流程類似,有且僅有一個(gè)實(shí)例負(fù)責(zé)周期性監(jiān)測(cè)工作,符合條件的 Job ID (注,并非所有的作業(yè)都用同步的必要,比如一些處于終態(tài)的作業(yè))將發(fā)送到內(nèi)部延遲隊(duì)列。之所以采用延遲隊(duì)列而非 Kafka 隊(duì)列,主要是為了將同一時(shí)間點(diǎn)批量同步的需求在一定時(shí)間間隔內(nèi)隨機(jī)打散,降低同步的壓力。最后,在獲取到作業(yè)的完整信息后,再做一次 state mapping 將狀態(tài)映射為平臺(tái)抽象的狀態(tài)類型。
由于狀態(tài)同步是周期性進(jìn)行的,存在一定的延遲。因此在平臺(tái)獲取作業(yè)詳情時(shí),也會(huì)同步觸發(fā)一次狀態(tài)同步,保證獲取最新數(shù)據(jù)。
Job 狀態(tài)機(jī)
PalinkJob 的狀態(tài)流轉(zhuǎn)如下圖所示:

DEPLOYING:作業(yè)初始狀態(tài),將隨著 PalinkJobCommand 的狀態(tài)驅(qū)動(dòng)向 DEPLOY_SUCCESSED 和 DEPLOY_FAILED 流轉(zhuǎn)。 DEPLOY_SUCCESSED:部署成功狀態(tài),依賴作業(yè)「狀態(tài)同步」驅(qū)動(dòng)向 RUNNING 狀態(tài)或者其他終態(tài)流轉(zhuǎn)。 DEPLOY_FAILED:部署失敗狀態(tài),依賴用戶重新提交向 DEPLOYING 狀態(tài)流轉(zhuǎn)。 RUNNING:運(yùn)行中狀態(tài)??赏ㄟ^用戶執(zhí)行暫停操作向 FINISHED 狀態(tài)流轉(zhuǎn),或執(zhí)行終止操作向 KILLED 狀態(tài)流轉(zhuǎn),或因?yàn)閮?nèi)部異常向 FAILED 狀態(tài)流轉(zhuǎn)。 FINISHED:完成狀態(tài),作業(yè)終態(tài)之一。通過用戶執(zhí)行暫停操作,作業(yè)將回到此狀態(tài)。 KILLED:終止?fàn)顟B(tài),作業(yè)終態(tài)之一。通過用戶執(zhí)行終止操作,作業(yè)將回到此狀態(tài)。 FAILED:失敗狀態(tài),作業(yè)終態(tài)之一。作業(yè)異常會(huì)轉(zhuǎn)為此狀態(tài)。
作業(yè) HA 管理
解決了上述問題之后,另一個(gè)待討論的話題便是「作業(yè) HA 管理」。我們需要回答用戶以下的兩個(gè)問題:
作業(yè)是有狀態(tài)的,但是作業(yè)需要代碼升級(jí),如何處理? 作業(yè)異常失敗了,怎么做到從失敗的時(shí)間點(diǎn)恢復(fù)?
Flink 提供了兩種機(jī)制用于恢復(fù)作業(yè):「Checkpoint」和「Savepoint」,本文統(tǒng)稱為保存點(diǎn)。「Savepoint」可以看作是一種特殊的「Checkpoint」,只不過不像「Checkpoint」定期的從系統(tǒng)中生成,它是用戶通過命令觸發(fā)的,用戶可以控制保存點(diǎn)產(chǎn)生的時(shí)間點(diǎn)。任務(wù)啟動(dòng)時(shí),通過指定「Checkpoint」或「Savepoint」外部路徑,就可以達(dá)到從保存點(diǎn)恢復(fù)的效果。我們對(duì)于平臺(tái)作業(yè) HA 的管理也是基于這兩者展開的。下圖為管理的流程圖:

用戶有兩種方式來(lái)手動(dòng)停止一個(gè)作業(yè):暫停和終止。暫停操作通過調(diào)用 flink cancel api 實(shí)現(xiàn),將觸發(fā)作業(yè)生成「Savepoint」。終止操作則是通過調(diào)用 yarn kill application api 實(shí)現(xiàn),用于快速結(jié)束一個(gè)任務(wù)。被暫停的作業(yè)重啟時(shí),系統(tǒng)將比較「Savepoint」和「Checkpoint」的生成時(shí)間點(diǎn),按照最近的一個(gè)保存點(diǎn)啟動(dòng),而當(dāng)作業(yè)被重新提交時(shí),由于用戶可能變更了代碼邏輯,將直接由用戶決定是否按照保存點(diǎn)恢復(fù)。對(duì)于被終止的作業(yè),無(wú)論是重啟或者是重新提交,都直接采取由用戶決定的方式,因?yàn)榻K止操作本身就帶有丟棄作業(yè)狀態(tài)的色彩。
失敗狀態(tài)的作業(yè)是由于異常錯(cuò)誤被迫停止的。對(duì)于這類作業(yè),有三重保障。一是任務(wù)自身可以設(shè)置重啟策略自動(dòng)恢復(fù),外部平臺(tái)無(wú)感知。二是,對(duì)于內(nèi)部重啟依舊失敗的任務(wù)在平臺(tái)側(cè)可再次設(shè)置上層重啟策略。三是,手動(dòng)重啟或重新提交。僅在重新提交時(shí),由用戶決定按照那種方式啟動(dòng),其余場(chǎng)景皆按照最近的保存點(diǎn)啟動(dòng)。
任務(wù) SQL 化
Flink JAR 和 PyFlink 都是采用 Flink API 的形式開發(fā)作業(yè),這樣的形式必然極大地增加用戶的學(xué)習(xí)成本,影響開發(fā)的效率。需要不斷輸入和培養(yǎng)具有該領(lǐng)域開發(fā)技能的工程師,才能滿足源源不斷的業(yè)務(wù)需求。而產(chǎn)品定位不僅僅是面向數(shù)據(jù)中臺(tái)的開發(fā)工程師們,我們期望可以和離線目標(biāo)用戶保持一致,將目標(biāo)群體滲透至分析人員乃至業(yè)務(wù)研發(fā)和部分的產(chǎn)品經(jīng)理,簡(jiǎn)單的需求完全可以自己動(dòng)手實(shí)現(xiàn)。要達(dá)到這個(gè)目的,必然開發(fā)的形式也要向離線看齊,作業(yè) SQL 化是勢(shì)在必行的。
我們期望 Flink 可以提供一種類似于 Hive Cli 或者 Hive JDBC 的作業(yè)提交方式,用戶無(wú)需寫一行 Java 或 Scala 代碼。查閱官方文檔,F(xiàn)link 確實(shí)提供了一個(gè) SQL 客戶端以支持以一種簡(jiǎn)單的方式來(lái)編寫、調(diào)試和提交表程序到 Flink 集群,不過截止到目前最新的 release 1.13 版本,SQL 客戶端僅支持嵌入式模式,相關(guān)的功能還不夠健全,另外對(duì)于 connector 支持也是有限的。因此,需要尋求一種更穩(wěn)定、更高可擴(kuò)展性的實(shí)現(xiàn)方案。
經(jīng)過一番調(diào)研后,我們發(fā)現(xiàn)袋鼠云開源的「flinkStreamSQL」基本可以滿足我們目前的要求。此項(xiàng)目是基于開源的 Flink 打造的,并對(duì)其實(shí)時(shí) SQL 進(jìn)行了擴(kuò)展,支持原生 Flink SQL 所有的語(yǔ)法。
實(shí)現(xiàn)機(jī)制
下圖為 Flink 官方提供的作業(yè)角色流程圖,由圖可知,用戶提交的代碼將在 Client 端進(jìn)行加工、轉(zhuǎn)換(最終生成 Jobgraph )然后提交至遠(yuǎn)程集群。

那么要實(shí)現(xiàn)用戶層面的作業(yè) SQL 化,底層的實(shí)現(xiàn)同樣是繞不開這個(gè)流程。實(shí)際上「flinkStreamSQL」項(xiàng)目就是通過定制化的手段實(shí)現(xiàn)了 Client 端的邏輯,可以將整個(gè)過程簡(jiǎn)要地描述為:
構(gòu)建 PackagedProgram
利用 PackagedProgramUtils 生成 JobGraph。
通過 YarnClusterDescriptor 提交作業(yè)。
其中,第一步是最關(guān)鍵的,PackagedProgram 的構(gòu)造方法如下:
PackagedProgram.newBuilder()
.setJarFile(coreJarFile)
.setArguments(execArgs)
.setSavepointRestoreSettings(savepointRestoreSettings)
.build();
execArgs 為外部輸入?yún)?shù),這里就包含了用戶提交的 SQL。而 coreJarFile 對(duì)應(yīng)的就是 API 開發(fā)方式時(shí)用戶提交的 JAR 文件,只不過這里系統(tǒng)幫我們實(shí)現(xiàn)了。coreJarFile 的代碼對(duì)應(yīng)項(xiàng)目中的 core module,該 module 本質(zhì)上就是 API 開發(fā)方式的一個(gè) template 模板。module 內(nèi)實(shí)現(xiàn)了自定義 SQL 解析以及各類 connector plugin 注入。更多細(xì)節(jié)可通過開源項(xiàng)目進(jìn)一步了解。
定制開發(fā)
我們基于「flinkStreamSQL」進(jìn)行了二次開發(fā),以滿足內(nèi)部更多樣化的需求。主要分為以下幾點(diǎn):
服務(wù)化:整個(gè) SQL 化模塊作為 proxy 獨(dú)立部署和管理,以 HTTP 形式暴露服務(wù)。 支持語(yǔ)法校驗(yàn)特性。 支持調(diào)試特性:通過解析 SQL 結(jié)構(gòu)可直接獲取到 source 表和 sink 表的結(jié)構(gòu)信息。平臺(tái)可通過人工構(gòu)造或線上抓取源表數(shù)據(jù)的方式得到測(cè)試數(shù)據(jù)集,sink 算子被 localTest connector 算子直接替換,以截取結(jié)果數(shù)據(jù)輸出。 支持更多的 connector plugin,如 pulsar connector。 其他特性
除了上文提到的一些功能特性,平臺(tái)還支持了:
DDL 語(yǔ)句注入 UDF 管理 租戶管理 版本管理 作業(yè)監(jiān)控 日志收集
這些點(diǎn)就不在本文詳細(xì)闡述,但作為一個(gè)實(shí)時(shí)計(jì)算平臺(tái)這些點(diǎn)又是必不可少的。
線上效果
作業(yè)總覽

作業(yè)詳情

作業(yè)監(jiān)控

未來(lái)工作
隨著業(yè)務(wù)的繼續(xù)推進(jìn),平臺(tái)將在以下幾方面繼續(xù)迭代優(yōu)化:
穩(wěn)定性建設(shè):實(shí)時(shí)任務(wù)的穩(wěn)定性建設(shè)必然是未來(lái)工作中的首要事項(xiàng)。作業(yè)參數(shù)如何設(shè)置,作業(yè)如何自動(dòng)調(diào)優(yōu),作業(yè)在流量高峰如何保持穩(wěn)定的性能,這些問題需要不斷探索并沉淀更多的最佳實(shí)踐。 提升開發(fā)效率:SQL 化建設(shè)。盡管 SQL 化已初具雛形,但開發(fā)起來(lái)依舊具備一定的學(xué)習(xí)成本,其中最明顯的就是 DDL 的構(gòu)建,用戶對(duì)于 source、sink 的 schema 并不清楚,最好的方式是平臺(tái)可以和我們的元數(shù)據(jù)中心打通將構(gòu)建 DDL 的過程自動(dòng)化,這一點(diǎn)也是我們目前正在做的。 優(yōu)化使用體驗(yàn):體驗(yàn)上的問題在一定程度上也直接影響到了開發(fā)的效率。通過不斷收集用戶反饋,持續(xù)改進(jìn)。 探索更多業(yè)務(wù)場(chǎng)景:目前伴魚內(nèi)部已開始基于 Flink 開展 AI 、實(shí)時(shí)數(shù)倉(cāng)等場(chǎng)景的建設(shè)。未來(lái)我們將繼續(xù)推進(jìn) Flink 在更多場(chǎng)景上的實(shí)踐。
