Hive 拉鏈表實(shí)踐
背景
拉鏈表可以避免按每一天存儲所有記錄造成的海量存儲問題,同時(shí)也是處理緩慢變化數(shù)據(jù)(SCD2)的一種常見方式。
應(yīng)用場景
現(xiàn)假設(shè)有如下場景:一個(gè)企業(yè)擁有5000萬會員信息,每天有20萬會員資料變更,需要在數(shù)倉中記錄會員表的歷史變化以備分析使用,即每天都要保留一個(gè)快照供查詢,反映歷史數(shù)據(jù)的情況。
在此場景中,需要反映5000萬會員的歷史變化,如果保留快照,存儲兩年就需要2X365X5000W條數(shù)據(jù)存儲空間,數(shù)據(jù)量為365億,如果存儲更長時(shí)間,則無法估計(jì)需要的存儲空間。而利用拉鏈算法存儲,每日只向歷史表中添加新增和變化的數(shù)據(jù),每日不過20萬條,存儲4年也只需要3億存儲空間。
實(shí)現(xiàn)步驟
在拉鏈表中,每一條數(shù)據(jù)都有一個(gè)生效日期(effective_date)和失效日期(expire_date)。假設(shè)在一個(gè)用戶表中,在2019年11月8日新增了兩個(gè)用戶,如下表所示,則這兩條記錄的生效時(shí)間為當(dāng)天,由于到2019年11月8日為止,這兩條就還沒有被修改過,所以失效時(shí)間為一個(gè)給定的比較大的值,比如:3000-12-31
| member_id | phoneno | create_time | update_time |
| 10001 | 13300000001 | 2019-11-08 | 3000-12-31 |
| 10002 | 13500000002 | 2019-11-08 | 3000-12-31 |
第二天(2019-11-09),用戶10001被刪除了,用戶10002的電話號碼被修改成13600000002.為了保留歷史狀態(tài),用戶10001的失效時(shí)間被修改為2019-11-09,用戶10002則變成了兩條記錄,如下表所示:
| member_id | phoneno | create_time | update_time |
| 10001 | 13300000001 | 2019-11-08 | 2019-11-09 |
| 10002 | 13500000002 | 2019-11-08 | 2019-11-09 |
| 10002 | 13600000002 | 2019-11-09 | 3000-12-31 |
第三天(2019-11-10),又新增了用戶10003,則用戶表數(shù)據(jù)如小表所示:
| member_id | phoneno | create_time | update_time |
| 10001 | 13300000001 | 2019-11-08 | 2019-11-09 |
| 10002 | 13500000002 | 2019-11-08 | 2019-11-09 |
| 10002 | 13600000002 | 2019-11-09 | 3000-12-31 |
| 10003 | 13300000006 | 2019-11-10 | 3000-12-31 |
如果要查詢最新的數(shù)據(jù),那么只要查詢失效時(shí)間為3000-12-31的數(shù)據(jù)即可,如果要查11月8號的歷史數(shù)據(jù),則篩選生效時(shí)間<= 2019-11-08并且失效時(shí)間>2019-11-08的數(shù)據(jù)即可。如果查詢11月9號的數(shù)據(jù),那么篩選條件則是生效時(shí)間<=2019-11-09并且失效時(shí)間>2019-11-09
表結(jié)構(gòu)
MySQL源member表
CREATE TABLE member(
member_id VARCHAR ( 64 ),
phoneno VARCHAR ( 20 ),
create_time datetime,
update_time datetime );ODS層增量表member_delta,每天一個(gè)分區(qū)
CREATE TABLE member_delta
(member_id string,
phoneno string,
create_time string,
update_time string)
PARTITIONED BY (DAY string);
臨時(shí)表
CREATE TABLE member_his_tmp
(member_id string,
phoneno string,
effective_date date,
expire_date date
);DW層歷史拉鏈表
CREATE TABLE member_his
(member_id string,
phoneno string,
effective_date date,
expire_date date);Demo數(shù)據(jù)準(zhǔn)備
2019-11-08的數(shù)據(jù)為:
| member_id | phoneno | create_time | update_time |
| 10001 | 13500000001 | 2019-11-08 14:47:55 | 2019-11-08 14:47:55 |
| 10002 | 13500000002 | 2019-11-08 14:48:33 | 2019-11-08 14:48:33 |
| 10003 | 13500000003 | 2019-11-08 14:48:53 | 2019-11-08 14:48:53 |
| 10004 | 13500000004 | 2019-11-08 14:49:02 | 2019-11-08 14:49:02 |
2019-11-09的數(shù)據(jù)為:其中藍(lán)色代表新增數(shù)據(jù),紅色代表修改的數(shù)據(jù)
| member_id | phoneno | create_time | update_time |
| 10001 | 13500000001 | 2019-11-08 14:47:55 | 2019-11-08 14:47:55 |
| 10002 | 13600000002 | 2019-11-08 14:48:33 | 2019-11-09 14:48:33 |
| 10003 | 13500000003 | 2019-11-08 14:48:53 | 2019-11-08 14:48:53 |
| 10004 | 13500000004 | 2019-11-08 14:49:02 | 2019-11-08 14:49:02 |
| 10005 | 13500000005 | 2019-11-09 08:54:03 | 2019-11-09 08:54:03 |
| 10006 | 13500000006 | 2019-11-09 09:54:25 | 2019-11-09 09:54:25 |
2019-11-10的數(shù)據(jù):其中藍(lán)色代表新增數(shù)據(jù),紅色代表修改的數(shù)據(jù)
| member_id | phoneno | create_time | update_time |
| 10001 | 13500000001 | 2019-11-08 14:47:55 | 2019-11-08 14:47:55 |
| 10002 | 13600000002 | 2019-11-08 14:48:33 | 2019-11-09 14:48:33 |
| 10003 | 13500000003 | 2019-11-08 14:48:53 | 2019-11-08 14:48:53 |
| 10004 | 13600000004 | 2019-11-08 14:49:02 | 2019-11-10 14:49:02 |
| 10005 | 13500000005 | 2019-11-09 08:54:03 | 2019-11-09 08:54:03 |
| 10006 | 13500000006 | 2019-11-09 09:54:25 | 2019-11-09 09:54:25 |
| 10007 | 13500000007 | 2019-11-10 17:41:49 | 2019-11-10 17:41:49 |
全量初始裝載
在啟用拉鏈表時(shí),先對其進(jìn)行初始裝載,比如以2019-11-08為開始時(shí)間,那么將MySQL源表全量抽取到ODS層member_delta表的2018-11-08的分區(qū)中,然后初始裝載DW層的拉鏈表member_his
INSERT overwrite TABLE member_his
SELECT
member_id,
phoneno,
to_date ( create_time ) AS effective_date,
'3000-12-31'
FROM
member_delta
WHERE
DAY = '2019-11-08'
查詢初始的歷史拉鏈表數(shù)據(jù)

