<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          電商數(shù)據(jù)分析案例!SQL構(gòu)建實(shí)時(shí)數(shù)倉(cāng)

          共 23878字,需瀏覽 48分鐘

           ·

          2020-08-20 15:49

          點(diǎn)擊上方數(shù)據(jù)管道”,選擇“置頂星標(biāo)”公眾號(hào)

          干貨福利,第一時(shí)間送達(dá)

          實(shí)時(shí)數(shù)倉(cāng)主要是為了解決傳統(tǒng)數(shù)倉(cāng)數(shù)據(jù)時(shí)效性低的問(wèn)題,實(shí)時(shí)數(shù)倉(cāng)通常會(huì)用在實(shí)時(shí)的OLAP分析、實(shí)時(shí)的數(shù)據(jù)看板、業(yè)務(wù)指標(biāo)實(shí)時(shí)監(jiān)控等場(chǎng)景。雖然關(guān)于實(shí)時(shí)數(shù)倉(cāng)的架構(gòu)及技術(shù)選型與傳統(tǒng)的離線數(shù)倉(cāng)會(huì)存在差異,但是關(guān)于數(shù)倉(cāng)建設(shè)的基本方法論是一致的。本文會(huì)分享基于Flink SQL從0到1搭建一個(gè)實(shí)時(shí)數(shù)倉(cāng)的demo,涉及數(shù)據(jù)采集、存儲(chǔ)、計(jì)算、可視化整個(gè)處理流程。通過(guò)本文你可以了解到:

          • 實(shí)時(shí)數(shù)倉(cāng)的基本架構(gòu)
          • 實(shí)時(shí)數(shù)倉(cāng)的數(shù)據(jù)處理流程
          • Flink1.11的SQL新特性
          • Flink1.11存在的bug
          • 完整的操作案例

          古人學(xué)問(wèn)無(wú)遺力,少壯工夫老始成。

          紙上得來(lái)終覺(jué)淺,絕知此事要躬行。

          案例簡(jiǎn)介

          本文會(huì)以電商業(yè)務(wù)為例,展示實(shí)時(shí)數(shù)倉(cāng)的數(shù)據(jù)處理流程。另外,本文旨在說(shuō)明實(shí)時(shí)數(shù)倉(cāng)的構(gòu)建流程,所以不會(huì)涉及太復(fù)雜的數(shù)據(jù)計(jì)算。為了保證案例的可操作性和完整性,本文會(huì)給出詳細(xì)的操作步驟。為了方便演示,本文的所有操作都是在Flink SQL Cli中完成的。

          架構(gòu)設(shè)計(jì)

          具體的架構(gòu)設(shè)計(jì)如圖所示:首先通過(guò)canal解析MySQL的binlog日志,將數(shù)據(jù)存儲(chǔ)在Kafka中。然后使用Flink SQL對(duì)原始數(shù)據(jù)進(jìn)行清洗關(guān)聯(lián),并將處理之后的明細(xì)寬表寫(xiě)入kafka中。維表數(shù)據(jù)存儲(chǔ)在MySQL中,通過(guò)Flink SQL對(duì)明細(xì)寬表與維表進(jìn)行JOIN,將聚合后的數(shù)據(jù)寫(xiě)入MySQL,最后通過(guò)FineBI進(jìn)行可視化展示。

          業(yè)務(wù)數(shù)據(jù)準(zhǔn)備

          • 訂單表(order_info)
          CREATE?TABLE?`order_info`?(
          ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號(hào)',
          ??`consignee`?varchar(100)?DEFAULT?NULL?COMMENT?'收貨人',
          ??`consignee_tel`?varchar(20)?DEFAULT?NULL?COMMENT?'收件人電話',
          ??`total_amount`?decimal(10,2)?DEFAULT?NULL?COMMENT?'總金額',
          ??`order_status`?varchar(20)?DEFAULT?NULL?COMMENT?'訂單狀態(tài)',
          ??`user_id`?bigint(20)?DEFAULT?NULL?COMMENT?'用戶id',
          ??`payment_way`?varchar(20)?DEFAULT?NULL?COMMENT?'付款方式',
          ??`delivery_address`?varchar(1000)?DEFAULT?NULL?COMMENT?'送貨地址',
          ??`order_comment`?varchar(200)?DEFAULT?NULL?COMMENT?'訂單備注',
          ??`out_trade_no`?varchar(50)?DEFAULT?NULL?COMMENT?'訂單交易編號(hào)(第三方支付用)',
          ??`trade_body`?varchar(200)?DEFAULT?NULL?COMMENT?'訂單描述(第三方支付用)',
          ??`create_time`?datetime?DEFAULT?NULL?COMMENT?'創(chuàng)建時(shí)間',
          ??`operate_time`?datetime?DEFAULT?NULL?COMMENT?'操作時(shí)間',
          ??`expire_time`?datetime?DEFAULT?NULL?COMMENT?'失效時(shí)間',
          ??`tracking_no`?varchar(100)?DEFAULT?NULL?COMMENT?'物流單編號(hào)',
          ??`parent_order_id`?bigint(20)?DEFAULT?NULL?COMMENT?'父訂單編號(hào)',
          ??`img_url`?varchar(200)?DEFAULT?NULL?COMMENT?'圖片路徑',
          ??`province_id`?int(20)?DEFAULT?NULL?COMMENT?'地區(qū)',
          ??PRIMARY?KEY?(`id`)
          )?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='訂單表';
          • 訂單詳情表(order_detail)
          CREATE?TABLE?`order_detail`?(
          ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號(hào)',
          ??`order_id`?bigint(20)?DEFAULT?NULL?COMMENT?'訂單編號(hào)',
          ??`sku_id`?bigint(20)?DEFAULT?NULL?COMMENT?'sku_id',
          ??`sku_name`?varchar(200)?DEFAULT?NULL?COMMENT?'sku名稱(冗余)',
          ??`img_url`?varchar(200)?DEFAULT?NULL?COMMENT?'圖片名稱(冗余)',
          ??`order_price`?decimal(10,2)?DEFAULT?NULL?COMMENT?'購(gòu)買(mǎi)價(jià)格(下單時(shí)sku價(jià)格)',
          ??`sku_num`?varchar(200)?DEFAULT?NULL?COMMENT?'購(gòu)買(mǎi)個(gè)數(shù)',
          ??`create_time`?datetime?DEFAULT?NULL?COMMENT?'創(chuàng)建時(shí)間',
          ??PRIMARY?KEY?(`id`)
          )?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='訂單詳情表';
          • 商品表(sku_info)
          CREATE?TABLE?`sku_info`?(
          ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'skuid(itemID)',
          ??`spu_id`?bigint(20)?DEFAULT?NULL?COMMENT?'spuid',
          ??`price`?decimal(10,0)?DEFAULT?NULL?COMMENT?'價(jià)格',
          ??`sku_name`?varchar(200)?DEFAULT?NULL?COMMENT?'sku名稱',
          ??`sku_desc`?varchar(2000)?DEFAULT?NULL?COMMENT?'商品規(guī)格描述',
          ??`weight`?decimal(10,2)?DEFAULT?NULL?COMMENT?'重量',
          ??`tm_id`?bigint(20)?DEFAULT?NULL?COMMENT?'品牌(冗余)',
          ??`category3_id`?bigint(20)?DEFAULT?NULL?COMMENT?'三級(jí)分類id(冗余)',
          ??`sku_default_img`?varchar(200)?DEFAULT?NULL?COMMENT?'默認(rèn)顯示圖片(冗余)',
          ??`create_time`?datetime?DEFAULT?NULL?COMMENT?'創(chuàng)建時(shí)間',
          ??PRIMARY?KEY?(`id`)
          )?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='商品表';
          • 商品一級(jí)類目表(base_category1)
          CREATE?TABLE?`base_category1`?(
          ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號(hào)',
          ??`name`?varchar(10)?NOT?NULL?COMMENT?'分類名稱',
          ??PRIMARY?KEY?(`id`)
          )?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='一級(jí)分類表';
          • 商品二級(jí)類目表(base_category2)
          CREATE?TABLE?`base_category2`?(
          ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號(hào)',
          ??`name`?varchar(200)?NOT?NULL?COMMENT?'二級(jí)分類名稱',
          ??`category1_id`?bigint(20)?DEFAULT?NULL?COMMENT?'一級(jí)分類編號(hào)',
          ??PRIMARY?KEY?(`id`)
          )?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='二級(jí)分類表';
          • 商品三級(jí)類目表(base_category3)
          CREATE?TABLE?`base_category3`?(
          ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號(hào)',
          ??`name`?varchar(200)?NOT?NULL?COMMENT?'三級(jí)分類名稱',
          ??`category2_id`?bigint(20)?DEFAULT?NULL?COMMENT?'二級(jí)分類編號(hào)',
          ??PRIMARY?KEY?(`id`)
          )?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='三級(jí)分類表';
          • 省份表(base_province)
          CREATE?TABLE?`base_province`?(
          ??`id`?int(20)?DEFAULT?NULL?COMMENT?'id',
          ??`name`?varchar(20)?DEFAULT?NULL?COMMENT?'省名稱',
          ??`region_id`?int(20)?DEFAULT?NULL?COMMENT?'大區(qū)id',
          ??`area_code`?varchar(20)?DEFAULT?NULL?COMMENT?'行政區(qū)位碼'
          )?ENGINE=InnoDB?DEFAULT?CHARSET=utf8;
          • 區(qū)域表(base_region)
          CREATE?TABLE?`base_region`?(
          ??`id`?int(20)?NOT?NULL?COMMENT?'大區(qū)id',
          ??`region_name`?varchar(20)?DEFAULT?NULL?COMMENT?'大區(qū)名稱',
          ???PRIMARY?KEY?(`id`)
          )?ENGINE=InnoDB?DEFAULT?CHARSET=utf8;

          注意:以上的建表語(yǔ)句是在MySQL中完成的,完整的建表及模擬數(shù)據(jù)生成腳本見(jiàn):

          鏈接:https://pan.baidu.com/s/1fcMgDHGKedOpzqLbSRUGwA 提取碼:zuqw

          數(shù)據(jù)處理流程

          ODS層數(shù)據(jù)同步

          主要使用canal解析MySQL的binlog日志,然后將其寫(xiě)入到Kafka對(duì)應(yīng)的topic中。由于篇幅限制,不會(huì)對(duì)具體的細(xì)節(jié)進(jìn)行說(shuō)明。同步之后的結(jié)果如下圖所示:

          DIM層維表數(shù)據(jù)準(zhǔn)備

          本案例中將維表存儲(chǔ)在了MySQL中,實(shí)際生產(chǎn)中會(huì)用HBase存儲(chǔ)維表數(shù)據(jù)。我們主要用到兩張維表:區(qū)域維表商品維表。處理過(guò)程如下:

          • 區(qū)域維表

          首先將mydw.base_provincemydw.base_region這個(gè)主題對(duì)應(yīng)的數(shù)據(jù)抽取到MySQL中,主要使用Flink SQL的Kafka數(shù)據(jù)源對(duì)應(yīng)的canal-json格式,注意:在執(zhí)行裝載之前,需要先在MySQL中創(chuàng)建對(duì)應(yīng)的表,本文使用的MySQL數(shù)據(jù)庫(kù)的名字為dim,用于存放維表數(shù)據(jù)。如下:

          --?-------------------------
          --???省份
          --???kafka?Source
          --?-------------------------?
          DROP?TABLE?IF?EXISTS?`ods_base_province`;
          CREATE?TABLE?`ods_base_province`?(
          ??`id`?INT,
          ??`name`?STRING,
          ??`region_id`?INT?,
          ??`area_code`STRING
          )?WITH(
          'connector'?=?'kafka',
          ?'topic'?=?'mydw.base_province',
          ?'properties.bootstrap.servers'?=?'kms-3:9092',
          ?'properties.group.id'?=?'testGroup',
          ?'format'?=?'canal-json'?,
          ?'scan.startup.mode'?=?'earliest-offset'?
          )?;?

          --?-------------------------
          --???省份
          --???MySQL?Sink
          --?-------------------------?
          DROP?TABLE?IF?EXISTS?`base_province`;
          CREATE?TABLE?`base_province`?(
          ????`id`?INT,
          ????`name`?STRING,
          ????`region_id`?INT?,
          ????`area_code`STRING,
          ????PRIMARY?KEY?(id)?NOT?ENFORCED
          )?WITH?(
          ????'connector'?=?'jdbc',
          ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
          ????'table-name'?=?'base_province',?--?MySQL中的待插入數(shù)據(jù)的表
          ????'driver'?=?'com.mysql.jdbc.Driver',
          ????'username'?=?'root',
          ????'password'?=?'123qwe',
          ????'sink.buffer-flush.interval'?=?'1s'
          );

          --?-------------------------
          --???省份
          --???MySQL?Sink?Load?Data
          --?-------------------------?
          INSERT?INTO?base_province
          SELECT?*
          FROM?ods_base_province;

          --?-------------------------
          --???區(qū)域
          --???kafka?Source
          --?-------------------------?
          DROP?TABLE?IF?EXISTS?`ods_base_region`;
          CREATE?TABLE?`ods_base_region`?(
          ??`id`?INT,
          ??`region_name`?STRING
          )?WITH(
          'connector'?=?'kafka',
          ?'topic'?=?'mydw.base_region',
          ?'properties.bootstrap.servers'?=?'kms-3:9092',
          ?'properties.group.id'?=?'testGroup',
          ?'format'?=?'canal-json'?,
          ?'scan.startup.mode'?=?'earliest-offset'?
          )?;?

          --?-------------------------
          --???區(qū)域
          --???MySQL?Sink
          --?-------------------------?
          DROP?TABLE?IF?EXISTS?`base_region`;
          CREATE?TABLE?`base_region`?(
          ????`id`?INT,
          ????`region_name`?STRING,
          ?????PRIMARY?KEY?(id)?NOT?ENFORCED
          )?WITH?(
          ????'connector'?=?'jdbc',
          ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
          ????'table-name'?=?'base_region',?--?MySQL中的待插入數(shù)據(jù)的表
          ????'driver'?=?'com.mysql.jdbc.Driver',
          ????'username'?=?'root',
          ????'password'?=?'123qwe',
          ????'sink.buffer-flush.interval'?=?'1s'
          );

          --?-------------------------
          --???區(qū)域
          --???MySQL?Sink?Load?Data
          --?-------------------------?
          INSERT?INTO?base_region
          SELECT?*
          FROM?ods_base_region;

          經(jīng)過(guò)上面的步驟,將創(chuàng)建維表所需要的原始數(shù)據(jù)已經(jīng)存儲(chǔ)到了MySQL中,接下來(lái)就需要在MySQL中創(chuàng)建維表,我們使用上面的兩張表,創(chuàng)建一張視圖:dim_province作為維表:

          --?---------------------------------
          --?DIM層,區(qū)域維表,
          --?在MySQL中創(chuàng)建視圖
          --?---------------------------------
          DROP?VIEW?IF?EXISTS?dim_province;
          CREATE?VIEW?dim_province?AS
          SELECT
          ??bp.id?AS?province_id,
          ??bp.name?AS?province_name,
          ??br.id?AS?region_id,
          ??br.region_name?AS?region_name,
          ??bp.area_code?AS?area_code
          FROM?base_region?br?
          ?????JOIN?base_province?bp?ON?br.id=?bp.region_id
          ;

          這樣我們所需要的維表:dim_province就創(chuàng)建好了,只需要在維表join時(shí),使用Flink SQL創(chuàng)建JDBC的數(shù)據(jù)源,就可以使用該維表了。同理,我們使用相同的方法創(chuàng)建商品維表,具體如下:

          --?-------------------------
          --??一級(jí)類目表
          --???kafka?Source
          --?-------------------------?
          DROP?TABLE?IF?EXISTS?`ods_base_category1`;
          CREATE?TABLE?`ods_base_category1`?(
          ??`id`?BIGINT,
          ??`name`?STRING
          )WITH(
          ?'connector'?=?'kafka',
          ?'topic'?=?'mydw.base_category1',
          ?'properties.bootstrap.servers'?=?'kms-3:9092',
          ?'properties.group.id'?=?'testGroup',
          ?'format'?=?'canal-json'?,
          ?'scan.startup.mode'?=?'earliest-offset'?
          )?;

          --?-------------------------
          --??一級(jí)類目表
          --???MySQL?Sink
          --?-------------------------?
          DROP?TABLE?IF?EXISTS?`base_category1`;
          CREATE?TABLE?`base_category1`?(
          ????`id`?BIGINT,
          ????`name`?STRING,
          ?????PRIMARY?KEY?(id)?NOT?ENFORCED
          )?WITH?(
          ????'connector'?=?'jdbc',
          ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
          ????'table-name'?=?'base_category1',?--?MySQL中的待插入數(shù)據(jù)的表
          ????'driver'?=?'com.mysql.jdbc.Driver',
          ????'username'?=?'root',
          ????'password'?=?'123qwe',
          ????'sink.buffer-flush.interval'?=?'1s'
          );

          --?-------------------------
          --??一級(jí)類目表
          --???MySQL?Sink?Load?Data
          --?-------------------------?

          INSERT?INTO?base_category1
          SELECT?*
          FROM?ods_base_category1;

          --?-------------------------
          --??二級(jí)類目表
          --???kafka?Source
          --?-------------------------?
          DROP?TABLE?IF?EXISTS?`ods_base_category2`;
          CREATE?TABLE?`ods_base_category2`?(
          ??`id`?BIGINT,
          ??`name`?STRING,
          ??`category1_id`?BIGINT
          )WITH(
          'connector'?=?'kafka',
          ?'topic'?=?'mydw.base_category2',
          ?'properties.bootstrap.servers'?=?'kms-3:9092',
          ?'properties.group.id'?=?'testGroup',
          ?'format'?=?'canal-json'?,
          ?'scan.startup.mode'?=?'earliest-offset'?
          )?;

          --?-------------------------
          --??二級(jí)類目表
          --???MySQL?Sink
          --?-------------------------?
          DROP?TABLE?IF?EXISTS?`base_category2`;
          CREATE?TABLE?`base_category2`?(
          ????`id`?BIGINT,
          ????`name`?STRING,
          ????`category1_id`?BIGINT,
          ?????PRIMARY?KEY?(id)?NOT?ENFORCED
          )?WITH?(
          ????'connector'?=?'jdbc',
          ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
          ????'table-name'?=?'base_category2',?--?MySQL中的待插入數(shù)據(jù)的表
          ????'driver'?=?'com.mysql.jdbc.Driver',
          ????'username'?=?'root',
          ????'password'?=?'123qwe',
          ????'sink.buffer-flush.interval'?=?'1s'
          );

          --?-------------------------
          --??二級(jí)類目表
          --???MySQL?Sink?Load?Data
          --?-------------------------?
          INSERT?INTO?base_category2
          SELECT?*
          FROM?ods_base_category2;

          --?-------------------------
          --?三級(jí)類目表
          --???kafka?Source
          --?-------------------------?
          DROP?TABLE?IF?EXISTS?`ods_base_category3`;
          CREATE?TABLE?`ods_base_category3`?(
          ??`id`?BIGINT,
          ??`name`?STRING,
          ??`category2_id`?BIGINT
          )WITH(
          'connector'?=?'kafka',
          ?'topic'?=?'mydw.base_category3',
          ?'properties.bootstrap.servers'?=?'kms-3:9092',
          ?'properties.group.id'?=?'testGroup',
          ?'format'?=?'canal-json'?,
          ?'scan.startup.mode'?=?'earliest-offset'?
          )?;?

          --?-------------------------
          --??三級(jí)類目表
          --???MySQL?Sink
          --?-------------------------?
          DROP?TABLE?IF?EXISTS?`base_category3`;
          CREATE?TABLE?`base_category3`?(
          ????`id`?BIGINT,
          ????`name`?STRING,
          ????`category2_id`?BIGINT,
          ????PRIMARY?KEY?(id)?NOT?ENFORCED
          )?WITH?(
          ????'connector'?=?'jdbc',
          ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
          ????'table-name'?=?'base_category3',?--?MySQL中的待插入數(shù)據(jù)的表
          ????'driver'?=?'com.mysql.jdbc.Driver',
          ????'username'?=?'root',
          ????'password'?=?'123qwe',
          ????'sink.buffer-flush.interval'?=?'1s'
          );

          --?-------------------------
          --??三級(jí)類目表
          --???MySQL?Sink?Load?Data
          --?-------------------------?
          INSERT?INTO?base_category3
          SELECT?*
          FROM?ods_base_category3;

          --?-------------------------
          --???商品表
          --???Kafka?Source
          --?-------------------------?

          DROP?TABLE?IF?EXISTS?`ods_sku_info`;
          CREATE?TABLE?`ods_sku_info`?(
          ??`id`?BIGINT,
          ??`spu_id`?BIGINT,
          ??`price`?DECIMAL(10,0),
          ??`sku_name`?STRING,
          ??`sku_desc`?STRING,
          ??`weight`?DECIMAL(10,2),
          ??`tm_id`?BIGINT,
          ??`category3_id`?BIGINT,
          ??`sku_default_img`?STRING,
          ??`create_time`?TIMESTAMP(0)
          )?WITH(
          ?'connector'?=?'kafka',
          ?'topic'?=?'mydw.sku_info',
          ?'properties.bootstrap.servers'?=?'kms-3:9092',
          ?'properties.group.id'?=?'testGroup',
          ?'format'?=?'canal-json'?,
          ?'scan.startup.mode'?=?'earliest-offset'?
          )?;?

          --?-------------------------
          --???商品表
          --???MySQL?Sink
          --?-------------------------?
          DROP?TABLE?IF?EXISTS?`sku_info`;
          CREATE?TABLE?`sku_info`?(
          ??`id`?BIGINT,
          ??`spu_id`?BIGINT,
          ??`price`?DECIMAL(10,0),
          ??`sku_name`?STRING,
          ??`sku_desc`?STRING,
          ??`weight`?DECIMAL(10,2),
          ??`tm_id`?BIGINT,
          ??`category3_id`?BIGINT,
          ??`sku_default_img`?STRING,
          ??`create_time`?TIMESTAMP(0),
          ???PRIMARY?KEY?(tm_id)?NOT?ENFORCED
          )?WITH?(
          ????'connector'?=?'jdbc',
          ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
          ????'table-name'?=?'sku_info',?--?MySQL中的待插入數(shù)據(jù)的表
          ????'driver'?=?'com.mysql.jdbc.Driver',
          ????'username'?=?'root',
          ????'password'?=?'123qwe',
          ????'sink.buffer-flush.interval'?=?'1s'
          );

          --?-------------------------
          --???商品
          --???MySQL?Sink?Load?Data
          --?-------------------------?
          INSERT?INTO?sku_info
          SELECT?*
          FROM?ods_sku_info;

          經(jīng)過(guò)上面的步驟,我們可以將創(chuàng)建商品維表的基礎(chǔ)數(shù)據(jù)表同步到MySQL中,同樣需要提前創(chuàng)建好對(duì)應(yīng)的數(shù)據(jù)表。接下來(lái)我們使用上面的基礎(chǔ)表在mySQL的dim庫(kù)中創(chuàng)建一張視圖:dim_sku_info,用作后續(xù)使用的維表。

          --?---------------------------------
          --?DIM層,商品維表,
          --?在MySQL中創(chuàng)建視圖
          --?---------------------------------
          CREATE?VIEW?dim_sku_info?AS
          SELECT
          ??si.id?AS?id,
          ??si.sku_name?AS?sku_name,
          ??si.category3_id?AS?c3_id,
          ??si.weight?AS?weight,
          ??si.tm_id?AS?tm_id,
          ??si.price?AS?price,
          ??si.spu_id?AS?spu_id,
          ??c3.name?AS?c3_name,
          ??c2.id?AS?c2_id,
          ??c2.name?AS?c2_name,
          ??c3.id?AS?c1_id,
          ??c3.name?AS?c1_name
          FROM
          (
          ??sku_info?si?
          ??JOIN?base_category3?c3?ON?si.category3_id?=?c3.id
          ??JOIN?base_category2?c2?ON?c3.category2_id?=c2.id
          ??JOIN?base_category1?c1?ON?c2.category1_id?=?c1.id
          );

          至此,我們所需要的維表數(shù)據(jù)已經(jīng)準(zhǔn)備好了,接下來(lái)開(kāi)始處理DWD層的數(shù)據(jù)。

          DWD層數(shù)據(jù)處理

          經(jīng)過(guò)上面的步驟,我們已經(jīng)將所用的維表已經(jīng)準(zhǔn)備好了。接下來(lái)我們將對(duì)ODS的原始數(shù)據(jù)進(jìn)行處理,加工成DWD層的明細(xì)寬表。具體過(guò)程如下:

          --?-------------------------
          --???訂單詳情
          --???Kafka?Source
          --?-------------------------?

          DROP?TABLE?IF?EXISTS?`ods_order_detail`;
          CREATE?TABLE?`ods_order_detail`(
          ??`id`?BIGINT,
          ??`order_id`?BIGINT,
          ??`sku_id`?BIGINT,
          ??`sku_name`?STRING,
          ??`img_url`?STRING,
          ??`order_price`?DECIMAL(10,2),
          ??`sku_num`?INT,
          ??`create_time`?TIMESTAMP(0)
          )?WITH(
          ?'connector'?=?'kafka',
          ?'topic'?=?'mydw.order_detail',
          ?'properties.bootstrap.servers'?=?'kms-3:9092',
          ?'properties.group.id'?=?'testGroup',
          ?'format'?=?'canal-json'?,
          ?'scan.startup.mode'?=?'earliest-offset'?
          )?;?

          --?-------------------------
          --???訂單信息
          --???Kafka?Source
          --?-------------------------
          DROP?TABLE?IF?EXISTS?`ods_order_info`;
          CREATE?TABLE?`ods_order_info`?(
          ??`id`?BIGINT,
          ??`consignee`?STRING,
          ??`consignee_tel`?STRING,
          ??`total_amount`?DECIMAL(10,2),
          ??`order_status`?STRING,
          ??`user_id`?BIGINT,
          ??`payment_way`?STRING,
          ??`delivery_address`?STRING,
          ??`order_comment`?STRING,
          ??`out_trade_no`?STRING,
          ??`trade_body`?STRING,
          ??`create_time`?TIMESTAMP(0)?,
          ??`operate_time`?TIMESTAMP(0)?,
          ??`expire_time`?TIMESTAMP(0)?,
          ??`tracking_no`?STRING,
          ??`parent_order_id`?BIGINT,
          ??`img_url`?STRING,
          ??`province_id`?INT
          )?WITH(
          'connector'?=?'kafka',
          ?'topic'?=?'mydw.order_info',
          ?'properties.bootstrap.servers'?=?'kms-3:9092',
          ?'properties.group.id'?=?'testGroup',
          ?'format'?=?'canal-json'?,
          ?'scan.startup.mode'?=?'earliest-offset'?
          )?;?

          --?---------------------------------
          --?DWD層,支付訂單明細(xì)表dwd_paid_order_detail
          --?---------------------------------
          DROP?TABLE?IF?EXISTS?dwd_paid_order_detail;
          CREATE?TABLE?dwd_paid_order_detail
          (
          ??detail_id?BIGINT,
          ??order_id?BIGINT,
          ??user_id?BIGINT,
          ??province_id?INT,
          ??sku_id?BIGINT,
          ??sku_name?STRING,
          ??sku_num?INT,
          ??order_price?DECIMAL(10,0),
          ??create_time?STRING,
          ??pay_time?STRING
          ?)?WITH?(
          ????'connector'?=?'kafka',
          ????'topic'?=?'dwd_paid_order_detail',
          ????'scan.startup.mode'?=?'earliest-offset',
          ????'properties.bootstrap.servers'?=?'kms-3:9092',
          ????'format'?=?'changelog-json'
          );
          --?---------------------------------
          --?DWD層,已支付訂單明細(xì)表
          --?向dwd_paid_order_detail裝載數(shù)據(jù)
          --?---------------------------------
          INSERT?INTO?dwd_paid_order_detail
          SELECT
          ??od.id,
          ??oi.id?order_id,
          ??oi.user_id,
          ??oi.province_id,
          ??od.sku_id,
          ??od.sku_name,
          ??od.sku_num,
          ??od.order_price,
          ??oi.create_time,
          ??oi.operate_time
          FROM
          ????(
          ????SELECT?*?
          ????FROM?ods_order_info
          ????WHERE?order_status?=?'2'?--?已支付
          ????)?oi?JOIN
          ????(
          ????SELECT?*
          ????FROM?ods_order_detail
          ????)?od?
          ????ON?oi.id?=?od.order_id;

          ADS層數(shù)據(jù)

          經(jīng)過(guò)上面的步驟,我們創(chuàng)建了一張dwd_paid_order_detail明細(xì)寬表,并將該表存儲(chǔ)在了Kafka中。接下來(lái)我們將使用這張明細(xì)寬表與維表進(jìn)行JOIN,得到我們ADS應(yīng)用層數(shù)據(jù)。

          • ads_province_index

          首先在MySQL中創(chuàng)建對(duì)應(yīng)的ADS目標(biāo)表:ads_province_index

          CREATE?TABLE?ads.ads_province_index(
          ??province_id?INT(10),
          ??area_code?VARCHAR(100),
          ??province_name?VARCHAR(100),
          ??region_id?INT(10),
          ??region_name?VARCHAR(100),
          ??order_amount?DECIMAL(10,2),
          ??order_count?BIGINT(10),
          ??dt?VARCHAR(100),
          ??PRIMARY?KEY?(province_id,?dt)?
          )?;

          向MySQL的ADS層目標(biāo)裝載數(shù)據(jù):

          --?Flink?SQL?Cli操作
          --?---------------------------------
          --?使用?DDL創(chuàng)建MySQL中的ADS層表
          --?指標(biāo):1.每天每個(gè)省份的訂單數(shù)
          --??????2.每天每個(gè)省份的訂單金額
          --?---------------------------------
          CREATE?TABLE?ads_province_index(
          ??province_id?INT,
          ??area_code?STRING,
          ??province_name?STRING,
          ??region_id?INT,
          ??region_name?STRING,
          ??order_amount?DECIMAL(10,2),
          ??order_count?BIGINT,
          ??dt?STRING,
          ??PRIMARY?KEY?(province_id,?dt)?NOT?ENFORCED??
          )?WITH?(
          ????'connector'?=?'jdbc',
          ????'url'?=?'jdbc:mysql://kms-1:3306/ads',
          ????'table-name'?=?'ads_province_index',?
          ????'driver'?=?'com.mysql.jdbc.Driver',
          ????'username'?=?'root',
          ????'password'?=?'123qwe'
          );
          --?---------------------------------
          --?dwd_paid_order_detail已支付訂單明細(xì)寬表
          --?---------------------------------
          CREATE?TABLE?dwd_paid_order_detail
          (
          ??detail_id?BIGINT,
          ??order_id?BIGINT,
          ??user_id?BIGINT,
          ??province_id?INT,
          ??sku_id?BIGINT,
          ??sku_name?STRING,
          ??sku_num?INT,
          ??order_price?DECIMAL(10,2),
          ??create_time?STRING,
          ??pay_time?STRING
          ?)?WITH?(
          ????'connector'?=?'kafka',
          ????'topic'?=?'dwd_paid_order_detail',
          ????'scan.startup.mode'?=?'earliest-offset',
          ????'properties.bootstrap.servers'?=?'kms-3:9092',
          ????'format'?=?'changelog-json'
          );

          --?---------------------------------
          --?tmp_province_index
          --?訂單匯總臨時(shí)表
          --?---------------------------------
          CREATE?TABLE?tmp_province_index(
          ????province_id?INT,
          ????order_count?BIGINT,--?訂單數(shù)
          ????order_amount?DECIMAL(10,2),?--?訂單金額
          ????pay_date?DATE
          )WITH?(
          ????'connector'?=?'kafka',
          ????'topic'?=?'tmp_province_index',
          ????'scan.startup.mode'?=?'earliest-offset',
          ????'properties.bootstrap.servers'?=?'kms-3:9092',
          ????'format'?=?'changelog-json'
          );
          --?---------------------------------
          --?tmp_province_index
          --?訂單匯總臨時(shí)表數(shù)據(jù)裝載
          --?---------------------------------
          INSERT?INTO?tmp_province_index
          SELECT
          ??????province_id,
          ??????count(distinct?order_id)?order_count,--?訂單數(shù)
          ??????sum(order_price?*?sku_num)?order_amount,?--?訂單金額
          ??????TO_DATE(pay_time,'yyyy-MM-dd')?pay_date
          FROM?dwd_paid_order_detail
          GROUP?BY?province_id,TO_DATE(pay_time,'yyyy-MM-dd')
          ;
          --?---------------------------------
          --?tmp_province_index_source
          --?使用該臨時(shí)匯總表,作為數(shù)據(jù)源
          --?---------------------------------
          CREATE?TABLE?tmp_province_index_source(
          ????province_id?INT,
          ????order_count?BIGINT,--?訂單數(shù)
          ????order_amount?DECIMAL(10,2),?--?訂單金額
          ????pay_date?DATE,
          ????proctime?as?PROCTIME()???--?通過(guò)計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
          ?)?WITH?(
          ????'connector'?=?'kafka',
          ????'topic'?=?'tmp_province_index',
          ????'scan.startup.mode'?=?'earliest-offset',
          ????'properties.bootstrap.servers'?=?'kms-3:9092',
          ????'format'?=?'changelog-json'
          );

          --?---------------------------------
          --?DIM層,區(qū)域維表,
          --?創(chuàng)建區(qū)域維表數(shù)據(jù)源
          --?---------------------------------
          DROP?TABLE?IF?EXISTS?`dim_province`;
          CREATE?TABLE?dim_province?(
          ??province_id?INT,
          ??province_name?STRING,
          ??area_code?STRING,
          ??region_id?INT,
          ??region_name?STRING?,
          ??PRIMARY?KEY?(province_id)?NOT?ENFORCED
          )?WITH?(
          ????'connector'?=?'jdbc',
          ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
          ????'table-name'?=?'dim_province',?
          ????'driver'?=?'com.mysql.jdbc.Driver',
          ????'username'?=?'root',
          ????'password'?=?'123qwe',
          ????'scan.fetch-size'?=?'100'
          );

          --?---------------------------------
          --?向ads_province_index裝載數(shù)據(jù)
          --?維表JOIN
          --?---------------------------------

          INSERT?INTO?ads_province_index
          SELECT
          ??pc.province_id,
          ??dp.area_code,
          ??dp.province_name,
          ??dp.region_id,
          ??dp.region_name,
          ??pc.order_amount,
          ??pc.order_count,
          ??cast(pc.pay_date?as?VARCHAR)
          FROM
          tmp_province_index_source?pc
          ??JOIN?dim_province?FOR?SYSTEM_TIME?AS?OF?pc.proctime?as?dp?
          ??ON?dp.province_id?=?pc.province_id;

          當(dāng)提交任務(wù)之后:觀察Flink WEB UI:

          查看ADS層的ads_province_index表數(shù)據(jù):

          • ads_sku_index

          首先在MySQL中創(chuàng)建對(duì)應(yīng)的ADS目標(biāo)表:ads_sku_index

          CREATE?TABLE?ads_sku_index
          (
          ??sku_id?BIGINT(10),
          ??sku_name?VARCHAR(100),
          ??weight?DOUBLE,
          ??tm_id?BIGINT(10),
          ??price?DOUBLE,
          ??spu_id?BIGINT(10),
          ??c3_id?BIGINT(10),
          ??c3_name?VARCHAR(100)?,
          ??c2_id?BIGINT(10),
          ??c2_name?VARCHAR(100),
          ??c1_id?BIGINT(10),
          ??c1_name?VARCHAR(100),
          ??order_amount?DOUBLE,
          ??order_count?BIGINT(10),
          ??sku_count?BIGINT(10),
          ??dt?varchar(100),
          ??PRIMARY?KEY?(sku_id,dt)
          );

          向MySQL的ADS層目標(biāo)裝載數(shù)據(jù):

          --?---------------------------------
          --?使用?DDL創(chuàng)建MySQL中的ADS層表
          --?指標(biāo):1.每天每個(gè)商品對(duì)應(yīng)的訂單個(gè)數(shù)
          --??????2.每天每個(gè)商品對(duì)應(yīng)的訂單金額
          --??????3.每天每個(gè)商品對(duì)應(yīng)的數(shù)量
          --?---------------------------------
          CREATE?TABLE?ads_sku_index
          (
          ??sku_id?BIGINT,
          ??sku_name?VARCHAR,
          ??weight?DOUBLE,
          ??tm_id?BIGINT,
          ??price?DOUBLE,
          ??spu_id?BIGINT,
          ??c3_id?BIGINT,
          ??c3_name?VARCHAR?,
          ??c2_id?BIGINT,
          ??c2_name?VARCHAR,
          ??c1_id?BIGINT,
          ??c1_name?VARCHAR,
          ??order_amount?DOUBLE,
          ??order_count?BIGINT,
          ??sku_count?BIGINT,
          ??dt?varchar,
          ??PRIMARY?KEY?(sku_id,dt)?NOT?ENFORCED
          )?WITH?(
          ????'connector'?=?'jdbc',
          ????'url'?=?'jdbc:mysql://kms-1:3306/ads',
          ????'table-name'?=?'ads_sku_index',?
          ????'driver'?=?'com.mysql.jdbc.Driver',
          ????'username'?=?'root',
          ????'password'?=?'123qwe'
          );

          --?---------------------------------
          --?dwd_paid_order_detail已支付訂單明細(xì)寬表
          --?---------------------------------
          CREATE?TABLE?dwd_paid_order_detail
          (
          ??detail_id?BIGINT,
          ??order_id?BIGINT,
          ??user_id?BIGINT,
          ??province_id?INT,
          ??sku_id?BIGINT,
          ??sku_name?STRING,
          ??sku_num?INT,
          ??order_price?DECIMAL(10,2),
          ??create_time?STRING,
          ??pay_time?STRING
          ?)?WITH?(
          ????'connector'?=?'kafka',
          ????'topic'?=?'dwd_paid_order_detail',
          ????'scan.startup.mode'?=?'earliest-offset',
          ????'properties.bootstrap.servers'?=?'kms-3:9092',
          ????'format'?=?'changelog-json'
          );

          --?---------------------------------
          --?tmp_sku_index
          --?商品指標(biāo)統(tǒng)計(jì)
          --?---------------------------------
          CREATE?TABLE?tmp_sku_index(
          ????sku_id?BIGINT,
          ????order_count?BIGINT,--?訂單數(shù)
          ????order_amount?DECIMAL(10,2),?--?訂單金額
          ?order_sku_num?BIGINT,
          ????pay_date?DATE
          )WITH?(
          ????'connector'?=?'kafka',
          ????'topic'?=?'tmp_sku_index',
          ????'scan.startup.mode'?=?'earliest-offset',
          ????'properties.bootstrap.servers'?=?'kms-3:9092',
          ????'format'?=?'changelog-json'
          );
          --?---------------------------------
          --?tmp_sku_index
          --?數(shù)據(jù)裝載
          --?---------------------------------
          INSERT?INTO?tmp_sku_index
          SELECT
          ??????sku_id,
          ??????count(distinct?order_id)?order_count,--?訂單數(shù)
          ??????sum(order_price?*?sku_num)?order_amount,?--?訂單金額
          ???sum(sku_num)?order_sku_num,
          ??????TO_DATE(pay_time,'yyyy-MM-dd')?pay_date
          FROM?dwd_paid_order_detail
          GROUP?BY?sku_id,TO_DATE(pay_time,'yyyy-MM-dd')
          ;

          --?---------------------------------
          --?tmp_sku_index_source
          --?使用該臨時(shí)匯總表,作為數(shù)據(jù)源
          --?---------------------------------
          CREATE?TABLE?tmp_sku_index_source(
          ????sku_id?BIGINT,
          ????order_count?BIGINT,--?訂單數(shù)
          ????order_amount?DECIMAL(10,2),?--?訂單金額
          ????order_sku_num?BIGINT,
          ????pay_date?DATE,
          ????proctime?as?PROCTIME()???--?通過(guò)計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
          ?)?WITH?(
          ????'connector'?=?'kafka',
          ????'topic'?=?'tmp_sku_index',
          ????'scan.startup.mode'?=?'earliest-offset',
          ????'properties.bootstrap.servers'?=?'kms-3:9092',
          ????'format'?=?'changelog-json'
          );
          --?---------------------------------
          --?DIM層,商品維表,
          --?創(chuàng)建商品維表數(shù)據(jù)源
          --?---------------------------------
          DROP?TABLE?IF?EXISTS?`dim_sku_info`;
          CREATE?TABLE?dim_sku_info?(
          ??id?BIGINT,
          ??sku_name?STRING,
          ??c3_id?BIGINT,
          ??weight?DECIMAL(10,2),
          ??tm_id?BIGINT,
          ??price?DECIMAL(10,2),
          ??spu_id?BIGINT,
          ??c3_name?STRING,
          ??c2_id?BIGINT,
          ??c2_name?STRING,
          ??c1_id?BIGINT,
          ??c1_name?STRING,
          ??PRIMARY?KEY?(id)?NOT?ENFORCED
          )?WITH?(
          ????'connector'?=?'jdbc',
          ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
          ????'table-name'?=?'dim_sku_info',?
          ????'driver'?=?'com.mysql.jdbc.Driver',
          ????'username'?=?'root',
          ????'password'?=?'123qwe',
          ????'scan.fetch-size'?=?'100'
          );
          --?---------------------------------
          --?向ads_sku_index裝載數(shù)據(jù)
          --?維表JOIN
          --?---------------------------------
          INSERT?INTO?ads_sku_index
          SELECT
          ??sku_id?,
          ??sku_name?,
          ??weight?,
          ??tm_id?,
          ??price?,
          ??spu_id?,
          ??c3_id?,
          ??c3_name,
          ??c2_id?,
          ??c2_name?,
          ??c1_id?,
          ??c1_name?,
          ??sc.order_amount,
          ??sc.order_count?,
          ??sc.order_sku_num?,
          ??cast(sc.pay_date?as?VARCHAR)
          FROM
          tmp_sku_index_source?sc?
          ??JOIN?dim_sku_info?FOR?SYSTEM_TIME?AS?OF?sc.proctime?as?ds
          ??ON?ds.id?=?sc.sku_id
          ??;

          當(dāng)提交任務(wù)之后:觀察Flink WEB UI:

          查看ADS層的ads_sku_index表數(shù)據(jù):

          FineBI結(jié)果展示

          其他注意點(diǎn)

          Flink1.11.0存在的bug

          當(dāng)在代碼中使用Flink1.11.0版本時(shí),如果將一個(gè)change-log的數(shù)據(jù)源insert到一個(gè)upsert sink時(shí),會(huì)報(bào)如下異常:

          [ERROR]?Could?not?execute?SQL?statement.?Reason:
          org.apache.flink.table.api.TableException:?Provided?trait?[BEFORE_AND_AFTER]?can't?satisfy?required?trait?[ONLY_UPDATE_AFTER].?This?is?a?bug?in?planner,?please?file?an?issue.?
          Current?node?is?TableSourceScan(table=[[default_catalog,?default_database,?t_pick_order]],?fields=[order_no,?status])

          該bug目前已被修復(fù),修復(fù)可以在Flink1.11.1中使用。

          總結(jié)

          本文主要分享了構(gòu)建一個(gè)實(shí)時(shí)數(shù)倉(cāng)的demo案例,通過(guò)本文可以了解實(shí)時(shí)數(shù)倉(cāng)的數(shù)據(jù)處理流程,在此基礎(chǔ)之上,對(duì)Flink SQL的CDC會(huì)有更加深刻的認(rèn)識(shí)。另外,本文給出了非常詳細(xì)的使用案例,你可以直接上手進(jìn)行操作,在實(shí)踐中探索實(shí)時(shí)數(shù)倉(cāng)的構(gòu)建流程。

          瀏覽 30
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  三级做爱视频 | 99成人免费视频 | 日韩Aⅴ在线 | 超碰自拍中文字幕 | 伊人影院污 |