Flink+Hologres億級用戶實(shí)時(shí)UV精確去重最佳實(shí)踐
因?yàn)闃I(yè)務(wù)需求不同,通常會分為兩種場景:
離線計(jì)算場景:以T+1為主,計(jì)算歷史數(shù)據(jù) 實(shí)時(shí)計(jì)算場景:實(shí)時(shí)計(jì)算日常新增的數(shù)據(jù),對用戶標(biāo)簽去重
針對離線計(jì)算場景,Hologres基于RoaringBitmap,提供超高基數(shù)的UV計(jì)算,只需進(jìn)行一次最細(xì)粒度的預(yù)聚合計(jì)算,也只生成一份最細(xì)粒度的預(yù)聚合結(jié)果表,就能達(dá)到亞秒級查詢。具體詳情可以參見往期文章>>Hologres如何支持超高基數(shù)UV計(jì)算(基于RoaringBitmap實(shí)現(xiàn))
對于實(shí)時(shí)計(jì)算場景,可以使用Flink+Hologres方式,并基于RoaringBitmap,實(shí)時(shí)對用戶標(biāo)簽去重。這樣的方式,可以較細(xì)粒度的實(shí)時(shí)得到用戶UV、PV數(shù)據(jù),同時(shí)便于根據(jù)需求調(diào)整最小統(tǒng)計(jì)窗口(如最近5分鐘的UV),實(shí)現(xiàn)類似實(shí)時(shí)監(jiān)控的效果,更好的在大屏等BI展示。相較于以天、周、月等為單位的去重,更適合在活動(dòng)日期進(jìn)行更細(xì)粒度的統(tǒng)計(jì),并且通過簡單的聚合,也可以得到較大時(shí)間單位的統(tǒng)計(jì)結(jié)果。
主體思想
Flink將流式數(shù)據(jù)轉(zhuǎn)化為表與維表進(jìn)行JOIN操作,再轉(zhuǎn)化為流式數(shù)據(jù)。此舉可以利用Hologres維表的insertIfNotExists特性結(jié)合自增字段實(shí)現(xiàn)高效的uid映射。
Flink把關(guān)聯(lián)的結(jié)果數(shù)據(jù)按照時(shí)間窗口進(jìn)行處理,根據(jù)查詢維度使用RoaringBitmap進(jìn)行聚合,并將查詢維度以及聚合的uid存放在聚合結(jié)果表,其中聚合出的uid結(jié)果放入Hologres的RoaringBitmap類型的字段中。
查詢時(shí),與離線方式相似,直接按照查詢條件查詢聚合結(jié)果表,并對其中關(guān)鍵的RoaringBitmap字段做or運(yùn)算后并統(tǒng)計(jì)基數(shù),即可得出對應(yīng)用戶數(shù)。
處理流程如下圖所示

