Apache Calcite 框架 50 倍性能優(yōu)化實(shí)踐
某天臨時(shí)被當(dāng)成壯丁拉去參加一個(gè)非常牛逼的應(yīng)用監(jiān)控平臺(tái)(后續(xù)會(huì)開源),然后大佬就給我派了一個(gè)任務(wù),要將項(xiàng)目中的查詢性能優(yōu)化 50 倍以上,大佬對(duì)我如此地寄予厚望,我怎么能讓大佬失望呢(雖然我內(nèi)心瑟瑟發(fā)抖)?于是我就開始了這段性能優(yōu)化之旅。
初步認(rèn)識(shí) Calcite
項(xiàng)目使用 Calcite 框架作為查詢引擎,之前一直沒停過這玩意是啥,而且網(wǎng)上資料特別少,又是體現(xiàn)我學(xué)習(xí)能力的時(shí)候了,在著手排查性能問題前,我花了非常多時(shí)間在了解 Calcite 實(shí)現(xiàn)原理上面。
1、Calcite 簡介
Apache Calcite是一款開源的動(dòng)態(tài)數(shù)據(jù)管理框架,它提供了標(biāo)準(zhǔn)的 SQL 語言、多種查詢優(yōu)化和連接各種數(shù)據(jù)源的能力,但不包括數(shù)據(jù)存儲(chǔ)、處理數(shù)據(jù)的算法和存儲(chǔ)元數(shù)據(jù)的存儲(chǔ)庫。
Calcite 之前的名稱叫做optiq,optiq 起初在 Hive 項(xiàng)目中,為 Hive 提供基于成本模型的優(yōu)化,即CBO(Cost Based Optimizatio)。2014 年 5 月 optiq 獨(dú)立出來,成為 Apache 社區(qū)的孵化項(xiàng)目,2014 年 9 月正式更名為 Calcite。
Calcite 的目標(biāo)是“one size fits all(一種方案適應(yīng)所有需求場景)”,希望能為不同計(jì)算平臺(tái)和數(shù)據(jù)源提供統(tǒng)一的查詢引擎。
2、Calcite 執(zhí)行流程

1)解析 SQL,目的是為了將 SQL 轉(zhuǎn)換成 AST 抽象語法數(shù),Calcite 有一個(gè)專門的對(duì)象 SqlNode 表示;
2)語法檢查,用數(shù)據(jù)庫的元數(shù)據(jù)信息進(jìn)行語法驗(yàn)證;
3)邏輯優(yōu)化,根據(jù)前面生成的邏輯計(jì)劃按照相應(yīng)的規(guī)則(Rule)進(jìn)行優(yōu)化;
4)SQL 執(zhí)行,按照?qǐng)?zhí)行計(jì)劃執(zhí)行。
3、Calcite 相關(guān)對(duì)象
RelNode:
關(guān)系表達(dá)式, 主要有 TableScan, Project, Sort, Join 等。如果 SQL 為查詢的話,所有關(guān)系達(dá)式都可以在 SqlSelect中找到, 如 where 和 having 對(duì)應(yīng)的 Filter, selectList 對(duì)應(yīng) Project, orderBy、offset、fetch 對(duì)應(yīng)著 Sort, From 對(duì)應(yīng)著 TableScan/Join 等等, 示便 Sql 最后會(huì)生成如下 RelNode 樹。
Debug 源碼得到的 RelNode 對(duì)象長這樣:

RexNode:
行表達(dá)式, 如 RexLiteral(常量), RexCall(函數(shù)), RexInputRef (輸入引用) 等,舉個(gè)例子:
SELECT?LOCATION?as?LOCATION,MERGE2(VALUE2)?as?VALUE2?
FROM?transaction?
WHERE?REPORTTIME?>=1594887720000?AND?REPORTTIME?<=1594891320000?AND?APPID?=?'test-api'??AND?GROUP2?IN?('DubboService','URL')?AND?METRICKEY?IN?('$$TOTAL')?GROUP?BY?LOCATION
RexCall
<=($1,?1595496539000)
RexInputRef
$1
RexLiteral
1595496539000:BIGINT
下面根據(jù)官方資料的描述,總結(jié) Calcite 的三種查詢模式:
1)ScannableTable
這種方式基本不會(huì)用,原因是查詢數(shù)據(jù)庫的時(shí)候沒有任何條件限制,默認(rèn)會(huì)先把全部數(shù)據(jù)拉到內(nèi)存,然后再根據(jù)filter條件在內(nèi)存中過濾。
使用方式:實(shí)現(xiàn)
Enumerable scan(DataContext root);,該函數(shù)返回Enumerable對(duì)象,通過該對(duì)象可以一行行的獲取這個(gè)Table的全部數(shù)據(jù)。
2)FilterableTable
初級(jí)用法,我們能拿到filter條件,即能再查詢底層DB時(shí)進(jìn)行一部分的數(shù)據(jù)過濾,一般開始介入calcite可以用這種方式(translatable方式學(xué)習(xí)成本較高)。
使用方式:實(shí)現(xiàn)
Enumerable scan(DataContext root, List filters )。如果當(dāng)前類型的“表”能夠支持我們自己寫代碼優(yōu)化這個(gè)過濾器,那么執(zhí)行完自定義優(yōu)化器,可以把該過濾條件從集合中移除,否則,就讓calcite來過濾,簡言之就是,如果我們不處理
List filters,Calcite也會(huì)根據(jù)自己的規(guī)則在內(nèi)存中過濾,無非就是對(duì)于查詢引擎來說查的數(shù)據(jù)多了,但如果我們可以寫查詢引擎支持的過濾器(比如寫一些hbase、es的filter),這樣在查的時(shí)候引擎本身就能先過濾掉多余數(shù)據(jù),更加優(yōu)化。提示,即使走了我們的查詢過濾條件,可以再讓calcite幫我們過濾一次,比較靈活。
3)TranslatableTable
高階用法,有些查詢用上面的方式都支持不了或支持的不好,比如join、聚合、或?qū)τ趕elect的字段篩選等,需要用這種方式來支持,好處是可以支持更全的功能,代價(jià)是所有的解析都要自己寫,“承上啟下”,上面解析sql的各個(gè)部件,下面要根據(jù)不同的DB(esmysqldrudi..)來寫不同的語法查詢。
當(dāng)使用ScannableTable的時(shí)候,我們只需要實(shí)現(xiàn)函數(shù)
Enumerable scan(DataContext root);,該函數(shù)返回Enumerable對(duì)象,通過該對(duì)象可以一行行的獲取這個(gè)Table的全部數(shù)據(jù)(也就意味著每次的查詢都是掃描這個(gè)表的數(shù)據(jù),我們干涉不了任何執(zhí)行過程);當(dāng)使用FilterableTable的時(shí)候,我們需要實(shí)現(xiàn)函數(shù)Enumerable scan(DataContext root, List filters );參數(shù)中多了filters數(shù)組,這個(gè)數(shù)據(jù)包含了針對(duì)這個(gè)表的過濾條件,這樣我們根據(jù)過濾條件只返回過濾之后的行,減少上層進(jìn)行其它運(yùn)算的數(shù)據(jù)集;當(dāng)使用TranslatableTable的時(shí)候,我們需要實(shí)現(xiàn)RelNode toRel( RelOptTable.ToRelContext context, RelOptTable relOptTable);,該函數(shù)可以讓我們根據(jù)上下文自己定義表掃描的物理執(zhí)行計(jì)劃,至于為什么不在返回一個(gè)Enumerable對(duì)象了,因?yàn)樯厦鎯煞N其實(shí)使用的是默認(rèn)的執(zhí)行計(jì)劃,轉(zhuǎn)換成EnumerableTableAccessRel算子,通過TranslatableTable我們可以實(shí)現(xiàn)自定義的算子,以及執(zhí)行一些其他的rule,Kylin就是使用這個(gè)類型的Table實(shí)現(xiàn)查詢。
由于我對(duì) Calcite 還沒有一個(gè)更加深入的了解(但是并不阻礙我排查問題,而且 Calcite 這玩意對(duì)我來說太復(fù)雜了),因此 Calcite 更加復(fù)雜的概念我在這里就不繼續(xù)啰嗦了,比如關(guān)系代數(shù)的基本知識(shí)、查詢優(yōu)化器等等,排查問題并不需要了解那么深入,而且項(xiàng)目中只是使用了 Calcite 一小部分功能。
使用 Calcite 實(shí)現(xiàn)一個(gè)簡單的數(shù)據(jù)庫
需要做如下幾步:
編寫 model.json 自定義 SchemaFactory 自定義 Schema(像一個(gè)“沒有存儲(chǔ)層的databse”一樣,Calcite不會(huì)去了解任何文件格式) 自定義Table 自定義 Enumerator
demo url: https://github.com/objcoding/calcite-demo
耗時(shí)排查
我在項(xiàng)目中使用了 FilterableTable 模式,Cacite 在解析 Sql 時(shí)耗時(shí)非常大,然后通過調(diào)試,我發(fā)現(xiàn)每個(gè)請(qǐng)求都占據(jù)了兩個(gè)位置:
org.apache.calcite.adapter.enumerable.EnumerableInterpretable#getBindable

Cacite 在這個(gè)地方通過設(shè)置緩存大小來優(yōu)化緩存設(shè)置。
org.apache.calcite.interpreter.JaninoRexCompiler#baz

但是不會(huì)緩存該位置,并且每次都會(huì)使用新的表達(dá)式字符串通過反射創(chuàng)建它。
我使用 JProfile 工具對(duì)線程耗時(shí)的地方進(jìn)行定位:

Calcite 會(huì)在這個(gè)地方會(huì)調(diào)用反射根據(jù)不同的 Sql 動(dòng)態(tài)生成不同的表達(dá)式,Debug 獲取的表達(dá)式如下:

Calcite 為什么會(huì)有這種機(jī)制呢?我們先從 Bindable 對(duì)象講起:
在 EnumerableRel(RelNode,我們可以通過 TranslatableTable 自定義 FilterRel、JoinRel、AggregateRel)的每個(gè)算子的 implement 方法中會(huì)將一些算子(Group、join、sort、function)要實(shí)現(xiàn)的算法寫成 Linq4j 的表達(dá)式,然后通過這些 Linq4j 表達(dá)式生成 Java Class。通過 JavaRowFormat 格式化)
calcite 會(huì)將 sql 生成的 linq4j 表達(dá)式生成可執(zhí)行的 Java 代碼( Bindable 類):org.apache.calcite.adapter.enumerable.EnumerableInterpretable#getBindable
Calcite 會(huì)調(diào)用 Janino 編譯器動(dòng)態(tài)編譯這個(gè) java 類,并且實(shí)例化這個(gè)類的一個(gè)對(duì)象,然后將其封裝到 CalciteSignature 對(duì)象中。
調(diào)用 executorQuery 查詢方法并創(chuàng)建 CalciteResultSet 的時(shí)候會(huì)調(diào)用 Bindable 對(duì)象的 bind 方法,這個(gè)方法返回一個(gè)Eumerable對(duì)象:
org.apache.calcite.avatica.AvaticaResultSet#execute

org.apache.calcite.jdbc.CalcitePrepare.CalciteSignature#enumerable

將 Enumerable 賦值給 CalciteResultSet 的 cursor 成員變量。
在執(zhí)行真正的數(shù)據(jù)庫查詢時(shí),獲得實(shí)際的 CalciteResultSet,最終會(huì)調(diào)用:
org.apache.calcite.avatica.AvaticaResultSet#next

以下是根據(jù) SQL 動(dòng)態(tài)生成的 linq4j 表達(dá)式:
public?static?class?Record2_0?implements?java.io.Serializable?{
??public?Object?f0;
??public?boolean?f1;
??public?Record2_0()?{}
??public?boolean?equals(Object?o)?{
????if?(this?==?o)?{
??????return?true;
????}
????if?(!(o?instanceof?Record2_0))?{
??????return?false;
????}
????return?java.util.Objects.equals(this.f0,?((Record2_0)?o).f0)?&&?this.f1?==?((Record2_0)?o).f1;
??}
??public?int?hashCode()?{
????int?h?=?0;
????h?=?org.apache.calcite.runtime.Utilities.hash(h,?this.f0);
????h?=?org.apache.calcite.runtime.Utilities.hash(h,?this.f1);
????return?h;
??}
??public?int?compareTo(Record2_0?that)?{
????int?c;
????c?=?org.apache.calcite.runtime.Utilities.compare(this.f1,?that.f1);
????if?(c?!=?0)?{
??????return?c;
????}
????return?0;
??}
??public?String?toString()?{
????return?"{f0="?+?this.f0?+?",?f1="?+?this.f1?+?"}";
??}
}
public?org.apache.calcite.linq4j.Enumerable?bind(final?org.apache.calcite.DataContext?root)?{
??final?org.apache.calcite.rel.RelNode?v1stashed?=?(org.apache.calcite.rel.RelNode)?root.get("v1stashed");
??final?org.apache.calcite.interpreter.Interpreter?interpreter?=?new?org.apache.calcite.interpreter.Interpreter(
????root,
????v1stashed);
??java.util.List?accumulatorAdders?=?new?java.util.LinkedList();
??accumulatorAdders.add(new?org.apache.calcite.linq4j.function.Function2()?{
????public?Record2_0?apply(Record2_0?acc,?Object[]?in)?{
??????final?Object?inp9_?=?in[9];
??????if?(inp9_?!=?null)?{
????????acc.f1?=?true;
????????acc.f0?=?com.zto.zcat.store.api.query.Merge2Fun.add(acc.f0,?inp9_);
??????}
??????return?acc;
????}
????public?Record2_0?apply(Object?acc,?Object?in)?{
??????return?apply(
????????(Record2_0)?acc,
????????(Object[])?in);
????}
??}
??);
??org.apache.calcite.adapter.enumerable.AggregateLambdaFactory?lambdaFactory?=?new?org.apache.calcite.adapter.enumerable.BasicAggregateLambdaFactory(
????new?org.apache.calcite.linq4j.function.Function0()?{
??????public?Object?apply()?{
????????Object?a0s0;
????????boolean?a0s1;
????????a0s1?=?false;
????????a0s0?=?com.zto.zcat.store.api.query.Merge2Fun.init();
????????Record2_0?record0;
????????record0?=?new?Record2_0();
????????record0.f0?=?a0s0;
????????record0.f1?=?a0s1;
????????return?record0;
??????}
????}
,
????accumulatorAdders);
??return?org.apache.calcite.linq4j.Linq4j.singletonEnumerable(interpreter.aggregate(lambdaFactory.accumulatorInitializer().apply(),?lambdaFactory.accumulatorAdder(),?lambdaFactory.singleGroupResultSelector(new?org.apache.calcite.linq4j.function.Function1()?{
??????public?Object?apply(Record2_0?acc)?{
????????return?acc.f1???com.zto.zcat.store.api.query.Merge2Fun.result(acc.f0)?:?(Object)?null;
??????}
??????public?Object?apply(Object?acc)?{
????????return?apply(
??????????(Record2_0)?acc);
??????}
????}
????)));
}
public?Class?getElementType()?{
??return?java.lang.Object.class;
}
Enumerator是Linq風(fēng)格的迭代器,它有4個(gè)方法:
current moveNext reset close
current 返回游標(biāo)所指的當(dāng)前記錄,需要注意的是current并不會(huì)改變游標(biāo)的位置,這一點(diǎn)和iterator是不同的,在iterator相對(duì)應(yīng)的是next方法,每一次調(diào)用都會(huì)將游標(biāo)移動(dòng)到下一條記錄,current則不會(huì),Enumerator是在調(diào)用moveNext方法時(shí)才會(huì)移動(dòng)游標(biāo)。moveNext方法將游標(biāo)指向下一條記錄,并獲取當(dāng)前記錄供current方法調(diào)用,如果沒有下一條記錄則返回false。
CsvEnumerator是讀取 csv 文件的迭代器,它還得需要一個(gè)RowConverter,因?yàn)閏sv中都是String類型,使用RowConverter轉(zhuǎn)化成相應(yīng)的類型。在moreNext方法中,有Stream和謂詞下推filter部分的實(shí)現(xiàn),在本文只關(guān)注如下幾行代碼:
總結(jié)執(zhí)行順序:
1、executeQuery 方法:
1)根據(jù)算子 linq4j 表達(dá)式子生成 Bindable 執(zhí)行對(duì)象,如果有設(shè)置緩存,則會(huì)將對(duì)像存儲(chǔ)到緩存中;
2)生成 CalciteResultSet 時(shí)會(huì)調(diào)用 Bindable#bind 方法返回一個(gè) Enumerable 對(duì)象;
2、getData 方法:調(diào)用 ResultSet#next 方法最終會(huì)嗲用 Enumerable#moveNext
一圖理解 Bindable 在 calcite 中的作用:

發(fā)現(xiàn) Bindable 緩存會(huì)持續(xù)增加,說明 Bindable 類內(nèi)容不一致:

也說明了 calcite 會(huì)根據(jù)不同的 SQL 動(dòng)態(tài)生成 linq4j 表達(dá)式。
性能優(yōu)化
以上排查結(jié)果可知,在 Calcite 內(nèi)容會(huì)頻繁使用 JaninoRexCompiler 使用反射動(dòng)態(tài)生成表達(dá)式,由于項(xiàng)目中的 sql 格式相對(duì)固定,因此我們是否可以自定義一個(gè) ?Compiler,然后替換 JaninoRexCompiler ?
我將使用 JaninoRexCompiler 的相關(guān)類復(fù)制出來,實(shí)現(xiàn)一個(gè)自定義的 Interpreter.ScalarCompiler,然后在這個(gè)地方 org.apache.calcite.interpreter.Interpreter.CompilerImpl#CompilerImpl替換 JaninoRexCompiler。
關(guān)于自定義 Interpreter.ScalarCompiler 的具體思路過程,我記錄在這里了:
https://issues.apache.org/jira/browse/CALCITE-4144
經(jīng)過反復(fù)調(diào)試,發(fā)現(xiàn)性能提上了 50 倍以上!

再次使用 JProfiler 查看,發(fā)現(xiàn) Calcite 查詢過程耗時(shí)已經(jīng)大大降低了。
