<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          大數(shù)據(jù)實(shí)戰(zhàn) -- 基于 Flink 的短視頻生產(chǎn)消費(fèi)監(jiān)控

          共 26946字,需瀏覽 54分鐘

           ·

          2020-11-24 12:14


          本文詳細(xì)介紹了實(shí)時監(jiān)控類指標(biāo)的數(shù)據(jù)流轉(zhuǎn)鏈路以及技術(shù)方案,大多數(shù)的實(shí)時監(jiān)控類指標(biāo)都可按照本文中的幾種方案實(shí)現(xiàn)。

          短視頻生產(chǎn)消費(fèi)監(jiān)控

          短視頻帶來了全新的傳播場域和節(jié)目形態(tài),小屏幕、快節(jié)奏成為行業(yè)潮流的同時,也催生了新的用戶消費(fèi)習(xí)慣,為創(chuàng)作者和商戶帶來收益。而多元化的短視頻也可以為品牌方提供營銷機(jī)遇。

          其中對于垂類生態(tài)短視頻的生產(chǎn)消費(fèi)熱點(diǎn)的監(jiān)控分析目前成為了實(shí)時數(shù)據(jù)處理很常見的一個應(yīng)用場景,比如對某個圈定的垂類生態(tài)下的視頻生產(chǎn)或者視頻消費(fèi)進(jìn)行監(jiān)控,對熱點(diǎn)視頻生成對應(yīng)的優(yōu)化推薦策略,促進(jìn)熱點(diǎn)視頻的生產(chǎn)或者消費(fèi),構(gòu)建整個生產(chǎn)消費(fèi)數(shù)據(jù)鏈路的閉環(huán),從而提高創(chuàng)作者收益以及消費(fèi)者留存。

          本文將完整分析垂類生態(tài)短視頻生產(chǎn)消費(fèi)數(shù)據(jù)的整條鏈路流轉(zhuǎn)方式,并基于 Flink 提供幾種對于垂類視頻生產(chǎn)消費(fèi)監(jiān)控的方案設(shè)計。通過本文,你可以了解到:

          • 垂類生態(tài)短視頻生產(chǎn)消費(fèi)數(shù)據(jù)鏈路閉環(huán)

          • 實(shí)時監(jiān)控短視頻生產(chǎn)消費(fèi)的方案設(shè)計

          • 不同監(jiān)控量級場景下的代碼實(shí)現(xiàn)

          • flink 學(xué)習(xí)資料

          項(xiàng)目簡介

          垂類生態(tài)短視頻生產(chǎn)消費(fèi)數(shù)據(jù)鏈路流轉(zhuǎn)架構(gòu)圖如下,此數(shù)據(jù)流轉(zhuǎn)圖也適用于其他場景:

          鏈路

          在上述場景中,用戶生產(chǎn)和消費(fèi)短視頻,從而客戶端、服務(wù)端以及數(shù)據(jù)庫會產(chǎn)生相應(yīng)的行為操作日志,這些日志會通過日志抽取中間件抽取到消息隊(duì)列中,我們目前的場景中是使用 Kafka 作為消息隊(duì)列;然后使用 flink 對垂類生態(tài)中的視頻進(jìn)行生產(chǎn)或消費(fèi)監(jiān)控(內(nèi)容生產(chǎn)通常是圈定垂類作者 id 池,內(nèi)容消費(fèi)通常是圈定垂類視頻 id 池),最后將實(shí)時聚合數(shù)據(jù)產(chǎn)出到下游;下游可以以數(shù)據(jù)服務(wù),實(shí)時看板的方式展現(xiàn),運(yùn)營同學(xué)或者自動化工具最終會幫助我們分析當(dāng)前垂類下的生產(chǎn)或者消費(fèi)熱點(diǎn),從而生成推薦策略。

          方案設(shè)計

          架構(gòu)

          其中數(shù)據(jù)源如下:

          • Kafka 為全量內(nèi)容生產(chǎn)和內(nèi)容消費(fèi)的日志。
          • Rpc/Http/Mysql/配置中心/Redis/HBase 為需要監(jiān)控的垂類生態(tài)內(nèi)容 id 池(內(nèi)容生產(chǎn)則為作者 id 池,內(nèi)容消費(fèi)則為視頻 id 池),其主要是提供給運(yùn)營同學(xué)動態(tài)配置需要監(jiān)控的 id 范圍,其可以在 flink 中進(jìn)行實(shí)時查詢,解析運(yùn)營同學(xué)想要的監(jiān)控指標(biāo)范圍,以及監(jiān)控的指標(biāo)和計算方式,然后加工數(shù)據(jù)產(chǎn)出,可以支持隨時配置,實(shí)時數(shù)據(jù)隨時計算產(chǎn)出。

          其中數(shù)據(jù)匯為聚類好的內(nèi)容生產(chǎn)或者消費(fèi)熱點(diǎn)話題或者事件指標(biāo):

          • Redis/HBase 主要是以低延遲(Redis 5ms p99,HBase 100ms p99,不同公司的服務(wù)能力不同)并且高 QPS 提供數(shù)據(jù)服務(wù),給 Server 端或者線上用戶提供低延遲的數(shù)據(jù)查詢。
          • Druid/Mysql 可以做為 OLAP 引擎為 BI 分析提供靈活的上卷下鉆聚合分析能力,供運(yùn)營同學(xué)配置可視化圖表使用。
          • Kafka 可以以流式數(shù)據(jù)產(chǎn)出,從而提供給下游繼續(xù)消費(fèi)或者進(jìn)行特征提取。

          廢話不多說,我們直接上方案和代碼,下述幾種方案按照監(jiān)控 id 范圍量級區(qū)分,不同的量級對應(yīng)著不同的方案,其中的代碼示例為 ProcessWindowFunction,也可以使用 AggregateFunction 代替,其中主要監(jiān)控邏輯都相同。

          方案 1

          適合監(jiān)控 id 數(shù)據(jù)量小的場景(幾千 id),其實(shí)現(xiàn)方式是在 flink 任務(wù)初始化時將需要監(jiān)控的 id 池或動態(tài)配置中心的 id 池加載到內(nèi)存當(dāng)中,之后只需要在內(nèi)存中判斷內(nèi)容生產(chǎn)或者消費(fèi)數(shù)據(jù)是否在這個監(jiān)控池當(dāng)中。

          ProcessWindowFunction p = new ProcessWindowFunction
               
                () {
                
              
               // 配置中心動態(tài) id 池
               private Config > needMonitoredIdsConfig;

               @Override
               public void open(Configuration parameters) throws Exception {
                   this.needMonitoredIdsConfig = ConfigBuilder
                          .buildSet( "needMonitoredIds", Long.class);
              }

               @Override
               public void process(Long bucket, Context context, Iterable  iterable, Collector  collector)  throws Exception {
                  Set  needMonitoredIds = needMonitoredIdsConfig.get();
                   /**
                   * 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
                   */

              }
          }

          監(jiān)控的 id 池可以按照固定或者可配置從而分出兩種獲取方式:第一種是在 flink 任務(wù)開始時就全部加載進(jìn)內(nèi)存中,這種方式適合監(jiān)控 id 池不變的情況;第二種是使用動態(tài)配置中心,每次都從配置中心訪問到最新的監(jiān)控 id 池,其可以滿足動態(tài)配置或者更改 id 池的需求,并且這種實(shí)現(xiàn)方式通常可以實(shí)時感知到配置更改,幾乎無延遲。

          方案 2

          適合監(jiān)控 id 數(shù)據(jù)量適中(幾十萬 id),監(jiān)控數(shù)據(jù)范圍會不定時發(fā)生變動的場景。其實(shí)現(xiàn)方式是在 flink 算子中定時訪問接口獲取最新的監(jiān)控 id 池,以獲取最新監(jiān)控數(shù)據(jù)范圍。

          ProcessWindowFunction p = new ProcessWindowFunction
               
                () {
                

               private  long lastRefreshTimestamp;

               private Set  needMonitoredIds;

               @Override
               public void open(Configuration parameters) throws Exception {
                   super.open(parameters);
                   this.refreshNeedMonitoredIds(System.currentTimeMillis());
              }

               @Override
               public void process(Long bucket, Context context, Iterable  iterable, Collector  collector)  throws Exception {
                   long windowStart = context.window().getStart();
                   this.refreshNeedMonitoredIds(windowStart);
                   /**
                   * 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
                   */

              }

               public void refreshNeedMonitoredIds(long windowStart) {
                   // 每隔 10 秒訪問一次
                   if (windowStart -  this.lastRefreshTimestamp >=  10000L) {
                       this.lastRefreshTimestamp = windowStart;
                       this.needMonitoredIds = Rpc.get(...)
                  }
              }
          }

          根據(jù)上述代碼實(shí)現(xiàn)方式,按照時間間隔的方式刷新 id 池,其缺點(diǎn)在于不能實(shí)時感知監(jiān)控 id 池的變化,所以刷新時間可能會和需求場景強(qiáng)耦合(如果 id 池會頻繁更新,那么就需要縮小刷新時間間隔)。也可根據(jù)需求場景在每個窗口開始前刷新 id 池,這樣可保證每個窗口中的 id 池中的數(shù)據(jù)一直保持更新。

          方案 3

          方案 3 對方案 2 的一個優(yōu)化(幾十萬 id,我們生產(chǎn)環(huán)境中最常用的)。其實(shí)現(xiàn)方式是在 flink 中使用 broadcast 算子定時訪問監(jiān)控 id 池,并將 id 池以廣播的形式下發(fā)給下游參與計算的各個算子。其優(yōu)化點(diǎn)在于:比如任務(wù)的并行度為 500,每 1s 訪問一次,采用方案 2 則訪問監(jiān)控 id 池接口的 QPS 為 500,在使用 broadcast 算子之后,其訪問 QPS 可以減少到 1,可以大大減少對接口的訪問量,減輕接口壓力。

          public class Example {

              @Slf4j
              static class NeedMonitorIdsSource implements SourceFunction<Map<LongSet<Long>>> {

                  private volatile boolean isCancel;

                  @Override
                  public void run(SourceContext >> sourceContext)  throws Exception {
                      while (!this.isCancel) {
                          try {
                              TimeUnit.SECONDS.sleep(1);
                              Set  needMonitorIds = Rpc.get(...);
                               // 可以和上一次訪問的數(shù)據(jù)做比較查看是否有變化,如果有變化,才發(fā)送出去
                               if (CollectionUtils.isNotEmpty(needMonitorIds)) {
                                  sourceContext.collect( new HashMap >() {{
                                      put(0L, needMonitorIds);
                                  }});
                              }
                          }  catch (Throwable e) {
                               // 防止接口訪問失敗導(dǎo)致的錯誤導(dǎo)致 flink job 掛掉
                              log.error( "need monitor ids error", e);
                          }
                      }
                  }

                   @Override
                   public void cancel() {
                       this.isCancel =  true;
                  }
              }

               public static void main(String[] args) {
                  ParameterTool parameterTool = ParameterTool.fromArgs(args);
                  InputParams inputParams =  new InputParams(parameterTool);
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

                   final MapStateDescriptor > broadcastMapStateDescriptor =  new MapStateDescriptor<>(
                           "config-keywords",
                          BasicTypeInfo.LONG_TYPE_INFO,
                          TypeInformation.of( new TypeHint >() {
                          }));

                   /********************* kafka source *********************/
                  BroadcastStream >> broadcastStream = env
                          .addSource( new NeedMonitorIdsSource())  // redis photoId 數(shù)據(jù)廣播
                          .setParallelism( 1)
                          .broadcast(broadcastMapStateDescriptor);

                  DataStream  logSourceDataStream = SourceFactory.getSourceDataStream(...);

                   /********************* dag *********************/
                  DataStream  resultDataStream = logSourceDataStream
                          .keyBy(KeySelectorFactory.getStringKeySelector(CommonModel::getKeyField))
                          .connect(broadcastStream)
                          .process( new KeyedBroadcastProcessFunction >, CommonModel>() {

                               private Set  needMonitoredIds;

                               @Override
                               public void open(Configuration parameters) throws Exception {
                                   super.open(parameters);
                                   this.needMonitoredIds = Rpc.get(...)
                              }

                               @Override
                               public void processElement(CommonModel commonModel, ReadOnlyContext readOnlyContext, Collector  collector)  throws Exception {
                                   // 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
                              }

                               @Override
                               public void processBroadcastElement(Map > longSetMap, Context context, Collector  collector)  throws Exception {
                                   // 需要監(jiān)控的字段
                                  Set  needMonitorIds = longSetMap.get( 0L);
                                   if (CollectionUtils.isNotEmpty(needMonitorIds)) {
                                       this.needMonitoredIds = needMonitorIds;
                                  }
                              }
                          });

                   /********************* kafka sink *********************/
                  SinkFactory.setSinkDataStream(...);
                  
                  env.execute(inputParams.jobName);
              }

          }

          方案 4

          適合于超大監(jiān)控范圍的數(shù)據(jù)(幾百萬,我們自己的生產(chǎn)實(shí)踐中使用擴(kuò)量到 500 萬)。其原理是將監(jiān)控范圍接口按照 id 按照一定規(guī)則分桶。flink 消費(fèi)到日志數(shù)據(jù)后將 id 按照 監(jiān)控范圍接口 id 相同的分桶方法進(jìn)行分桶 keyBy,這樣在下游算子中每個算子中就可以按照桶變量值,從接口中拿到對應(yīng)桶的監(jiān)控 id 數(shù)據(jù),這樣 flink 中并行的每個算子只需要獲取到自己對應(yīng)的桶的數(shù)據(jù),可以大大減少請求的壓力。

          public class Example {

              public static void main(String[] args) {
                  ParameterTool parameterTool = ParameterTool.fromArgs(args);
                  InputParams inputParams = new InputParams(parameterTool);
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

                  final MapStateDescriptor > broadcastMapStateDescriptor =  new MapStateDescriptor<>(
                           "config-keywords",
                          BasicTypeInfo.LONG_TYPE_INFO,
                          TypeInformation.of( new TypeHint >() {
                          }));

                   /********************* kafka source *********************/

                  DataStream  logSourceDataStream = SourceFactory.getSourceDataStream(...);

                   /********************* dag *********************/
                  DataStream  resultDataStream = logSourceDataStream
                          .keyBy(KeySelectorFactory.getLongKeySelector(CommonModel::getKeyField))
                          .timeWindow(Time.seconds(inputParams.accTimeWindowSeconds))
                          .process( new ProcessWindowFunction () {

                               private  long lastRefreshTimestamp;

                               private Set  oneBucketNeedMonitoredIds;

                               @Override
                               public void open(Configuration parameters) throws Exception {
                                   super.open(parameters);
                              }

                               @Override
                               public void process(Long bucket, Context context, Iterable  iterable, Collector  collector)  throws Exception {
                                   long windowStart = context.window().getStart();
                                   this.refreshNeedMonitoredIds(windowStart, bucket);
                                   /**
                                   * 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
                                   */

                              }

                               public void refreshNeedMonitoredIds(long windowStart, long bucket) {
                                   // 每隔 10 秒訪問一次
                                   if (windowStart -  this.lastRefreshTimestamp >=  10000L) {
                                       this.lastRefreshTimestamp = windowStart;
                                       this.oneBucketNeedMonitoredIds = Rpc.get(bucket, ...)
                                  }
                              }
                          });

                   /********************* kafka sink *********************/
                  SinkFactory.setSinkDataStream(...);

                  env.execute(inputParams.jobName);
              }
          }

          總結(jié)

          本文首先介紹了,在短視頻領(lǐng)域中,短視頻生產(chǎn)消費(fèi)數(shù)據(jù)鏈路的整個閉環(huán),并且其數(shù)據(jù)鏈路閉環(huán)一般情況下也適用于其他場景;以及對應(yīng)的實(shí)時監(jiān)控方案的設(shè)計和不同場景下的代碼實(shí)現(xiàn),包括:

          • 垂類生態(tài)短視頻生產(chǎn)消費(fèi)數(shù)據(jù)鏈路閉環(huán):用戶操作行為日志的流轉(zhuǎn),日志上傳,實(shí)時計算,以及流轉(zhuǎn)到 BI,數(shù)據(jù)服務(wù),最后數(shù)據(jù)賦能的整個流程

          • 實(shí)時監(jiān)控方案設(shè)計:監(jiān)控類實(shí)時計算流程中各類數(shù)據(jù)源,數(shù)據(jù)匯的選型

          • 監(jiān)控 id 池在不同量級場景下具體代碼實(shí)現(xiàn)

          學(xué)習(xí)資料

          flink

          • https://github.com/flink-china/flink-training-course/blob/master/README.md
          • https://ververica.cn/developers-resources/
          • https://space.bilibili.com/33807709


          --end--


          掃描下方二維碼
          添加好友,備注【交流
          可私聊交流,也可進(jìn)資源豐富學(xué)習(xí)群

          瀏覽 50
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  激情久久网 | 国产精品免费一区二区六十路 | 免费在线观看视频无码 | 精品国产视频 | 小泽玛利亚av在线 |