Apache Flink OLAP引擎性能優(yōu)化及應用
導讀:本次分享的主題為Apache Flink新場景——OLAP引擎,主要內(nèi)容包括:
背景介紹
Apache Flink OLAP引擎
案例介紹
未來計劃
1. OLAP及其分類

OLAP是一種讓用戶可以用從不同視角方便快捷的分析數(shù)據(jù)的計算方法。主流的OLAP可以分為3類:多維OLAP ( Multi-dimensional OLAP )、關系型OLAP ( Relational OLAP ) 和混合OLAP ( Hybrid OLAP ) 三大類。
多維OLAP ( MOLAP ):
傳統(tǒng)的OLAP分析方式
數(shù)據(jù)存儲在多維數(shù)據(jù)集中
關系型OLAP ( ROLAP ):
以關系數(shù)據(jù)庫為核心,以關系型結(jié)構(gòu)進行多維數(shù)據(jù)的表示
通過SQL的where條件以呈現(xiàn)傳統(tǒng)OLAP的切片、切塊功能
混合OLAP ( HOLAP ):
將MOLAP和ROLPA的優(yōu)勢結(jié)合起來,以獲得更快的性能
接下來為大家詳細介紹下:
① MOLAP

典型代表:
MOLAP的典型代表是Kylin和Druid。
處理流程:
對原始數(shù)據(jù)做數(shù)據(jù)預處理
預處理后的數(shù)據(jù)存至數(shù)據(jù)倉庫
用戶的請求通過OLAP server查詢數(shù)據(jù)倉庫中的數(shù)據(jù)
MOLAP的優(yōu)點和缺點:
MOLAP的優(yōu)點和缺點都來自于其數(shù)據(jù)預處理 ( pre-processing ) 環(huán)節(jié)。數(shù)據(jù)預處理,將原始數(shù)據(jù)按照指定的計算規(guī)則預先做聚合計算,這樣避免了查詢過程中出現(xiàn)大量的臨時計算,提升了查詢性能,同時也為很多復雜的計算提供了支持。
但是這樣的預聚合處理,需要預先定義維度,會限制后期數(shù)據(jù)查詢的靈活性;如果查詢工作涉及新的指標,需要重新增加預處理流程,損失了靈活度,存儲成本也很高;同時,這種方式不支持明細數(shù)據(jù)的查詢。
因此,MOLAP適用于對性能非常高的場景。
② ROLAP

典型代表:
ROLAP的典型代表是Presto和Impala。
處理流程:
用戶的請求直接發(fā)送給OLAP server
OLAP serve將用戶的請求轉(zhuǎn)換成關系型操作算子:
1. 通過SCAN掃描原始數(shù)據(jù)
2. 在原始數(shù)據(jù)基礎上做過濾、聚合、關聯(lián)等處理
將計算結(jié)果返回給用戶
ROLAP的優(yōu)點和缺點:
ROLAP不需要進行數(shù)據(jù)預處理 ( pre-processing ),因此查詢靈活,可擴展性好。這類引擎使用MPP架構(gòu) ( 與Hadoop相似的大型并行處理架構(gòu),可以通過擴大并發(fā)來增加計算資源 ),可以高效處理大量數(shù)據(jù)。但是當數(shù)據(jù)量較大或query較為復雜時,查詢性能也無法像MOLAP那樣穩(wěn)定。所有計算都是臨時發(fā)生 ( 沒有預處理 ),因此會耗費更多的計算資源。
因此,ROLAP適用于對查詢靈活性高的場景。
③ HOLAP
混合OLAP,是MOLAP和ROLAP的一種融合。當查詢聚合性數(shù)據(jù)的時候,使用MOLAP技術;當查詢明細數(shù)據(jù)時,使用ROLAP技術。在給定使用場景的前提下,以達到查詢性能的最優(yōu)化。
2. Apache Flink介紹
① 當前Apache Flink支持的應用場景

Apache Flink支持的3種典型應用場景:
01. 事件驅(qū)動的應用
反欺詐
基于規(guī)則的監(jiān)控報警
02. 流式Pipeline
數(shù)據(jù)ETL
實時搜索引擎的索引
03. 批處理&流處理分析
網(wǎng)絡質(zhì)量監(jiān)控
消費者實時數(shù)據(jù)分析
② Apache Flink 架構(gòu)

③ Apache Flink 優(yōu)勢