方案最佳實(shí)踐
1.創(chuàng)建相關(guān)基礎(chǔ)表
1)創(chuàng)建表uid_mapping為uid映射表,用于映射uid到32位int類型。
RoaringBitmap類型要求用戶ID必須是32位int類型且越稠密越好(即用戶ID最好連續(xù))。常見的業(yè)務(wù)系統(tǒng)或者埋點(diǎn)中的用戶ID很多是字符串類型或Long類型,因此需要使用uid_mapping類型構(gòu)建一張映射表。映射表利用Hologres的SERIAL類型(自增的32位int)來實(shí)現(xiàn)用戶映射的自動(dòng)管理和穩(wěn)定映射。
由于是實(shí)時(shí)數(shù)據(jù), 設(shè)置該表為行存表,以提高Flink維表實(shí)時(shí)JOIN的QPS。
BEGIN;
CREATE TABLE public.uid_mapping (
uid text NOT NULL,
uid_int32 serial,
PRIMARY KEY (uid)
);
--將uid設(shè)為clustering_key和distribution_key便于快速查找其對應(yīng)的int32值
CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid');
CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid');
CALL set_table_property('public.uid_mapping', 'orientation', 'row');
COMMIT;
2)創(chuàng)建表dws_app為基礎(chǔ)聚合表,用于存放在基礎(chǔ)維度上聚合后的結(jié)果。
使用RoaringBitmap前需要?jiǎng)?chuàng)建RoaringBitmap extention,同時(shí)也需要Hologres實(shí)例為0.10版本
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
為了更好性能,建議根據(jù)基礎(chǔ)聚合表數(shù)據(jù)量合理的設(shè)置Shard數(shù),但建議基礎(chǔ)聚合表的Shard數(shù)設(shè)置不超過計(jì)算資源的Core數(shù)。推薦使用以下方式通過Table Group來設(shè)置Shard數(shù)
--新建shard數(shù)為16的Table Group,
--因?yàn)闇y試數(shù)據(jù)量百萬級,其中后端計(jì)算資源為100core,設(shè)置shard數(shù)為16
BEGIN;
CREATE TABLE tg16 (a int); --Table Group哨兵表
call set_table_property('tg16', 'shard_count', '16');
COMMIT;
相比離線結(jié)果表,此結(jié)果表增加了時(shí)間戳字段,用于實(shí)現(xiàn)以Flink窗口周期為單位的統(tǒng)計(jì)。結(jié)果表DDL如下:
BEGIN;
create table dws_app(
country text,
prov text,
city text,
ymd text NOT NULL, --日期字段
timetz TIMESTAMPTZ, --統(tǒng)計(jì)時(shí)間戳,可以實(shí)現(xiàn)以Flink窗口周期為單位的統(tǒng)計(jì)
uid32_bitmap roaringbitmap, -- 使用roaringbitmap記錄uv
primary key(country, prov, city, ymd, timetz)--查詢維度和時(shí)間作為主鍵,防止重復(fù)插入數(shù)據(jù)
);
CALL set_table_property('public.dws_app', 'orientation', 'column');
--日期字段設(shè)為clustering_key和event_time_column,便于過濾
CALL set_table_property('public.dws_app', 'clustering_key', 'ymd');
CALL set_table_property('public.dws_app', 'event_time_column', 'ymd');
--等價(jià)于將表放在shard數(shù)為16的table group
call set_table_property('public.dws_app', 'colocate_with', 'tg16');
--group by字段設(shè)為distribution_key
CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city');
COMMIT;
2.Flink實(shí)時(shí)讀取數(shù)據(jù)并更新dws_app基礎(chǔ)聚合表
完整示例源碼請見alibabacloud-hologres-connectors examples
1)Flink 流式讀取數(shù)據(jù)源(DataStream),并轉(zhuǎn)化為源表(Table)
//此處使用csv文件作為數(shù)據(jù)源,也可以是kafka等
DataStreamSource odsStream = env.createInput(csvInput, typeInfo);
// 與維表join需要添加proctime字段,詳見https://help.aliyun.com/document_detail/62506.html
Table odsTable =
tableEnv.fromDataStream(
odsStream,
$("uid"),
$("country"),
$("prov"),
$("city"),
$("ymd"),
$("proctime").proctime());
// 注冊到catalog環(huán)境
tableEnv.createTemporaryView("odsTable", odsTable);
2)將源表與Hologres維表(uid_mapping)進(jìn)行關(guān)聯(lián)
其中維表使用insertIfNotExists參數(shù),即查詢不到數(shù)據(jù)時(shí)自行插入,uid_int32字段便可以利用Hologres的serial類型自增創(chuàng)建。
// 創(chuàng)建Hologres維表,其中nsertIfNotExists表示查詢不到則自行插入
String createUidMappingTable =
String.format(
"create table uid_mapping_dim("
+ " uid string,"
+ " uid_int32 INT"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s'," //Hologres DB名
+ " 'tablename' = '%s',"//Hologres 表名
+ " 'username' = '%s'," //當(dāng)前賬號access id
+ " 'password' = '%s'," //當(dāng)前賬號access key
+ " 'endpoint' = '%s'," //Hologres endpoint
+ " 'insertifnotexists'='true'"
+ ")",
database, dimTableName, username, password, endpoint);
tableEnv.executeSql(createUidMappingTable);
// 源表與維表join
String odsJoinDim =
"SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"
+ " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"
+ " ON ods.uid = dim.uid";
Table joinRes = tableEnv.sqlQuery(odsJoinDim);
3)將關(guān)聯(lián)結(jié)果轉(zhuǎn)化為DataStream,通過Flink時(shí)間窗口處理,結(jié)合RoaringBitmap進(jìn)行聚合
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource =
source
// 篩選需要統(tǒng)計(jì)的維度(country, prov, city, ymd)
.keyBy(0, 1, 2, 3)
// 滾動(dòng)時(shí)間窗口;此處由于使用讀取csv模擬輸入流,采用ProcessingTime,實(shí)際使用中可使用EventTime
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
// 觸發(fā)器,可以在窗口未結(jié)束時(shí)獲取聚合結(jié)果
.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1)))
.aggregate(
// 聚合函數(shù),根據(jù)key By篩選的維度,進(jìn)行聚合
new AggregateFunction<
Tuple5<String, String, String, String, Integer>,
RoaringBitmap,
RoaringBitmap>() {
@Override
public RoaringBitmap createAccumulator() {
return new RoaringBitmap();
}
@Override
public RoaringBitmap add(
Tuple5<String, String, String, String, Integer> in,
RoaringBitmap acc) {
// 將32位的uid添加到RoaringBitmap進(jìn)行去重
acc.add(in.f4);
return acc;
}
@Override
public RoaringBitmap getResult(RoaringBitmap acc) {
return acc;
}
@Override
public RoaringBitmap merge(
RoaringBitmap acc1, RoaringBitmap acc2) {
return RoaringBitmap.or(acc1, acc2);
}
},
//窗口函數(shù),輸出聚合結(jié)果
new WindowFunction<
RoaringBitmap,
Tuple6<String, String, String, String, Timestamp, byte[]>,
Tuple,
TimeWindow>() {
@Override
public void apply(
Tuple keys,
TimeWindow timeWindow,
Iterable<RoaringBitmap> iterable,
Collector<
Tuple6<String, String, String, String, Timestamp, byte[]>> out)
throws Exception {
RoaringBitmap result = iterable.iterator().next();
// 優(yōu)化RoaringBitmap
result.runOptimize();
// 將RoaringBitmap轉(zhuǎn)化為字節(jié)數(shù)組以存入Holo中
byte[] byteArray = new byte[result.serializedSizeInBytes()];
result.serialize(ByteBuffer.wrap(byteArray));
// 其中 Tuple6.f4(Timestamp) 字段表示以窗口長度為周期進(jìn)行統(tǒng)計(jì),以秒為單位
out.collect(
new Tuple6<>(
keys.getField(0),
keys.getField(1),
keys.getField(2),
keys.getField(3),
new Timestamp(
timeWindow.getEnd() / 1000 * 1000),
byteArray));
}
});
需要注意的是,Hologres中RoaringBitmap類型在Flink中對應(yīng)Byte數(shù)組類型
// 計(jì)算結(jié)果轉(zhuǎn)換為表
Table resTable =
tableEnv.fromDataStream(
processedSource,
$("country"),
$("prov"),
$("city"),
$("ymd"),
$("timest"),
$("uid32_bitmap"));
// 創(chuàng)建Hologres結(jié)果表, 其中Hologres的RoaringBitmap類型通過Byte數(shù)組存入
String createHologresTable =
String.format(
"create table sink("
+ " country string,"
+ " prov string,"
+ " city string,"
+ " ymd string,"
+ " timetz timestamp,"
+ " uid32_bitmap BYTES"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s',"
+ " 'connectionSize' = '%s',"
+ " 'mutatetype' = 'insertOrReplace'"
+ ")",
database, dwsTableName, username, password, endpoint, connectionSize);
tableEnv.executeSql(createHologresTable);
// 寫入計(jì)算結(jié)果到dws表
tableEnv.executeSql("insert into sink select * from " + resTable);
查詢時(shí),從基礎(chǔ)聚合表(dws_app)中按照查詢維度做聚合計(jì)算,查詢bitmap基數(shù),得出group by條件下的用戶數(shù)
查詢某天內(nèi)各個(gè)城市的uv
--運(yùn)行下面RB_AGG運(yùn)算查詢,可執(zhí)行參數(shù)先關(guān)閉三階段聚合開關(guān)(默認(rèn)關(guān)閉),性能更好
set hg_experimental_enable_force_three_stage_agg=off
SELECT country
,prov
,city
,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM dws_app
WHERE ymd = '20210329'
GROUP BY country
,prov
,city
;
查詢某段時(shí)間內(nèi)各個(gè)省份的uv
--運(yùn)行下面RB_AGG運(yùn)算查詢,可執(zhí)行參數(shù)先關(guān)閉三階段聚合開關(guān)(默認(rèn)關(guān)閉),性能更好
set hg_experimental_enable_force_three_stage_agg=off
SELECT country
,prov
,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM dws_app
WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08'
GROUP BY country
,prov
;
本文鏈接:https://zhuanlan.zhihu.com/p/377588369
