Apache DolphinScheduler 2.X保姆級源碼解析

Hi,我是王知無,一個大數(shù)據(jù)領(lǐng)域的原創(chuàng)作者。? 放心關(guān)注我,獲取更多行業(yè)的一手消息。
在大數(shù)據(jù)領(lǐng)域,越來越多的企業(yè)擁抱開源軟件,在這個背景下,我們針對數(shù)據(jù)調(diào)度工具如何正確選型?
中國移動云能力中心軟件開發(fā)工程師徐海輝表示:古語云“工欲善其事必先利其器”,如果你正處于觀望/不知如何下手/即將參與開源項目的小伙伴,我建議可以先從一個優(yōu)秀的開源社區(qū)源代碼的入手,我在中國移動目前主要負責(zé)數(shù)據(jù)服務(wù),這次在 Apache DolphinScheduler 4月 Meetup 上為大家?guī)?strong style="box-sizing: border-box;">DolphinScheduler源碼2.X解析,希望你有所收獲。
“
本次演講主要包含四個部分:
開篇與源碼環(huán)境準備
服務(wù)啟動流程
任務(wù)執(zhí)行流程
個人思考與總結(jié)
Apache DolphinScheduler
徐海輝
中國移動云能力中心?軟件開發(fā)工程師。從事大數(shù)據(jù)基礎(chǔ)平臺開發(fā),主要負責(zé)中國移動Hadoop大數(shù)據(jù)平臺組件Ranger 、移動云Lake House 產(chǎn)品的研發(fā)
關(guān)鍵詞:Apache DolphinScheduler源碼2.X解析、源碼環(huán)境準備、服務(wù)啟動、任務(wù)執(zhí)行
P.S.
Apache DolphinScheduler 源碼下載鏈接:
https://dolphinscheduler.apache.org/zh-cn/download/download.html
01
開篇與源碼環(huán)境準備
Apache DolphinScheduler是一個基于java開發(fā)的開源分布式工作流調(diào)度系統(tǒng)。致力于可視化操作任務(wù)及工作流之間的依賴關(guān)系,并可視化整個數(shù)據(jù)流過程;解決數(shù)據(jù)處理流程中錯綜復(fù)雜的依賴關(guān)系。
01
入手DolphinScheduler需要考慮的問題
開源分布式的工作原理?
工具可視化是體現(xiàn)在哪里?
Master和Worker之間如何通信?
怎么體現(xiàn)工作流之間的依賴關(guān)系?
“Show me the code”
其實在這個過程中需要考慮的問題有很多,我們不妨從官方給的架構(gòu)圖先簡單梳理一下,然后通過實際的使用,再去研究源碼,一套組合拳下來就差不多了。
02
DolphinScheduler架構(gòu)圖
話不多說,開擼!
4個由源碼啟動的服務(wù)是:UI、MasterServer、WorkServer、AlertServer(還有LoggerServer圖里面沒有體現(xiàn))

MasterServer & WorkServer依賴于Zookeeper協(xié)調(diào)服務(wù)中心注冊
MasterServer & WorkServer 1-N組成集群,分別是單獨的服務(wù)和進程
執(zhí)行任務(wù)支持的插件:Flink、Shell、Subflow、SQL、Procedure、Python、MR、Spark、Dependent等等
03
預(yù)先必備知識
Netty
一個異步的、基于事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用框架,用于快速開發(fā)可維護、高性能的網(wǎng)絡(luò)服務(wù)器和客戶端
Zookeeper
一個分布式的,開放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),它是一個為分布式應(yīng)用提供一致性服務(wù)的軟件,提供的功能包括:配置維護、域名服務(wù)、分布式同步、組服務(wù)等。
分布式鎖
為了解決單機部署情況下的并發(fā)控制鎖策略失效這個問題,需要一種跨機器的互斥機制來控制共享資源的訪問
Quartz
定時自動執(zhí)行任務(wù)
多線程
很多小伙伴不明白Master和Worker之間是怎么通信的,DolphinScheduler是基于Netty框架來實現(xiàn)的。這里可以拓展一下,服務(wù)器之間的通信還有很多方式:
HTTP
HTTP:http其實是一種網(wǎng)絡(luò)傳輸協(xié)議,基于TCP,規(guī)定了數(shù)據(jù)傳輸?shù)母袷健?/p>
REST API
REST API通信是通過JSON格式的字符串進行數(shù)據(jù)傳輸?shù)模址强梢栽诰W(wǎng)絡(luò)中穿透防火墻的。也就是說,REST API可以穿透防火墻。同時字符串也可以不用受開發(fā)語言的限制,可以同時完成后端與WEB,后端與APP(Android,IOS)之間的通信。
RPC
RPC通信又稱遠程過程調(diào)用,在內(nèi)網(wǎng)中速度非常快,效率高。
如下圖是準備相關(guān)環(huán)境的部署,并且我還提供了一個遠程Debug的方式,這個不難理解,比如你想調(diào)試API,在啟動過程中,它肯定會調(diào)用腳本,你只需要在腳本里面添加 Debug啟動的一些參數(shù),就可以進行一個遠程的Debug。