01. 統(tǒng)一框架 ( 不區(qū)分流處理和批處理 )
用戶API統(tǒng)一
執(zhí)行引擎統(tǒng)一
02. 多層次API
標準SQL APL
Table API
DataStream API ( 靈活,無schema限制 )
03. 高性能
支持內(nèi)存計算
支持代價模型優(yōu)化
支持代碼動態(tài)生成
04. 方便集成
支持豐富的Connectors
方便對接現(xiàn)有catalog
05. 靈活的Failover策略
在Pipeline下支持快速failover
類似MapReduce、Spark一樣支持shuffle數(shù)據(jù)落盤
06. 易部署維護
靈活部署方案
支持高可用

1. Apache Flink OLAP引擎
① 為什么Apache Flink 可以做ROLAP引擎?

Flink的核心和基礎是流計算,支持高性能、低延遲的大規(guī)模計算
Blink將批看作有限流,批處理是針對有限數(shù)據(jù)集的優(yōu)化,因此批處理引擎也是構(gòu)建在流引擎上 ( 已開源 )
OLAP是響應時間要求更短的批處理,因此OLAP可以看作是一種特殊的批。OLAP引擎也可以構(gòu)建在現(xiàn)有的批引擎上
注:Flink OLAP引擎目前不帶存儲,只是一個計算框架
② Apache Flink 做OLAP引擎的優(yōu)勢

統(tǒng)一引擎:流處理、批處理、OLAP統(tǒng)一使用Flink引擎
降低學習成本,僅需要學習一個引擎
提高開發(fā)效率,很多SQL是流批通用
提高維護效率,可以更集中維護好一個引擎
既有優(yōu)勢:利用Flink已有的很多特性,使OLAP使用場景更為廣泛
使用流處理的內(nèi)存計算、Pipeline
支持代碼動態(tài)生成
也可以支持批處理數(shù)據(jù)落盤能力
相互增強:OLAP能享有現(xiàn)有引擎的優(yōu)勢,同時也能增強引擎能力
無統(tǒng)計信息場景的優(yōu)化
開發(fā)更高效的算子
使Flink同時兼?zhèn)淞鳌⑴?、OLAP處理的能力,成為更通用的框架
2. 性能優(yōu)化
OLAP 對查詢時間非常敏感,當前很多組件的性能不滿足要求,因此我們對Flink做了很多相關優(yōu)化。
① 服務架構(gòu)的優(yōu)化
客戶端服務化:
下圖介紹了一條SQL怎么在客戶端一步一步變?yōu)镴obGraph,最終提交給JM:

在改動之前,每次接受一個query時會啟動一個新的JVM進程來進行作業(yè)的編譯。其中JVM的啟動、Class的加載、代碼的動態(tài)編譯 ( 如Optimizer模塊由于需要通過Janino動態(tài)編譯進行cost計算 ) 等操作都非常耗時 ( 需要約3~5s )。因此,我們將客戶端進行服務化,將整個Client做成Service,當接收到用戶的query時,無需重復各項加載工作,可將延時降低至100ms 左右。
自定義CollectionTableSink:

這部分優(yōu)化,源于OLAP的一個特性:OLAP會將最終計算結(jié)果發(fā)給客戶端,通過JobManager轉(zhuǎn)發(fā)給Client。假如某個query的結(jié)果數(shù)據(jù)量很大,會讓JobManager OOM ( OutOfMemory );如果同時執(zhí)行多個query,也會相互影響。因此,我們從新實現(xiàn)了一個CollectionTableSink,限制數(shù)據(jù)的條數(shù)和數(shù)據(jù)大小,避免出現(xiàn)OOM,保證多個Query同時運行時的穩(wěn)定性。
調(diào)度優(yōu)化:

在Batch模式下的調(diào)度存在以下問題:
使用Lazy_from_sources模式調(diào)度,會導致整體運行時間較長,也可能造成死鎖。
注:調(diào)度死鎖是指在資源有限的情況下,多個Job同時運行時,如果多個Job都只申請到了部分資源并沒有剩余資源可以申請,導致Job沒法繼續(xù)執(zhí)行,新的Job也沒法提交
RM ( Resource Manager ) 按OnDemand方式分配Slot需求,也會造成死鎖
RM以單線程同步模式向TM ( Transaction Manager ) 分配Slot請求,會造成等待時間更長。
針對上述問題,我們提出了以下幾點改動:
采用Eager調(diào)度模式 ( 確保所有的資源都申請到后才開始運行 )
使用FIFO ( 先進先出隊 ) 模式申請資源 ( 確保當前Job的資源分配結(jié)束后才開始下一個Job的資源分配 )
將單線程同步模式改為多線程異步模式,減少任務啟動時間和執(zhí)行時間
② 針對source的優(yōu)化
在ROLAP的執(zhí)行場景中,所有數(shù)據(jù)都是通過掃描原始數(shù)據(jù)表后進行處理;因此,基于Source的讀取性能非常關鍵,直接影響Job的執(zhí)行效率。
Project&Filter下堆:

像Parquet這類的列存文件格式,支持按需讀取相所需列,同時支持RowGroup級別的過濾。利用該特性,可以將Project和Filter下推到TableSource,從而只需要掃描Query中涉及的字段和滿足條件的RowGroup,大大提升讀取效率。
Aggregate下堆:

這個優(yōu)化也是充分利用了TableSource的特性:例如Parquet文件的metadata中已經(jīng)存儲了每個RowGroup的統(tǒng)計信息 ( 如 max、min等 ),因此在做max、min這類聚合統(tǒng)計時,可直接讀取metadata信息,而不需要先讀取所有原始數(shù)據(jù)再計算。
③ 在沒有統(tǒng)計信息場景下做的優(yōu)化
消除CrossJoin:

CrossJoin是沒有任何Join條件,將Join的兩張表的數(shù)據(jù)做笛卡爾積,導致Join的結(jié)果膨脹非常厲害,這類Join應該盡量避免。我們對含有CrossJoin的Plan進行改寫:將有join條件的表格先做join ( 通常會因為一些數(shù)據(jù)Join不上而減少數(shù)據(jù) ),從而提高執(zhí)行效率。這是一個確定性的改寫,即使在沒有統(tǒng)計信息的情況下,也可以使用該優(yōu)化。
自適應的Local Aggregate:

通常情況下,兩階段的Aggregate是非常高效的,因為LocalAggregate能聚合大量數(shù)據(jù),導致Shuffle的數(shù)據(jù)量會變少。但是當LocalAggregate的聚合度很低的時候, Local聚合操作的意義不大,反而會浪費CPU。在沒有任何統(tǒng)計信息的情況下,優(yōu)化器沒法決定是否要產(chǎn)生LocalAggregate算子;因此,我們采用運行時采樣的方式來判斷聚合度,如果聚合度低于設定的閾值,我們將關閉聚合操作,改為僅做數(shù)據(jù)轉(zhuǎn)發(fā);經(jīng)我們測試,部分場景有30% 的性能提升。
3. 測試結(jié)果

上圖是Flink和Presto基于1T數(shù)據(jù)做的SSB ( Star Schema Benchmark ) 測試,從圖中可以看出 Flink和Presto整體上不相上下,甚至有些Query Flink性能優(yōu)于Presto。注:Flink OLAP從開始到嘉賓分享時,只有3個月時間。

1. Apache Flink OLAP在數(shù)據(jù)探查上的應用

上圖描述了一個數(shù)據(jù)湖應用的完整架構(gòu),Flink OLAP主要用于"數(shù)據(jù)探查"。數(shù)據(jù)探查是對數(shù)據(jù)結(jié)構(gòu)做智能判斷,給出數(shù)據(jù)的探查結(jié)果,快速了解數(shù)據(jù)的信息和質(zhì)量情況。即用戶可以在管控平臺上了解數(shù)據(jù)湖中任意一份數(shù)據(jù)的數(shù)據(jù)特性。用戶通過Web交互操作選擇相應的表和指標后立即展示相關結(jié)果指標,因此要求低延遲、實時反饋。而且數(shù)據(jù)湖中很多數(shù)據(jù)沒有任何統(tǒng)計信息;前述的各種查詢、聚合層面的優(yōu)化,主要為這類場景服務。
2. 整體架構(gòu)

上圖是這類應用的整體架構(gòu)。整套服務托管到Kubernetes上,最終訪問的數(shù)據(jù)是OSS;目前這套架構(gòu)正在阿里云上做公測,邀請廣大用戶試用。


推回社區(qū):目前所有工作都是基于內(nèi)部Flink,希望推回社區(qū);
資源隔離:后期很多功能的開發(fā)和優(yōu)化會圍繞多Query運行時的"資源隔離";
優(yōu)化&性能:圍繞OLAP的特性,在此場景下會進一步做優(yōu)化和性能提升等方面的工作。
