<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink SQL LookupJoin終極解決方案及Flink Rule入門

          共 28201字,需瀏覽 57分鐘

           ·

          2022-07-26 12:47

          點擊上方藍色字體,選擇“設(shè)為星標(biāo)”
          回復(fù)"面試"獲取更多驚喜

          全網(wǎng)最全大數(shù)據(jù)面試提升手冊!

          Flink Join

          常規(guī)Join

          例如常用的內(nèi)聯(lián)接:

          SELECT * FROM Orders
          JOIN Product
          ON Orders.productId = Product.id

          這種 JOIN 要求 JOIN 兩邊數(shù)據(jù)都永久保留在 Flink state 中,才能保證輸出結(jié)果的準(zhǔn)確性,這將導(dǎo)致 State 的無限膨脹。

          可以配置 state 的TTL(time-to-live:table.exec.state.ttl)來避免其無限增長,但請注意這可能會影響查詢結(jié)果的準(zhǔn)備性。

          Interval Join

          根據(jù) JOIN 條件和時間限制進行的 JOIN。它基于兩個 KeyStream,按照 JOIN 條件將一條流上的每條數(shù)據(jù)與另一條流上不同時間窗口的數(shù)據(jù)進行連接。

          例如,查詢訂單及關(guān)聯(lián)的支付信息,其中支付是在下單時間前后各1小時內(nèi):

          SELECT
            ...
          FROM
            Orders AS o JOIN Payment AS p ON
            o.orderId = p.orderId AND
            p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND
            orderTime + INTERVAL '1' HOUR

          Temporal join

          首先介紹一個時態(tài)表的概念,這是一個隨時間不斷變化的動態(tài)表,它可能包含表的多個快照。

          對于時態(tài)表中的記錄,可以追蹤、訪問其歷史版本的表稱為版本表,如數(shù)據(jù)庫的 changeLog;

          只能追蹤、訪問最新版本的表稱為普通表,如數(shù)據(jù)庫的表。

          在Flink中,定義了主鍵約束和事件時間屬性的表就是版本表。

          Temporal Join 允許 JOIN 版本表,即主表可以用一個不斷更新的版本表,根據(jù)時間和等值關(guān)聯(lián)條件來擴充其詳細信息。兩個表必須同時為事件時間或處理時間。

          • 當(dāng)使用事件時間時,版本表保留從上一個 watermark 到當(dāng)前時刻的所有版本數(shù)據(jù),左右表都需要配置好 watermark;右表必須為 CDC 數(shù)據(jù),正確配置主鍵,且主鍵必須在 JOIN 的等值關(guān)聯(lián)條件中。例如:
          -- 左表為普通的 append-only 表.
          CREATE TABLE orders (
              order_id    STRING,
              price       DECIMAL(32,2),
              currency    STRING,
              order_time  TIMESTAMP(3),
              WATERMARK FOR order_time AS order_time
          ) WITH (/* ... */);

          -- 右表為匯率的版本表,CDC 數(shù)據(jù)
          CREATE TABLE currency_rates (
              currency STRING,
              conversion_rate DECIMAL(32, 2),
              update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
              WATERMARK FOR update_time AS update_time,
              PRIMARY KEY(currency) NOT ENFORCED
          ) WITH (
             'connector' = 'kafka',
             'value.format' = 'debezium-json',
             /* ... */
          );

          SELECT 
               order_id,
               price,
               currency,
               conversion_rate,
               order_time
          FROM orders
          LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
          -- 主鍵必須在關(guān)聯(lián)條件中
          ON orders.currency = currency_rates.currency;

          order_id  price  currency  conversion_rate  order_time
          ========  =====  ========  ===============  =========
          o_001     11.11  EUR       1.14             12:00:00
          o_002     12.51  EUR       1.10             12:06:00
          • 當(dāng)使用處理時間時,用戶可以將 Lookup 表(右表)看成一個普通的HashMap,它存儲了最新的全量數(shù)據(jù)。Flink 可直接 JOIN 一個外部數(shù)據(jù)庫系統(tǒng)的表,而無須存儲最新版本的狀態(tài)。例如:
          SELECT
            o.amout, o.currency, r.rate, o.amount * r.rate
          FROM
            Orders AS o
            JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
            ON r.currency = o.currency;

          -- 或 Join 一個表函數(shù)
          SELECT
            o_amount, r_rate
          FROM
            Orders,
            LATERAL TABLE (Rates(o_proctime))
          WHERE
            r_currency = o_currency

          注意:"FOR SYSTEM_TIME AS OF"語法不支持 VIEW/任意最新表是因為考慮到Flink的實現(xiàn)與其語義不大相符,左流的 JOIN 處理不會等待右邊的版本表(VIEW/表函數(shù))完成快照后才進行。個人理解可能會導(dǎo)致左表 JOIN 上的右表并不一定是當(dāng)前最新的數(shù)據(jù)。

          Lookup Join

          同基于事件時間的 Temporal Join,以 JOIN 算子執(zhí)行時的時間點查詢右表的數(shù)據(jù)進行關(guān)聯(lián)。一般用于維表關(guān)聯(lián),只支持等值 JOIN。例如:

          SELECT
            o.amout, o.currency, r.rate, o.amount * r.rate
          FROM
            Orders AS o
            JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
            ON r.currency = o.currency;

          Lookup Join 執(zhí)行流程

          以 Flink 單測用例為例進行講解,新手可以基于此上手開發(fā)自定義的 Rule。

          準(zhǔn)備工作

          編譯 Flink Table 模塊

          flink-table 目錄下執(zhí)行:mvn clean package -Pfast,hive-2.1.1,scala-2.12 -DskipTests

          打開單測文件

          Flink Rule 的 UT 包含:

          • 邏輯計劃測試:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical
          • 物理計劃測試:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql、XXX/batch/sql
          • 集成測試:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql、XXX/batch/sql

          這也是向社區(qū)提交 Rule 相關(guān) PR 需要完成的 UT

          打開日志級別

          在需要單測的代碼前,加上:Configurator.setAllLevels("", Level.TRACE)

          跟蹤sql執(zhí)行

          • 下文基于文件:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala 的執(zhí)行進行分析。
          • 執(zhí)行單測:testJoinTemporalTable    SELECT * FROM MyTable AS T JOIN LookupTable
            FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id

          sql解析

          parser(calcite語法支持)會將SQL語句 "FOR SYSTEM_TIME AS OF " 解析成 SqlSnapshot ( SqlNode),validate() 將其轉(zhuǎn)換成 LogicalSnapshot(RelNode),可以看到邏輯 執(zhí)行計劃:

          LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
            LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
              LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
              LogicalFilter(condition=[=($cor0.a, $0)])
                LogicalSnapshot(period=[$cor0.proctime])
                  LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])

          優(yōu)化器優(yōu)化

          FlinkStreamProgram/FlinkBatchProgram中定義了一系列規(guī)則,對邏輯/物理計劃進行轉(zhuǎn)換和優(yōu)化。

          該案例中會經(jīng)歷下邊的幾個重要的轉(zhuǎn)換過程:

          1. LogicalCorrelateToJoinFromLookupTableRuleWithFilter:
          // 從類的定義可以看出,上方的邏輯計劃能匹配上該規(guī)則
          class LogicalCorrelateToJoinFromLookupTableRuleWithFilter
            extends LogicalCorrelateToJoinFromLookupTemporalTableRule(
              operand(classOf[LogicalCorrelate],
                operand(classOf[RelNode], any()),
                operand(classOf[LogicalFilter],
                  operand(classOf[LogicalSnapshot],
                    operand(classOf[RelNode], any())))),
              "LogicalCorrelateToJoinFromLookupTableRuleWithFilter"
            ) {
              override def matches(call: RelOptRuleCall): Boolean = {
                val snapshot: LogicalSnapshot = call.rel(3)
                val snapshotInput: RelNode = trimHep(call.rel(4))
                isLookupJoin(snapshot, snapshotInput)
              }
              ……
          }
          // 匹配到規(guī)則后判斷是否為 lookupJoin
          protected def isLookupJoin(snapshot: LogicalSnapshot, snapshotInput: RelNode): Boolean = {
            ……
            // 是處理時間 且 快照的表為LookupTableSource
            isProcessingTime && snapshotOnLookupSource
          }

          匹配到后,會將LogicalCorrelate轉(zhuǎn)換成LogicalJoin

          LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
          +- LogicalJoin(condition=[=($0$5)], joinType=[inner])
             :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
             +- LogicalSnapshot(period=[$cor0.proctime])
                +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
          1. FlinkProjectJoinTransposeRule + ProjectRemoveRule:Project算子下推并裁剪
          // 對調(diào)Project和下方的Join算子,實現(xiàn)下推Project
          public FlinkProjectJoinTransposeRule(
                  PushProjector.ExprCondition preserveExprCondition, RelBuilderFactory relFactory) {
              super(operand(Project.class, operand(Join.class, any())), relFactory, null);
              this.preserveExprCondition = preserveExprCondition;
          }

          優(yōu)化后:

          LogicalJoin(condition=[=($0$5)], joinType=[inner])
          :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
          +- LogicalSnapshot(period=[$cor0.proctime])
             +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])

          接下來的Volcano規(guī)則會對邏輯計劃進行組合優(yōu)化,生成最優(yōu)的計劃??梢钥吹綀?zhí)行后,最優(yōu)結(jié)果為:

          12129 [main] DEBUG org.apache.calcite.plan.RelOptPlanner [] - Cheapest plan:
          FlinkLogicalJoin(condition=[=($0$5)], joinType=[inner]): rowcount = 3.0E7, cumulative cost = {4.0E8 rows, 5.0E8 cpu, 1.37E10 io, 0.0 network, 0.0 memory}, id = 403
            FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory}, id = 378
            FlinkLogicalSnapshot(period=[$cor0.proctime]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 2.0E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}, id = 402
              FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}, id = 381

          // 最后結(jié)果:
          FlinkLogicalJoin(condition=[=($0$5)], joinType=[inner])
          :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
          +- FlinkLogicalSnapshot(period=[$cor0.proctime])
             +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])    

          嘗試規(guī)則

          Rules                                                                   Attempts           Time (us)
          FlinkJoinPushExpressionsRule                                                   2                 553
          JoinConditionPushRule                                                          2                 152
          FlinkLogicalTableSourceScanConverter(in:NONE,out:LOGICAL)                      1              54,956
          FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)                                 1               4,787
          FlinkLogicalSnapshotConverter(in:NONE,out:LOGICAL)                             1               3,162
          FlinkLogicalDataStreamTableScanConverter(in:NONE,out:LOGICAL)                   1               1,403
          SimplifyJoinConditionRule                                                      1                 249
          * Total                                                                        9              65,262

          其中:幾個Converter放在LOGICAL_CONVERTERS中,該集合包含了一系列將 Calcite node 轉(zhuǎn)換成 Flink node 的邏輯規(guī)則。

          1. 比如:FlinkLogicalSnapshotConverter:
          // 把 LogicalSnapshot 轉(zhuǎn)換成  FlinkLogicalSnapshot
          class FlinkLogicalSnapshotConverter
            extends ConverterRule(
            // 匹配 LogicalSnapshot 類型,且沒有Convention,輸出的為 FlinkConventions.LOGICAL
              classOf[LogicalSnapshot],
              Convention.NONE,
              FlinkConventions.LOGICAL,
              "FlinkLogicalSnapshotConverter") {

            def convert(rel: RelNode): RelNode = {
              val snapshot = rel.asInstanceOf[LogicalSnapshot]
              val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)
              FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)
            }
          }
          1. 增加處理時間實體化的算子
          // convert time indicators
          chainedProgram.addLast(TIME_INDICATOR, new FlinkRelTimeIndicatorProgram)
          // 如果是事件時間,且必要的情況下,這里會創(chuàng)建一個 sqlFunction 來實現(xiàn)
          rexBuilder.makeCall(FlinkSqlOperatorTable.PROCTIME_MATERIALIZE, expr)

          經(jīng)轉(zhuǎn)換:

          FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
          +- FlinkLogicalJoin(condition=[=($0$5)], joinType=[inner])
             :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
             +- FlinkLogicalSnapshot(period=[$cor0.proctime])
                +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])

          物理規(guī)則優(yōu)化

          經(jīng)下述物理Volcano規(guī)則處理后

          FlinkJoinPushExpressionsRule          
          JoinConditionPushRule                                                  
          StreamPhysicalTableSourceScanRule(in:LOGICAL,out:STREAM_PHYSICAL)
          FlinkLogicalTableSourceScanConverter(in:NONE,out:LOGICAL)      
          StreamPhysicalSnapshotOnTableScanRule                       
          StreamPhysicalCalcRule(in:LOGICAL,out:STREAM_PHYSICAL)    
          FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)         
          StreamPhysicalDataStreamScanRule(in:LOGICAL,out:STREAM_PHYSICAL) 
          FlinkLogicalSnapshotConverter(in:NONE,out:LOGICAL)                 
          FlinkLogicalDataStreamTableScanConverter(in:NONE,out:LOGICAL)
          SimplifyJoinConditionRule 

          得到最優(yōu)結(jié)果:

          Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
          +- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])
             +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
          • StreamPhysicalCalcRule:將FlinkLogicalCalc轉(zhuǎn)換成StreamPhysicalCalc
          • SnapshotOnTableScanRule:將
          FlinkLogicalJoin
          +-  FlinkLogicalDataStreamTableScan
          +-  FlinkLogicalSnapshot
              +- FlinkLogicalTableSourceScan

          轉(zhuǎn)換成

          StreamPhysicalLookupJoin
          +- StreamPhysicalDataStreamScan

          這里是LookupJoin的關(guān)鍵轉(zhuǎn)換邏輯:

          // 該規(guī)則使用父類的匹配條件
          class SnapshotOnTableScanRule
            extends BaseSnapshotOnTableScanRule("StreamPhysicalSnapshotOnTableScanRule") {
          }
          // 可以看到,正好匹配上未優(yōu)化前的邏輯計劃
          abstract class BaseSnapshotOnTableScanRule(description: String)
            extends RelOptRule(
              operand(classOf[FlinkLogicalJoin],
                operand(classOf[FlinkLogicalRel], any()),
                operand(classOf[FlinkLogicalSnapshot],
                  operand(classOf[TableScan], any()))),
              description)
            with CommonLookupJoinRule 

          private def doTransform(
            join: FlinkLogicalJoin,
            input: FlinkLogicalRel,
            temporalTable: RelOptTable,
            calcProgram: Option[RexProgram]): StreamPhysicalLookupJoin = {
              
            val joinInfo = join.analyzeCondition

            val cluster = join.getCluster

            val providedTrait = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
            val requiredTrait = input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
             //將input從邏輯節(jié)點轉(zhuǎn)換成物理節(jié)點,這里會觸發(fā) StreamPhysicalDataStreamScanRule,
             //把FlinkLogicalTableSourceScan轉(zhuǎn)換成StreamPhysicalDataStreamScan
            val convInput = RelOptRule.convert(input, requiredTrait)
            new StreamPhysicalLookupJoin(
              cluster,
              providedTrait,
              convInput,
              temporalTable,
              calcProgram,
              joinInfo,
              join.getJoinType)
          }

          至此完成物理計劃的轉(zhuǎn)換

          翻譯物理計劃

          planner.translate()其中包括了:

          val execGraph = translateToExecNodeGraph(optimizedRelNodes)
          val transformations = translateToPlan(execGraph)

          在translateToExecNodeGraph中:會調(diào)用物理計劃生成最后節(jié)點的translateToExecNode方法。如

          • StreamPhysicalLookupJoin會轉(zhuǎn)換成StreamExecLookupJoin
            在translateToPlan中:調(diào)用ExecNode的translateToPlanInternal方法。以CommonExecLookupJoin為例:
          protected CommonExecLookupJoin(……){
              //這里忽略校驗和異步LookupFunction邏輯
              public Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
                  // -----------創(chuàng)建lookupFunction Operator的工廠---------------
                  RelOptTable temporalTable = temporalTableSourceSpec.getTemporalTable(planner);
              
                  UserDefinedFunction userDefinedFunction =
                          LookupJoinUtil.getLookupFunction(temporalTable, lookupKeys.keySet());
                  UserDefinedFunctionHelper.prepareInstance(
                          planner.getTableConfig().getConfiguration(), userDefinedFunction);
              
                  boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT;
                  StreamOperatorFactory<RowData> operatorFactory;
              
                  operatorFactory =
                          createSyncLookupJoin(
                                  temporalTable,
                                  planner.getTableConfig(),
                                  lookupKeys,
                                  (TableFunction<Object>) userDefinedFunction,
                                  planner.getRelBuilder(),
                                  inputRowType,
                                  tableSourceRowType,
                                  resultRowType,
                                  isLeftOuterJoin,
                                  planner.getExecEnv().getConfig().isObjectReuseEnabled());
                  //-------------------------------------------------------
                  // 轉(zhuǎn)換成Transformation
                  Transformation<RowData> inputTransformation =
                          (Transformation<RowData>) inputEdge.translateToPlan(planner);
                  return new OneInputTransformation<>(
                          inputTransformation,
                          getDescription(),
                          operatorFactory,
                          InternalTypeInfo.of(resultRowType),
                          inputTransformation.getParallelism());
              }
          }
          //只羅列核心邏輯,主要分三塊
          private StreamOperatorFactory<RowData> createSyncLookupJoin() {
              // 通過codeGenerator,生成lookupFunction的函數(shù),包裝成FlatMap函數(shù)
              GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFetcher =
                  LookupJoinCodeGenerator.generateSyncLookupFunction();
              // 生成表函數(shù)的輸出結(jié)果的Collector
              GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector =
                  LookupJoinCodeGenerator.generateCollector();
              // 最后會生成LookupJoinRunner的ProcessFunction
              // 如果在lookupJoin這一側(cè)(即右表)有Calc的話,該Runner中會帶有Calc的計算邏輯
              // 比如:SELECT * FROM T JOIN DIM FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.b + 1
              // Fetcher會讀出LookupFunction中的原始數(shù)據(jù),再經(jīng)過calc計算后,再與主表(左流)的數(shù)據(jù)進行比對
              GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc =
                  LookupJoinCodeGenerator.generateCalcMapFunction(
                          config,
                          JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
                          filterOnTemporalTable,
                          temporalTableOutputType,
                          tableSourceRowType);
              
              ProcessFunction<RowData, RowData> processFunc =
                      new LookupJoinWithCalcRunner(
                              generatedFetcher,
                              generatedCalc,
                              generatedCollector,
                              isLeftOuterJoin,
                              rightRowType.getFieldCount());
          }

          最后再Transformations->StreamGraph->JobGraph,與DataStream API的流程就統(tǒng)一了。


          如果這個文章對你有幫助,不要忘記 「在看」 「點贊」 「收藏」 三連啊喂!

          2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專家級技能模型與學(xué)習(xí)指南(勝天半子篇)
          互聯(lián)網(wǎng)最壞的時代可能真的來了
          我在B站讀大學(xué),大數(shù)據(jù)專業(yè)
          我們在學(xué)習(xí)Flink的時候,到底在學(xué)習(xí)什么?
          193篇文章暴揍Flink,這個合集你需要關(guān)注一下
          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點
          我們在學(xué)習(xí)Spark的時候,到底在學(xué)習(xí)什么?
          在所有Spark模塊中,我愿稱SparkSQL為最強!
          硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
          數(shù)據(jù)治理方法論和實踐小百科全書
          標(biāo)簽體系下的用戶畫像建設(shè)小指南
          4萬字長文 | ClickHouse基礎(chǔ)&實踐&調(diào)優(yōu)全視角解析
          【面試&個人成長】2021年過半,社招和校招的經(jīng)驗之談
          大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結(jié)
          我寫過的關(guān)于成長/面試/職場進階的文章
          當(dāng)我們在學(xué)習(xí)Hive的時候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
          瀏覽 149
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  色播日韩| 丁香五月天视频 | 精品国产乱码一区二区三区小黄书 | 俺去俺来色官网 | 一本无码在线播放 |