增量抽取數(shù)據(jù)
每天,從源系統(tǒng)member表中,將前一天的增量數(shù)據(jù)抽取到ODS層的增量數(shù)據(jù)表member_delta對應(yīng)的分區(qū)中。這里的增量需要通過member表中的創(chuàng)建時(shí)間和修改時(shí)間來確定,或者使用sqoop job監(jiān)控update時(shí)間來進(jìn)行增聯(lián)抽取。比如,本案例中2019-11-09和2019-11-10為兩個(gè)分區(qū),分別存儲了2019-11-09和2019-11-10日的增量數(shù)據(jù)。2019-11-09分區(qū)的數(shù)據(jù)為:

2019-11-10分區(qū)的數(shù)據(jù)為:

增量刷新歷史拉鏈數(shù)據(jù)
2019-11-09增量刷新歷史拉鏈表將數(shù)據(jù)放進(jìn)臨時(shí)表
INSERT overwrite TABLE member_his_tmp
SELECT *
FROM
(
-- 2019-11-09增量數(shù)據(jù),代表最新的狀態(tài),該數(shù)據(jù)的生效時(shí)間是2019-11-09,過期時(shí)間為3000-12-31
-- 這些增量的數(shù)據(jù)需要被全部加載到歷史拉鏈表中
SELECT member_id,
phoneno,
'2019-11-09' effective_date,
'3000-12-31' expire_date
FROM member_delta
WHERE DAY='2019-11-09'
UNION ALL
-- 用當(dāng)前為生效狀態(tài)的拉鏈數(shù)據(jù),去left join 增量數(shù)據(jù),
-- 如果匹配得上,則表示該數(shù)據(jù)已發(fā)生了更新,
-- 此時(shí),需要將發(fā)生更新的數(shù)據(jù)的過期時(shí)間更改為當(dāng)前時(shí)間.
-- 如果匹配不上,則表明該數(shù)據(jù)沒有發(fā)生更新,此時(shí)過期時(shí)間不變
SELECT a.member_id,
a.phoneno,
a.effective_date,
if(b.member_id IS NULL, to_date(a.expire_date), to_date(b.day)) expire_date
FROM
(SELECT *
FROM member_his
WHERE expire_date='3000-12-31') a
LEFT JOIN
(SELECT *
FROM member_delta
WHERE DAY='2019-11-09') b ON a.member_id=b.member_id)his
將數(shù)據(jù)覆蓋到歷史拉鏈表
INSERT overwrite TABLE member_his
SELECT *
FROM member_his_tmp
查看歷史拉鏈表

