<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          基于 Flink 打造的伴魚實時計算平臺 Palink 的設(shè)計與實現(xiàn)

          共 6746字,需瀏覽 14分鐘

           ·

          2021-06-15 21:24

          在伴魚發(fā)展早期,出現(xiàn)了一系列實時性相關(guān)的需求,比如算法工程師期望可以拿到用戶的實時特征數(shù)據(jù)做實時推薦,產(chǎn)品經(jīng)理希望數(shù)據(jù)方可以提供實時指標(biāo)看板做實時運營分析。


          這個階段中臺數(shù)據(jù)開發(fā)工程師主要是基于 Spark 實時計算引擎開發(fā)作業(yè)來滿足業(yè)務(wù)方提出的需求。然而這類作業(yè)并沒有統(tǒng)一的平臺進(jìn)行管理,任務(wù)的開發(fā)形式、提交方式、可用性保障等也完全因人而異。

          伴隨著業(yè)務(wù)的加速發(fā)展,越來越多的實時場景涌現(xiàn)出來,對實時作業(yè)的開發(fā)效率和質(zhì)量保障提出了更高的要求。為此,我們從去年開始著手打造伴魚公司級的實時計算平臺,平臺代號 Palink,由 Palfish + Flink 組合而來。

          之所以選擇 Flink 作為平臺唯一的實時計算引擎,是因為近些年來其在實時領(lǐng)域的優(yōu)秀表現(xiàn)和主導(dǎo)地位,同時活躍的社區(qū)氛圍也提供了非常多不錯的實踐經(jīng)驗可供借鑒。目前 Palink 項目已經(jīng)落地并投入使用,很好地滿足了伴魚業(yè)務(wù)在實時場景的需求。

          一、核心原則



          通過調(diào)研阿里云、網(wǎng)易等各大廠商提供的實時計算服務(wù),我們基本確定了 Palink 的整個產(chǎn)品形態(tài)。同時,在系統(tǒng)設(shè)計過程中緊緊圍繞以下幾個核心原則:


          • 極簡性:保持簡易設(shè)計,快速落地,不過度追求功能的完整性,滿足核心需求為主;


          • 高質(zhì)量:保持項目質(zhì)量嚴(yán)要求,核心模塊思慮周全;


          • 可擴(kuò)展:保持較高的可擴(kuò)展性,便于后續(xù)方案的迭代升級。



          二、系統(tǒng)設(shè)計



          平臺整體架構(gòu)



          以下是平臺整體的架構(gòu)示意圖:




          整個平臺由四部分組成:


          • Web UI:前端操作頁面;


          • Palink (GO) 服務(wù):實時作業(yè)管理服務(wù),負(fù)責(zé)作業(yè)元信息及作業(yè)生命周期內(nèi)全部狀態(tài)的管理,承接全部的前端流量。包括作業(yè)調(diào)度、作業(yè)提交、作業(yè)狀態(tài)同步及作業(yè) HA 管理幾個核心模塊;


          • PalinkProxy(JAVA) 服務(wù):SQL 化服務(wù),F(xiàn)link SQL 作業(yè)將由此模塊編譯、提交至遠(yuǎn)端集群。包括 SQL 語法校驗、SQL 作業(yè)調(diào)試及 SQL 作業(yè)編譯和提交幾個核心模塊;


          • Flink On Yarn:基于 Hadoop Yarn 做集群的資源管理。



          這里之所以將后臺服務(wù)拆分成兩塊,并且分別使用 GO 和 JAVA 語言實現(xiàn),原因主要有三個方面:


          • 一是伴魚擁有一套非常完善的基于 GO 語言實現(xiàn)的微服務(wù)基礎(chǔ)框架,基于它可以快速構(gòu)建服務(wù)并擁有包括服務(wù)監(jiān)控在內(nèi)的一系列周邊配套,公司目前 95% 以上的服務(wù)是基于此服務(wù)框架構(gòu)建的;


          • 二是 SQL 化模塊是基于開源項目二次開發(fā)實現(xiàn)的(這個在后文會做詳細(xì)介紹),而該開源項目使用的是 JAVA 語言;


          • 三是內(nèi)部服務(wù)增加一次遠(yuǎn)程調(diào)用的成本是可以接受的。



          這里也體現(xiàn)了我們極簡性原則中對快速落地的要求。事實上,以 GO 為核心開發(fā)語言是非常具有 Palfish 特色的,在接下來伴魚大數(shù)據(jù)系列的相關(guān)文章中也會有所體現(xiàn)。
          接下來本文將著重介紹 Palink 幾個核心模塊的設(shè)計。


          作業(yè)調(diào)度&執(zhí)行



          后端服務(wù)接收到前端創(chuàng)建作業(yè)的請求后,將生成一條 PalinkJob 記錄和一條 PalinkJobCommand 記錄并持久化到 DB,PalinkJobCommand 為作業(yè)提交執(zhí)行階段抽象出的一個實體,整個作業(yè)調(diào)度過程將圍繞該實體的狀態(tài)變更向前推進(jìn)。其結(jié)構(gòu)如下:


          type PalinkJobCommand struct { ID            uint64 `json:"id"`                        PalinkJobID   uint64 `json:"palink_job_id"`   CommandParams string `json:"command_params"`  CommandState  int8   `json:"command_state"`   Log           string `json:"log"`                       CreatedAt     int64  `json:"created_at"`         UpdatedAt     int64  `json:"updated_at"`      }


          這里并沒有直接基于 PalinkJob 實體來串聯(lián)整個調(diào)度過程,是因為作業(yè)的狀態(tài)同步會直接作用于這個實體,如果調(diào)度過程也基于該實體,兩部分的邏輯就緊耦合了。


          調(diào)度流程



          下圖為作業(yè)調(diào)度的流程圖:




          palink pod 異步執(zhí)行競爭分布式鎖操作,保證同一時刻有且僅有一個實例獲取周期性監(jiān)測權(quán)限,滿足條件的 Command 將直接被發(fā)送到 Kafka 待執(zhí)行隊列,同時變更其狀態(tài),保證之后不再被調(diào)度。此外,所有的 palink pod 將充當(dāng)待執(zhí)行隊列消費者的角色,并歸屬于同一個消費者組,消費到消息的實例將獲取到最終的執(zhí)行權(quán)。


          執(zhí)行流程



          作業(yè)的執(zhí)行實則是作業(yè)提交的過程,根據(jù)作業(yè)類型的不同提交工作流有所區(qū)別,可細(xì)分為三類:



          • Flink JAR 作業(yè):我們摒棄了用戶直接上傳 JAR 文件的交互方式。用戶只需提供作業(yè) gitlab 倉庫地址即可,打包構(gòu)建全流程平臺直接完成。由于每一個服務(wù)實例都內(nèi)嵌 Flink 客戶端,任務(wù)是直接通過 Flink run 方式提交的。


          • PyFlink 作業(yè):與 Flink JAR 方式類似,少了編譯的過程,提交命令也有所不同。


          • Flink SQL 作業(yè):與上兩種方式區(qū)別較大。對于 Flink SQL 作業(yè)而言,用戶只需提交相對簡單的 SQL 文本信息,這個內(nèi)容我們是直接維護(hù)在平臺的元信息中,故沒有和 gitlab 倉庫交互的地方。SQL 文本將進(jìn)一步提交給 PalinkProxy 服務(wù)進(jìn)行后續(xù)的編譯,然后使用 Yarn Client 方式提交。


          Command 狀態(tài)機(jī)



          PalinkJobCommand 的狀態(tài)流轉(zhuǎn)如下圖所示:



          • UNDO:初始狀態(tài),將被調(diào)度實例監(jiān)測。


          • DOING:執(zhí)行中狀態(tài),同樣會調(diào)度實例監(jiān)測,防止長期處于進(jìn)行中的臟狀態(tài)產(chǎn)生。


          • SUCCESSED:執(zhí)行成功狀態(tài)。隨著用戶的后續(xù)行為,如重新提交、重新啟動操作,狀態(tài)會再次回到 UNDO 態(tài)。


          • FAILED:執(zhí)行失敗狀態(tài)。同上,狀態(tài)可能會再次回到 UNDO 態(tài)。


          作業(yè)狀態(tài)同步



          作業(yè)成功提交至集群后,由于集群狀態(tài)的不確定性或者其他的一些因素最終導(dǎo)致任務(wù)異常終止了,平臺該如何及時感知到?這就涉及到我們即將要闡述的另一個話題 “狀態(tài)同步“。


          狀態(tài)同步流程



          這里首先要回答的一個問題是:同步誰的狀態(tài)?

          有過離線或者 Flink on yarn 開發(fā)經(jīng)驗的同學(xué)一定知道,作業(yè)在部署到 yarn 上之后會有一個 application 與之對應(yīng),每一個 application 都有其對應(yīng)的狀態(tài)和操作動作,比如我們可以執(zhí)行 Yarn UI 上 Kill Application 操作來殺掉整個任務(wù)。

          同樣的,當(dāng)我們翻閱 Flink 官方文檔或者進(jìn)入 Flink UI 頁面也都可以看到每一個任務(wù)都有其對應(yīng)的狀態(tài)和一系列操作行為。最直接的想法肯定是以 Flink 任務(wù)狀態(tài)為準(zhǔn),畢竟這是我們最想拿到的。

          但仔細(xì)分析,其實二者的狀態(tài)對于平臺而言沒有太大區(qū)別,只是狀態(tài)的粒度有所不同而已,yarn application 的狀態(tài)已經(jīng)是對 Flink 狀態(tài)做了一次 state mapping。可是考慮到,F(xiàn)link 在 HA 的時候,作業(yè)對外暴露的 URL 會發(fā)生變更,這種情況下只能通過獲取作業(yè)對應(yīng)的 application 信息才能拿到最新的地址。

          與此同時,一次狀態(tài)同步的過程不僅僅只是希望拿到最新的狀態(tài),對于任務(wù)的 checkpoint 等相關(guān)信息同樣是有同步的訴求。看來二者的信息在一次同步的過程中都需要獲取,最終的狀態(tài)同步設(shè)計如下:




          前置流程和作業(yè)調(diào)度流程類似,有且僅有一個實例負(fù)責(zé)周期性監(jiān)測工作,符合條件的 Job ID(注,并非所有的作業(yè)都用同步的必要,比如一些處于終態(tài)的作業(yè))將發(fā)送到內(nèi)部延遲隊列。之所以采用延遲隊列而非 Kafka 隊列,主要是為了將同一時間點批量同步的需求在一定時間間隔內(nèi)隨機(jī)打散,降低同步的壓力。最后,在獲取到作業(yè)的完整信息后,再做一次 state mapping 將狀態(tài)映射為平臺抽象的狀態(tài)類型。

          由于狀態(tài)同步是周期性進(jìn)行的,存在一定的延遲。因此在平臺獲取作業(yè)詳情時,也會同步觸發(fā)一次狀態(tài)同步,保證獲取最新數(shù)據(jù)。


          Job 狀態(tài)機(jī)



          PalinkJob 的狀態(tài)流轉(zhuǎn)如下圖所示:



          • DEPLOYING:作業(yè)初始狀態(tài),將隨著 PalinkJobCommand 的狀態(tài)驅(qū)動向 DEPLOY_SUCCESSED 和 DEPLOY_FAILED 流轉(zhuǎn)。

          • DEPLOY_SUCCESSED:部署成功狀態(tài),依賴作業(yè)「狀態(tài)同步」驅(qū)動向 RUNNING 狀態(tài)或者其他終態(tài)流轉(zhuǎn)。

          • DEPLOY_FAILED:部署失敗狀態(tài),依賴用戶重新提交向 DEPLOYING 狀態(tài)流轉(zhuǎn)。

          • RUNNING:運行中狀態(tài)。可通過用戶執(zhí)行暫停操作向 FINISHED 狀態(tài)流轉(zhuǎn),或執(zhí)行終止操作向 KILLED 狀態(tài)流轉(zhuǎn),或因為內(nèi)部異常向 FAILED 狀態(tài)流轉(zhuǎn)。

          • FINISHED:完成狀態(tài),作業(yè)終態(tài)之一。通過用戶執(zhí)行暫停操作,作業(yè)將回到此狀態(tài)。

          • KILLED:終止?fàn)顟B(tài),作業(yè)終態(tài)之一。通過用戶執(zhí)行終止操作,作業(yè)將回到此狀態(tài)。

          • FAILED:失敗狀態(tài),作業(yè)終態(tài)之一。作業(yè)異常會轉(zhuǎn)為此狀態(tài)。

          作業(yè) HA 管理



          解決了上述問題之后,另一個待討論的話題便是 “作業(yè) HA 管理”。我們需要回答用戶以下的兩個問題:


          • 作業(yè)是有狀態(tài)的,但是作業(yè)需要代碼升級,如何處理?

          • 作業(yè)異常失敗了,怎么做到從失敗的時間點恢復(fù)?


          Flink 提供了兩種機(jī)制用于恢復(fù)作業(yè):Checkpoint 和 Savepoint,本文統(tǒng)稱為保存點。Savepoint 可以看作是一種特殊的 Checkpoint ,只不過不像 Checkpoint 定期的從系統(tǒng)中生成,它是用戶通過命令觸發(fā)的,用戶可以控制保存點產(chǎn)生的時間點。

          任務(wù)啟動時,通過指定 Checkpoint 或 Savepoint 外部路徑,就可以達(dá)到從保存點恢復(fù)的效果。我們對于平臺作業(yè) HA 的管理也是基于這兩者展開的。下圖為管理的流程圖:




          用戶有兩種方式來手動停止一個作業(yè):暫停和終止。


          • 暫停操作通過調(diào)用 Flink cancel api 實現(xiàn),將觸發(fā)作業(yè)生成 Savepoint。

          • 終止操作則是通過調(diào)用 yarn kill application api 實現(xiàn),用于快速結(jié)束一個任務(wù)。


          被暫停的作業(yè)重啟時,系統(tǒng)將比較 Savepoint 和 Checkpoint 的生成時間點,按照最近的一個保存點啟動,而當(dāng)作業(yè)被重新提交時,由于用戶可能變更了代碼邏輯,將直接由用戶決定是否按照保存點恢復(fù)。對于被終止的作業(yè),無論是重啟或者是重新提交,都直接采取由用戶決定的方式,因為終止操作本身就帶有丟棄作業(yè)狀態(tài)的色彩。

          失敗狀態(tài)的作業(yè)是由于異常錯誤被迫停止的。對于這類作業(yè),有三重保障:


          • 一是任務(wù)自身可以設(shè)置重啟策略自動恢復(fù),外部平臺無感知;

          • 二是,對于內(nèi)部重啟依舊失敗的任務(wù)在平臺側(cè)可再次設(shè)置上層重啟策略;

          • 三是,手動重啟或重新提交。僅在重新提交時,由用戶決定按照那種方式啟動,其余場景皆按照最近的保存點啟動。

          任務(wù) SQL 化



          Flink JAR 和 PyFlink 都是采用 Flink API 的形式開發(fā)作業(yè),這樣的形式必然極大地增加用戶的學(xué)習(xí)成本,影響開發(fā)的效率。需要不斷輸入和培養(yǎng)具有該領(lǐng)域開發(fā)技能的工程師,才能滿足源源不斷的業(yè)務(wù)需求。

          而產(chǎn)品定位不僅僅是面向數(shù)據(jù)中臺的開發(fā)工程師們,我們期望可以和離線目標(biāo)用戶保持一致,將目標(biāo)群體滲透至分析人員乃至業(yè)務(wù)研發(fā)和部分的產(chǎn)品經(jīng)理,簡單的需求完全可以自己動手實現(xiàn)。要達(dá)到這個目的,必然開發(fā)的形式也要向離線看齊,作業(yè) SQL 化是勢在必行的。

          我們期望 Flink 可以提供一種類似于 Hive Cli 或者 Hive JDBC 的作業(yè)提交方式,用戶無需寫一行 Java 或 Scala 代碼。查閱官方文檔,F(xiàn)link 確實提供了一個 SQL 客戶端以支持以一種簡單的方式來編寫、調(diào)試和提交表程序到 Flink 集群,不過截止到目前最新的 release 1.13 版本,SQL 客戶端僅支持嵌入式模式,相關(guān)的功能還不夠健全,另外對于 connector 支持也是有限的。因此,需要尋求一種更穩(wěn)定、更高可擴(kuò)展性的實現(xiàn)方案。

          經(jīng)過一番調(diào)研后,我們發(fā)現(xiàn)袋鼠云開源的「FlinkStreamSQL」基本可以滿足我們目前的要求。此項目是基于開源的 Flink 打造的,并對其實時 SQL 進(jìn)行了擴(kuò)展,支持原生 Flink SQL 所有的語法。


          實現(xiàn)機(jī)制



          下圖為 Flink 官方提供的作業(yè)角色流程圖,由圖可知,用戶提交的代碼將在 Client 端進(jìn)行加工、轉(zhuǎn)換(最終生成 Jobgraph )然后提交至遠(yuǎn)程集群。




          那么要實現(xiàn)用戶層面的作業(yè) SQL 化,底層的實現(xiàn)同樣是繞不開這個流程。實際上 FlinkStreamSQL 項目就是通過定制化的手段實現(xiàn)了 Client 端的邏輯,可以將整個過程簡要地描述為:


          構(gòu)建 PackagedProgram



          利用 PackagedProgramUtils 生成 JobGraph。

          通過 YarnClusterDescriptor 提交作業(yè)。

          其中,第一步是最關(guān)鍵的,PackagedProgram 的構(gòu)造方法如下:


          PackagedProgram.newBuilder()                .setJarFile(coreJarFile)                .setArguments(execArgs)                .setSavepointRestoreSettings(savepointRestoreSettings)                .build();


          execArgs 為外部輸入?yún)?shù),這里就包含了用戶提交的 SQL。而 coreJarFile 對應(yīng)的就是 API 開發(fā)方式時用戶提交的 JAR 文件,只不過這里系統(tǒng)幫我們實現(xiàn)了。coreJarFile 的代碼對應(yīng)項目中的 core module,該 module 本質(zhì)上就是 API 開發(fā)方式的一個 template 模板。module 內(nèi)實現(xiàn)了自定義 SQL 解析以及各類 connector plugin 注入。更多細(xì)節(jié)可通過開源項目進(jìn)一步了解。


          定制開發(fā)



          我們基于 FlinkStreamSQL 進(jìn)行了二次開發(fā),以滿足內(nèi)部更多樣化的需求。主要分為以下幾點:


          • 服務(wù)化:整個 SQL 化模塊作為 proxy 獨立部署和管理,以 HTTP 形式暴露服務(wù);

          • 支持語法校驗特性;

          • 支持調(diào)試特性:通過解析 SQL 結(jié)構(gòu)可直接獲取到 source 表和 sink 表的結(jié)構(gòu)信息。平臺可通過人工構(gòu)造或線上抓取源表數(shù)據(jù)的方式得到測試數(shù)據(jù)集,sink 算子被 localTest connector 算子直接替換,以截取結(jié)果數(shù)據(jù)輸出;

          • 支持更多的 connector plugin,如 pulsar connector;

          • 其他特性。


          除了上文提到的一些功能特性,平臺還支持了:


          • DDL 語句注入

          • UDF 管理

          • 租戶管理

          • 版本管理

          • 作業(yè)監(jiān)控

          • 日志收集


          這些點就不在本文詳細(xì)闡述,但作為一個實時計算平臺這些點又是必不可少的。

          三、線上效果



          ■ 作業(yè)總覽




          ■ 作業(yè)詳情




          ■ 作業(yè)監(jiān)控




          四、未來工作



          隨著業(yè)務(wù)的繼續(xù)推進(jìn),平臺將在以下幾方面繼續(xù)迭代優(yōu)化:


          • 穩(wěn)定性建設(shè):實時任務(wù)的穩(wěn)定性建設(shè)必然是未來工作中的首要事項。作業(yè)參數(shù)如何設(shè)置,作業(yè)如何自動調(diào)優(yōu),作業(yè)在流量高峰如何保持穩(wěn)定的性能,這些問題需要不斷探索并沉淀更多的最佳實踐;




          • 提升開發(fā)效率:SQL 化建設(shè)。盡管 SQL 化已初具雛形,但開發(fā)起來依舊具備一定的學(xué)習(xí)成本,其中最明顯的就是 DDL 的構(gòu)建,用戶對于 source、sink 的 schema 并不清楚,最好的方式是平臺可以和我們的元數(shù)據(jù)中心打通將構(gòu)建 DDL 的過程自動化,這一點也是我們目前正在做的;




          • 優(yōu)化使用體驗:體驗上的問題在一定程度上也直接影響到了開發(fā)的效率。通過不斷收集用戶反饋,持續(xù)改進(jìn);




          • 探索更多業(yè)務(wù)場景:目前伴魚內(nèi)部已開始基于 Flink 開展 AI 、實時數(shù)倉等場景的建設(shè)。未來我們將繼續(xù)推進(jìn) Flink 在更多場景上的實踐。

          瀏覽 65
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久久久三级 | 99看片 | 人人操天天摸 | 天天撸一撸视频 | 亚洲日韩欧美一级片 |