<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          基于Flink快速開(kāi)發(fā)實(shí)時(shí)TopN程序最簡(jiǎn)單的思路

          共 5196字,需瀏覽 11分鐘

           ·

          2020-09-29 14:16

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)

          回復(fù)”資源“獲取更多資源

          11be74a037024c28e98ac74e6f2f02e3.webp

          a32bc2717e887d7fc9be7390db996a96.webp

          大數(shù)據(jù)技術(shù)與架構(gòu)點(diǎn)擊右側(cè)關(guān)注,大數(shù)據(jù)開(kāi)發(fā)領(lǐng)域最強(qiáng)公眾號(hào)!

          d44dce709aff2e8ab28bc5a36d96cdb4.webp

          大數(shù)據(jù)真好玩點(diǎn)擊右側(cè)關(guān)注,大數(shù)據(jù)真好玩!21483bfb68dc01820d10e9dad3916056.webp


          TopN 是統(tǒng)計(jì)報(bào)表和大屏非常常見(jiàn)的功能,主要用來(lái)實(shí)時(shí)計(jì)算排行榜。流式的TopN可以使業(yè)務(wù)方在內(nèi)存中按照某個(gè)統(tǒng)計(jì)指標(biāo)(如出現(xiàn)次數(shù))計(jì)算排名并快速出發(fā)出更新后的排行榜。

          我們以統(tǒng)計(jì)詞頻為例展示一下如何快速開(kāi)發(fā)一個(gè)計(jì)算TopN的flink程序。

          Flink支持各種各樣的流數(shù)據(jù)接口作為數(shù)據(jù)的數(shù)據(jù)源,本次demo我們采用內(nèi)置的socketTextStream作為數(shù)據(jù)數(shù)據(jù)源。

          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作為時(shí)間語(yǔ)義
          DataStream text = env.socketTextStream(hostName, port); //監(jiān)聽(tīng)指定socket端口作為輸入

          與離線wordcount類似,程序首先需要把輸入的整句文字按照分隔符split成一個(gè)一個(gè)單詞,然后按照單詞為key實(shí)現(xiàn)累加。

          DataStream> ds = text
          .flatMap(new LineSplitter()); //將輸入語(yǔ)句split成一個(gè)一個(gè)單詞并初始化count值為1的Tuple2類型
          private static final class LineSplitter implements
          FlatMapFunction> {

          @Override
          public void flatMap(String value, Collector> out) {
          // normalize and split the line
          String[] tokens = value.toLowerCase().split("\\W+");

          // emit the pairs
          for (String token : tokens) {
          if (token.length() > 0) {
          out.collect(new Tuple2(token, 1));
          }
          }
          }
          }
          DataStream> wcount = ds
          .keyBy(0) //按照Tuple2的第一個(gè)元素為key,也就是單詞
          .window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(20)))
          //key之后的元素進(jìn)入一個(gè)總時(shí)間長(zhǎng)度為600s,每20s向后滑動(dòng)一次的滑動(dòng)窗口
          .sum(1);// 將相同的key的元素第二個(gè)count值相加

          全局TopN

          數(shù)據(jù)流經(jīng)過(guò)前面的處理后會(huì)每20s計(jì)算一次各個(gè)單詞的count值并發(fā)送到下游窗口。

            DataStream> ret = wcount
          .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
          //所有key元素進(jìn)入一個(gè)20s長(zhǎng)的窗口(選20秒是因?yàn)樯嫌未翱诿?0s計(jì)算一輪數(shù)據(jù),topN窗口一次計(jì)算只統(tǒng)計(jì)一個(gè)窗口時(shí)間內(nèi)的變化)
          .process(new TopNAllFunction(5));//計(jì)算該窗口TopN

          windowAll是一個(gè)全局并發(fā)為1的特殊操作,也就是所有元素都會(huì)進(jìn)入到一個(gè)窗口內(nèi)進(jìn)行計(jì)算。

          private static class TopNAllFunction
          extends
          ProcessAllWindowFunction, Tuple2, TimeWindow> {

          private int topSize = 10;

          public TopNAllFunction(int topSize) {
          // TODO Auto-generated constructor stub

          this.topSize = topSize;
          }

          @Override
          public void process(
          ProcessAllWindowFunction, Tuple2, TimeWindow>.Context arg0,
          Iterable> input,
          Collector> out) throws Exception {
          // TODO Auto-generated method stub

          TreeMap> treemap = new TreeMap>(
          new Comparator() {

          @Override
          public int compare(Integer y, Integer x) {
          // TODO Auto-generated method stub
          return (x < y) ? -1 : 1;
          }

          }); //treemap按照key降序排列,相同count值不覆蓋

          for (Tuple2 element : input) {
          treemap.put(element.f1, element);
          if (treemap.size() > topSize) { //只保留前面TopN個(gè)元素
          treemap.pollLastEntry();
          }
          }

          for (Entry> entry : treemap
          .entrySet()) {
          out.collect(entry.getValue());
          }

          }

          }

          分組TopN

          在部分場(chǎng)景下,用戶希望根據(jù)不同的分組進(jìn)行排序,計(jì)算出每個(gè)分組的一個(gè)排行榜。

            wcount.keyBy(new TupleKeySelectorByStart()) // 按照首字母分組
          .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) //20s窗口統(tǒng)計(jì)上游數(shù)據(jù)
          .process(new TopNFunction(5)) //分組TopN統(tǒng)計(jì)
          private static class TupleKeySelectorByStart implements
          KeySelector, String> {

          @Override
          public String getKey(Tuple2 value) throws Exception {
          // TODO Auto-generated method stub
          return value.f0.substring(0, 1); //取首字母做key
          }
          }
          /**
          *
          *針對(duì)keyby window的TopN函數(shù),繼承自ProcessWindowFunction
          *
          */
          private static class TopNFunction
          extends
          ProcessWindowFunction, Tuple2, String, TimeWindow> {

          private int topSize = 10;

          public TopNFunction(int topSize) {
          // TODO Auto-generated constructor stub
          this.topSize = topSize;
          }

          @Override
          public void process(
          String arg0,
          ProcessWindowFunction, Tuple2, String, TimeWindow>.Context arg1,
          Iterable> input,
          Collector> out) throws Exception {
          // TODO Auto-generated method stub

          TreeMap> treemap = new TreeMap>(
          new Comparator() {

          @Override
          public int compare(Integer y, Integer x) {
          // TODO Auto-generated method stub
          return (x < y) ? -1 : 1;
          }

          });

          for (Tuple2 element : input) {
          treemap.put(element.f1, element);
          if (treemap.size() > topSize) {
          treemap.pollLastEntry();
          }
          }

          for (Entry> entry : treemap
          .entrySet()) {
          out.collect(entry.getValue());
          }
          }
          }

          上面的代碼實(shí)現(xiàn)了按照首字母分組,取每組元素count最高的TopN方法。

          嵌套TopN

          全局topN的缺陷是,由于windowall是一個(gè)全局并發(fā)為1的操作,所有的數(shù)據(jù)只能匯集到一個(gè)節(jié)點(diǎn)進(jìn)行 TopN 的計(jì)算,那么計(jì)算能力就會(huì)受限于單臺(tái)機(jī)器,容易產(chǎn)生數(shù)據(jù)熱點(diǎn)問(wèn)題。

          解決思路就是使用嵌套 TopN,或者說(shuō)兩層 TopN。在原先的 TopN 前面,再加一層 TopN,用于分散熱點(diǎn)。例如可以先加一層分組 TopN,第一層會(huì)計(jì)算出每一組的 TopN,而后在第二層中進(jìn)行合并匯總,得到最終的全網(wǎng)TopN。第二層雖然仍是單點(diǎn),但是大量的計(jì)算量由第一層分擔(dān)了,而第一層是可以水平擴(kuò)展的。

          e2d417e9371692fca39820ebd2b0b616.webpd2c067f596ba19533cc9a419995fd6db.webp

          版權(quán)聲明:

          本文為大數(shù)據(jù)技術(shù)與架構(gòu)整理,原作者獨(dú)家授權(quán)。未經(jīng)原作者允許轉(zhuǎn)載追究侵權(quán)責(zé)任。編輯|冷眼丶微信公眾號(hào)|import_bigdata


          歡迎點(diǎn)贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連



          文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??

          瀏覽 65
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  中文字幕无码一区二区三区一本久道不卡 | 日韩美女少妇 | 免费一区二区三区四区五区 | 屁屁影院—线路①屁屁影院 | 亚洲免费黄色电影 |