2019-11-10增量刷新歷史拉鏈表
將數(shù)據(jù)放進(jìn)臨時(shí)表
INSERT overwrite TABLE member_his_tmp
SELECT *
FROM
(
-- 2019-11-10增量數(shù)據(jù),代表最新的狀態(tài),該數(shù)據(jù)的生效時(shí)間是2019-11-10,過期時(shí)間為3000-12-31
-- 這些增量的數(shù)據(jù)需要被全部加載到歷史拉鏈表中
SELECT member_id,
phoneno,
'2019-11-10' effective_date,
'3000-12-31' expire_date
FROM member_delta
WHERE DAY='2019-11-10'
UNION ALL
-- 用當(dāng)前為生效狀態(tài)的拉鏈數(shù)據(jù),去left join 增量數(shù)據(jù),
-- 如果匹配得上,則表示該數(shù)據(jù)已發(fā)生了更新,
-- 此時(shí),需要將發(fā)生更新的數(shù)據(jù)的過期時(shí)間更改為當(dāng)前時(shí)間.
-- 如果匹配不上,則表明該數(shù)據(jù)沒有發(fā)生更新,此時(shí)過期時(shí)間不變
SELECT a.member_id,
a.phoneno,
a.effective_date,
if(b.member_id IS NULL, to_date(a.expire_date), to_date(b.day)) expire_date
FROM
(SELECT *
FROM member_his
WHERE expire_date='3000-12-31') a
LEFT JOIN
(SELECT *
FROM member_delta
WHERE DAY='2019-11-10') b ON a.member_id=b.member_id)his查看歷史拉鏈表

將以上腳本封裝成shell調(diào)度的腳本
#!/bin/bash
#如果是輸入的日期按照取輸入日期;如果沒輸入日期取當(dāng)前時(shí)間的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
INSERT overwrite TABLE member_his_tmp
SELECT *
FROM
(
-- 2019-11-10增量數(shù)據(jù),代表最新的狀態(tài),該數(shù)據(jù)的生效時(shí)間是2019-11-10,過期時(shí)間為3000-12-31
-- 這些增量的數(shù)據(jù)需要被全部加載到歷史拉鏈表中
SELECT member_id,
phoneno,
'$do_date' effective_date,
'3000-12-31' expire_date
FROM member_delta
WHERE DAY='$do_date'
UNION ALL
-- 用當(dāng)前為生效狀態(tài)的拉鏈數(shù)據(jù),去left join 增量數(shù)據(jù),
-- 如果匹配得上,則表示該數(shù)據(jù)已發(fā)生了更新,
-- 此時(shí),需要將發(fā)生更新的數(shù)據(jù)的過期時(shí)間更改為當(dāng)前時(shí)間.
-- 如果匹配不上,則表明該數(shù)據(jù)沒有發(fā)生更新,此時(shí)過期時(shí)間不變
SELECT a.member_id,
a.phoneno,
a.effective_date,
if(b.member_id IS NULL, to_date(a.expire_date), to_date(b.day)) expire_date
FROM
(SELECT *
FROM member_his
WHERE expire_date='3000-12-31') a
LEFT JOIN
(SELECT *
FROM member_delta
WHERE DAY='$do_date') b ON a.member_id=b.member_id)his;
"
$hive -e "$sql"
