<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark處理的一些業(yè)務(wù)場(chǎng)景

          共 5907字,需瀏覽 12分鐘

           ·

          2021-07-21 22:17

          Sparksql在處理一些具體的業(yè)務(wù)場(chǎng)景的時(shí)候,可以通過(guò)算子操作,或者RDD之間的轉(zhuǎn)換來(lái)完成負(fù)責(zé)業(yè)務(wù)的數(shù)據(jù)處理,在日常做需求的時(shí)候,整理出來(lái)一下幾個(gè)經(jīng)典的業(yè)務(wù)場(chǎng)景的解決方案,供大家參考。

          1、取商家任務(wù)(task=1,2,3)全部完成的最早時(shí)間(注意如果任務(wù)3沒(méi)有完成,則表中無(wú)3的數(shù)據(jù),這種情況下全部完成時(shí)間為空)

          業(yè)務(wù)背景:

          商家在開(kāi)通店鋪服務(wù)的時(shí)候,會(huì)由商家服務(wù)人員去跟進(jìn)商家完成開(kāi)店任務(wù),如:創(chuàng)建店鋪(task_id=1),完成交易(task_id=2),創(chuàng)建營(yíng)銷活動(dòng)(task_id=3),那么在考核服務(wù)人員是否做好服務(wù)的定義是:商家在一個(gè)月內(nèi)是否完成所有開(kāi)店的任務(wù),因此需要統(tǒng)計(jì)商家完成全部任務(wù)的最早時(shí)間,以判斷服務(wù)的好壞。

          原始數(shù)據(jù):

          原始數(shù)據(jù):
          table:test
          shop_id |task_id |finish_time
          001 |1 |2020-03-01 09:00:00
          001 |1 |2020-04-01 09:00:00
          001 |2 |2020-03-12 09:00:00
          001 |3 |2020-03-10 09:00:00
          001 |3 |2020-03-02 09:00:00
          002 |1 |2020-04-01 09:00:00

          輸出結(jié)果:
          shop_id |finish_time
          001 |2020-03-12 09:00:00
          002 |

          分析:
          1、每個(gè)店鋪都會(huì)有3個(gè)流程,只有流程走完才會(huì)有最早完成時(shí)間。

          2、每個(gè)流程都會(huì)有多次的完成時(shí)間,同一個(gè)店鋪同一個(gè)流程要取最早的完成時(shí)間。

          3、不同流程完成時(shí)間中取最早的完成時(shí)間為這個(gè)店鋪的最后的最早完成時(shí)間。

          解決方案:
          1、先按照shopid,task_id作為主鍵來(lái)獲取每個(gè)店鋪、每個(gè)任務(wù)節(jié)點(diǎn)的最早完成時(shí)間,那么得出結(jié)果如下:

          shop_id    |task_id    |finish_time
          001 |1 |2020-03-01 09:00:00
          001 |2 |2020-03-12 09:00:00
          001 |3 |2020-03-02 09:00:00
          002 |1 |2020-04-01 09:00:00

          2、然后按照shopid做為主鍵,對(duì)task_id進(jìn)行聚合統(tǒng)計(jì),對(duì)finish_time進(jìn)行排序獲取最新的時(shí)間,得出結(jié)果如下:

          shop_id    |task_num    |finish_time
          001 |3 |2020-03-12 09:00:00
          002 |1 |2020-04-01 09:00:00

          3、判斷task_num個(gè)數(shù)是否為3,如果為3,那么店鋪完成所有的業(yè)務(wù),就輸出這一行,如果不為3,那么未完成所有業(yè)務(wù),finish_time變?yōu)閚ull,結(jié)果如下:

          shop_id    |task_num    |finish_time
          001 |3 |2020-03-12 09:00:00
          002 |1 |

          Spark的處理邏輯:

            val DF = Spark.sql("select shop_id,task_id,unix_timestamp(finish_time) as ft from test")  val RDD = DF.rdd.map(f => ((f.getAs[String]("shop_id"),     f.getAs[Int]("task_id")),     f.getAs[Long]("ft"))).groupByKey().map(f => {    val shop_id = f._1._1    val task_id = f._1._2    val list = f._2.toList.sortWith(_ < _)    (shop_id,task_id,list.head)  }).map(f => (f._1,(f._2,f._3))).groupByKey().map(f => {    val shop_id = f._1    val list = f._2.toList.sortWith(_._2 > _._2)    if (list.length == 3)      {        (shop_id,list.length,list.head._2)      }    else      (shop_id,list.length,0L)  })


          2、取登陸用戶的最大連續(xù)登陸天數(shù)。

          業(yè)務(wù)場(chǎng)景:

          某C端APP,每天會(huì)記錄登陸用戶的登陸時(shí)間,然后需要統(tǒng)計(jì)用戶在一段周期內(nèi)的最長(zhǎng)連續(xù)登陸的天數(shù)/或者沒(méi)有登陸的天數(shù)。

          同時(shí)這個(gè)業(yè)務(wù)場(chǎng)景在監(jiān)控里面也可以使用:例如取數(shù)據(jù)表中最近連續(xù)穩(wěn)定(數(shù)據(jù)量不變)的天數(shù)等等。

          原始數(shù)據(jù):

          user_id    |ds001        |2020-03-01001        |2020-03-02001        |2020-03-03001        |2020-03-04001        |2020-03-06001        |2020-03-07002        |2020-03-01002        |2020-03-04001        |2020-03-05

          結(jié)果:

          user_id    |num001        |4002        |2

          分析:

          這塊主要處理的問(wèn)題是連續(xù)登陸的問(wèn)題,如何取判斷用戶是連續(xù)登陸。

          1、對(duì)用戶的登陸時(shí)間進(jìn)行排序;

          2、計(jì)算每?jī)蓚€(gè)時(shí)間的時(shí)間差,如果對(duì)應(yīng)的時(shí)間差為1天,那么就是連續(xù)登陸,如果大于1,則為非連續(xù);

          3、統(tǒng)計(jì)時(shí)間差對(duì)應(yīng)數(shù)組中連續(xù)為1的最大長(zhǎng)度就是最大的連續(xù)登陸天數(shù)。

          Spark的處理邏輯:

          val DF = Spark.sql("select uid,unix_timestamp(ds) as dt from test")  val RDD = DF.rdd.map(f => (f.getAs[String]("uid"),     f.getAs[Long]("dt"))).groupByKey().map(f => {    val uid = f._1    val ir = f._2.toBuffer.sortWith(_ < _)    var array: Array[Long] = Array()    var num = 0L    for (i <- 0 to ir.length - 2) {      val subTime = ir(i + 1) - ir(i)      val during = subTime/86400L      if (during == 1L){        num = num+1L        array = array :+ num      }      else{        num = 0L      }    }    (uid,array.max)  })

          原理:

          例如:array如下(也就是時(shí)間差對(duì)應(yīng)的數(shù)組):Array[Long] = Array(1, 2, 1, 1, 1, 2, 1, 1, 1, 1)
          var num = 0L var arr: Array[Long] = Array() for (i <- 0 to array.length - 1) { if (array(i) == 1L){ num = num + 1L arr = arr :+ num } else{ num = 0L } }輸出:arr:Array[Long] = Array(1, 1, 2, 3, 1, 2, 3, 4)而arr.max = 4 也就是最大連續(xù)登陸的天數(shù)。

          3、如何讓業(yè)務(wù)方能夠自由篩選當(dāng)天分鐘級(jí)別的新增訪問(wèn)用戶數(shù)。

          業(yè)務(wù)背景:

          在做flink的實(shí)時(shí)大屏統(tǒng)計(jì)的時(shí)候,只能選在到當(dāng)天當(dāng)前這個(gè)時(shí)刻的新增用戶數(shù)有多少,但是業(yè)務(wù)方需要通過(guò)時(shí)間篩選,可能在8點(diǎn)30的時(shí)候,需要去看到8點(diǎn)25的時(shí)候,今天新增了多少訪問(wèn)用戶,而且這個(gè)時(shí)間區(qū)間是隨機(jī)的,而且是到分鐘維度的。

          分析:
          如果數(shù)據(jù)量小的情況下:

          通過(guò)canal監(jiān)聽(tīng)業(yè)務(wù)庫(kù)的binlog,然后寫到Kafka通過(guò)flink進(jìn)行binlog解析,生成用戶的第一次登陸時(shí)間寫到mysql,供后端同學(xué)通過(guò)業(yè)務(wù)邏輯進(jìn)行篩選,就可以達(dá)到任意區(qū)間,任意范圍的新增訪問(wèn)用戶的圈選。

          但是在C端數(shù)據(jù)量偏大的情況下,顯然不能存儲(chǔ)全量數(shù)據(jù),就算存儲(chǔ)也不能按照hive的方式存儲(chǔ),uid + fisrt_time這種模式進(jìn)行存儲(chǔ)。

          那么數(shù)據(jù)量大的情況下,如何解決呢:

          1、可以按照分鐘進(jìn)行存儲(chǔ),數(shù)據(jù)的主鍵就是時(shí)間戳到分鐘級(jí)別的,然后統(tǒng)計(jì)每分鐘第一次訪問(wèn)的用戶量,那么一天的數(shù)據(jù)也就是1440行,每一行存的就是第一次訪問(wèn)時(shí)間在這個(gè)分鐘內(nèi)的用戶量。

          time_min    |num2020-08-18 09:01:00        |40022020-08-18 09:02:00        |50022020-08-18 09:03:00        |5202

          這樣存儲(chǔ)之后后端可以通過(guò)時(shí)間區(qū)間進(jìn)行篩選后相加得到某個(gè)分鐘級(jí)別區(qū)間端的第一次訪問(wèn)的用戶數(shù)據(jù)。

          2、不過(guò)上面的方案有個(gè)缺陷,雖然將用戶維度為主鍵修改為分鐘維度的主鍵,數(shù)據(jù)量減少了很多,但是可能業(yè)務(wù)方需要的不僅僅是用戶量,還要具體的用戶ID,來(lái)針對(duì)性進(jìn)行投放,那么上面的方案就不太適合了。

          針對(duì)上面的業(yè)務(wù)場(chǎng)景,可以選用Hbase進(jìn)行優(yōu)化。

          rowKey    |uid2020-08-18 09:01:00+timetamp        |{uid1,uid2,uid3}2020-08-18 09:02:00+timetamp        |{uid21,uid23,uid33}2020-08-18 09:03:00+timetamp        |{uid13,uid24,uid35}

          由于Hbase本身是列存儲(chǔ)的,如果將分鐘級(jí)別的時(shí)間戳作為RowKey,是可以很快的定位到數(shù)據(jù)所在的位置,不必進(jìn)行全表掃描,這樣查詢效率會(huì)很快。

          不過(guò)這個(gè)場(chǎng)景沒(méi)有驗(yàn)證過(guò),但是在用戶畫像的需求中是通過(guò)這個(gè)邏輯來(lái)實(shí)現(xiàn)秒級(jí)別的查詢的。

          4、遞歸的方式來(lái)解析JSON串(樹(shù)結(jié)構(gòu))

          業(yè)務(wù)背景:

          在處理IM需求的時(shí)候,需要對(duì)客服的評(píng)價(jià)進(jìn)行打分,而客服的評(píng)價(jià)系統(tǒng)是分為多個(gè)層級(jí),不同類型,當(dāng)初設(shè)計(jì)這個(gè)層級(jí)關(guān)系的時(shí)候是按照樹(shù)結(jié)構(gòu)進(jìn)行涉及的,最多能下層4集合,但是每一層的都會(huì)有具體行為的選擇和對(duì)應(yīng)的得分情況。某一個(gè)層級(jí)可以包含多個(gè)下屬層級(jí)。

          具體結(jié)構(gòu)如下:



          層級(jí)架構(gòu)如下:



          分析:
          1、本身是一個(gè)數(shù)組,數(shù)組的元素是JSON串,基本字段一致,每一層級(jí)都是包含基本字符串信息:level,id,lbalel,value,parentID,children。

          2、children的value也是一個(gè)數(shù)組,和上面的數(shù)組模式一樣同時(shí)包含全部字段。

          3、最后的層級(jí)最多到第四層結(jié)束,或者說(shuō)是判斷最后對(duì)應(yīng)的children的值是一個(gè)空數(shù)組結(jié)束。

          所以這個(gè)模式可以利用遞歸進(jìn)行調(diào)用解析,最后的判定條件是children的值是否為空為止。

          代碼模式:

          var res = new ArrayBuffer[(String,String,String,String,String)]()    def JsonFunc(Son: String,rest:ArrayBuffer[(String,String,String,String,String)]): ArrayBuffer[(String,String,String,String,String)] = {      val jsonArray = JSON.parseArray(Son)      if (jsonArray.size() == 0)  {        println("last")      }      else      {        for (i <- 0 to (jsonArray.size() - 1)) {          val jsonObj = jsonArray.getJSONObject(i)          val label = jsonObj.getOrDefault("label", null).toString          val value = jsonObj.getOrDefault("value", null).toString          val id = jsonObj.getOrDefault("id", null).toString          val level = jsonObj.getOrDefault("level", null).toString          val parentID = jsonObj.getOrDefault("parentID", null).toString          res =  res :+ (id,level,parentID,label,value)          JsonFunc(jsonObj.getOrDefault("children", null).toString,res)        }      }      res    }
          瀏覽 34
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费一级做a爰片性视频 | 一道本二区三区 | 国产肉体ⅩXXX137大胆视频 | 91九色丨国产丨爆乳 | 操女生小逼 |