電商數(shù)據(jù)分析案例!SQL構(gòu)建實(shí)時(shí)數(shù)倉(cāng)
點(diǎn)擊上方“數(shù)據(jù)管道”,選擇“置頂星標(biāo)”公眾號(hào)
干貨福利,第一時(shí)間送達(dá)

實(shí)時(shí)數(shù)倉(cāng)主要是為了解決傳統(tǒng)數(shù)倉(cāng)數(shù)據(jù)時(shí)效性低的問(wèn)題,實(shí)時(shí)數(shù)倉(cāng)通常會(huì)用在實(shí)時(shí)的OLAP分析、實(shí)時(shí)的數(shù)據(jù)看板、業(yè)務(wù)指標(biāo)實(shí)時(shí)監(jiān)控等場(chǎng)景。雖然關(guān)于實(shí)時(shí)數(shù)倉(cāng)的架構(gòu)及技術(shù)選型與傳統(tǒng)的離線數(shù)倉(cāng)會(huì)存在差異,但是關(guān)于數(shù)倉(cāng)建設(shè)的基本方法論是一致的。本文會(huì)分享基于Flink SQL從0到1搭建一個(gè)實(shí)時(shí)數(shù)倉(cāng)的demo,涉及數(shù)據(jù)采集、存儲(chǔ)、計(jì)算、可視化整個(gè)處理流程。通過(guò)本文你可以了解到:
實(shí)時(shí)數(shù)倉(cāng)的基本架構(gòu) 實(shí)時(shí)數(shù)倉(cāng)的數(shù)據(jù)處理流程 Flink1.11的SQL新特性 Flink1.11存在的bug 完整的操作案例
古人學(xué)問(wèn)無(wú)遺力,少壯工夫老始成。
紙上得來(lái)終覺(jué)淺,絕知此事要躬行。
案例簡(jiǎn)介
本文會(huì)以電商業(yè)務(wù)為例,展示實(shí)時(shí)數(shù)倉(cāng)的數(shù)據(jù)處理流程。另外,本文旨在說(shuō)明實(shí)時(shí)數(shù)倉(cāng)的構(gòu)建流程,所以不會(huì)涉及太復(fù)雜的數(shù)據(jù)計(jì)算。為了保證案例的可操作性和完整性,本文會(huì)給出詳細(xì)的操作步驟。為了方便演示,本文的所有操作都是在Flink SQL Cli中完成的。
架構(gòu)設(shè)計(jì)
具體的架構(gòu)設(shè)計(jì)如圖所示:首先通過(guò)canal解析MySQL的binlog日志,將數(shù)據(jù)存儲(chǔ)在Kafka中。然后使用Flink SQL對(duì)原始數(shù)據(jù)進(jìn)行清洗關(guān)聯(lián),并將處理之后的明細(xì)寬表寫(xiě)入kafka中。維表數(shù)據(jù)存儲(chǔ)在MySQL中,通過(guò)Flink SQL對(duì)明細(xì)寬表與維表進(jìn)行JOIN,將聚合后的數(shù)據(jù)寫(xiě)入MySQL,最后通過(guò)FineBI進(jìn)行可視化展示。

