<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          JStorm企業(yè)級(jí)流式計(jì)算引擎

          聯(lián)合創(chuàng)作 · 2023-10-01 06:39

          JStorm 是參考 Apache Storm 實(shí)現(xiàn)的實(shí)時(shí)流式計(jì)算框架,在網(wǎng)絡(luò)IO、線程模型、資源調(diào)度、可用性及穩(wěn)定性上做了持續(xù)改進(jìn),已被越來(lái)越多企業(yè)使用。JStorm 可以看作是 storm 的java增強(qiáng)版本,除了內(nèi)核用純java實(shí)現(xiàn)外,還包括了thrift、python、facet ui。從架構(gòu)上看,其本質(zhì)是一個(gè)基于zk的分布式調(diào)度系統(tǒng)

          JStorm 的性能是Apache Storm 的4倍, 可以自由切換行模式或 mini-batch 模式:

          JStorm Performance



           

          Jstorm主要應(yīng)用場(chǎng)景有:

          1. 信息流處理,如聚合、分析等

          2. 持續(xù)計(jì)算,如實(shí)時(shí)數(shù)據(jù)統(tǒng)計(jì)、監(jiān)控

          3. 分布式rpc調(diào)用

          Jstorm在內(nèi)核上對(duì)storm的改進(jìn)有:

          1. 模型簡(jiǎn)化

          2. 多維度資源調(diào)度

          3. 網(wǎng)絡(luò)通信層改造

          4. 采樣重構(gòu)

          5. worker/task內(nèi)部異步化處理

          6. classload、HA

          模型簡(jiǎn)化將storm的三層管理模型簡(jiǎn)化為兩層


           jstorm中task直接對(duì)應(yīng)了線程概念,而在storm中是task只是線程executor的一個(gè)執(zhí)行邏輯單元

          多維度資源調(diào)度 分為cpu、memory、net、disk四個(gè)維度,默認(rèn)情況下:

          cpu slots = 機(jī)器核數(shù) * 2 -1

          memory slots = 機(jī)器物理內(nèi)存 / 1024M

          net slots = min(cpu slots, memory slots)

          網(wǎng)絡(luò)通信層 采用了netty + disruptor 替換 zmq + blockingQueue

          采樣重構(gòu) 

          1. 定義了滾動(dòng)時(shí)間窗口

          2. 優(yōu)化緩存map性能

          3. 增量采樣時(shí)間以及減少無(wú)謂數(shù)據(jù)

          Worker/Task內(nèi)部異步化

          異步化和回調(diào)是流式框架最基本的兩大特征,Jstorm在task的計(jì)算中將nextTuple和ack/fail的邏輯分離開(kāi)來(lái),并在worker中采用單獨(dú)線程負(fù)責(zé)流入、流出數(shù)據(jù)的反序列化及序列化工作

          有關(guān)jstorm實(shí)現(xiàn)的幾個(gè)關(guān)鍵流程,有興趣的可以參考源碼

          1.Nimbus的啟動(dòng)


           2.supervisor的啟動(dòng)


          3. worker內(nèi)部結(jié)構(gòu)


          worker的啟動(dòng)需要完成以下幾件事:

          1. 讀取配置文件,啟動(dòng)進(jìn)程

          2. 初始化tuple接收隊(duì)列和發(fā)送隊(duì)列

          3. 打開(kāi)端口,啟動(dòng)rpc服務(wù)

          4. 創(chuàng)建context結(jié)構(gòu),<component, <stream, output_field>>

          5. 觸發(fā)各種timer,refresh/reconnection/heartbeat...

          task的工作包括:

          1. 創(chuàng)建內(nèi)部隊(duì)列,bind connection

          2. 反射component拿到taskObj,創(chuàng)建具體的spout/bolt executor

          3. 反序列化tuple數(shù)據(jù),執(zhí)行處理邏輯

          4. 做stats,heartbeat等

          jstorm在數(shù)據(jù)的完整性和準(zhǔn)確性上分別依賴了acker和事務(wù)機(jī)制

          acker本質(zhì)是獨(dú)立的bolt,input是fieldGrouping,output是directGrouping;

          每個(gè)bolt有兩個(gè)output stream(ACKER_ACK_STREAM_ID/ACKER_ACK_FAIL_STREAM_ID)

          每個(gè)spout有一個(gè)output stream(ACKER_INIT_STREAM_ID),以及兩個(gè)input stream(ACKER_ACK_STREAM_ID/ACKER_ACK_FAIL_STREAM_ID)
              Spout
                  發(fā)送給acker 的value <rootid, xor(target_task_list)>
                  發(fā)送下一級(jí)bolt 的value <rootid, 目標(biāo)taskid>
              Bolt
                   下一級(jí)bolt需要ack發(fā)送給下一級(jí)bolt 為<rootid, 新uuid)>發(fā)送給acker的value為<rootid, xor(新uuid, $(接收值))>
                   下一級(jí)bolt不需要ack發(fā)送給下一級(jí)bolt 為空發(fā)送給acker為<rootid, $(接收值)>

          事務(wù):批處理+全局唯一遞增id+兩階段提交

          在發(fā)送tuple的時(shí)候帶上tid來(lái)保證“只有一次”的原語(yǔ),下游邏輯根據(jù)tid是否next tid來(lái)判斷是否需要處理。為了提高效率,會(huì)將多個(gè)tuple組裝成一批賦予一個(gè)tid,并用pipeline方式執(zhí)行processing和commit階段,其中processing可以并發(fā)執(zhí)行,而commit具有嚴(yán)格的強(qiáng)順序性。接口coordinator,commitor中做了狀態(tài)管理、事務(wù)協(xié)調(diào)、錯(cuò)誤檢查等工作

          另外一個(gè)用得最多的高級(jí)特性就是trident,它對(duì)bolt進(jìn)行了封裝,提供了如joins、aggregations、grouping、filters、function等多種高級(jí)數(shù)據(jù)處理能力

          最后,談?wù)動(dòng)嘘P(guān)jstorm的運(yùn)維開(kāi)發(fā)

          (1)配置優(yōu)先級(jí):代碼 > jstorm.yaml > default.yaml

          (2)stream流對(duì)比:

                a.fieldsGrouping

                b.globalGrouping - target componet的第一個(gè)task

                c.shuffleGrouping - 自定義random,更平均

                d.noneGrouping - 調(diào)用random

                e.allGrouping - target component所有task

                f.directGrouping - 指定目標(biāo)task

                g.customGrouping - 接口customStreamGrouping

          (3)jvm調(diào)優(yōu),優(yōu)先考慮新生代,開(kāi)啟碎片整理

          (4)同一worker內(nèi)的task,開(kāi)啟定向調(diào)度避免網(wǎng)絡(luò)開(kāi)銷

          (5)優(yōu)雅關(guān)閉,reblance或kill前先deactive,等待msg_timeout進(jìn)行數(shù)據(jù)清理

          (6)其它,hooks、queue-size、topology.max.spout.pending等

          瀏覽 24
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          編輯 分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          編輯 分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产免费久久 | 一级特黄特色的免费大片 | 手机黄色在线 | 国产精品久久综合 | 黄色一级看片 |