<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink+Hologres億級用戶實(shí)時(shí)UV精確去重最佳實(shí)踐

          共 16853字,需瀏覽 34分鐘

           ·

          2021-06-15 21:23

          因?yàn)闃I(yè)務(wù)需求不同,通常會分為兩種場景:

          • 離線計(jì)算場景:以T+1為主,計(jì)算歷史數(shù)據(jù)
          • 實(shí)時(shí)計(jì)算場景:實(shí)時(shí)計(jì)算日常新增的數(shù)據(jù),對用戶標(biāo)簽去重

          針對離線計(jì)算場景,Hologres基于RoaringBitmap,提供超高基數(shù)的UV計(jì)算,只需進(jìn)行一次最細(xì)粒度的預(yù)聚合計(jì)算,也只生成一份最細(xì)粒度的預(yù)聚合結(jié)果表,就能達(dá)到亞秒級查詢。具體詳情可以參見往期文章>>Hologres如何支持超高基數(shù)UV計(jì)算(基于RoaringBitmap實(shí)現(xiàn))

          對于實(shí)時(shí)計(jì)算場景,可以使用Flink+Hologres方式,并基于RoaringBitmap,實(shí)時(shí)對用戶標(biāo)簽去重。這樣的方式,可以較細(xì)粒度的實(shí)時(shí)得到用戶UV、PV數(shù)據(jù),同時(shí)便于根據(jù)需求調(diào)整最小統(tǒng)計(jì)窗口(如最近5分鐘的UV),實(shí)現(xiàn)類似實(shí)時(shí)監(jiān)控的效果,更好的在大屏等BI展示。相較于以天、周、月等為單位的去重,更適合在活動(dòng)日期進(jìn)行更細(xì)粒度的統(tǒng)計(jì),并且通過簡單的聚合,也可以得到較大時(shí)間單位的統(tǒng)計(jì)結(jié)果。

          主體思想

          Flink將流式數(shù)據(jù)轉(zhuǎn)化為表與維表進(jìn)行JOIN操作,再轉(zhuǎn)化為流式數(shù)據(jù)。此舉可以利用Hologres維表的insertIfNotExists特性結(jié)合自增字段實(shí)現(xiàn)高效的uid映射。

          Flink把關(guān)聯(lián)的結(jié)果數(shù)據(jù)按照時(shí)間窗口進(jìn)行處理,根據(jù)查詢維度使用RoaringBitmap進(jìn)行聚合,并將查詢維度以及聚合的uid存放在聚合結(jié)果表,其中聚合出的uid結(jié)果放入Hologres的RoaringBitmap類型的字段中。

          查詢時(shí),與離線方式相似,直接按照查詢條件查詢聚合結(jié)果表,并對其中關(guān)鍵的RoaringBitmap字段做or運(yùn)算后并統(tǒng)計(jì)基數(shù),即可得出對應(yīng)用戶數(shù)。

          處理流程如下圖所示

          方案最佳實(shí)踐

          1.創(chuàng)建相關(guān)基礎(chǔ)表

          1)創(chuàng)建表uid_mapping為uid映射表,用于映射uid到32位int類型。

          RoaringBitmap類型要求用戶ID必須是32位int類型且越稠密越好(即用戶ID最好連續(xù))。常見的業(yè)務(wù)系統(tǒng)或者埋點(diǎn)中的用戶ID很多是字符串類型或Long類型,因此需要使用uid_mapping類型構(gòu)建一張映射表。映射表利用Hologres的SERIAL類型(自增的32位int)來實(shí)現(xiàn)用戶映射的自動(dòng)管理和穩(wěn)定映射。

          由于是實(shí)時(shí)數(shù)據(jù), 設(shè)置該表為行存表,以提高Flink維表實(shí)時(shí)JOIN的QPS。

          BEGIN;
          CREATE TABLE public.uid_mapping (
          uid text NOT NULL,
          uid_int32 serial,
          PRIMARY KEY (uid)
          );
          --將uid設(shè)為clustering_key和distribution_key便于快速查找其對應(yīng)的int32值
          CALL set_table_property('public.uid_mapping''clustering_key''uid');
          CALL set_table_property('public.uid_mapping''distribution_key''uid');
          CALL set_table_property('public.uid_mapping''orientation''row');
          COMMIT;

          2)創(chuàng)建表dws_app為基礎(chǔ)聚合表,用于存放在基礎(chǔ)維度上聚合后的結(jié)果。

          使用RoaringBitmap前需要?jiǎng)?chuàng)建RoaringBitmap extention,同時(shí)也需要Hologres實(shí)例為0.10版本

          CREATE EXTENSION IF NOT EXISTS roaringbitmap;

          為了更好性能,建議根據(jù)基礎(chǔ)聚合表數(shù)據(jù)量合理的設(shè)置Shard數(shù),但建議基礎(chǔ)聚合表的Shard數(shù)設(shè)置不超過計(jì)算資源的Core數(shù)。推薦使用以下方式通過Table Group來設(shè)置Shard數(shù)

          --新建shard數(shù)為16的Table Group,
          --因?yàn)闇y試數(shù)據(jù)量百萬級,其中后端計(jì)算資源為100core,設(shè)置shard數(shù)為16
          BEGIN;
          CREATE TABLE tg16 (a int);                             --Table Group哨兵表
          call set_table_property('tg16''shard_count''16'); 
          COMMIT;

          相比離線結(jié)果表,此結(jié)果表增加了時(shí)間戳字段,用于實(shí)現(xiàn)以Flink窗口周期為單位的統(tǒng)計(jì)。結(jié)果表DDL如下:

          BEGIN;
          create table dws_app(
            country text,
            prov text,
            city text, 
            ymd text NOT NULL,  --日期字段
            timetz TIMESTAMPTZ,  --統(tǒng)計(jì)時(shí)間戳,可以實(shí)現(xiàn)以Flink窗口周期為單位的統(tǒng)計(jì)
            uid32_bitmap roaringbitmap, -- 使用roaringbitmap記錄uv
            primary key(country, prov, city, ymd, timetz)--查詢維度和時(shí)間作為主鍵,防止重復(fù)插入數(shù)據(jù)
          );
          CALL set_table_property('public.dws_app''orientation''column');
          --日期字段設(shè)為clustering_key和event_time_column,便于過濾
          CALL set_table_property('public.dws_app''clustering_key''ymd');
          CALL set_table_property('public.dws_app''event_time_column''ymd');
          --等價(jià)于將表放在shard數(shù)為16的table group
          call set_table_property('public.dws_app''colocate_with''tg16');
          --group by字段設(shè)為distribution_key
          CALL set_table_property('public.dws_app''distribution_key''country,prov,city');
          COMMIT;

          2.Flink實(shí)時(shí)讀取數(shù)據(jù)并更新dws_app基礎(chǔ)聚合表

          完整示例源碼請見alibabacloud-hologres-connectors examples

          1)Flink 流式讀取數(shù)據(jù)源(DataStream),并轉(zhuǎn)化為源表(Table)

          //此處使用csv文件作為數(shù)據(jù)源,也可以是kafka等
          DataStreamSource odsStream = env.createInput(csvInput, typeInfo);
          // 與維表join需要添加proctime字段,詳見https://help.aliyun.com/document_detail/62506.html
          Table odsTable =
              tableEnv.fromDataStream(
              odsStream,
              $("uid"),
              $("country"),
              $("prov"),
              $("city"),
              $("ymd"),
              $("proctime").proctime());
          // 注冊到catalog環(huán)境
          tableEnv.createTemporaryView("odsTable", odsTable);

          2)將源表與Hologres維表(uid_mapping)進(jìn)行關(guān)聯(lián)

          其中維表使用insertIfNotExists參數(shù),即查詢不到數(shù)據(jù)時(shí)自行插入,uid_int32字段便可以利用Hologres的serial類型自增創(chuàng)建。

          // 創(chuàng)建Hologres維表,其中nsertIfNotExists表示查詢不到則自行插入
          String createUidMappingTable =
              String.format(
              "create table uid_mapping_dim("
              + "  uid string,"
              + "  uid_int32 INT"
              + ") with ("
              + "  'connector'='hologres',"
              + "  'dbname' = '%s'," //Hologres DB名
              + "  'tablename' = '%s',"//Hologres 表名
              + "  'username' = '%s'," //當(dāng)前賬號access id
              + "  'password' = '%s'," //當(dāng)前賬號access key
              + "  'endpoint' = '%s'," //Hologres endpoint
              + "  'insertifnotexists'='true'"
              + ")",
              database, dimTableName, username, password, endpoint);
          tableEnv.executeSql(createUidMappingTable);
          // 源表與維表join
          String odsJoinDim =
              "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"
              + "  FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"
              + "  ON ods.uid = dim.uid";
          Table joinRes = tableEnv.sqlQuery(odsJoinDim);

          3)將關(guān)聯(lián)結(jié)果轉(zhuǎn)化為DataStream,通過Flink時(shí)間窗口處理,結(jié)合RoaringBitmap進(jìn)行聚合

          DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource =
              source
              // 篩選需要統(tǒng)計(jì)的維度(country, prov, city, ymd)
              .keyBy(0, 1, 2, 3)
              // 滾動(dòng)時(shí)間窗口;此處由于使用讀取csv模擬輸入流,采用ProcessingTime,實(shí)際使用中可使用EventTime
              .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
              // 觸發(fā)器,可以在窗口未結(jié)束時(shí)獲取聚合結(jié)果
              .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1)))
              .aggregate(
              // 聚合函數(shù),根據(jù)key By篩選的維度,進(jìn)行聚合
              new AggregateFunction<
                  Tuple5<String, String, String, String, Integer>,
                  RoaringBitmap,
                  RoaringBitmap>() {
                      @Override
                      public RoaringBitmap createAccumulator() {
                          return new RoaringBitmap();
                      }
                      @Override
                      public RoaringBitmap add(
                          Tuple5<String, String, String, String, Integer> in,
                          RoaringBitmap acc) {
                          // 將32位的uid添加到RoaringBitmap進(jìn)行去重
                          acc.add(in.f4);
                          return acc;
                      }
                      @Override
                      public RoaringBitmap getResult(RoaringBitmap acc) {
                          return acc;
                      }
                      @Override
                      public RoaringBitmap merge(
                          RoaringBitmap acc1, RoaringBitmap acc2) {
                          return RoaringBitmap.or(acc1, acc2);
                      }
               },
              //窗口函數(shù),輸出聚合結(jié)果
              new WindowFunction<
                  RoaringBitmap,
                  Tuple6<String, String, String, String, Timestamp, byte[]>,
                  Tuple,
                  TimeWindow>() {
                      @Override
                      public void apply(
                          Tuple keys,
                          TimeWindow timeWindow,
                          Iterable<RoaringBitmap> iterable,
                          Collector<
                          Tuple6<String, String, String, String, Timestamp, byte[]>> out)
                          throws Exception {
                          RoaringBitmap result = iterable.iterator().next();
                          // 優(yōu)化RoaringBitmap
                          result.runOptimize();
                          // 將RoaringBitmap轉(zhuǎn)化為字節(jié)數(shù)組以存入Holo中
                          byte[] byteArray = new byte[result.serializedSizeInBytes()];
                          result.serialize(ByteBuffer.wrap(byteArray));
                          // 其中 Tuple6.f4(Timestamp) 字段表示以窗口長度為周期進(jìn)行統(tǒng)計(jì),以秒為單位
                          out.collect(
                              new Tuple6<>(
                                  keys.getField(0),
                                  keys.getField(1),
                                  keys.getField(2),
                                  keys.getField(3),
                                  new Timestamp(
                                      timeWindow.getEnd() / 1000 * 1000),
                                  byteArray));
                  }
              });

          4)寫入結(jié)果表

          需要注意的是,Hologres中RoaringBitmap類型在Flink中對應(yīng)Byte數(shù)組類型

          // 計(jì)算結(jié)果轉(zhuǎn)換為表
          Table resTable =
              tableEnv.fromDataStream(
                  processedSource,
                  $("country"),
                  $("prov"),
                  $("city"),
                  $("ymd"),
                  $("timest"),
                  $("uid32_bitmap"));
          // 創(chuàng)建Hologres結(jié)果表, 其中Hologres的RoaringBitmap類型通過Byte數(shù)組存入
          String createHologresTable =
              String.format(
                  "create table sink("
                  + "  country string,"
                  + "  prov string,"
                  + "  city string,"
                  + "  ymd string,"
                  + "  timetz timestamp,"
                  + "  uid32_bitmap BYTES"
                  + ") with ("
                  + "  'connector'='hologres',"
                  + "  'dbname' = '%s',"
                  + "  'tablename' = '%s',"
                  + "  'username' = '%s',"
                  + "  'password' = '%s',"
                  + "  'endpoint' = '%s',"
                  + "  'connectionSize' = '%s',"
                  + "  'mutatetype' = 'insertOrReplace'"
                  + ")",
              database, dwsTableName, username, password, endpoint, connectionSize);
          tableEnv.executeSql(createHologresTable);
          // 寫入計(jì)算結(jié)果到dws表
          tableEnv.executeSql("insert into sink select * from " + resTable);

          3.數(shù)據(jù)查詢

          查詢時(shí),從基礎(chǔ)聚合表(dws_app)中按照查詢維度做聚合計(jì)算,查詢bitmap基數(shù),得出group by條件下的用戶數(shù)

          查詢某天內(nèi)各個(gè)城市的uv

          --運(yùn)行下面RB_AGG運(yùn)算查詢,可執(zhí)行參數(shù)先關(guān)閉三階段聚合開關(guān)(默認(rèn)關(guān)閉),性能更好
          set hg_experimental_enable_force_three_stage_agg=off  
          SELECT  country
                  ,prov
                  ,city
                  ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
          FROM    dws_app
          WHERE   ymd = '20210329'
          GROUP BY country
                   ,prov
                   ,city
          ;

          查詢某段時(shí)間內(nèi)各個(gè)省份的uv

          --運(yùn)行下面RB_AGG運(yùn)算查詢,可執(zhí)行參數(shù)先關(guān)閉三階段聚合開關(guān)(默認(rèn)關(guān)閉),性能更好
          set hg_experimental_enable_force_three_stage_agg=off 
          SELECT  country
                  ,prov
                  ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
          FROM    dws_app
          WHERE   time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08'
          GROUP BY country
                   ,prov
          ;

          本文鏈接:https://zhuanlan.zhihu.com/p/377588369

          瀏覽 58
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  人人操人人干人人 | 婷婷丁香无码 | 国产肉体XXX137大胆 | 欧美精品中文字幕在线观看 | 夜夜国自一区 1080P |