JStorm企業(yè)級(jí)流式計(jì)算引擎
JStorm 是參考 Apache Storm 實(shí)現(xiàn)的實(shí)時(shí)流式計(jì)算框架,在網(wǎng)絡(luò)IO、線程模型、資源調(diào)度、可用性及穩(wěn)定性上做了持續(xù)改進(jìn),已被越來(lái)越多企業(yè)使用。JStorm 可以看作是 storm 的java增強(qiáng)版本,除了內(nèi)核用純java實(shí)現(xiàn)外,還包括了thrift、python、facet ui。從架構(gòu)上看,其本質(zhì)是一個(gè)基于zk的分布式調(diào)度系統(tǒng)
JStorm 的性能是Apache Storm 的4倍, 可以自由切換行模式或 mini-batch 模式:
Jstorm主要應(yīng)用場(chǎng)景有:
-
信息流處理,如聚合、分析等
-
持續(xù)計(jì)算,如實(shí)時(shí)數(shù)據(jù)統(tǒng)計(jì)、監(jiān)控
-
分布式rpc調(diào)用
Jstorm在內(nèi)核上對(duì)storm的改進(jìn)有:
-
模型簡(jiǎn)化
-
多維度資源調(diào)度
-
網(wǎng)絡(luò)通信層改造
-
采樣重構(gòu)
-
worker/task內(nèi)部異步化處理
-
classload、HA
模型簡(jiǎn)化將storm的三層管理模型簡(jiǎn)化為兩層
jstorm中task直接對(duì)應(yīng)了線程概念,而在storm中是task只是線程executor的一個(gè)執(zhí)行邏輯單元
多維度資源調(diào)度 分為cpu、memory、net、disk四個(gè)維度,默認(rèn)情況下:
cpu slots = 機(jī)器核數(shù) * 2 -1
memory slots = 機(jī)器物理內(nèi)存 / 1024M
net slots = min(cpu slots, memory slots)
網(wǎng)絡(luò)通信層 采用了netty + disruptor 替換 zmq + blockingQueue
采樣重構(gòu)
-
定義了滾動(dòng)時(shí)間窗口
-
優(yōu)化緩存map性能
-
增量采樣時(shí)間以及減少無(wú)謂數(shù)據(jù)
Worker/Task內(nèi)部異步化
異步化和回調(diào)是流式框架最基本的兩大特征,Jstorm在task的計(jì)算中將nextTuple和ack/fail的邏輯分離開(kāi)來(lái),并在worker中采用單獨(dú)線程負(fù)責(zé)流入、流出數(shù)據(jù)的反序列化及序列化工作
有關(guān)jstorm實(shí)現(xiàn)的幾個(gè)關(guān)鍵流程,有興趣的可以參考源碼
1.Nimbus的啟動(dòng)
2.supervisor的啟動(dòng)
3. worker內(nèi)部結(jié)構(gòu)
worker的啟動(dòng)需要完成以下幾件事:
-
讀取配置文件,啟動(dòng)進(jìn)程
-
初始化tuple接收隊(duì)列和發(fā)送隊(duì)列
-
打開(kāi)端口,啟動(dòng)rpc服務(wù)
-
創(chuàng)建context結(jié)構(gòu),<component, <stream, output_field>>
-
觸發(fā)各種timer,refresh/reconnection/heartbeat...
task的工作包括:
-
創(chuàng)建內(nèi)部隊(duì)列,bind connection
-
反射component拿到taskObj,創(chuàng)建具體的spout/bolt executor
-
反序列化tuple數(shù)據(jù),執(zhí)行處理邏輯
-
做stats,heartbeat等
jstorm在數(shù)據(jù)的完整性和準(zhǔn)確性上分別依賴了acker和事務(wù)機(jī)制
acker本質(zhì)是獨(dú)立的bolt,input是fieldGrouping,output是directGrouping;
每個(gè)bolt有兩個(gè)output stream(ACKER_ACK_STREAM_ID/ACKER_ACK_FAIL_STREAM_ID)
每個(gè)spout有一個(gè)output stream(ACKER_INIT_STREAM_ID),以及兩個(gè)input stream(ACKER_ACK_STREAM_ID/ACKER_ACK_FAIL_STREAM_ID)
Spout
發(fā)送給acker 的value <rootid, xor(target_task_list)>
發(fā)送下一級(jí)bolt 的value <rootid, 目標(biāo)taskid>
Bolt
下一級(jí)bolt需要ack發(fā)送給下一級(jí)bolt 為<rootid, 新uuid)>發(fā)送給acker的value為<rootid, xor(新uuid, $(接收值))>
下一級(jí)bolt不需要ack發(fā)送給下一級(jí)bolt 為空發(fā)送給acker為<rootid, $(接收值)>
事務(wù):批處理+全局唯一遞增id+兩階段提交
在發(fā)送tuple的時(shí)候帶上tid來(lái)保證“只有一次”的原語(yǔ),下游邏輯根據(jù)tid是否next tid來(lái)判斷是否需要處理。為了提高效率,會(huì)將多個(gè)tuple組裝成一批賦予一個(gè)tid,并用pipeline方式執(zhí)行processing和commit階段,其中processing可以并發(fā)執(zhí)行,而commit具有嚴(yán)格的強(qiáng)順序性。接口coordinator,commitor中做了狀態(tài)管理、事務(wù)協(xié)調(diào)、錯(cuò)誤檢查等工作
另外一個(gè)用得最多的高級(jí)特性就是trident,它對(duì)bolt進(jìn)行了封裝,提供了如joins、aggregations、grouping、filters、function等多種高級(jí)數(shù)據(jù)處理能力
最后,談?wù)動(dòng)嘘P(guān)jstorm的運(yùn)維開(kāi)發(fā)
(1)配置優(yōu)先級(jí):代碼 > jstorm.yaml > default.yaml
(2)stream流對(duì)比:
a.fieldsGrouping
b.globalGrouping - target componet的第一個(gè)task
c.shuffleGrouping - 自定義random,更平均
d.noneGrouping - 調(diào)用random
e.allGrouping - target component所有task
f.directGrouping - 指定目標(biāo)task
g.customGrouping - 接口customStreamGrouping
(3)jvm調(diào)優(yōu),優(yōu)先考慮新生代,開(kāi)啟碎片整理
(4)同一worker內(nèi)的task,開(kāi)啟定向調(diào)度避免網(wǎng)絡(luò)開(kāi)銷
(5)優(yōu)雅關(guān)閉,reblance或kill前先deactive,等待msg_timeout進(jìn)行數(shù)據(jù)清理
(6)其它,hooks、queue-size、topology.max.spout.pending等
