Flink SQL LookupJoin終極解決方案及Flink Rule入門

Flink Join
常規(guī)Join
例如常用的內(nèi)聯(lián)接:
SELECT * FROM Orders
JOIN Product
ON Orders.productId = Product.id
這種 JOIN 要求 JOIN 兩邊數(shù)據(jù)都永久保留在 Flink state 中,才能保證輸出結(jié)果的準(zhǔn)確性,這將導(dǎo)致 State 的無限膨脹。
可以配置 state 的TTL(time-to-live:table.exec.state.ttl)來避免其無限增長,但請注意這可能會影響查詢結(jié)果的準(zhǔn)備性。
Interval Join
根據(jù) JOIN 條件和時間限制進行的 JOIN。它基于兩個 KeyStream,按照 JOIN 條件將一條流上的每條數(shù)據(jù)與另一條流上不同時間窗口的數(shù)據(jù)進行連接。
例如,查詢訂單及關(guān)聯(lián)的支付信息,其中支付是在下單時間前后各1小時內(nèi):
SELECT
...
FROM
Orders AS o JOIN Payment AS p ON
o.orderId = p.orderId AND
p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND
orderTime + INTERVAL '1' HOUR
Temporal join
首先介紹一個時態(tài)表的概念,這是一個隨時間不斷變化的動態(tài)表,它可能包含表的多個快照。
對于時態(tài)表中的記錄,可以追蹤、訪問其歷史版本的表稱為版本表,如數(shù)據(jù)庫的 changeLog;
只能追蹤、訪問最新版本的表稱為普通表,如數(shù)據(jù)庫的表。
在Flink中,定義了主鍵約束和事件時間屬性的表就是版本表。
Temporal Join 允許 JOIN 版本表,即主表可以用一個不斷更新的版本表,根據(jù)時間和等值關(guān)聯(lián)條件來擴充其詳細信息。兩個表必須同時為事件時間或處理時間。
當(dāng)使用事件時間時,版本表保留從上一個 watermark 到當(dāng)前時刻的所有版本數(shù)據(jù),左右表都需要配置好 watermark;右表必須為 CDC 數(shù)據(jù),正確配置主鍵,且主鍵必須在 JOIN 的等值關(guān)聯(lián)條件中。例如:
-- 左表為普通的 append-only 表.
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (/* ... */);
-- 右表為匯率的版本表,CDC 數(shù)據(jù)
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
WATERMARK FOR update_time AS update_time,
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/* ... */
);
SELECT
order_id,
price,
currency,
conversion_rate,
order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
-- 主鍵必須在關(guān)聯(lián)條件中
ON orders.currency = currency_rates.currency;
order_id price currency conversion_rate order_time
======== ===== ======== =============== =========
o_001 11.11 EUR 1.14 12:00:00
o_002 12.51 EUR 1.10 12:06:00
當(dāng)使用處理時間時,用戶可以將 Lookup 表(右表)看成一個普通的HashMap,它存儲了最新的全量數(shù)據(jù)。Flink 可直接 JOIN 一個外部數(shù)據(jù)庫系統(tǒng)的表,而無須存儲最新版本的狀態(tài)。例如:
SELECT
o.amout, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency;
-- 或 Join 一個表函數(shù)
SELECT
o_amount, r_rate
FROM
Orders,
LATERAL TABLE (Rates(o_proctime))
WHERE
r_currency = o_currency
注意:"FOR SYSTEM_TIME AS OF"語法不支持 VIEW/任意最新表是因為考慮到Flink的實現(xiàn)與其語義不大相符,左流的 JOIN 處理不會等待右邊的版本表(VIEW/表函數(shù))完成快照后才進行。個人理解可能會導(dǎo)致左表 JOIN 上的右表并不一定是當(dāng)前最新的數(shù)據(jù)。
Lookup Join
同基于事件時間的 Temporal Join,以 JOIN 算子執(zhí)行時的時間點查詢右表的數(shù)據(jù)進行關(guān)聯(lián)。一般用于維表關(guān)聯(lián),只支持等值 JOIN。例如:
SELECT
o.amout, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency;
Lookup Join 執(zhí)行流程
以 Flink 單測用例為例進行講解,新手可以基于此上手開發(fā)自定義的 Rule。
準(zhǔn)備工作
編譯 Flink Table 模塊
flink-table 目錄下執(zhí)行:mvn clean package -Pfast,hive-2.1.1,scala-2.12 -DskipTests
打開單測文件
Flink Rule 的 UT 包含:
邏輯計劃測試:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical 物理計劃測試:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql、XXX/batch/sql 集成測試:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql、XXX/batch/sql
這也是向社區(qū)提交 Rule 相關(guān) PR 需要完成的 UT
打開日志級別
在需要單測的代碼前,加上:Configurator.setAllLevels("", Level.TRACE)
跟蹤sql執(zhí)行
下文基于文件:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala 的執(zhí)行進行分析。 執(zhí)行單測:testJoinTemporalTable SELECT * FROM MyTable AS T JOIN LookupTable
FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
sql解析
parser(calcite語法支持)會將SQL語句 "FOR SYSTEM_TIME AS OF " 解析成 SqlSnapshot ( SqlNode),validate() 將其轉(zhuǎn)換成 LogicalSnapshot(RelNode),可以看到邏輯 執(zhí)行計劃:
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
LogicalFilter(condition=[=($cor0.a, $0)])
LogicalSnapshot(period=[$cor0.proctime])
LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
優(yōu)化器優(yōu)化
FlinkStreamProgram/FlinkBatchProgram中定義了一系列規(guī)則,對邏輯/物理計劃進行轉(zhuǎn)換和優(yōu)化。
該案例中會經(jīng)歷下邊的幾個重要的轉(zhuǎn)換過程:
LogicalCorrelateToJoinFromLookupTableRuleWithFilter:
// 從類的定義可以看出,上方的邏輯計劃能匹配上該規(guī)則
class LogicalCorrelateToJoinFromLookupTableRuleWithFilter
extends LogicalCorrelateToJoinFromLookupTemporalTableRule(
operand(classOf[LogicalCorrelate],
operand(classOf[RelNode], any()),
operand(classOf[LogicalFilter],
operand(classOf[LogicalSnapshot],
operand(classOf[RelNode], any())))),
"LogicalCorrelateToJoinFromLookupTableRuleWithFilter"
) {
override def matches(call: RelOptRuleCall): Boolean = {
val snapshot: LogicalSnapshot = call.rel(3)
val snapshotInput: RelNode = trimHep(call.rel(4))
isLookupJoin(snapshot, snapshotInput)
}
……
}
// 匹配到規(guī)則后判斷是否為 lookupJoin
protected def isLookupJoin(snapshot: LogicalSnapshot, snapshotInput: RelNode): Boolean = {
……
// 是處理時間 且 快照的表為LookupTableSource
isProcessingTime && snapshotOnLookupSource
}
匹配到后,會將LogicalCorrelate轉(zhuǎn)換成LogicalJoin
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
+- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
FlinkProjectJoinTransposeRule + ProjectRemoveRule:Project算子下推并裁剪
// 對調(diào)Project和下方的Join算子,實現(xiàn)下推Project
public FlinkProjectJoinTransposeRule(
PushProjector.ExprCondition preserveExprCondition, RelBuilderFactory relFactory) {
super(operand(Project.class, operand(Join.class, any())), relFactory, null);
this.preserveExprCondition = preserveExprCondition;
}
優(yōu)化后:
LogicalJoin(condition=[=($0, $5)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
接下來的Volcano規(guī)則會對邏輯計劃進行組合優(yōu)化,生成最優(yōu)的計劃??梢钥吹綀?zhí)行后,最優(yōu)結(jié)果為:
12129 [main] DEBUG org.apache.calcite.plan.RelOptPlanner [] - Cheapest plan:
FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner]): rowcount = 3.0E7, cumulative cost = {4.0E8 rows, 5.0E8 cpu, 1.37E10 io, 0.0 network, 0.0 memory}, id = 403
FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory}, id = 378
FlinkLogicalSnapshot(period=[$cor0.proctime]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 2.0E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}, id = 402
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}, id = 381
// 最后結(jié)果:
FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
:- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+- FlinkLogicalSnapshot(period=[$cor0.proctime])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
嘗試規(guī)則
Rules Attempts Time (us)
FlinkJoinPushExpressionsRule 2 553
JoinConditionPushRule 2 152
FlinkLogicalTableSourceScanConverter(in:NONE,out:LOGICAL) 1 54,956
FlinkLogicalJoinConverter(in:NONE,out:LOGICAL) 1 4,787
FlinkLogicalSnapshotConverter(in:NONE,out:LOGICAL) 1 3,162
FlinkLogicalDataStreamTableScanConverter(in:NONE,out:LOGICAL) 1 1,403
SimplifyJoinConditionRule 1 249
* Total 9 65,262
其中:幾個Converter放在LOGICAL_CONVERTERS中,該集合包含了一系列將 Calcite node 轉(zhuǎn)換成 Flink node 的邏輯規(guī)則。
比如:FlinkLogicalSnapshotConverter:
// 把 LogicalSnapshot 轉(zhuǎn)換成 FlinkLogicalSnapshot
class FlinkLogicalSnapshotConverter
extends ConverterRule(
// 匹配 LogicalSnapshot 類型,且沒有Convention,輸出的為 FlinkConventions.LOGICAL
classOf[LogicalSnapshot],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalSnapshotConverter") {
def convert(rel: RelNode): RelNode = {
val snapshot = rel.asInstanceOf[LogicalSnapshot]
val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)
FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)
}
}
增加處理時間實體化的算子
// convert time indicators
chainedProgram.addLast(TIME_INDICATOR, new FlinkRelTimeIndicatorProgram)
// 如果是事件時間,且必要的情況下,這里會創(chuàng)建一個 sqlFunction 來實現(xiàn)
rexBuilder.makeCall(FlinkSqlOperatorTable.PROCTIME_MATERIALIZE, expr)
經(jīng)轉(zhuǎn)換:
FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
+- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
:- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+- FlinkLogicalSnapshot(period=[$cor0.proctime])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
物理規(guī)則優(yōu)化
經(jīng)下述物理Volcano規(guī)則處理后
FlinkJoinPushExpressionsRule
JoinConditionPushRule
StreamPhysicalTableSourceScanRule(in:LOGICAL,out:STREAM_PHYSICAL)
FlinkLogicalTableSourceScanConverter(in:NONE,out:LOGICAL)
StreamPhysicalSnapshotOnTableScanRule
StreamPhysicalCalcRule(in:LOGICAL,out:STREAM_PHYSICAL)
FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)
StreamPhysicalDataStreamScanRule(in:LOGICAL,out:STREAM_PHYSICAL)
FlinkLogicalSnapshotConverter(in:NONE,out:LOGICAL)
FlinkLogicalDataStreamTableScanConverter(in:NONE,out:LOGICAL)
SimplifyJoinConditionRule
得到最優(yōu)結(jié)果:
Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
+- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
StreamPhysicalCalcRule:將FlinkLogicalCalc轉(zhuǎn)換成StreamPhysicalCalc SnapshotOnTableScanRule:將
FlinkLogicalJoin
+- FlinkLogicalDataStreamTableScan
+- FlinkLogicalSnapshot
+- FlinkLogicalTableSourceScan
轉(zhuǎn)換成
StreamPhysicalLookupJoin
+- StreamPhysicalDataStreamScan
這里是LookupJoin的關(guān)鍵轉(zhuǎn)換邏輯:
// 該規(guī)則使用父類的匹配條件
class SnapshotOnTableScanRule
extends BaseSnapshotOnTableScanRule("StreamPhysicalSnapshotOnTableScanRule") {
}
// 可以看到,正好匹配上未優(yōu)化前的邏輯計劃
abstract class BaseSnapshotOnTableScanRule(description: String)
extends RelOptRule(
operand(classOf[FlinkLogicalJoin],
operand(classOf[FlinkLogicalRel], any()),
operand(classOf[FlinkLogicalSnapshot],
operand(classOf[TableScan], any()))),
description)
with CommonLookupJoinRule
private def doTransform(
join: FlinkLogicalJoin,
input: FlinkLogicalRel,
temporalTable: RelOptTable,
calcProgram: Option[RexProgram]): StreamPhysicalLookupJoin = {
val joinInfo = join.analyzeCondition
val cluster = join.getCluster
val providedTrait = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
val requiredTrait = input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
//將input從邏輯節(jié)點轉(zhuǎn)換成物理節(jié)點,這里會觸發(fā) StreamPhysicalDataStreamScanRule,
//把FlinkLogicalTableSourceScan轉(zhuǎn)換成StreamPhysicalDataStreamScan
val convInput = RelOptRule.convert(input, requiredTrait)
new StreamPhysicalLookupJoin(
cluster,
providedTrait,
convInput,
temporalTable,
calcProgram,
joinInfo,
join.getJoinType)
}
至此完成物理計劃的轉(zhuǎn)換
翻譯物理計劃
planner.translate()其中包括了:
val execGraph = translateToExecNodeGraph(optimizedRelNodes)
val transformations = translateToPlan(execGraph)
在translateToExecNodeGraph中:會調(diào)用物理計劃生成最后節(jié)點的translateToExecNode方法。如
StreamPhysicalLookupJoin會轉(zhuǎn)換成StreamExecLookupJoin
在translateToPlan中:調(diào)用ExecNode的translateToPlanInternal方法。以CommonExecLookupJoin為例:
protected CommonExecLookupJoin(……){
//這里忽略校驗和異步LookupFunction邏輯
public Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
// -----------創(chuàng)建lookupFunction Operator的工廠---------------
RelOptTable temporalTable = temporalTableSourceSpec.getTemporalTable(planner);
UserDefinedFunction userDefinedFunction =
LookupJoinUtil.getLookupFunction(temporalTable, lookupKeys.keySet());
UserDefinedFunctionHelper.prepareInstance(
planner.getTableConfig().getConfiguration(), userDefinedFunction);
boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT;
StreamOperatorFactory<RowData> operatorFactory;
operatorFactory =
createSyncLookupJoin(
temporalTable,
planner.getTableConfig(),
lookupKeys,
(TableFunction<Object>) userDefinedFunction,
planner.getRelBuilder(),
inputRowType,
tableSourceRowType,
resultRowType,
isLeftOuterJoin,
planner.getExecEnv().getConfig().isObjectReuseEnabled());
//-------------------------------------------------------
// 轉(zhuǎn)換成Transformation
Transformation<RowData> inputTransformation =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
return new OneInputTransformation<>(
inputTransformation,
getDescription(),
operatorFactory,
InternalTypeInfo.of(resultRowType),
inputTransformation.getParallelism());
}
}
//只羅列核心邏輯,主要分三塊
private StreamOperatorFactory<RowData> createSyncLookupJoin() {
// 通過codeGenerator,生成lookupFunction的函數(shù),包裝成FlatMap函數(shù)
GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFetcher =
LookupJoinCodeGenerator.generateSyncLookupFunction();
// 生成表函數(shù)的輸出結(jié)果的Collector
GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector =
LookupJoinCodeGenerator.generateCollector();
// 最后會生成LookupJoinRunner的ProcessFunction
// 如果在lookupJoin這一側(cè)(即右表)有Calc的話,該Runner中會帶有Calc的計算邏輯
// 比如:SELECT * FROM T JOIN DIM FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.b + 1
// Fetcher會讀出LookupFunction中的原始數(shù)據(jù),再經(jīng)過calc計算后,再與主表(左流)的數(shù)據(jù)進行比對
GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc =
LookupJoinCodeGenerator.generateCalcMapFunction(
config,
JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
filterOnTemporalTable,
temporalTableOutputType,
tableSourceRowType);
ProcessFunction<RowData, RowData> processFunc =
new LookupJoinWithCalcRunner(
generatedFetcher,
generatedCalc,
generatedCollector,
isLeftOuterJoin,
rightRowType.getFieldCount());
}
最后再Transformations->StreamGraph->JobGraph,與DataStream API的流程就統(tǒng)一了。


