Spark處理的一些業(yè)務(wù)場(chǎng)景
Sparksql在處理一些具體的業(yè)務(wù)場(chǎng)景的時(shí)候,可以通過(guò)算子操作,或者RDD之間的轉(zhuǎn)換來(lái)完成負(fù)責(zé)業(yè)務(wù)的數(shù)據(jù)處理,在日常做需求的時(shí)候,整理出來(lái)一下幾個(gè)經(jīng)典的業(yè)務(wù)場(chǎng)景的解決方案,供大家參考。
1、取商家任務(wù)(task=1,2,3)全部完成的最早時(shí)間(注意如果任務(wù)3沒(méi)有完成,則表中無(wú)3的數(shù)據(jù),這種情況下全部完成時(shí)間為空)
業(yè)務(wù)背景:
商家在開(kāi)通店鋪服務(wù)的時(shí)候,會(huì)由商家服務(wù)人員去跟進(jìn)商家完成開(kāi)店任務(wù),如:創(chuàng)建店鋪(task_id=1),完成交易(task_id=2),創(chuàng)建營(yíng)銷活動(dòng)(task_id=3),那么在考核服務(wù)人員是否做好服務(wù)的定義是:商家在一個(gè)月內(nèi)是否完成所有開(kāi)店的任務(wù),因此需要統(tǒng)計(jì)商家完成全部任務(wù)的最早時(shí)間,以判斷服務(wù)的好壞。
原始數(shù)據(jù):
原始數(shù)據(jù):
table:test
shop_id |task_id |finish_time
001 |1 |2020-03-01 09:00:00
001 |1 |2020-04-01 09:00:00
001 |2 |2020-03-12 09:00:00
001 |3 |2020-03-10 09:00:00
001 |3 |2020-03-02 09:00:00
002 |1 |2020-04-01 09:00:00
輸出結(jié)果:
shop_id |finish_time
001 |2020-03-12 09:00:00
002 |分析:
1、每個(gè)店鋪都會(huì)有3個(gè)流程,只有流程走完才會(huì)有最早完成時(shí)間。
2、每個(gè)流程都會(huì)有多次的完成時(shí)間,同一個(gè)店鋪同一個(gè)流程要取最早的完成時(shí)間。
3、不同流程完成時(shí)間中取最早的完成時(shí)間為這個(gè)店鋪的最后的最早完成時(shí)間。
解決方案:
1、先按照shopid,task_id作為主鍵來(lái)獲取每個(gè)店鋪、每個(gè)任務(wù)節(jié)點(diǎn)的最早完成時(shí)間,那么得出結(jié)果如下:
shop_id |task_id |finish_time
001 |1 |2020-03-01 09:00:00
001 |2 |2020-03-12 09:00:00
001 |3 |2020-03-02 09:00:00
002 |1 |2020-04-01 09:00:00
2、然后按照shopid做為主鍵,對(duì)task_id進(jìn)行聚合統(tǒng)計(jì),對(duì)finish_time進(jìn)行排序獲取最新的時(shí)間,得出結(jié)果如下:
shop_id |task_num |finish_time
001 |3 |2020-03-12 09:00:00
002 |1 |2020-04-01 09:00:003、判斷task_num個(gè)數(shù)是否為3,如果為3,那么店鋪完成所有的業(yè)務(wù),就輸出這一行,如果不為3,那么未完成所有業(yè)務(wù),finish_time變?yōu)閚ull,結(jié)果如下:
shop_id |task_num |finish_time
001 |3 |2020-03-12 09:00:00
002 |1 |Spark的處理邏輯:
val DF = Spark.sql("select shop_id,task_id,unix_timestamp(finish_time) as ft from test")val RDD = DF.rdd.map(f => ((f.getAs[String]("shop_id"),f.getAs[Int]("task_id")),f.getAs[Long]("ft"))).groupByKey().map(f => {val shop_id = f._1._1val task_id = f._1._2val list = f._2.toList.sortWith(_ < _)(shop_id,task_id,list.head)}).map(f => (f._1,(f._2,f._3))).groupByKey().map(f => {val shop_id = f._1val list = f._2.toList.sortWith(_._2 > _._2)if (list.length == 3){(shop_id,list.length,list.head._2)}else(shop_id,list.length,0L)})
2、取登陸用戶的最大連續(xù)登陸天數(shù)。
業(yè)務(wù)場(chǎng)景:
某C端APP,每天會(huì)記錄登陸用戶的登陸時(shí)間,然后需要統(tǒng)計(jì)用戶在一段周期內(nèi)的最長(zhǎng)連續(xù)登陸的天數(shù)/或者沒(méi)有登陸的天數(shù)。
同時(shí)這個(gè)業(yè)務(wù)場(chǎng)景在監(jiān)控里面也可以使用:例如取數(shù)據(jù)表中最近連續(xù)穩(wěn)定(數(shù)據(jù)量不變)的天數(shù)等等。
原始數(shù)據(jù):
user_id |ds001 |2020-03-01001 |2020-03-02001 |2020-03-03001 |2020-03-04001 |2020-03-06001 |2020-03-07002 |2020-03-01002 |2020-03-04001 |2020-03-05
結(jié)果:
user_id |num001 |4002 |2
分析:
這塊主要處理的問(wèn)題是連續(xù)登陸的問(wèn)題,如何取判斷用戶是連續(xù)登陸。
1、對(duì)用戶的登陸時(shí)間進(jìn)行排序;
2、計(jì)算每?jī)蓚€(gè)時(shí)間的時(shí)間差,如果對(duì)應(yīng)的時(shí)間差為1天,那么就是連續(xù)登陸,如果大于1,則為非連續(xù);
3、統(tǒng)計(jì)時(shí)間差對(duì)應(yīng)數(shù)組中連續(xù)為1的最大長(zhǎng)度就是最大的連續(xù)登陸天數(shù)。
Spark的處理邏輯:
val DF = Spark.sql("select uid,unix_timestamp(ds) as dt from test")val RDD = DF.rdd.map(f => (f.getAs[String]("uid"),f.getAs[Long]("dt"))).groupByKey().map(f => {val uid = f._1val ir = f._2.toBuffer.sortWith(_ < _)var array: Array[Long] = Array()var num = 0Lfor (i <- 0 to ir.length - 2) {val subTime = ir(i + 1) - ir(i)val during = subTime/86400Lif (during == 1L){num = num+1Larray = array :+ num}else{num = 0L}}(uid,array.max)})
原理:
例如:array如下(也就是時(shí)間差對(duì)應(yīng)的數(shù)組):Array[Long] = Array(1, 2, 1, 1, 1, 2, 1, 1, 1, 1)var num = 0Lvar arr: Array[Long] = Array()for (i <- 0 to array.length - 1) {if (array(i) == 1L){num = num + 1Larr = arr :+ num}else{num = 0L}}輸出:arr:Array[Long] = Array(1, 1, 2, 3, 1, 2, 3, 4)而arr.max = 4 也就是最大連續(xù)登陸的天數(shù)。
3、如何讓業(yè)務(wù)方能夠自由篩選當(dāng)天分鐘級(jí)別的新增訪問(wèn)用戶數(shù)。
業(yè)務(wù)背景:
在做flink的實(shí)時(shí)大屏統(tǒng)計(jì)的時(shí)候,只能選在到當(dāng)天當(dāng)前這個(gè)時(shí)刻的新增用戶數(shù)有多少,但是業(yè)務(wù)方需要通過(guò)時(shí)間篩選,可能在8點(diǎn)30的時(shí)候,需要去看到8點(diǎn)25的時(shí)候,今天新增了多少訪問(wèn)用戶,而且這個(gè)時(shí)間區(qū)間是隨機(jī)的,而且是到分鐘維度的。
分析:
如果數(shù)據(jù)量小的情況下:
通過(guò)canal監(jiān)聽(tīng)業(yè)務(wù)庫(kù)的binlog,然后寫到Kafka通過(guò)flink進(jìn)行binlog解析,生成用戶的第一次登陸時(shí)間寫到mysql,供后端同學(xué)通過(guò)業(yè)務(wù)邏輯進(jìn)行篩選,就可以達(dá)到任意區(qū)間,任意范圍的新增訪問(wèn)用戶的圈選。
但是在C端數(shù)據(jù)量偏大的情況下,顯然不能存儲(chǔ)全量數(shù)據(jù),就算存儲(chǔ)也不能按照hive的方式存儲(chǔ),uid + fisrt_time這種模式進(jìn)行存儲(chǔ)。
那么數(shù)據(jù)量大的情況下,如何解決呢:
1、可以按照分鐘進(jìn)行存儲(chǔ),數(shù)據(jù)的主鍵就是時(shí)間戳到分鐘級(jí)別的,然后統(tǒng)計(jì)每分鐘第一次訪問(wèn)的用戶量,那么一天的數(shù)據(jù)也就是1440行,每一行存的就是第一次訪問(wèn)時(shí)間在這個(gè)分鐘內(nèi)的用戶量。
time_min |num2020-08-18 09:01:00 |40022020-08-18 09:02:00 |50022020-08-18 09:03:00 |5202
這樣存儲(chǔ)之后后端可以通過(guò)時(shí)間區(qū)間進(jìn)行篩選后相加得到某個(gè)分鐘級(jí)別區(qū)間端的第一次訪問(wèn)的用戶數(shù)據(jù)。
2、不過(guò)上面的方案有個(gè)缺陷,雖然將用戶維度為主鍵修改為分鐘維度的主鍵,數(shù)據(jù)量減少了很多,但是可能業(yè)務(wù)方需要的不僅僅是用戶量,還要具體的用戶ID,來(lái)針對(duì)性進(jìn)行投放,那么上面的方案就不太適合了。
針對(duì)上面的業(yè)務(wù)場(chǎng)景,可以選用Hbase進(jìn)行優(yōu)化。
rowKey |uid2020-08-18 09:01:00+timetamp |{uid1,uid2,uid3}2020-08-18 09:02:00+timetamp |{uid21,uid23,uid33}2020-08-18 09:03:00+timetamp |{uid13,uid24,uid35}
由于Hbase本身是列存儲(chǔ)的,如果將分鐘級(jí)別的時(shí)間戳作為RowKey,是可以很快的定位到數(shù)據(jù)所在的位置,不必進(jìn)行全表掃描,這樣查詢效率會(huì)很快。
不過(guò)這個(gè)場(chǎng)景沒(méi)有驗(yàn)證過(guò),但是在用戶畫像的需求中是通過(guò)這個(gè)邏輯來(lái)實(shí)現(xiàn)秒級(jí)別的查詢的。
4、遞歸的方式來(lái)解析JSON串(樹(shù)結(jié)構(gòu))
業(yè)務(wù)背景:
在處理IM需求的時(shí)候,需要對(duì)客服的評(píng)價(jià)進(jìn)行打分,而客服的評(píng)價(jià)系統(tǒng)是分為多個(gè)層級(jí),不同類型,當(dāng)初設(shè)計(jì)這個(gè)層級(jí)關(guān)系的時(shí)候是按照樹(shù)結(jié)構(gòu)進(jìn)行涉及的,最多能下層4集合,但是每一層的都會(huì)有具體行為的選擇和對(duì)應(yīng)的得分情況。某一個(gè)層級(jí)可以包含多個(gè)下屬層級(jí)。
具體結(jié)構(gòu)如下:

層級(jí)架構(gòu)如下:

分析:
1、本身是一個(gè)數(shù)組,數(shù)組的元素是JSON串,基本字段一致,每一層級(jí)都是包含基本字符串信息:level,id,lbalel,value,parentID,children。
2、children的value也是一個(gè)數(shù)組,和上面的數(shù)組模式一樣同時(shí)包含全部字段。
3、最后的層級(jí)最多到第四層結(jié)束,或者說(shuō)是判斷最后對(duì)應(yīng)的children的值是一個(gè)空數(shù)組結(jié)束。
所以這個(gè)模式可以利用遞歸進(jìn)行調(diào)用解析,最后的判定條件是children的值是否為空為止。
代碼模式:
var res = new ArrayBuffer[(String,String,String,String,String)]()def JsonFunc(Son: String,rest:ArrayBuffer[(String,String,String,String,String)]): ArrayBuffer[(String,String,String,String,String)] = {val jsonArray = JSON.parseArray(Son)if (jsonArray.size() == 0) {println("last")}else{for (i <- 0 to (jsonArray.size() - 1)) {val jsonObj = jsonArray.getJSONObject(i)val label = jsonObj.getOrDefault("label", null).toStringval value = jsonObj.getOrDefault("value", null).toStringval id = jsonObj.getOrDefault("id", null).toStringval level = jsonObj.getOrDefault("level", null).toStringval parentID = jsonObj.getOrDefault("parentID", null).toStringres = res :+ (id,level,parentID,label,value)JsonFunc(jsonObj.getOrDefault("children", null).toString,res)}}res}
