CSharpFlink物聯(lián)網(wǎng)/工業(yè)互聯(lián)網(wǎng)實(shí)時計算引擎
1 項目背景
我們有一個全國性質(zhì)的面向工業(yè)的公有云平臺,通過專線或4G的鏈路方式實(shí)時向平臺傳輸數(shù)據(jù),每天處理1億條左右的數(shù)據(jù)量,為現(xiàn)場用戶提供實(shí)時的在線服務(wù)和離線數(shù)據(jù)分析服務(wù)?,F(xiàn)在已經(jīng)上線穩(wěn)定運(yùn)行有將近3年的時間。同時也為工業(yè)企業(yè)提供私有云建設(shè)服務(wù)。
我們計劃使用Flink作為云平臺后臺的實(shí)時計算部分,基本實(shí)現(xiàn)數(shù)據(jù)點(diǎn)的聚合計算、表達(dá)式規(guī)則計算等業(yè)務(wù),進(jìn)一步實(shí)現(xiàn)機(jī)器學(xué)習(xí)或自定義復(fù)雜算法的需求。
我們經(jīng)過將近一年左右時間的研究及開發(fā),已經(jīng)基本實(shí)現(xiàn)了聚合和邏輯等業(yè)務(wù),但是感覺Flink比較重,并且應(yīng)用和運(yùn)維的水平要求比較高。
基于上述情況,我們自主使用NET 5.0開發(fā)一套CSharpFlink實(shí)時計算組件,支持自定義數(shù)據(jù)源、計算和存儲的基本要求。
2 應(yīng)用場景
主要面向物聯(lián)網(wǎng)、工業(yè)互聯(lián)網(wǎng)私有云或公有云平臺建設(shè)過程中的數(shù)據(jù)點(diǎn)實(shí)時聚合和表達(dá)式計算。應(yīng)用場景包括:
- 數(shù)據(jù)點(diǎn)的實(shí)時時間窗口范圍內(nèi)聚合計算,例如:最大值、最小值、平均值、和值、眾數(shù)、方差、中位數(shù)等,可以自定義二次開發(fā)。
- 數(shù)據(jù)點(diǎn)的歷史延遲窗口的一段時間范圍內(nèi)數(shù)據(jù)補(bǔ)充或更新的重新計算。
- 數(shù)據(jù)點(diǎn)的表達(dá)式計算,支持自定義C#腳本進(jìn)行編輯,實(shí)時預(yù)警或數(shù)據(jù)深度加工處理。
- 主從結(jié)構(gòu)的分布式部署,主節(jié)點(diǎn)負(fù)責(zé)計算任務(wù)分發(fā),工作節(jié)點(diǎn)負(fù)責(zé)任務(wù)計算及結(jié)果存儲。
3 框架特點(diǎn)
主要特點(diǎn)主要是根據(jù)我們多年的物聯(lián)網(wǎng)、工業(yè)項目經(jīng)驗的提煉和總結(jié),滿足實(shí)現(xiàn)應(yīng)用場景,特點(diǎn)包括:
- 使用最新的NET 5.0進(jìn)行開發(fā),完全跨平臺。
- 實(shí)時數(shù)據(jù)窗口范圍外的數(shù)據(jù)補(bǔ)發(fā)或更新的重新計算,例如:當(dāng)前5秒的實(shí)時數(shù)據(jù)窗口,支持5秒以前的數(shù)據(jù)補(bǔ)充和更新,并且進(jìn)行重新計算及更新到數(shù)據(jù)存儲單元。
- 實(shí)時數(shù)據(jù)表達(dá)式計算支持定時計算或數(shù)據(jù)值改事件變觸發(fā)計算,滿足實(shí)時表達(dá)式或周期性計算。
- C#語言的二次開發(fā),對接多種數(shù)據(jù)源,自定義算子和多種方式數(shù)據(jù)存儲等。
- 單節(jié)點(diǎn)或分布式部署。
4 框架結(jié)構(gòu)
框架結(jié)構(gòu)組件的基本示意,如下圖:
5 代碼目錄說明
使用VS2019進(jìn)行工程開發(fā),工程解決方案文件為:CSharpFlink.sln,代碼目錄說明如下:
- Cache:主節(jié)點(diǎn)和工作節(jié)點(diǎn)計算任務(wù)本地緩存管理。
- Calculate:計算任務(wù)輸入、過程、輸出操作及管理。
- Channel:主節(jié)點(diǎn)和工作節(jié)點(diǎn)分布式部署模式的IO通訊操作。
- Common:操作公眾類庫。
- Config:全局配置文件操作。
- Execution:全局工程的執(zhí)行環(huán)境入口。
- Expression:表達(dá)式計算任務(wù)操作。
- Log:日志操作及管理。
- Model:數(shù)據(jù)點(diǎn)元數(shù)據(jù)信息。
- Node:主節(jié)點(diǎn)和工作節(jié)點(diǎn)管理。
- Protocol:主節(jié)點(diǎn)和工作節(jié)點(diǎn)之間分布式部署之間交互的協(xié)議。
- Sink:計算任務(wù)計算結(jié)果存儲接口。
- Source:對接多種數(shù)據(jù)源接口,例如:mqtt、kafka、rabbitmq、數(shù)據(jù)庫等。
- Task:窗口或表達(dá)任務(wù)接口,主節(jié)點(diǎn)和工作節(jié)點(diǎn)任務(wù)操作及管理。
- Window:數(shù)據(jù)窗口任務(wù)操作。
- Worker:工作節(jié)點(diǎn)接口。
6 配置文件說明
配置文件默認(rèn)為:cfg\global.cfg,可以自定義指定配置文件,參見:命令行操作說明。配置文件說明,如下:
- MaxDegreeOfParallelism:任務(wù)并行度,主節(jié)點(diǎn)生成任務(wù)、工作節(jié)點(diǎn)處理任務(wù)依賴這個參數(shù)。
- MasterListenPort:主節(jié)點(diǎn)偵聽端口,用于工作節(jié)點(diǎn)主動連接。
- MasterIp:主節(jié)點(diǎn)IP,用于工作節(jié)點(diǎn)主動連接。
- NodeType:節(jié)點(diǎn)運(yùn)行模式,包括:Master、Slave和Both。
- RemoteInvokeInterval:遠(yuǎn)程調(diào)用工作節(jié)點(diǎn)間隔時間,單位:毫秒。
- RepeatRemoteInvokeInterval:調(diào)用工作節(jié)點(diǎn)失敗后,重新調(diào)用工作節(jié)點(diǎn)間隔時間,單位:毫秒。
- SlaveExcuteCalculateInterval:工作節(jié)點(diǎn)執(zhí)行計算任務(wù)間隔時間,單位:毫秒。
- MaxFrameLength:主節(jié)點(diǎn)和工作節(jié)點(diǎn)之間傳輸數(shù)據(jù)最大數(shù)據(jù)偵,單位:字節(jié)。
- WorkerPower:工作節(jié)點(diǎn)能力系數(shù),大于1,會連續(xù)發(fā)送多個任務(wù)。
7 任務(wù)部署說明
二次開發(fā)參見:二次開發(fā)說明。開發(fā)好的任務(wù),測試通過后,把程序集(.dll)復(fù)制到“tasks”目錄下,例如工程TestTask項目測試、編譯通過后,可以部署到“tasks”目錄下,運(yùn)行“CSharpFlink”主程序會自動加載和調(diào)用。 可以自定義指定任務(wù)程序集,參見:命令行操作說明。
8 命令行操作說明
命令行運(yùn)行“CSharpFlink”程序,支持自定義指定配置文件或任務(wù)程序集,說明如下: -h 顯示命令行幫助。 -c 加載指定配置文件。 例如:CSharpFlink -c c:/my.cfg -t 加載任務(wù)程序集。 例如:CSharpFlink -t c:/mytask.dll 例如:
dotnet CSharpFlink.dll -c c:/master.cfg -t c:/mytask.dll
9 部署說明
“release”目錄下是編譯好的程序,把“CSharpFlink v1.0”分別復(fù)制到不同的路徑下,分別修改“cfg\global.cfg”配置文件中“NodeType”參數(shù)為:Master和Slave,修改主節(jié)點(diǎn)程序“tasks\tasks.cfg”文件中的任務(wù)數(shù),分別運(yùn)行不同目錄下的“dotnet CSharpFlink.dll”。 “TestTask.dll”源代碼,參見:二次開發(fā)說明。
10 二次開發(fā)說明
二次開發(fā)主要針對數(shù)據(jù)源、計算過程和數(shù)據(jù)計算結(jié)果存儲,大致過程如下:
- 數(shù)據(jù)源對接,可以自定義對接mqtt、kafka、rabbitmq、數(shù)據(jù)庫等,需要繼承SourceFunction接口,參見:RandomSourceFunction.cs類。
- 數(shù)據(jù)計算過程,可以自定義數(shù)據(jù)處理或加工,需要繼承Calculate.Calculate接口,參見:聚合計算Avg.cs、表達(dá)式計算ExpressionCalculate.cs。通過AddWindowTask或AddExpressionTask函數(shù)參數(shù)進(jìn)行實(shí)例化。
- 數(shù)據(jù)計算結(jié)果存儲,可以自定義存儲任何介質(zhì)上,需要繼承SinkFunction接口,參見:SinkFunction.cs類。
11 應(yīng)用事例展示
CPU:4核 I5-7400 2.7GHz,內(nèi)存:16G,隨機(jī)數(shù)據(jù)點(diǎn)時間窗口和計算算子,主節(jié)點(diǎn)CPU和內(nèi)存使用情況:15%-35%、1500MB-2048MB, 工作節(jié)點(diǎn)CPU和內(nèi)存使用情況:0.1%-2.5%、18MB-30MB。
本機(jī)部署1個主節(jié)點(diǎn),10個計算節(jié)點(diǎn)。主節(jié)點(diǎn)生成10萬個數(shù)據(jù)點(diǎn)的計算任務(wù),每個數(shù)據(jù)點(diǎn)1秒鐘產(chǎn)生一個新數(shù)據(jù),計算時間窗口的最大值、最小值、平均值或和值。 如下圖:
