Apache Calcite 框架原理入門和生產(chǎn)應用
1. 簡介
Calcite 是什么?如果用一句話形容 Calcite,Calcite 是一個用于優(yōu)化異構(gòu)數(shù)據(jù)源的查詢處理的基礎框架。
最近十幾年來,出現(xiàn)了很多專門的數(shù)據(jù)處理引擎。例如列式存儲 (HBase)、流處理引擎 (Flink)、文檔搜索引擎 (Elasticsearch) 等等。這些引擎在各自針對的領域都有獨特的優(yōu)勢,在現(xiàn)有復雜的業(yè)務場景下,我們很難只采用當中的某一個而舍棄其他的數(shù)據(jù)引擎。當引擎發(fā)展到一定成熟階段,為了減少用戶的學習成本,大多引擎都會考慮引入 SQL 支持,但如何避免重復造輪子又成了一個大問題?;谶@個背景,Calcite 橫空出世,它提供了標準的 SQL 語言、多種查詢優(yōu)化和連接各種數(shù)據(jù)源的能力,將數(shù)據(jù)存儲以及數(shù)據(jù)管理的能力留給引擎自身實現(xiàn)。同時 Calcite 有著良好的可插拔的架構(gòu)設計,我們可以只使用其中一部分功能構(gòu)建自己的 SQL 引擎,而無需將整個引擎依托在 Calcite 上。因此 Calcite 成為了現(xiàn)在許多大數(shù)據(jù)框架 SQL 引擎的最佳方案。我們計算引擎組也基于 Calcite 實現(xiàn)了一個自用的 SQL 校驗層,當用戶提交 Flink SQL 作業(yè)時需要先進過一層語義校驗,通過后再利用校驗得到的元數(shù)據(jù)構(gòu)建模板任務提交給 Flink 引擎執(zhí)行。
2. 核心架構(gòu)

中間的方框總結(jié)了 Calcite 的核心結(jié)構(gòu),首先 Calcite 通過 SQL Parser 和 Validator 將一個 SQL 查詢解析得到一個抽象語法樹 (AST, Abstract Syntax Tree),由于 Calcite 不包含存儲層,因此它提供了另一種定義 table schema 和 view 的機制—— Catalog 作為元數(shù)據(jù)的存儲空間(另外 Calcite 提供了 Adaptor 機制連接外部的存儲引擎獲取元數(shù)據(jù),這部分內(nèi)容不在本文范圍內(nèi))。之后,Calcite 通過優(yōu)化器生成對應的關系表達式樹,根據(jù)特定的規(guī)則進行優(yōu)化。優(yōu)化器是 Calcite 最為重要的一部分邏輯,它包含了三個組件:Rule、MetadataProvider(Catalog)、Planner engine,這些組件在文章后續(xù)都會有具體的講解。
通過架構(gòu)圖我們可以看出,Calcite 最大的特點(優(yōu)勢)是它將 SQL 的處理、校驗和優(yōu)化等邏輯單獨剝離出來,省略了一些關鍵組件,例如,數(shù)據(jù)存儲,處理數(shù)據(jù)的算法以及用于存儲元數(shù)據(jù)的存儲庫。其次 Calcite 做得最出色的地方則是它的可插拔機制,每個大數(shù)據(jù)框架都可以選擇 Calcite 的整體或部分模塊建立自己的 SQL 處理引擎,如 Hive 自己實現(xiàn)了 SQL 解析,只使用了 Calcite 的優(yōu)化功能,Storm 以及 Flink 則是完全基于 Calcite 建立了 SQL 引擎,具體如下表所示:

2.1 四個階段

Calcite 框架的運行主要分四個階段
Parse:使用 JavaCC 生成的解析器進行詞法、語法分析,得到 AST; Validate:結(jié)合元數(shù)據(jù)進行校驗; Optimize:將 AST 轉(zhuǎn)化為邏輯執(zhí)行計劃(tree of relational expression),并根據(jù)特定的規(guī)則(heuristic 或 cost-baesd)進行優(yōu)化; Execute:將邏輯執(zhí)行計劃 轉(zhuǎn)化成引擎特有的執(zhí)行邏輯,比如 Flink 的 DataStream。
考慮到第 4 步是一個和引擎耦合的流程,下面的內(nèi)容我們主要聚焦于前三個階段。
2.2 四大組件
圍繞著這個運行流程,Apache Calcite 最核心的框架可以拆分為四個組件
SQL Parser:將符合語法規(guī)則的 SQL 轉(zhuǎn)化成 AST(Sql text → SqlNode),Calcite 提供了默認的 parser,但也可以基于 JavaCC 生成自定義的 parser; Catalog:定義記錄了 SQL 的 metadata 和 namespace,方便后續(xù)的訪問和校驗; SQL Validator:結(jié)合 Catalog 提供的元數(shù)據(jù)校驗 AST,具體的實現(xiàn)都在 SqlValidatorImpl 中; Query Optimizer:這塊概念較多,首先需要將 AST 轉(zhuǎn)化成邏輯執(zhí)行計劃(即 SqlNode → RelNode),其次使用 Rules 優(yōu)化邏輯執(zhí)行計劃。
3. SQL parser
上文提到,SQL Parser 的作用是將 SQL 文本切割成一個個 token 并生成 AST,每個 token 在 Calcite 中由 SqlNode 表示(即代表 AST 的一個個結(jié)點),SqlNode 也可以通過 unparse 方法重新生成 SQL 文本。為了方便說明,我們引入一個 SQL 文本,通過觀察它在 Calcite 中的變化來摸清 Calcite 的原理,后續(xù)的校驗、優(yōu)化我們也會根據(jù)具體場景引入不同的 SQL 文本進行分析。
INSERT INTO sink_table SELECT s.id, name, age FROM source_table s JOIN dim_table d ON s.id=d.id WHERE s.id>1;
3.1 SqlNode
SqlNode 是 AST 所有結(jié)點的抽象,它可能具體代表某個運算符、常量或標識符等等,以 SqlNode 基類衍生出許多實現(xiàn)類如下:

INSERT 被 Parser 解析后會轉(zhuǎn)化成一個 SqlInsert,而 SELECT 則轉(zhuǎn)化成 SqlSelect,以上述的 SQL 文本為例,解析后會得到如下結(jié)構(gòu):

下面根據(jù)該圖講解 SqlNode 中一些較常見的核心結(jié)構(gòu)。
3.1.1 SqlInsert
首先這是個動作為 INSERT 的 DDL 語句,因此整個 AST root 是一個 SqlInsert,SqlInsert 中有個有許多成員變量分別記錄了這個 INSERT 語句的不同組成部分:
targetTable:記錄要插入的表,即 sink_table,在 AST 中表示為 SqlIdentifier source:標識了數(shù)據(jù)源,該 INSERT 語句的數(shù)據(jù)源是一個 SELECT 子句,在 AST 中表示為 SqlSelect; columnList:要插入的列,由于該 Insert 語句未顯式指定所以是 null,會在校驗階段動態(tài)計算得到。
3.1.2 SqlSelect
SqlSelect 是該 INSERT 語句的數(shù)據(jù)源部分被 Parser 解析生成的部分,它的核心結(jié)構(gòu)如下:
selectList:指 SELECT 關鍵字后緊跟的查詢的列,是一個 SqlNodeList,在該例中由于顯式指定了列且無任何函數(shù)調(diào)用,因此 SqlNodeList 中是三個 SqlIdentifier; from:指 SELECT 語句的數(shù)據(jù)源,該例中的數(shù)據(jù)源是表 source_table 和 dim_table 的連接,因此這里是一個 SqlJoin; where:指 WHERE 子句,是一個關于條件判斷的函數(shù)調(diào)用,SqlBasicCall,它的操作符是一個二元運算符 >,被解析為 SqlBinaryOperator,兩個操作數(shù)分別是 s.id(SqlIdentifier) 和 1(SqlNumberLiteral)。
3.1.3 SqlJoin
SqlJoin 是該 SqlSelect 語句的 JOIN 部分被 Parser 解析生成的部分:
left:代表 JOIN 的左表,由于我們用了別名,因此這里是一個 SqlBasicCall,它的操作符是 AS,被解析為 SqlAsOperator,兩個操作數(shù)分別是 source_table(SqlIdentifier) 和 s(SqlIdentifier); joinType:代表連接的類型,所有支持解析的 JOIN 類型都定義在 org.apache.calcite.sql.JoinType 中,joinType 被解析為 SqlLiteral,它的值即是 JoinType.INNER; right:代表 JOIN 的右表,由于我們用了別名,因此這里是一個 SqlBasicCall,它的操作符是 AS,被解析為 SqlAsOperator,兩個操作數(shù)分別是 dim_table(SqlIdentifier) 和 d(SqlIdentifier); conditionType:代表 ON 關鍵字,是一個 SqlLiteral; condition:與 3.1.2 的 where 相似,是一個關于條件判斷的函數(shù)調(diào)用,SqlBasicCall,它的操作符是一個二元運算符 =,被解析為 SqlBinaryOperator,兩個操作數(shù)分別是 s.id(SqlIdentifier) 和 d.id(SqlNumberLiteral)。
3.1.4 SqlIdentifier
SqlIdentifier 翻譯為標識符,標識 SQL 語句中所有的表名、字段名、視圖名(* 也會識別為一個 SqlIdentifier),基本所有與 SQL 相關的解析校驗,最后解析都到 SqlIdentifier 這一層結(jié)束,因此也可以認為 SqlIdentifier 是 SqlNode 中最基本的結(jié)構(gòu)單元。SqlIdentifier 有一個字符串列表 names 存儲實際的值,用列表示因為考慮到全限定名,如 s.id,在 names 會占用兩個元素格子,names[0] 存 s,names[1] 存 id。
3.1.5 SqlBasicCall
SqlBasicCall 包含所有的函數(shù)調(diào)用或運算,如 AS、CAST 等關鍵字和一些運算符,它有兩個核心成員變量:operator 和 operands,分別記錄這次函數(shù)調(diào)用/運算的操作符和操作數(shù),operator 通過 SqlKind 標識其類型。
3.2 JavaCC
Calcite 沒有自己造詞法、語法分析的輪子,而是采用了主流框架 JavaCC,并結(jié)合了 Freemarker 模板引擎來生成 LL(k)parser,JavaCC(Java Compiler Compiler)是一個用Java語言寫的一個Java語法分析生成器,它所產(chǎn)生的文件都是純Java代碼文件。用戶只要按照 JavaCC 的語法規(guī)范編寫 JavaCC 的源文件,然后使用 JavaCC 插件進行 codegen,就能夠生成基于Java語言的某種特定語言的分析器。
Freemarker 是一個模板渲染引擎,通過它建立內(nèi)置模板,結(jié)合自定義的拓展語法可以快速生成我們想要的語法描述文件。
在 Calcite 中,Parser.jj 是 Calcite 內(nèi)置的模板文件,.ftl 為自定義拓展模板,config.fmpp 用于聲明數(shù)據(jù)模型,首先 Calcite 通過 fmpp-maven-plugin 插件生成最終的 Parser.jj 文件,再利用 javacc-maven-plugin 插件生成對應的 Java 實現(xiàn)代碼,具體的流程圖如下:

4. Catalog
Catalog 保存著整個 SQL 的元數(shù)據(jù)和命名空間,元數(shù)據(jù)的校驗都需要通過 Catalog 組件進行,Catalog 中最關鍵的幾個結(jié)構(gòu)如下:

這些結(jié)構(gòu)大致可以分為三類:
元數(shù)據(jù)管理模式和命名空間; 表元數(shù)據(jù)信息; 類型系統(tǒng)。
Calcite 的 Catalog 結(jié)構(gòu)復雜,但我們可以從這個角度來理解 Catalog,它是 Calcite 在不同粒度上對元數(shù)據(jù)所做的不同級別的抽象。首先最細粒度的是 RelDataTypeField,代表某個字段的名字和類型信息,多個 RelDataTypeField 組成了一個 RelDataType,表示某行或某個標量表達式的結(jié)果的類型信息。再之后是一個完整表的元數(shù)據(jù)信息,即 Table。最后我們需要把這些元數(shù)據(jù)組織存儲起來進行管理,于是就有了 Schema。
5. SQL validator
Calcite 提供的 validator 流程極為復雜,但概括下來主要做了這么一件事,對每個 SqlNode 結(jié)合元數(shù)據(jù)校驗是否正確,包括:
驗證表名是否存在; select 的列在對應表中是否存在,且該匹配到的列名是否唯一,比如 join 多表,兩個表有相同名字的字段,如果此時 select 的列不指定表名就會報錯; 如果是 insert,需要插入列和數(shù)據(jù)源進行校驗,如列數(shù)、類型、權限等; ……

Calcite 提供的 validator 和前面提到的 Catalog 關系緊密,Calcite 定義了一個 CatalogReader 用于在校驗過程中訪問元數(shù)據(jù) (Table schema),并對元數(shù)據(jù)做了運行時的一些封裝,最核心的兩部分是 SqlValidatorNamespace 和 SqlValidatorScope。
SqlValidatorNamespace:描述了 SQL 查詢返回的關系,一個 SQL 查詢可以拆分為多個部分,查詢的列組合,表名等等,當中每個部分都有一個對應的 SqlValidatorNamespace。 SqlValidatorScope:可以認為是校驗流程中每個 SqlNode 的工作上下文,當校驗表達式時,通過 SqlValidatorScope 的 resolve 方法進行解析,如果成功的話會返回對應的 SqlValidatorNamespace 描述結(jié)果類型。
在此基礎上,Calcite 提供了 SqlValidator 接口,該接口提供了所有與校驗相關的核心邏輯,并提供了內(nèi)置的默認實現(xiàn)類 SqlValidatorImpl 定義如下:
public class SqlValidatorImpl implements SqlValidatorWithHints {
// ...
final SqlValidatorCatalogReader catalogReader;
/**
* Maps {@link SqlNode query node} objects to the {@link SqlValidatorScope}
* scope created from them.
*/
protected final Map<SqlNode, SqlValidatorScope> scopes =
new IdentityHashMap<>();
/**
* Maps a {@link SqlSelect} node to the scope used by its WHERE and HAVING
* clauses.
*/
private final Map<SqlSelect, SqlValidatorScope> whereScopes =
new IdentityHashMap<>();
/**
* Maps a {@link SqlSelect} node to the scope used by its GROUP BY clause.
*/
private final Map<SqlSelect, SqlValidatorScope> groupByScopes =
new IdentityHashMap<>();
/**
* Maps a {@link SqlSelect} node to the scope used by its SELECT and HAVING
* clauses.
*/
private final Map<SqlSelect, SqlValidatorScope> selectScopes =
new IdentityHashMap<>();
/**
* Maps a {@link SqlSelect} node to the scope used by its ORDER BY clause.
*/
private final Map<SqlSelect, SqlValidatorScope> orderScopes =
new IdentityHashMap<>();
/**
* Maps a {@link SqlSelect} node that is the argument to a CURSOR
* constructor to the scope of the result of that select node
*/
private final Map<SqlSelect, SqlValidatorScope> cursorScopes =
new IdentityHashMap<>();
/**
* The name-resolution scope of a LATERAL TABLE clause.
*/
private TableScope tableScope = null;
/**
* Maps a {@link SqlNode node} to the
* {@link SqlValidatorNamespace namespace} which describes what columns they
* contain.
*/
protected final Map<SqlNode, SqlValidatorNamespace> namespaces =
new IdentityHashMap<>();
// ...
}
可以看到 SqlValidatorImpl 當中有許多 scopes 映射 (SqlNode -> SqlValidatorScope) 和 namespaces (SqlNode -> SqlValidatorNamespace),校驗其實就是在一個個 SqlValidatorScope 中校驗 SqlValidatorNamespace 的過程,另外 SqlValidatorImpl 有一個成員 catalogReader,也就是上面說到的 SqlValidatorCatalogReader,為 SqlValidatorImpl 提供了訪問元數(shù)據(jù)的入口。
6. Query optimizer
query optimizer 是最為龐雜的一個組件,涉及到的概念多,首先,query optimizer 需要將 SqlNode 轉(zhuǎn)成 RelNode(SqlToRelConverter),并使用一系列關系代數(shù)的優(yōu)化規(guī)則(RelOptRule)對其進行優(yōu)化,最后將其轉(zhuǎn)化成對應引擎可執(zhí)行的物理計劃。
6.1 SqlNode 到 RelNode
SQL 是基于關系代數(shù)的一種 DSL,而 RelNode 接口就是 Calcite 對關系代數(shù)的一個抽象表示,所有關系代數(shù)的代碼結(jié)構(gòu)都需要實現(xiàn) RelNode 接口。
SqlNode 是從 sql 語法角度解析出來的一個個節(jié)點,而 RelNode 則是一個關系表達式的抽象結(jié)構(gòu),從關系代數(shù)這一角度去表示其邏輯結(jié)構(gòu),并用于之后的優(yōu)化過程中決定如何執(zhí)行查詢。當 SqlNode 第一次被轉(zhuǎn)化成 RelNode 時,由一系列邏輯節(jié)點(LogicalProject、LogicalJoin 等)組成,后續(xù)優(yōu)化器會將這些邏輯節(jié)點轉(zhuǎn)化成物理節(jié)點,根據(jù)不同的計算存儲引擎有不同的實現(xiàn),如 JdbcJoin、SparkJoin 等。下表是一個關于 SQL 、關系代數(shù)以及 Calcite 結(jié)構(gòu)的映射關系:

注:Calcite 列只列舉了較常見的情況,而非和前面兩列嚴格的映射標準。
一個簡單的 SQL 例子經(jīng)過 query optimizer 處理后得到的結(jié)果如下:

6.2 RelNode 優(yōu)化

在將 SqlNode 轉(zhuǎn)為 RelNode 后,我們就可以通過關系代數(shù)的一些規(guī)則對 RelNode 進行優(yōu)化,這些“規(guī)則”在 Calcite 表現(xiàn)為 RelOptRule。
RelNode 有一些物理特征,這部分特征就由 RelTrait 來表示,其中最重要的一個是 Convention(Calling Convention),可以理解是一個特定數(shù)據(jù)引擎協(xié)議或數(shù)據(jù)源約定,同數(shù)據(jù)引擎的 RelNode 可以直接相互連接,而非同引擎的則需要通過 Converter 進行轉(zhuǎn)換(通過 ConverterRule 匹配)。
比較常見的優(yōu)化規(guī)則如:
剪除不用的 fields; 合并 projections; 將子查詢轉(zhuǎn)為 join; 對 joins 重排序; 下推 projections; 下推 filters; ……
7. 應用場景
基于 Calcite 良好的可插拔特性,目前有許多基于 Calcite 二次開發(fā)的 SQL 解析引擎,如 Flink,該節(jié)列舉了一些可以基于 Calcite 拓展的工作和思路。
7.1 拓展 SQL 語法解析
基于 JavaCC 實現(xiàn)詞法分析器(Lexer)和語法分析器(Parser),比如 Flink 在Calcite 原生的 Parser.jj 模板之上自定義拓展了 SqlCreateTable 和 SqlCreateView 兩種 Parser,支持解析 CREATE TABLE ... 和 CREATE VIEW ... 的 DDL,同時需要拓展對應的 Java 類。。
7.2 拓展元數(shù)據(jù)校驗的自定義數(shù)據(jù)結(jié)構(gòu)
通過拓展 Schema 和 Table 等接口可以自定義注入元數(shù)據(jù)的時機以及格式,比如 Flink 通過命令式編程建立嵌套 VIEW 的數(shù)據(jù)依賴(假設 viewA 依賴 viewB 的數(shù)據(jù),則需要先手動調(diào)用 API 解析 viewB),有的框架則批量讀取,自己建立拓撲圖來解決數(shù)據(jù)依賴問題。關于元數(shù)據(jù)格式 Flink 基于 Table 接口實現(xiàn)了 QueryOperationCatalogViewTable 來表示表、視圖,并為其設計了統(tǒng)一的 TableSchema 收集 RelDataType 信息。
7.3 拓展類型解析
當碰著一些 Calcite 原生不支持的復雜類型,可以通過拓展 RelDataTypeFactory 等相關類型接口拓展類型解析。
7.4 拓展具體的規(guī)則優(yōu)化
可以自定義一些特殊的 Rule,調(diào)用 HepProgramBuilder 的 addRuleInstance 方法注冊到 planner 里,這就可以在 RelNode 的優(yōu)化過程中匹配到我們的自定義 Rule,并在成功匹配的情況下進行優(yōu)化。
這幾種場景從流程復雜度的角度來看是 SQL 語法解析>元數(shù)據(jù)校驗>類型解析>規(guī)則優(yōu)化,但在實際拓展過程中,個人認為難度順序反而是相反的,因為 SQL 語法解析和元數(shù)據(jù)校驗的流程雖然很復雜,但封裝完善,類型解析需要考慮的適配點較多是一個難點,而規(guī)則優(yōu)化則需要深厚的 SQL 基礎和一些理論知識,實際拓展過程反而his最為困難的。