業(yè)務(wù)數(shù)據(jù)準(zhǔn)備
訂單表(order_info)
CREATE?TABLE?`order_info`?(
??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號(hào)',
??`consignee`?varchar(100)?DEFAULT?NULL?COMMENT?'收貨人',
??`consignee_tel`?varchar(20)?DEFAULT?NULL?COMMENT?'收件人電話',
??`total_amount`?decimal(10,2)?DEFAULT?NULL?COMMENT?'總金額',
??`order_status`?varchar(20)?DEFAULT?NULL?COMMENT?'訂單狀態(tài)',
??`user_id`?bigint(20)?DEFAULT?NULL?COMMENT?'用戶id',
??`payment_way`?varchar(20)?DEFAULT?NULL?COMMENT?'付款方式',
??`delivery_address`?varchar(1000)?DEFAULT?NULL?COMMENT?'送貨地址',
??`order_comment`?varchar(200)?DEFAULT?NULL?COMMENT?'訂單備注',
??`out_trade_no`?varchar(50)?DEFAULT?NULL?COMMENT?'訂單交易編號(hào)(第三方支付用)',
??`trade_body`?varchar(200)?DEFAULT?NULL?COMMENT?'訂單描述(第三方支付用)',
??`create_time`?datetime?DEFAULT?NULL?COMMENT?'創(chuàng)建時(shí)間',
??`operate_time`?datetime?DEFAULT?NULL?COMMENT?'操作時(shí)間',
??`expire_time`?datetime?DEFAULT?NULL?COMMENT?'失效時(shí)間',
??`tracking_no`?varchar(100)?DEFAULT?NULL?COMMENT?'物流單編號(hào)',
??`parent_order_id`?bigint(20)?DEFAULT?NULL?COMMENT?'父訂單編號(hào)',
??`img_url`?varchar(200)?DEFAULT?NULL?COMMENT?'圖片路徑',
??`province_id`?int(20)?DEFAULT?NULL?COMMENT?'地區(qū)',
??PRIMARY?KEY?(`id`)
)?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='訂單表';
訂單詳情表(order_detail)
CREATE?TABLE?`order_detail`?(
??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號(hào)',
??`order_id`?bigint(20)?DEFAULT?NULL?COMMENT?'訂單編號(hào)',
??`sku_id`?bigint(20)?DEFAULT?NULL?COMMENT?'sku_id',
??`sku_name`?varchar(200)?DEFAULT?NULL?COMMENT?'sku名稱(冗余)',
??`img_url`?varchar(200)?DEFAULT?NULL?COMMENT?'圖片名稱(冗余)',
??`order_price`?decimal(10,2)?DEFAULT?NULL?COMMENT?'購(gòu)買(mǎi)價(jià)格(下單時(shí)sku價(jià)格)',
??`sku_num`?varchar(200)?DEFAULT?NULL?COMMENT?'購(gòu)買(mǎi)個(gè)數(shù)',
??`create_time`?datetime?DEFAULT?NULL?COMMENT?'創(chuàng)建時(shí)間',
??PRIMARY?KEY?(`id`)
)?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='訂單詳情表';
商品表(sku_info)
CREATE?TABLE?`sku_info`?(
??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'skuid(itemID)',
??`spu_id`?bigint(20)?DEFAULT?NULL?COMMENT?'spuid',
??`price`?decimal(10,0)?DEFAULT?NULL?COMMENT?'價(jià)格',
??`sku_name`?varchar(200)?DEFAULT?NULL?COMMENT?'sku名稱',
??`sku_desc`?varchar(2000)?DEFAULT?NULL?COMMENT?'商品規(guī)格描述',
??`weight`?decimal(10,2)?DEFAULT?NULL?COMMENT?'重量',
??`tm_id`?bigint(20)?DEFAULT?NULL?COMMENT?'品牌(冗余)',
??`category3_id`?bigint(20)?DEFAULT?NULL?COMMENT?'三級(jí)分類id(冗余)',
??`sku_default_img`?varchar(200)?DEFAULT?NULL?COMMENT?'默認(rèn)顯示圖片(冗余)',
??`create_time`?datetime?DEFAULT?NULL?COMMENT?'創(chuàng)建時(shí)間',
??PRIMARY?KEY?(`id`)
)?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='商品表';
商品一級(jí)類目表(base_category1)
CREATE?TABLE?`base_category1`?(
??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號(hào)',
??`name`?varchar(10)?NOT?NULL?COMMENT?'分類名稱',
??PRIMARY?KEY?(`id`)
)?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='一級(jí)分類表';
商品二級(jí)類目表(base_category2)
CREATE?TABLE?`base_category2`?(
??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號(hào)',
??`name`?varchar(200)?NOT?NULL?COMMENT?'二級(jí)分類名稱',
??`category1_id`?bigint(20)?DEFAULT?NULL?COMMENT?'一級(jí)分類編號(hào)',
??PRIMARY?KEY?(`id`)
)?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='二級(jí)分類表';
商品三級(jí)類目表(base_category3)
CREATE?TABLE?`base_category3`?(
??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號(hào)',
??`name`?varchar(200)?NOT?NULL?COMMENT?'三級(jí)分類名稱',
??`category2_id`?bigint(20)?DEFAULT?NULL?COMMENT?'二級(jí)分類編號(hào)',
??PRIMARY?KEY?(`id`)
)?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='三級(jí)分類表';
省份表(base_province)
CREATE?TABLE?`base_province`?(
??`id`?int(20)?DEFAULT?NULL?COMMENT?'id',
??`name`?varchar(20)?DEFAULT?NULL?COMMENT?'省名稱',
??`region_id`?int(20)?DEFAULT?NULL?COMMENT?'大區(qū)id',
??`area_code`?varchar(20)?DEFAULT?NULL?COMMENT?'行政區(qū)位碼'
)?ENGINE=InnoDB?DEFAULT?CHARSET=utf8;
區(qū)域表(base_region)
CREATE?TABLE?`base_region`?(
??`id`?int(20)?NOT?NULL?COMMENT?'大區(qū)id',
??`region_name`?varchar(20)?DEFAULT?NULL?COMMENT?'大區(qū)名稱',
???PRIMARY?KEY?(`id`)
)?ENGINE=InnoDB?DEFAULT?CHARSET=utf8;
注意:以上的建表語(yǔ)句是在MySQL中完成的,完整的建表及模擬數(shù)據(jù)生成腳本見(jiàn):
鏈接:https://pan.baidu.com/s/1fcMgDHGKedOpzqLbSRUGwA 提取碼:zuqw
數(shù)據(jù)處理流程
ODS層數(shù)據(jù)同步
主要使用canal解析MySQL的binlog日志,然后將其寫(xiě)入到Kafka對(duì)應(yīng)的topic中。由于篇幅限制,不會(huì)對(duì)具體的細(xì)節(jié)進(jìn)行說(shuō)明。同步之后的結(jié)果如下圖所示:

DIM層維表數(shù)據(jù)準(zhǔn)備
本案例中將維表存儲(chǔ)在了MySQL中,實(shí)際生產(chǎn)中會(huì)用HBase存儲(chǔ)維表數(shù)據(jù)。我們主要用到兩張維表:區(qū)域維表和商品維表。處理過(guò)程如下:
區(qū)域維表
首先將mydw.base_province和mydw.base_region這個(gè)主題對(duì)應(yīng)的數(shù)據(jù)抽取到MySQL中,主要使用Flink SQL的Kafka數(shù)據(jù)源對(duì)應(yīng)的canal-json格式,注意:在執(zhí)行裝載之前,需要先在MySQL中創(chuàng)建對(duì)應(yīng)的表,本文使用的MySQL數(shù)據(jù)庫(kù)的名字為dim,用于存放維表數(shù)據(jù)。如下:
--?-------------------------
--???省份
--???kafka?Source
--?-------------------------?
DROP?TABLE?IF?EXISTS?`ods_base_province`;
CREATE?TABLE?`ods_base_province`?(
??`id`?INT,
??`name`?STRING,
??`region_id`?INT?,
??`area_code`STRING
)?WITH(
'connector'?=?'kafka',
?'topic'?=?'mydw.base_province',
?'properties.bootstrap.servers'?=?'kms-3:9092',
?'properties.group.id'?=?'testGroup',
?'format'?=?'canal-json'?,
?'scan.startup.mode'?=?'earliest-offset'?
)?;?
--?-------------------------
--???省份
--???MySQL?Sink
--?-------------------------?
DROP?TABLE?IF?EXISTS?`base_province`;
CREATE?TABLE?`base_province`?(
????`id`?INT,
????`name`?STRING,
????`region_id`?INT?,
????`area_code`STRING,
????PRIMARY?KEY?(id)?NOT?ENFORCED
)?WITH?(
????'connector'?=?'jdbc',
????'url'?=?'jdbc:mysql://kms-1:3306/dim',
????'table-name'?=?'base_province',?--?MySQL中的待插入數(shù)據(jù)的表
????'driver'?=?'com.mysql.jdbc.Driver',
????'username'?=?'root',
????'password'?=?'123qwe',
????'sink.buffer-flush.interval'?=?'1s'
);
--?-------------------------
--???省份
--???MySQL?Sink?Load?Data
--?-------------------------?
INSERT?INTO?base_province
SELECT?*
FROM?ods_base_province;
--?-------------------------
--???區(qū)域
--???kafka?Source
--?-------------------------?
DROP?TABLE?IF?EXISTS?`ods_base_region`;
CREATE?TABLE?`ods_base_region`?(
??`id`?INT,
??`region_name`?STRING
)?WITH(
'connector'?=?'kafka',
?'topic'?=?'mydw.base_region',
?'properties.bootstrap.servers'?=?'kms-3:9092',
?'properties.group.id'?=?'testGroup',
?'format'?=?'canal-json'?,
?'scan.startup.mode'?=?'earliest-offset'?
)?;?
--?-------------------------
--???區(qū)域
--???MySQL?Sink
--?-------------------------?
DROP?TABLE?IF?EXISTS?`base_region`;
CREATE?TABLE?`base_region`?(
????`id`?INT,
????`region_name`?STRING,
?????PRIMARY?KEY?(id)?NOT?ENFORCED
)?WITH?(
????'connector'?=?'jdbc',
????'url'?=?'jdbc:mysql://kms-1:3306/dim',
????'table-name'?=?'base_region',?--?MySQL中的待插入數(shù)據(jù)的表
????'driver'?=?'com.mysql.jdbc.Driver',
????'username'?=?'root',
????'password'?=?'123qwe',
????'sink.buffer-flush.interval'?=?'1s'
);
--?-------------------------
--???區(qū)域
--???MySQL?Sink?Load?Data
--?-------------------------?
INSERT?INTO?base_region
SELECT?*
FROM?ods_base_region;
經(jīng)過(guò)上面的步驟,將創(chuàng)建維表所需要的原始數(shù)據(jù)已經(jīng)存儲(chǔ)到了MySQL中,接下來(lái)就需要在MySQL中創(chuàng)建維表,我們使用上面的兩張表,創(chuàng)建一張視圖:dim_province作為維表:
--?---------------------------------
--?DIM層,區(qū)域維表,
--?在MySQL中創(chuàng)建視圖
--?---------------------------------
DROP?VIEW?IF?EXISTS?dim_province;
CREATE?VIEW?dim_province?AS
SELECT
??bp.id?AS?province_id,
??bp.name?AS?province_name,
??br.id?AS?region_id,
??br.region_name?AS?region_name,
??bp.area_code?AS?area_code
FROM?base_region?br?
?????JOIN?base_province?bp?ON?br.id=?bp.region_id
;
這樣我們所需要的維表:dim_province就創(chuàng)建好了,只需要在維表join時(shí),使用Flink SQL創(chuàng)建JDBC的數(shù)據(jù)源,就可以使用該維表了。同理,我們使用相同的方法創(chuàng)建商品維表,具體如下:
--?-------------------------
--??一級(jí)類目表
--???kafka?Source
--?-------------------------?
DROP?TABLE?IF?EXISTS?`ods_base_category1`;
CREATE?TABLE?`ods_base_category1`?(
??`id`?BIGINT,
??`name`?STRING
)WITH(
?'connector'?=?'kafka',
?'topic'?=?'mydw.base_category1',
?'properties.bootstrap.servers'?=?'kms-3:9092',
?'properties.group.id'?=?'testGroup',
?'format'?=?'canal-json'?,
?'scan.startup.mode'?=?'earliest-offset'?
)?;
--?-------------------------
--??一級(jí)類目表
--???MySQL?Sink
--?-------------------------?
DROP?TABLE?IF?EXISTS?`base_category1`;
CREATE?TABLE?`base_category1`?(
????`id`?BIGINT,
????`name`?STRING,
?????PRIMARY?KEY?(id)?NOT?ENFORCED
)?WITH?(
????'connector'?=?'jdbc',
????'url'?=?'jdbc:mysql://kms-1:3306/dim',
????'table-name'?=?'base_category1',?--?MySQL中的待插入數(shù)據(jù)的表
????'driver'?=?'com.mysql.jdbc.Driver',
????'username'?=?'root',
????'password'?=?'123qwe',
????'sink.buffer-flush.interval'?=?'1s'
);
--?-------------------------
--??一級(jí)類目表
--???MySQL?Sink?Load?Data
--?-------------------------?
INSERT?INTO?base_category1
SELECT?*
FROM?ods_base_category1;
--?-------------------------
--??二級(jí)類目表
--???kafka?Source
--?-------------------------?
DROP?TABLE?IF?EXISTS?`ods_base_category2`;
CREATE?TABLE?`ods_base_category2`?(
??`id`?BIGINT,
??`name`?STRING,
??`category1_id`?BIGINT
)WITH(
'connector'?=?'kafka',
?'topic'?=?'mydw.base_category2',
?'properties.bootstrap.servers'?=?'kms-3:9092',
?'properties.group.id'?=?'testGroup',
?'format'?=?'canal-json'?,
?'scan.startup.mode'?=?'earliest-offset'?
)?;
--?-------------------------
--??二級(jí)類目表
--???MySQL?Sink
--?-------------------------?
DROP?TABLE?IF?EXISTS?`base_category2`;
CREATE?TABLE?`base_category2`?(
????`id`?BIGINT,
????`name`?STRING,
????`category1_id`?BIGINT,
?????PRIMARY?KEY?(id)?NOT?ENFORCED
)?WITH?(
????'connector'?=?'jdbc',
????'url'?=?'jdbc:mysql://kms-1:3306/dim',
????'table-name'?=?'base_category2',?--?MySQL中的待插入數(shù)據(jù)的表
????'driver'?=?'com.mysql.jdbc.Driver',
????'username'?=?'root',
????'password'?=?'123qwe',
????'sink.buffer-flush.interval'?=?'1s'
);
--?-------------------------
--??二級(jí)類目表
--???MySQL?Sink?Load?Data
--?-------------------------?
INSERT?INTO?base_category2
SELECT?*
FROM?ods_base_category2;
--?-------------------------
--?三級(jí)類目表
--???kafka?Source
--?-------------------------?
DROP?TABLE?IF?EXISTS?`ods_base_category3`;
CREATE?TABLE?`ods_base_category3`?(
??`id`?BIGINT,
??`name`?STRING,
??`category2_id`?BIGINT
)WITH(
'connector'?=?'kafka',
?'topic'?=?'mydw.base_category3',
?'properties.bootstrap.servers'?=?'kms-3:9092',
?'properties.group.id'?=?'testGroup',
?'format'?=?'canal-json'?,
?'scan.startup.mode'?=?'earliest-offset'?
)?;?
--?-------------------------
--??三級(jí)類目表
--???MySQL?Sink
--?-------------------------?
DROP?TABLE?IF?EXISTS?`base_category3`;
CREATE?TABLE?`base_category3`?(
????`id`?BIGINT,
????`name`?STRING,
????`category2_id`?BIGINT,
????PRIMARY?KEY?(id)?NOT?ENFORCED
)?WITH?(
????'connector'?=?'jdbc',
????'url'?=?'jdbc:mysql://kms-1:3306/dim',
????'table-name'?=?'base_category3',?--?MySQL中的待插入數(shù)據(jù)的表
????'driver'?=?'com.mysql.jdbc.Driver',
????'username'?=?'root',
????'password'?=?'123qwe',
????'sink.buffer-flush.interval'?=?'1s'
);
--?-------------------------
--??三級(jí)類目表
--???MySQL?Sink?Load?Data
--?-------------------------?
INSERT?INTO?base_category3
SELECT?*
FROM?ods_base_category3;
--?-------------------------
--???商品表
--???Kafka?Source
--?-------------------------?
DROP?TABLE?IF?EXISTS?`ods_sku_info`;
CREATE?TABLE?`ods_sku_info`?(
??`id`?BIGINT,
??`spu_id`?BIGINT,
??`price`?DECIMAL(10,0),
??`sku_name`?STRING,
??`sku_desc`?STRING,
??`weight`?DECIMAL(10,2),
??`tm_id`?BIGINT,
??`category3_id`?BIGINT,
??`sku_default_img`?STRING,
??`create_time`?TIMESTAMP(0)
)?WITH(
?'connector'?=?'kafka',
?'topic'?=?'mydw.sku_info',
?'properties.bootstrap.servers'?=?'kms-3:9092',
?'properties.group.id'?=?'testGroup',
?'format'?=?'canal-json'?,
?'scan.startup.mode'?=?'earliest-offset'?
)?;?
--?-------------------------
--???商品表
--???MySQL?Sink
--?-------------------------?
DROP?TABLE?IF?EXISTS?`sku_info`;
CREATE?TABLE?`sku_info`?(
??`id`?BIGINT,
??`spu_id`?BIGINT,
??`price`?DECIMAL(10,0),
??`sku_name`?STRING,
??`sku_desc`?STRING,
??`weight`?DECIMAL(10,2),
??`tm_id`?BIGINT,
??`category3_id`?BIGINT,
??`sku_default_img`?STRING,
??`create_time`?TIMESTAMP(0),
???PRIMARY?KEY?(tm_id)?NOT?ENFORCED
)?WITH?(
????'connector'?=?'jdbc',
????'url'?=?'jdbc:mysql://kms-1:3306/dim',
????'table-name'?=?'sku_info',?--?MySQL中的待插入數(shù)據(jù)的表
????'driver'?=?'com.mysql.jdbc.Driver',
????'username'?=?'root',
????'password'?=?'123qwe',
????'sink.buffer-flush.interval'?=?'1s'
);
--?-------------------------
--???商品
--???MySQL?Sink?Load?Data
--?-------------------------?
INSERT?INTO?sku_info
SELECT?*
FROM?ods_sku_info;
經(jīng)過(guò)上面的步驟,我們可以將創(chuàng)建商品維表的基礎(chǔ)數(shù)據(jù)表同步到MySQL中,同樣需要提前創(chuàng)建好對(duì)應(yīng)的數(shù)據(jù)表。接下來(lái)我們使用上面的基礎(chǔ)表在mySQL的dim庫(kù)中創(chuàng)建一張視圖:dim_sku_info,用作后續(xù)使用的維表。
--?---------------------------------
--?DIM層,商品維表,
--?在MySQL中創(chuàng)建視圖
--?---------------------------------
CREATE?VIEW?dim_sku_info?AS
SELECT
??si.id?AS?id,
??si.sku_name?AS?sku_name,
??si.category3_id?AS?c3_id,
??si.weight?AS?weight,
??si.tm_id?AS?tm_id,
??si.price?AS?price,
??si.spu_id?AS?spu_id,
??c3.name?AS?c3_name,
??c2.id?AS?c2_id,
??c2.name?AS?c2_name,
??c3.id?AS?c1_id,
??c3.name?AS?c1_name
FROM
(
??sku_info?si?
??JOIN?base_category3?c3?ON?si.category3_id?=?c3.id
??JOIN?base_category2?c2?ON?c3.category2_id?=c2.id
??JOIN?base_category1?c1?ON?c2.category1_id?=?c1.id
);
至此,我們所需要的維表數(shù)據(jù)已經(jīng)準(zhǔn)備好了,接下來(lái)開(kāi)始處理DWD層的數(shù)據(jù)。
DWD層數(shù)據(jù)處理
經(jīng)過(guò)上面的步驟,我們已經(jīng)將所用的維表已經(jīng)準(zhǔn)備好了。接下來(lái)我們將對(duì)ODS的原始數(shù)據(jù)進(jìn)行處理,加工成DWD層的明細(xì)寬表。具體過(guò)程如下:
--?-------------------------
--???訂單詳情
--???Kafka?Source
--?-------------------------?
DROP?TABLE?IF?EXISTS?`ods_order_detail`;
CREATE?TABLE?`ods_order_detail`(
??`id`?BIGINT,
??`order_id`?BIGINT,
??`sku_id`?BIGINT,
??`sku_name`?STRING,
??`img_url`?STRING,
??`order_price`?DECIMAL(10,2),
??`sku_num`?INT,
??`create_time`?TIMESTAMP(0)
)?WITH(
?'connector'?=?'kafka',
?'topic'?=?'mydw.order_detail',
?'properties.bootstrap.servers'?=?'kms-3:9092',
?'properties.group.id'?=?'testGroup',
?'format'?=?'canal-json'?,
?'scan.startup.mode'?=?'earliest-offset'?
)?;?
--?-------------------------
--???訂單信息
--???Kafka?Source
--?-------------------------
DROP?TABLE?IF?EXISTS?`ods_order_info`;
CREATE?TABLE?`ods_order_info`?(
??`id`?BIGINT,
??`consignee`?STRING,
??`consignee_tel`?STRING,
??`total_amount`?DECIMAL(10,2),
??`order_status`?STRING,
??`user_id`?BIGINT,
??`payment_way`?STRING,
??`delivery_address`?STRING,
??`order_comment`?STRING,
??`out_trade_no`?STRING,
??`trade_body`?STRING,
??`create_time`?TIMESTAMP(0)?,
??`operate_time`?TIMESTAMP(0)?,
??`expire_time`?TIMESTAMP(0)?,
??`tracking_no`?STRING,
??`parent_order_id`?BIGINT,
??`img_url`?STRING,
??`province_id`?INT
)?WITH(
'connector'?=?'kafka',
?'topic'?=?'mydw.order_info',
?'properties.bootstrap.servers'?=?'kms-3:9092',
?'properties.group.id'?=?'testGroup',
?'format'?=?'canal-json'?,
?'scan.startup.mode'?=?'earliest-offset'?
)?;?
--?---------------------------------
--?DWD層,支付訂單明細(xì)表dwd_paid_order_detail
--?---------------------------------
DROP?TABLE?IF?EXISTS?dwd_paid_order_detail;
CREATE?TABLE?dwd_paid_order_detail
(
??detail_id?BIGINT,
??order_id?BIGINT,
??user_id?BIGINT,
??province_id?INT,
??sku_id?BIGINT,
??sku_name?STRING,
??sku_num?INT,
??order_price?DECIMAL(10,0),
??create_time?STRING,
??pay_time?STRING
?)?WITH?(
????'connector'?=?'kafka',
????'topic'?=?'dwd_paid_order_detail',
????'scan.startup.mode'?=?'earliest-offset',
????'properties.bootstrap.servers'?=?'kms-3:9092',
????'format'?=?'changelog-json'
);
--?---------------------------------
--?DWD層,已支付訂單明細(xì)表
--?向dwd_paid_order_detail裝載數(shù)據(jù)
--?---------------------------------
INSERT?INTO?dwd_paid_order_detail
SELECT
??od.id,
??oi.id?order_id,
??oi.user_id,
??oi.province_id,
??od.sku_id,
??od.sku_name,
??od.sku_num,
??od.order_price,
??oi.create_time,
??oi.operate_time
FROM
????(
????SELECT?*?
????FROM?ods_order_info
????WHERE?order_status?=?'2'?--?已支付
????)?oi?JOIN
????(
????SELECT?*
????FROM?ods_order_detail
????)?od?
????ON?oi.id?=?od.order_id;

