<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          ES實現(xiàn)百億級數(shù)據(jù)實時分析實戰(zhàn)案例

          共 2688字,需瀏覽 6分鐘

           ·

          2021-01-15 00:48

          點擊上方藍色字體,選擇“設(shè)為星標
          回復”資源“獲取更多資源

          背景

          我們小組前段時間接到一個需求,希望能夠按照小時為單位,看到每個實驗中各種特征(單個或組合)的覆蓋率、正樣本占比、負樣本占比。我簡單解釋一下這三種指標的定義:
          • 覆蓋率:所有樣本中出現(xiàn)某一特征的樣本的比例

          • 正樣本占比:所有出現(xiàn)該特征的樣本中,正樣本的比例

          • 負樣本占比:所有出現(xiàn)該特征的樣本中,負樣本的比例

          光看這三個指標,大家可能會覺得這個需求很簡單,無非就是一個簡單的篩選、聚合而已。
          如果真的這么簡單,我也沒必要寫這篇文章單獨記錄了。問題的關(guān)鍵就在于,每小時有將近1億的數(shù)據(jù)量,而我們需要保存7天的數(shù)據(jù),數(shù)據(jù)總量預計超過了100億

          技術(shù)方案

          在了解清楚需求后,我們小組馬上對技術(shù)方案展開討論,討論過程中出現(xiàn)了3種方案:
          • 第一種:用Spark流式計算,計算每一種可能單個或組合特征的相關(guān)指標

          • 第二種:收到客戶端請求后,遍歷HDFS中相關(guān)數(shù)據(jù),進行離線計算

          • 第三種:將數(shù)據(jù)按照實驗+小時分索引存入ES,收到客戶端請求后,實時計算返回

          首先,第一種方案直接被diss,原因是一個實驗一般會出現(xiàn)幾百、上千個特征,而這些特征的組合何止幾億種,全部計算的話,可行性暫且不論,光是對資源的消耗就無法承受。
          第二種方案,雖然技術(shù)上是可行的,但離線計算所需時間較長,對用戶來說,體驗并不理想。并且,為了計算目標1%的數(shù)據(jù)而要遍歷所有數(shù)據(jù),對資源也存在很大浪費。
          第三種方案,將數(shù)據(jù)按照實驗+小時分索引后,可以將每個索引包含的數(shù)據(jù)量降到1000萬以下,再借助ES在查詢、聚合方面高效的能力,應(yīng)該可以實現(xiàn)秒級響應(yīng),并且用戶體驗也會非常好。
          技術(shù)方案由此確定。

          技術(shù)架構(gòu)

          1.用Spark從Kafka中接入原始數(shù)據(jù),之后對數(shù)據(jù)進行解析,轉(zhuǎn)換成我們的目標格式
          2.將數(shù)據(jù)按照實驗+小時分索引存入ES中
          3.接受到用戶請求后,將請求按照實驗+特征+小時組合,創(chuàng)建多個異步任務(wù),由這些異步任務(wù)并行從ES中過濾并聚合相關(guān)數(shù)據(jù),得到結(jié)果
          4.將異步任務(wù)的結(jié)果進行合并,返回給前端進行展示

          代碼實現(xiàn)

          異步任務(wù)
          // 啟動并行任務(wù)

          final Map>> futures = Maps.newHashMap();

          for(String metric : metrics) { // 遍歷要計算的指標

          final SampleRatio sampleRatio = getSampleRatio(metric);

          for (String exptId : expts) { // 遍歷目標實驗列表

          for (String id : features) { // 遍歷要分析的特征

          final String name = getMetricsName(exptId, sampleRatio, id);

          final List> resultList = Lists.newArrayList();

          for (Date hour : coveredHours) { // 將時間按照小時進行拆分

          final String fieldName = getFieldName(isFect ? Constants.FACET_COLLECT : Constants.FEATURE_COLLECT, id);

          final GetCoverageTask task = new GetCoverageTask(exptId, fieldName, sampleRatio, hour);

          // 啟動并行任務(wù)

          final Future future = TaskExecutor.submit(task);

          resultList.add(future);

          }

          futures.put(name, resultList);

          }

          }

          }

          final QueryRes queryRes = new QueryRes();

          final Iterator>>> it = futures.entrySet().iterator();

          while (it.hasNext()){

          // 省略結(jié)果處理流程

          }
          指標計算
          // 1\. 對文檔進行聚合運行,分別得到基礎(chǔ)文檔的數(shù)量,以及目標文檔數(shù)量

          final AggregationBuilder[] agg = getAggregationBuilder(sampleRatio, fieldName);

          final SearchSourceBuilder searchBuilder = new SearchSourceBuilder();

          searchBuilder.aggregation(agg[0]).aggregation(agg[1]).size(0);

          // 2\. 得到覆蓋率

          final String indexName = getIndexName(exptId, hour);

          final Search search = new Search.Builder(searchBuilder.toString())

          .addIndex(indexName).addType(getType()).build();

          final SearchResult result = jestClient.execute(search);

          if(result.getResponseCode() != HttpUtils.STATUS_CODE_200){

          // 請求出錯

          log.warn(result.getErrorMessage());

          return 0f;

          }

          final MetricAggregation aggregations = result.getAggregations();

          // 3\. 解析結(jié)果

          final long dividend ;

          if(SampleRatio.ALL == sampleRatio){

          dividend = aggregations.getValueCountAggregation(Constants.DIVIDEND).getValueCount();

          }else {

          dividend = aggregations.getFilterAggregation(Constants.DIVIDEND).getCount();

          }

          // 防止出現(xiàn)被除數(shù)為0時程序異常

          if(dividend <= 0){

          return 0f;

          }

          long divisor = aggregations.getFilterAggregation(Constants.DIVISOR).getCount();

          return divisor / (float)dividend;
          聚合
          int label = 0;

          final ExistsQueryBuilder existsQuery = QueryBuilders.existsQuery(fieldName);

          // 包含指定特征的正樣本數(shù)量

          final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

          final List must = boolQuery.must();

          // 計算樣本數(shù)量

          TermQueryBuilder labelQuery = null;

          if(SampleRatio.POSITIVE == sampleRatio) {

          // 計算正樣本數(shù)量

          label = 1;

          labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);

          must.add(labelQuery);

          }else if(SampleRatio.NEGATIVE == sampleRatio) {

          // 計算負樣本數(shù)量

          labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);

          must.add(labelQuery);

          }

          must.add(existsQuery);

          final ValueCountAggregationBuilder existsCountAgg = AggregationBuilders.count(sampleRatio.getField());

          existsCountAgg.field(fieldName);

          final FilterAggregationBuilder filterAgg = AggregationBuilders.filter(aggName, boolQuery);

          filterAgg.subAggregation(existsCountAgg);

          return filterAgg;

          上線效果

          上線后表現(xiàn)完全滿足預期,平均請求耗時在3秒左右,用戶體驗良好。感謝各位小伙伴的辛苦付出~~

          下圖是ES中部分索引的信息:



          突破性能瓶頸!ElasticSearch百億級數(shù)據(jù)檢索優(yōu)化案例
          ElasticSearch讀寫底層原理及性能調(diào)優(yōu)
          一文俯瞰Elasticsearch核心原理


          文章不錯?點個【在看】吧!???
          瀏覽 54
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲第一区视频 | 色综合网址 | 东京热视频专区 | 五月情丁香五月情婷婷 | 欧美日韩大鸡巴 |