像框框里面的數(shù)字實際上是服務(wù)的端口號,如果跟本地服務(wù)/其他組件端口之間有沖突的話,對它可以進行一個修改。
02
服務(wù)啟動流程
01
Master啟動流程
MasterServer采用分布式無中心設(shè)計理念,基于Netty提供監(jiān)聽服務(wù)。
MasterServer服務(wù)啟動時向Zookeeper注冊臨時節(jié)點,通過監(jiān)聽Zookeeper臨時節(jié)點變化來進行容錯處理。
MasterServer主要負責(zé) DAG 任務(wù)切分、任務(wù)提交監(jiān)控,并同時監(jiān)聽其它MasterServer和WorkerServer的健康狀態(tài)。
啟動步驟
啟動Netty 服務(wù)端服務(wù)
注冊到Zookeeper
啟動Event處理器
啟動scheduler定時任務(wù)
啟動StateWheel處理器
↓↓↓源碼一覽↓↓↓

對應(yīng)路徑里面存放了MasterServer.java的類,里面有對應(yīng)的main方法

執(zhí)行完構(gòu)造函數(shù)后,會啟動run方法及其各個組件

這里會提前構(gòu)造參數(shù)

調(diào)用start方法啟動

注冊元數(shù)據(jù)信息到Zookeeper上面,值得一提的是這里采用的是臨時路徑,比如說在過程中服務(wù)斷開了或者session過期,臨時路徑過一段時間會自己去Delete掉。

啟動調(diào)度服務(wù)

啟動Quartz框架,啟動完之后相當(dāng)于是一個獨立的服務(wù).
小結(jié):
第一步是注冊這個Netty
第二步是向Zookeeper注冊數(shù)據(jù)信息。
第三步是啟動相關(guān)組件和調(diào)度任務(wù)
第四步是啟動Quartz框架
02
Worker啟動流程
WorkerServer采用分布式無中心設(shè)計理念,WorkerServer基于Netty提供監(jiān)聽服務(wù)。
WorkerServer主要負責(zé)任務(wù)的執(zhí)行和提供日志服務(wù)。WorkerServer服務(wù)啟動時向Zookeeper注冊臨時節(jié)點,并維持心跳。
拓展:防止出現(xiàn)斷開、關(guān)閉等突發(fā)狀況,這里會選擇注冊臨時節(jié)點做出Delete動作。
啟動步驟
啟動Netty 服務(wù)端服務(wù)
注冊到Zookeeper
維護workserver節(jié)點狀態(tài)
啟動TaskExecuteThread
啟動RetryReportTaskStatusThread
↓↓↓源碼一覽↓↓↓

找到WorkServer.java中的main方法,進程代碼也位于DolphinScheduler-Server模塊下面

啟動Netty服務(wù),過程中也會提供一些端口,以便和其他進程交互

向Zookeeper注冊信息,其中WorkerregistryClient調(diào)用了Zookeeper的代碼

主要是處理和創(chuàng)建一些節(jié)點

啟動組件,WorkerManagerThread主要是管理Master發(fā)過來的任務(wù)

啟動組件,RetryReportTaskStatus主要是給Master反饋信息
03
任務(wù)執(zhí)行流程
首先我們可以從UI界面具體分析一些工作流是如何運行的?

DolphinScheduler-api :? ExecutorController.java → startProcessInstance() → createCommand() →寫入表 t_ds_command
然后通過代碼的跟蹤去看一下

位于DolphinScheduler-API模塊下的controller里面

Start-process-instance會調(diào)用ExecProcessInstance方法

通過一系列的執(zhí)行,最終會解析一些參數(shù)

解析完之后會生成一條記錄(Create Command)
接下來MasterServer會不定時掃描表里面的記錄,然后拉取出來,最后構(gòu)建流程的實例
定時任務(wù)輪詢獲取
Dolphinscheduler-server: MasterSchedulerService.java → scheduleProcess() → findOneCommand() → 創(chuàng)建ProcessInstance
構(gòu)建DAG并分發(fā)到worker
WorkflowExecuteThread.java → startProcess() → buildFlowDag() → submitPostNode() → submitStandByTask() → submitTaskExec()
TaskProcessor.java → dispatchTask() → 放入TaskPriorityQueue
TaskPriorityQueueConsumer.java: → dispatch()
ExecutorDispatcher.java → dispatch()
NettyExecutorManager.java → execute() →build command → doExecute() → ettyRemotingClient.send()

Master啟動完之后會不停拉取命令,然后解析完之后分發(fā)給Worker

可以看一下它的run方法

Scheduleprocess方法去找一條命令→Findonecommand,找到之后對它進行解析,創(chuàng)建工作流實例

交給WorkflowExecuteThread線程執(zhí)行

WorkflowExecuteThread run方法啟動之后提交一個節(jié)點

放到優(yōu)先級隊列里面,構(gòu)造處理的一些參數(shù),通過Netty分發(fā)
Worker接收任務(wù)并執(zhí)行
NettyClientHandler.java → channelRead() → processReceived() → TaskExecuteProcessor.java → process() → workerExecuteQueue.offer()
WorkerManagerThread.java → workerExecuteQueue.take()?
TaskExecuteThread.java → build taskRequest → taskChannel.create(taskRequest) →AbstractTask init() → handle()

通過taskExcutethread執(zhí)行

通過代碼跟蹤,我們發(fā)現(xiàn)是調(diào)用隊列里面的offer方法

通過channelRead方法接收Master發(fā)過來的信息,然后放在隊列里面

通過while循環(huán)不停的處理隊列里的任務(wù)

任務(wù)執(zhí)行的參數(shù)都放在taskExcutiontext里面

TaskChannel主要識別腳本任務(wù)類型(Flink、Shell、Python等腳本)

任務(wù)執(zhí)行完之后,通過ResponseCommand收集完信息反饋給Master
04
思考與總結(jié)
01
優(yōu)化返回方式
我們看源碼的過程當(dāng)中發(fā)現(xiàn),比如說在DolphinSchedulerr-api跟前端交互接口的時候,請求參數(shù)返回的時候,它沒有用bean來進行處理,什么意思呢?就是說它參數(shù)比較混亂,比如說自己的一些惡應(yīng)用要接入 DolphinScheduler,或者說后續(xù)社區(qū)要提供一些SDK,有bean的方式進行參數(shù)傳遞是比較好的。

02
表格式統(tǒng)一
還有一個在調(diào)研的過程當(dāng)中,這個表的命名還有優(yōu)化空間,表的命名格式有一些不同,比如說一些關(guān)系表,它后面加了 relation,然后有一些是加在前面的。

03
命名不友好

在看源碼的過程當(dāng)中,發(fā)現(xiàn)它的一些命名是不太友好的,如上圖
05
擁抱開源
你如果從事軟件開發(fā),你有可能會想花時間去打造一下屬于你自己的IP,比如說你負責(zé)具體某個組件,成為核心開發(fā)者。
01
我們?yōu)槭裁匆W(xué)習(xí)開源?
1、提升技術(shù)功底:學(xué)習(xí)源碼里的優(yōu)秀設(shè)計思想,比如疑難問題的解決思路,一些優(yōu)秀的設(shè)計模式,整體提升自己的技術(shù)功底,比如說在DS里面,用了大量的線程去處理一些Event,然后使用了去中心化,還有使用隊列,優(yōu)先級隊列等等
2、深度掌握技術(shù)框架:源碼看多了,對于一個新技術(shù)或框架的掌握速度會有大幅提升,比如說你要去學(xué)習(xí) DS或者對它進行一個二次開發(fā),你肯定要去了解一下其他知識,比如說Netty,還有Zookeeper
3、快速定位線上問題:遇到線上問題,特別是框架源碼里的問題(比如bug),能夠快速定位
4、擁抱開源社區(qū):參與到開源項目的研發(fā),結(jié)識更多大牛,積累更多優(yōu)質(zhì)人脈看源碼
02
在開源社區(qū)的學(xué)習(xí)方法
先使用:先看官方文檔快速掌握框架的基本使用
抓主線:找一個demo入手,順藤摸瓜快速看一遍框架的主線源碼,畫出源碼主流程圖,切勿一開始就陷入源碼的細枝末節(jié),否則會把自己繞暈,憑經(jīng)驗猜
畫圖做筆記:總結(jié)框架的一些核心功能點,從這些功能點入手深入到源碼的細節(jié),邊看源碼邊畫源碼走向圖,并對關(guān)鍵源碼的理解做筆記,把源碼里的閃光點都記錄下來,后續(xù)借鑒到工作項目中,理解能力強的可以直接看靜態(tài)源碼,也可以邊看源碼邊debug源碼執(zhí)行過程,觀察一些關(guān)鍵變量的值
整合總結(jié):所有功能點的源碼都分析完后,回到主流程圖再梳理一遍,爭取把自己畫的所有圖都在腦袋里做一個整合。
如果這個文章對你有幫助,不要忘記?「在看」?「點贊」?「收藏」?三連啊喂!