ADS層數(shù)據(jù)
經(jīng)過(guò)上面的步驟,我們創(chuàng)建了一張dwd_paid_order_detail明細(xì)寬表,并將該表存儲(chǔ)在了Kafka中。接下來(lái)我們將使用這張明細(xì)寬表與維表進(jìn)行JOIN,得到我們ADS應(yīng)用層數(shù)據(jù)。
ads_province_index
首先在MySQL中創(chuàng)建對(duì)應(yīng)的ADS目標(biāo)表:ads_province_index
CREATE?TABLE?ads.ads_province_index(
??province_id?INT(10),
??area_code?VARCHAR(100),
??province_name?VARCHAR(100),
??region_id?INT(10),
??region_name?VARCHAR(100),
??order_amount?DECIMAL(10,2),
??order_count?BIGINT(10),
??dt?VARCHAR(100),
??PRIMARY?KEY?(province_id,?dt)?
)?;
向MySQL的ADS層目標(biāo)裝載數(shù)據(jù):
--?Flink?SQL?Cli操作
--?---------------------------------
--?使用?DDL創(chuàng)建MySQL中的ADS層表
--?指標(biāo):1.每天每個(gè)省份的訂單數(shù)
--??????2.每天每個(gè)省份的訂單金額
--?---------------------------------
CREATE?TABLE?ads_province_index(
??province_id?INT,
??area_code?STRING,
??province_name?STRING,
??region_id?INT,
??region_name?STRING,
??order_amount?DECIMAL(10,2),
??order_count?BIGINT,
??dt?STRING,
??PRIMARY?KEY?(province_id,?dt)?NOT?ENFORCED??
)?WITH?(
????'connector'?=?'jdbc',
????'url'?=?'jdbc:mysql://kms-1:3306/ads',
????'table-name'?=?'ads_province_index',?
????'driver'?=?'com.mysql.jdbc.Driver',
????'username'?=?'root',
????'password'?=?'123qwe'
);
--?---------------------------------
--?dwd_paid_order_detail已支付訂單明細(xì)寬表
--?---------------------------------
CREATE?TABLE?dwd_paid_order_detail
(
??detail_id?BIGINT,
??order_id?BIGINT,
??user_id?BIGINT,
??province_id?INT,
??sku_id?BIGINT,
??sku_name?STRING,
??sku_num?INT,
??order_price?DECIMAL(10,2),
??create_time?STRING,
??pay_time?STRING
?)?WITH?(
????'connector'?=?'kafka',
????'topic'?=?'dwd_paid_order_detail',
????'scan.startup.mode'?=?'earliest-offset',
????'properties.bootstrap.servers'?=?'kms-3:9092',
????'format'?=?'changelog-json'
);
--?---------------------------------
--?tmp_province_index
--?訂單匯總臨時(shí)表
--?---------------------------------
CREATE?TABLE?tmp_province_index(
????province_id?INT,
????order_count?BIGINT,--?訂單數(shù)
????order_amount?DECIMAL(10,2),?--?訂單金額
????pay_date?DATE
)WITH?(
????'connector'?=?'kafka',
????'topic'?=?'tmp_province_index',
????'scan.startup.mode'?=?'earliest-offset',
????'properties.bootstrap.servers'?=?'kms-3:9092',
????'format'?=?'changelog-json'
);
--?---------------------------------
--?tmp_province_index
--?訂單匯總臨時(shí)表數(shù)據(jù)裝載
--?---------------------------------
INSERT?INTO?tmp_province_index
SELECT
??????province_id,
??????count(distinct?order_id)?order_count,--?訂單數(shù)
??????sum(order_price?*?sku_num)?order_amount,?--?訂單金額
??????TO_DATE(pay_time,'yyyy-MM-dd')?pay_date
FROM?dwd_paid_order_detail
GROUP?BY?province_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
--?---------------------------------
--?tmp_province_index_source
--?使用該臨時(shí)匯總表,作為數(shù)據(jù)源
--?---------------------------------
CREATE?TABLE?tmp_province_index_source(
????province_id?INT,
????order_count?BIGINT,--?訂單數(shù)
????order_amount?DECIMAL(10,2),?--?訂單金額
????pay_date?DATE,
????proctime?as?PROCTIME()???--?通過(guò)計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
?)?WITH?(
????'connector'?=?'kafka',
????'topic'?=?'tmp_province_index',
????'scan.startup.mode'?=?'earliest-offset',
????'properties.bootstrap.servers'?=?'kms-3:9092',
????'format'?=?'changelog-json'
);
--?---------------------------------
--?DIM層,區(qū)域維表,
--?創(chuàng)建區(qū)域維表數(shù)據(jù)源
--?---------------------------------
DROP?TABLE?IF?EXISTS?`dim_province`;
CREATE?TABLE?dim_province?(
??province_id?INT,
??province_name?STRING,
??area_code?STRING,
??region_id?INT,
??region_name?STRING?,
??PRIMARY?KEY?(province_id)?NOT?ENFORCED
)?WITH?(
????'connector'?=?'jdbc',
????'url'?=?'jdbc:mysql://kms-1:3306/dim',
????'table-name'?=?'dim_province',?
????'driver'?=?'com.mysql.jdbc.Driver',
????'username'?=?'root',
????'password'?=?'123qwe',
????'scan.fetch-size'?=?'100'
);
--?---------------------------------
--?向ads_province_index裝載數(shù)據(jù)
--?維表JOIN
--?---------------------------------
INSERT?INTO?ads_province_index
SELECT
??pc.province_id,
??dp.area_code,
??dp.province_name,
??dp.region_id,
??dp.region_name,
??pc.order_amount,
??pc.order_count,
??cast(pc.pay_date?as?VARCHAR)
FROM
tmp_province_index_source?pc
??JOIN?dim_province?FOR?SYSTEM_TIME?AS?OF?pc.proctime?as?dp?
??ON?dp.province_id?=?pc.province_id;
當(dāng)提交任務(wù)之后:觀察Flink WEB UI:

查看ADS層的ads_province_index表數(shù)據(jù):

ads_sku_index
首先在MySQL中創(chuàng)建對(duì)應(yīng)的ADS目標(biāo)表:ads_sku_index
CREATE?TABLE?ads_sku_index
(
??sku_id?BIGINT(10),
??sku_name?VARCHAR(100),
??weight?DOUBLE,
??tm_id?BIGINT(10),
??price?DOUBLE,
??spu_id?BIGINT(10),
??c3_id?BIGINT(10),
??c3_name?VARCHAR(100)?,
??c2_id?BIGINT(10),
??c2_name?VARCHAR(100),
??c1_id?BIGINT(10),
??c1_name?VARCHAR(100),
??order_amount?DOUBLE,
??order_count?BIGINT(10),
??sku_count?BIGINT(10),
??dt?varchar(100),
??PRIMARY?KEY?(sku_id,dt)
);
向MySQL的ADS層目標(biāo)裝載數(shù)據(jù):
--?---------------------------------
--?使用?DDL創(chuàng)建MySQL中的ADS層表
--?指標(biāo):1.每天每個(gè)商品對(duì)應(yīng)的訂單個(gè)數(shù)
--??????2.每天每個(gè)商品對(duì)應(yīng)的訂單金額
--??????3.每天每個(gè)商品對(duì)應(yīng)的數(shù)量
--?---------------------------------
CREATE?TABLE?ads_sku_index
(
??sku_id?BIGINT,
??sku_name?VARCHAR,
??weight?DOUBLE,
??tm_id?BIGINT,
??price?DOUBLE,
??spu_id?BIGINT,
??c3_id?BIGINT,
??c3_name?VARCHAR?,
??c2_id?BIGINT,
??c2_name?VARCHAR,
??c1_id?BIGINT,
??c1_name?VARCHAR,
??order_amount?DOUBLE,
??order_count?BIGINT,
??sku_count?BIGINT,
??dt?varchar,
??PRIMARY?KEY?(sku_id,dt)?NOT?ENFORCED
)?WITH?(
????'connector'?=?'jdbc',
????'url'?=?'jdbc:mysql://kms-1:3306/ads',
????'table-name'?=?'ads_sku_index',?
????'driver'?=?'com.mysql.jdbc.Driver',
????'username'?=?'root',
????'password'?=?'123qwe'
);
--?---------------------------------
--?dwd_paid_order_detail已支付訂單明細(xì)寬表
--?---------------------------------
CREATE?TABLE?dwd_paid_order_detail
(
??detail_id?BIGINT,
??order_id?BIGINT,
??user_id?BIGINT,
??province_id?INT,
??sku_id?BIGINT,
??sku_name?STRING,
??sku_num?INT,
??order_price?DECIMAL(10,2),
??create_time?STRING,
??pay_time?STRING
?)?WITH?(
????'connector'?=?'kafka',
????'topic'?=?'dwd_paid_order_detail',
????'scan.startup.mode'?=?'earliest-offset',
????'properties.bootstrap.servers'?=?'kms-3:9092',
????'format'?=?'changelog-json'
);
--?---------------------------------
--?tmp_sku_index
--?商品指標(biāo)統(tǒng)計(jì)
--?---------------------------------
CREATE?TABLE?tmp_sku_index(
????sku_id?BIGINT,
????order_count?BIGINT,--?訂單數(shù)
????order_amount?DECIMAL(10,2),?--?訂單金額
?order_sku_num?BIGINT,
????pay_date?DATE
)WITH?(
????'connector'?=?'kafka',
????'topic'?=?'tmp_sku_index',
????'scan.startup.mode'?=?'earliest-offset',
????'properties.bootstrap.servers'?=?'kms-3:9092',
????'format'?=?'changelog-json'
);
--?---------------------------------
--?tmp_sku_index
--?數(shù)據(jù)裝載
--?---------------------------------
INSERT?INTO?tmp_sku_index
SELECT
??????sku_id,
??????count(distinct?order_id)?order_count,--?訂單數(shù)
??????sum(order_price?*?sku_num)?order_amount,?--?訂單金額
???sum(sku_num)?order_sku_num,
??????TO_DATE(pay_time,'yyyy-MM-dd')?pay_date
FROM?dwd_paid_order_detail
GROUP?BY?sku_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
--?---------------------------------
--?tmp_sku_index_source
--?使用該臨時(shí)匯總表,作為數(shù)據(jù)源
--?---------------------------------
CREATE?TABLE?tmp_sku_index_source(
????sku_id?BIGINT,
????order_count?BIGINT,--?訂單數(shù)
????order_amount?DECIMAL(10,2),?--?訂單金額
????order_sku_num?BIGINT,
????pay_date?DATE,
????proctime?as?PROCTIME()???--?通過(guò)計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
?)?WITH?(
????'connector'?=?'kafka',
????'topic'?=?'tmp_sku_index',
????'scan.startup.mode'?=?'earliest-offset',
????'properties.bootstrap.servers'?=?'kms-3:9092',
????'format'?=?'changelog-json'
);
--?---------------------------------
--?DIM層,商品維表,
--?創(chuàng)建商品維表數(shù)據(jù)源
--?---------------------------------
DROP?TABLE?IF?EXISTS?`dim_sku_info`;
CREATE?TABLE?dim_sku_info?(
??id?BIGINT,
??sku_name?STRING,
??c3_id?BIGINT,
??weight?DECIMAL(10,2),
??tm_id?BIGINT,
??price?DECIMAL(10,2),
??spu_id?BIGINT,
??c3_name?STRING,
??c2_id?BIGINT,
??c2_name?STRING,
??c1_id?BIGINT,
??c1_name?STRING,
??PRIMARY?KEY?(id)?NOT?ENFORCED
)?WITH?(
????'connector'?=?'jdbc',
????'url'?=?'jdbc:mysql://kms-1:3306/dim',
????'table-name'?=?'dim_sku_info',?
????'driver'?=?'com.mysql.jdbc.Driver',
????'username'?=?'root',
????'password'?=?'123qwe',
????'scan.fetch-size'?=?'100'
);
--?---------------------------------
--?向ads_sku_index裝載數(shù)據(jù)
--?維表JOIN
--?---------------------------------
INSERT?INTO?ads_sku_index
SELECT
??sku_id?,
??sku_name?,
??weight?,
??tm_id?,
??price?,
??spu_id?,
??c3_id?,
??c3_name,
??c2_id?,
??c2_name?,
??c1_id?,
??c1_name?,
??sc.order_amount,
??sc.order_count?,
??sc.order_sku_num?,
??cast(sc.pay_date?as?VARCHAR)
FROM
tmp_sku_index_source?sc?
??JOIN?dim_sku_info?FOR?SYSTEM_TIME?AS?OF?sc.proctime?as?ds
??ON?ds.id?=?sc.sku_id
??;
當(dāng)提交任務(wù)之后:觀察Flink WEB UI:

查看ADS層的ads_sku_index表數(shù)據(jù):

FineBI結(jié)果展示

其他注意點(diǎn)
Flink1.11.0存在的bug
當(dāng)在代碼中使用Flink1.11.0版本時(shí),如果將一個(gè)change-log的數(shù)據(jù)源insert到一個(gè)upsert sink時(shí),會(huì)報(bào)如下異常:
[ERROR]?Could?not?execute?SQL?statement.?Reason:
org.apache.flink.table.api.TableException:?Provided?trait?[BEFORE_AND_AFTER]?can't?satisfy?required?trait?[ONLY_UPDATE_AFTER].?This?is?a?bug?in?planner,?please?file?an?issue.?
Current?node?is?TableSourceScan(table=[[default_catalog,?default_database,?t_pick_order]],?fields=[order_no,?status])
該bug目前已被修復(fù),修復(fù)可以在Flink1.11.1中使用。
總結(jié)
本文主要分享了構(gòu)建一個(gè)實(shí)時(shí)數(shù)倉(cāng)的demo案例,通過(guò)本文可以了解實(shí)時(shí)數(shù)倉(cāng)的數(shù)據(jù)處理流程,在此基礎(chǔ)之上,對(duì)Flink SQL的CDC會(huì)有更加深刻的認(rèn)識(shí)。另外,本文給出了非常詳細(xì)的使用案例,你可以直接上手進(jìn)行操作,在實(shí)踐中探索實(shí)時(shí)數(shù)倉(cāng)的構(gòu)建流程。
