<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          CSharpFlink物聯(lián)網(wǎng)/工業(yè)互聯(lián)網(wǎng)實(shí)時計算引擎

          聯(lián)合創(chuàng)作 · 2023-09-26 12:04

          1 項目背景

                我們有一個全國性質(zhì)的面向工業(yè)的公有云平臺,通過專線或4G的鏈路方式實(shí)時向平臺傳輸數(shù)據(jù),每天處理1億條左右的數(shù)據(jù)量,為現(xiàn)場用戶提供實(shí)時的在線服務(wù)和離線數(shù)據(jù)分析服務(wù)?,F(xiàn)在已經(jīng)上線穩(wěn)定運(yùn)行有將近3年的時間。同時也為工業(yè)企業(yè)提供私有云建設(shè)服務(wù)。
                我們計劃使用Flink作為云平臺后臺的實(shí)時計算部分,基本實(shí)現(xiàn)數(shù)據(jù)點(diǎn)的聚合計算、表達(dá)式規(guī)則計算等業(yè)務(wù),進(jìn)一步實(shí)現(xiàn)機(jī)器學(xué)習(xí)或自定義復(fù)雜算法的需求。
                我們經(jīng)過將近一年左右時間的研究及開發(fā),已經(jīng)基本實(shí)現(xiàn)了聚合和邏輯等業(yè)務(wù),但是感覺Flink比較重,并且應(yīng)用和運(yùn)維的水平要求比較高。
                基于上述情況,我們自主使用NET 5.0開發(fā)一套CSharpFlink實(shí)時計算組件,支持自定義數(shù)據(jù)源、計算和存儲的基本要求。

          2 應(yīng)用場景

                主要面向物聯(lián)網(wǎng)、工業(yè)互聯(lián)網(wǎng)私有云或公有云平臺建設(shè)過程中的數(shù)據(jù)點(diǎn)實(shí)時聚合和表達(dá)式計算。應(yīng)用場景包括:

          • 數(shù)據(jù)點(diǎn)的實(shí)時時間窗口范圍內(nèi)聚合計算,例如:最大值、最小值、平均值、和值、眾數(shù)、方差、中位數(shù)等,可以自定義二次開發(fā)。
          • 數(shù)據(jù)點(diǎn)的歷史延遲窗口的一段時間范圍內(nèi)數(shù)據(jù)補(bǔ)充或更新的重新計算。
          • 數(shù)據(jù)點(diǎn)的表達(dá)式計算,支持自定義C#腳本進(jìn)行編輯,實(shí)時預(yù)警或數(shù)據(jù)深度加工處理。
          • 主從結(jié)構(gòu)的分布式部署,主節(jié)點(diǎn)負(fù)責(zé)計算任務(wù)分發(fā),工作節(jié)點(diǎn)負(fù)責(zé)任務(wù)計算及結(jié)果存儲。

          3 框架特點(diǎn)

                主要特點(diǎn)主要是根據(jù)我們多年的物聯(lián)網(wǎng)、工業(yè)項目經(jīng)驗的提煉和總結(jié),滿足實(shí)現(xiàn)應(yīng)用場景,特點(diǎn)包括:

          • 使用最新的NET 5.0進(jìn)行開發(fā),完全跨平臺。
          • 實(shí)時數(shù)據(jù)窗口范圍外的數(shù)據(jù)補(bǔ)發(fā)或更新的重新計算,例如:當(dāng)前5秒的實(shí)時數(shù)據(jù)窗口,支持5秒以前的數(shù)據(jù)補(bǔ)充和更新,并且進(jìn)行重新計算及更新到數(shù)據(jù)存儲單元。
          • 實(shí)時數(shù)據(jù)表達(dá)式計算支持定時計算或數(shù)據(jù)值改事件變觸發(fā)計算,滿足實(shí)時表達(dá)式或周期性計算。
          • C#語言的二次開發(fā),對接多種數(shù)據(jù)源,自定義算子和多種方式數(shù)據(jù)存儲等。
          • 單節(jié)點(diǎn)或分布式部署。

          4 框架結(jié)構(gòu)

                框架結(jié)構(gòu)組件的基本示意,如下圖: CSharpFlink框架圖示意圖

          5 代碼目錄說明

                使用VS2019進(jìn)行工程開發(fā),工程解決方案文件為:CSharpFlink.sln,代碼目錄說明如下:

          • Cache:主節(jié)點(diǎn)和工作節(jié)點(diǎn)計算任務(wù)本地緩存管理。
          • Calculate:計算任務(wù)輸入、過程、輸出操作及管理。
          • Channel:主節(jié)點(diǎn)和工作節(jié)點(diǎn)分布式部署模式的IO通訊操作。
          • Common:操作公眾類庫。
          • Config:全局配置文件操作。
          • Execution:全局工程的執(zhí)行環(huán)境入口。
          • Expression:表達(dá)式計算任務(wù)操作。
          • Log:日志操作及管理。
          • Model:數(shù)據(jù)點(diǎn)元數(shù)據(jù)信息。
          • Node:主節(jié)點(diǎn)和工作節(jié)點(diǎn)管理。
          • Protocol:主節(jié)點(diǎn)和工作節(jié)點(diǎn)之間分布式部署之間交互的協(xié)議。
          • Sink:計算任務(wù)計算結(jié)果存儲接口。
          • Source:對接多種數(shù)據(jù)源接口,例如:mqtt、kafka、rabbitmq、數(shù)據(jù)庫等。
          • Task:窗口或表達(dá)任務(wù)接口,主節(jié)點(diǎn)和工作節(jié)點(diǎn)任務(wù)操作及管理。
          • Window:數(shù)據(jù)窗口任務(wù)操作。
          • Worker:工作節(jié)點(diǎn)接口。

          6 配置文件說明

                配置文件默認(rèn)為:cfg\global.cfg,可以自定義指定配置文件,參見:命令行操作說明。配置文件說明,如下:

          • MaxDegreeOfParallelism:任務(wù)并行度,主節(jié)點(diǎn)生成任務(wù)、工作節(jié)點(diǎn)處理任務(wù)依賴這個參數(shù)。
          • MasterListenPort:主節(jié)點(diǎn)偵聽端口,用于工作節(jié)點(diǎn)主動連接。
          • MasterIp:主節(jié)點(diǎn)IP,用于工作節(jié)點(diǎn)主動連接。
          • NodeType:節(jié)點(diǎn)運(yùn)行模式,包括:Master、Slave和Both。
          • RemoteInvokeInterval:遠(yuǎn)程調(diào)用工作節(jié)點(diǎn)間隔時間,單位:毫秒。
          • RepeatRemoteInvokeInterval:調(diào)用工作節(jié)點(diǎn)失敗后,重新調(diào)用工作節(jié)點(diǎn)間隔時間,單位:毫秒。
          • SlaveExcuteCalculateInterval:工作節(jié)點(diǎn)執(zhí)行計算任務(wù)間隔時間,單位:毫秒。
          • MaxFrameLength:主節(jié)點(diǎn)和工作節(jié)點(diǎn)之間傳輸數(shù)據(jù)最大數(shù)據(jù)偵,單位:字節(jié)。
          • WorkerPower:工作節(jié)點(diǎn)能力系數(shù),大于1,會連續(xù)發(fā)送多個任務(wù)。

          7 任務(wù)部署說明

                二次開發(fā)參見:二次開發(fā)說明。開發(fā)好的任務(wù),測試通過后,把程序集(.dll)復(fù)制到“tasks”目錄下,例如工程TestTask項目測試、編譯通過后,可以部署到“tasks”目錄下,運(yùn)行“CSharpFlink”主程序會自動加載和調(diào)用。       可以自定義指定任務(wù)程序集,參見:命令行操作說明。

          8 命令行操作說明

                命令行運(yùn)行“CSharpFlink”程序,支持自定義指定配置文件或任務(wù)程序集,說明如下:         -h 顯示命令行幫助。         -c 加載指定配置文件。 例如:CSharpFlink -c c:/my.cfg         -t 加載任務(wù)程序集。 例如:CSharpFlink -t c:/mytask.dll         例如:

          dotnet CSharpFlink.dll -c c:/master.cfg -t c:/mytask.dll

          9 部署說明

                “release”目錄下是編譯好的程序,把“CSharpFlink v1.0”分別復(fù)制到不同的路徑下,分別修改“cfg\global.cfg”配置文件中“NodeType”參數(shù)為:Master和Slave,修改主節(jié)點(diǎn)程序“tasks\tasks.cfg”文件中的任務(wù)數(shù),分別運(yùn)行不同目錄下的“dotnet CSharpFlink.dll”。       “TestTask.dll”源代碼,參見:二次開發(fā)說明。

          10 二次開發(fā)說明

                二次開發(fā)主要針對數(shù)據(jù)源、計算過程和數(shù)據(jù)計算結(jié)果存儲,大致過程如下:

          • 數(shù)據(jù)源對接,可以自定義對接mqtt、kafka、rabbitmq、數(shù)據(jù)庫等,需要繼承SourceFunction接口,參見:RandomSourceFunction.cs類。
          • 數(shù)據(jù)計算過程,可以自定義數(shù)據(jù)處理或加工,需要繼承Calculate.Calculate接口,參見:聚合計算Avg.cs、表達(dá)式計算ExpressionCalculate.cs。通過AddWindowTask或AddExpressionTask函數(shù)參數(shù)進(jìn)行實(shí)例化。
          • 數(shù)據(jù)計算結(jié)果存儲,可以自定義存儲任何介質(zhì)上,需要繼承SinkFunction接口,參見:SinkFunction.cs類。

          11 應(yīng)用事例展示

                CPU:4核 I5-7400 2.7GHz,內(nèi)存:16G,隨機(jī)數(shù)據(jù)點(diǎn)時間窗口和計算算子,主節(jié)點(diǎn)CPU和內(nèi)存使用情況:15%-35%、1500MB-2048MB, 工作節(jié)點(diǎn)CPU和內(nèi)存使用情況:0.1%-2.5%、18MB-30MB。

                本機(jī)部署1個主節(jié)點(diǎn),10個計算節(jié)點(diǎn)。主節(jié)點(diǎn)生成10萬個數(shù)據(jù)點(diǎn)的計算任務(wù),每個數(shù)據(jù)點(diǎn)1秒鐘產(chǎn)生一個新數(shù)據(jù),計算時間窗口的最大值、最小值、平均值或和值。 如下圖:

          瀏覽 24
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          編輯 分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          編輯 分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲中文字幕在线播放视频 | www.激情五月天 | 五月激情偷拍 | 日韩免费高清 | 亚洲人xxxxc |