<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark+Kudu的廣告業(yè)務(wù)項目實戰(zhàn)筆記(一)

          共 4076字,需瀏覽 9分鐘

           ·

          2020-08-20 06:50

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)

          回復(fù)”資源“獲取更多資源

          大數(shù)據(jù)技術(shù)與架構(gòu)
          點(diǎn)擊右側(cè)關(guān)注,大數(shù)據(jù)開發(fā)領(lǐng)域最強(qiáng)公眾號!

          暴走大數(shù)據(jù)
          點(diǎn)擊右側(cè)關(guān)注,暴走大數(shù)據(jù)!

          1.簡介

          本項目需要實現(xiàn):將廣告數(shù)據(jù)的json文件放置在HDFS上,并利用spark進(jìn)行ETL操作、分析操作,之后存儲在kudu上,最后設(shè)定每天凌晨三點(diǎn)自動執(zhí)行廣告數(shù)據(jù)的分析存儲操作。

          2.項目需求

          數(shù)據(jù)ETL:原始文件為JSON格式數(shù)據(jù),需原始文件與IP庫中數(shù)據(jù)進(jìn)行解析

          統(tǒng)計各省市的地域分布情況

          統(tǒng)計廣告投放的地域分布情況

          統(tǒng)計廣告投放APP分布情況

          3.項目架構(gòu)

          4.日志字段

          {  "sessionid": "qld2dU4cfhEa3yhADzgphOf3ySv9vMml",  "advertisersid": 66,  "adorderid": 142848,  "adcreativeid": 212312,  "adplatformproviderid": 174663,  "sdkversion": "Android 5.0",  "adplatformkey": "PLMyYnDKQgOPL55frHhxkUIQtBThHfHq",  "putinmodeltype": 1,  "requestmode": 1,  "adprice": 8410.0,  "adppprice": 5951.0,  "requestdate": "2018-10-07",  "ip": "182.91.190.221",  "appid": "XRX1000014",  "appname": "支付寶 - 讓生活更簡單",  "uuid": "QtxDH9HUueM2IffUe8z2UqLKuZueZLqq",  "device": "HUAWEI GX1手機(jī)",  "client": 1,  "osversion": "",  "density": "",  "pw": 1334,  "ph": 750,  "lang": "",  "lat": "",  "provincename": "",  "cityname": "",  "ispid": 46007,  "ispname": "移動",  "networkmannerid": 1,  "networkmannername": "4G",  "iseffective": 1,  "isbilling": 1,  "adspacetype": 3,  "adspacetypename": "全屏",  "devicetype": 1,  "processnode": 3,  "apptype": 0,  "district": "district",  "paymode": 1,  "isbid": 1,  "bidprice": 6812.0,  "winprice": 89934.0,  "iswin": 0,  "cur": "rmb",  "rate": 0.0,  "cnywinprice": 0.0,  "imei": "",  "mac": "52:54:00:41:ba:02",  "idfa": "",  "openudid": "FIZHDPIKQYVNHOHOOAWMTQDFTPNWAABZTAFVHTEL",  "androidid": "",  "rtbprovince": "",  "rtbcity": "",  "rtbdistrict": "",  "rtbstreet": "",  "storeurl": "",  "realip": "182.92.196.236",  "isqualityapp": 0,  "bidfloor": 0.0,  "aw": 0,  "ah": 0,  "imeimd5": "",  "macmd5": "",  "idfamd5": "",  "openudidmd5": "",  "androididmd5": "",  "imeisha1": "",  "macsha1": "",  "idfasha1": "",  "openudidsha1": "",  "androididsha1": "",  "uuidunknow": "",  "userid": "vtUO8pPXfwdsPnvo6ttNGhAAnHi8NVbA",  "reqdate": null,  "reqhour": null,  "iptype": 1,  "initbidprice": 0.0,  "adpayment": 175547.0,  "agentrate": 0.0,  "lomarkrate": 0.0,  "adxrate": 0.0,  "title": "中信建投首次公開發(fā)行股票發(fā)行結(jié)果 本次發(fā)行價格為5.42元/股",  "keywords": "IPO,中信建投證券,股票,投資,財經(jīng)",  "tagid": "rBRbAEQhkcAaeZ6XlTrGXOxyw6w9JQ7x",  "callbackdate": "2018-10-07",  "channelid": "123528",  "mediatype": 2,  "email": "[email protected]",  "tel": "13105823726",  "age": "29",  "sex": "0"}

          5.IP規(guī)則庫解析

          本項目利用IP規(guī)則庫進(jìn)行解析,在生產(chǎn)中應(yīng)該需要專門的公司提供的IP服務(wù)。IP規(guī)則庫中的一條如下:

          1.0.1.0|1.0.3.255|16777472|16778239|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302

          其中第三列是該段ip起始地址(十進(jìn)制),第四列是ip終止地址(十進(jìn)制)。

          新建LogETLApp.scala:

          package com.imooc.bigdata.cp08
          import com.imooc.bigdata.cp08.utils.IPUtilsimport org.apache.spark.sql.SparkSession
          object LogETLApp {
          def main(args: Array[String]): Unit = {
          //啟動本地模式的spark val spark = SparkSession.builder() .master("local[2]") .appName("LogETLApp") .getOrCreate()
          //使用DataSourceAPI直接加載json數(shù)據(jù) var jsonDF = spark.read.json("data-test.json") //jsonDF.printSchema() //jsonDF.show(false)
          //導(dǎo)入隱式轉(zhuǎn)換 import spark.implicits._ //加載IP庫,建議將RDD轉(zhuǎn)成DF val ipRowRDD = spark.sparkContext.textFile("ip.txt") val ipRuleDF = ipRowRDD.map(x => { val splits = x.split("\\|") val startIP = splits(2).toLong val endIP = splits(3).toLong val province = splits(6) val city = splits(7) val isp = splits(9)
          (startIP, endIP, province, city, isp) }).toDF("start_ip", "end_ip", "province", "city", "isp") //ipRuleDF.show(false)
          //利用Spark SQL UDF轉(zhuǎn)換json中的ip import org.apache.spark.sql.functions._ def getLongIp() = udf((ip:String)=>{ IPUtils.ip2Long(ip) })
          //添加字段傳入十進(jìn)制IP jsonDF = jsonDF.withColumn("ip_long", getLongIp()($"ip"))
          //將日志每一行的ip對應(yīng)省份、城市、運(yùn)行商進(jìn)行解析 //兩個DF進(jìn)行join,條件是:json中的ip在規(guī)則ip中的范圍內(nèi) jsonDF.join(ipRuleDF,jsonDF("ip_long") .between(ipRuleDF("start_ip"),ipRuleDF("end_ip"))) .show(false)
          spark.stop() }}

          工具類中將字符串轉(zhuǎn)成十進(jìn)制的IPUtils.scala:

          package com.imooc.bigdata.cp08.utils
          object IPUtils {
          //字符串->十進(jìn)制 def ip2Long(ip:String)={ val splits = ip.split("[.]") var ipNum = 0L
          for(i<-0 until(splits.length)){ //“|”是按位或操作,有1即1,全0則0 //“<<”是整體左移 //也就是說每一個數(shù)字算完向前移動8位接下一個數(shù)字 ipNum = splits(i).toLong | ipNum << 8L } ipNum }
          def main(args: Array[String]): Unit = { println(ip2Long("1.1.1.1"))??}}

          其實也可以用SQL語句達(dá)到相同的效果:

              //用SQL的方式完成    jsonDF.createOrReplaceTempView("logs")    ipRuleDF.createOrReplaceTempView("ips")    val sql = SQLUtils.SQL    spark.sql(sql).show(false)

          在SQLUtils中寫上SQL,因為ip_long已經(jīng)解析出來了,主要就做了一個left join:

          package com.imooc.bigdata.cp08.utils
          //項目相關(guān)的SQL工具類object SQLUtils {
          lazy val SQL = "select " + "logs.ip ," + "logs.sessionid," + "logs.advertisersid," + "logs.adorderid," + "logs.adcreativeid," + "logs.adplatformproviderid" + ",logs.sdkversion" + ",logs.adplatformkey" + ",logs.putinmodeltype" + ",logs.requestmode" + ",logs.adprice" + ",logs.adppprice" + ",logs.requestdate" + ",logs.appid" + ",logs.appname" + ",logs.uuid, logs.device, logs.client, logs.osversion, logs.density, logs.pw, logs.ph" + ",ips.province as provincename" + ",ips.city as cityname" + ",ips.isp as isp" + ",logs.ispid, logs.ispname" + ",logs.networkmannerid, logs.networkmannername, logs.iseffective, logs.isbilling" + ",logs.adspacetype, logs.adspacetypename, logs.devicetype, logs.processnode, logs.apptype" + ",logs.district, logs.paymode, logs.isbid, logs.bidprice, logs.winprice, logs.iswin, logs.cur" + ",logs.rate, logs.cnywinprice, logs.imei, logs.mac, logs.idfa, logs.openudid,logs.androidid" + ",logs.rtbprovince,logs.rtbcity,logs.rtbdistrict,logs.rtbstreet,logs.storeurl,logs.realip" + ",logs.isqualityapp,logs.bidfloor,logs.aw,logs.ah,logs.imeimd5,logs.macmd5,logs.idfamd5" + ",logs.openudidmd5,logs.androididmd5,logs.imeisha1,logs.macsha1,logs.idfasha1,logs.openudidsha1" + ",logs.androididsha1,logs.uuidunknow,logs.userid,logs.iptype,logs.initbidprice,logs.adpayment" + ",logs.agentrate,logs.lomarkrate,logs.adxrate,logs.title,logs.keywords,logs.tagid,logs.callbackdate" + ",logs.channelid,logs.mediatype,logs.email,logs.tel,logs.sex,logs.age " + "from logs left join " + "ips on logs.ip_long between ips.start_ip and ips.end_ip "
          }

          6.存入Kudu

          打開Kudu:

          cd /etc/init.d/llsudo ./kudu-master startsudo ./kudu-tserver start

          在8050端口看下是否能進(jìn)入Kudu的可視化界面。

              val result = jsonDF.join(ipRuleDF, jsonDF("ip_long")      .between(ipRuleDF("start_ip"), ipRuleDF("end_ip")))      //.show(false)
          //創(chuàng)建Kudu表 val masterAddresses = "hadoop000" val tableName = "ods" val client = new KuduClientBuilder(masterAddresses).build()
          if(client.tableExists(tableName)){ client.deleteTable(tableName) }
          val partitionId = "ip" val schema = SchemaUtils.ODSSchema val options = new CreateTableOptions() options.setNumReplicas(1)
          val parcols = new util.LinkedList[String]() parcols.add(partitionId) options.addHashPartitions(parcols,3)
          client.createTable(tableName,schema,options)
          //數(shù)據(jù)寫入Kudu result.write.mode(SaveMode.Append) .format("org.apache.kudu.spark.kudu") .option("kudu.table",tableName) .option("kudu.master",masterAddresses) .save()

          Schema數(shù)據(jù)如下所示:

          lazy val ODSSchema: Schema = {    val columns = List(      new ColumnSchemaBuilder("ip", Type.STRING).nullable(false).key(true).build(),      new ColumnSchemaBuilder("sessionid", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("advertisersid",Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("adorderid", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("adcreativeid", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("adplatformproviderid", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("sdkversion", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("adplatformkey", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("putinmodeltype", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("requestmode", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("adprice", Type.DOUBLE).nullable(false).build(),      new ColumnSchemaBuilder("adppprice", Type.DOUBLE).nullable(false).build(),      new ColumnSchemaBuilder("requestdate", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("appid", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("appname", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("uuid", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("device", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("client", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("osversion", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("density", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("pw", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("ph", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("provincename", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("cityname", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("ispid", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("ispname", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("isp", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("networkmannerid", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("networkmannername", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("iseffective", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("isbilling", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("adspacetype", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("adspacetypename", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("devicetype", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("processnode", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("apptype", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("district", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("paymode", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("isbid", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("bidprice", Type.DOUBLE).nullable(false).build(),      new ColumnSchemaBuilder("winprice", Type.DOUBLE).nullable(false).build(),      new ColumnSchemaBuilder("iswin", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("cur", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("rate", Type.DOUBLE).nullable(false).build(),      new ColumnSchemaBuilder("cnywinprice", Type.DOUBLE).nullable(false).build(),      new ColumnSchemaBuilder("imei", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("mac", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("idfa", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("openudid", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("androidid", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("rtbprovince", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("rtbcity", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("rtbdistrict", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("rtbstreet", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("storeurl", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("realip", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("isqualityapp", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("bidfloor", Type.DOUBLE).nullable(false).build(),      new ColumnSchemaBuilder("aw", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("ah", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("imeimd5", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("macmd5", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("idfamd5", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("openudidmd5", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("androididmd5", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("imeisha1", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("macsha1", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("idfasha1", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("openudidsha1", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("androididsha1", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("uuidunknow", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("userid", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("iptype", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("initbidprice", Type.DOUBLE).nullable(false).build(),      new ColumnSchemaBuilder("adpayment", Type.DOUBLE).nullable(false).build(),      new ColumnSchemaBuilder("agentrate", Type.DOUBLE).nullable(false).build(),      new ColumnSchemaBuilder("lomarkrate", Type.DOUBLE).nullable(false).build(),      new ColumnSchemaBuilder("adxrate", Type.DOUBLE).nullable(false).build(),      new ColumnSchemaBuilder("title", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("keywords", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("tagid", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("callbackdate", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("channelid", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("mediatype", Type.INT64).nullable(false).build(),      new ColumnSchemaBuilder("email", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("tel", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("sex", Type.STRING).nullable(false).build(),      new ColumnSchemaBuilder("age", Type.STRING).nullable(false).build()    ).asJava    new Schema(columns)  }

          數(shù)據(jù)寫入成功后在Kudu可視化界面檢查一下:

          最后在IDEA里看下數(shù)據(jù)是否寫入成功了:

              spark.read.format("org.apache.kudu.spark.kudu")        .option("kudu.master",masterAddresses)        .option("kudu.table",tableName)        .load().show()

          結(jié)果為:

          說明導(dǎo)入成功。

          7.代碼重構(gòu)

          建立KuduUtils.scala進(jìn)行重構(gòu),需要傳入的內(nèi)容為result/tableName/master/schema/partitionId

          package com.imooc.bigdata.cp08.utils
          import java.util
          import com.imooc.bigdata.chapter08.utils.SchemaUtilsimport org.apache.kudu.Schemaimport org.apache.kudu.client.{CreateTableOptions, KuduClient}import org.apache.kudu.client.KuduClient.KuduClientBuilderimport org.apache.spark.sql.{DataFrame, SaveMode} object KuduUtils {
          /** * 將DF數(shù)據(jù)落地到Kudu * @param data DF結(jié)果集 * @param tableName Kudu目標(biāo)表 * @param master Kudu的Master地址 * @param schema Kudu的schema信息 * @param partitionId Kudu表的分區(qū)字段 */ def sink(data:DataFrame, tableName:String, master:String, schema:Schema, partitionId:String)={ val client = new KuduClientBuilder(master).build()
          if(client.tableExists(tableName)){ client.deleteTable(tableName) }
          val options = new CreateTableOptions() options.setNumReplicas(1)
          val parcols = new util.LinkedList[String]() parcols.add(partitionId) options.addHashPartitions(parcols,3)
          client.createTable(tableName,schema,options)
          //數(shù)據(jù)寫入Kudu data.write.mode(SaveMode.Append) .format("org.apache.kudu.spark.kudu") .option("kudu.table",tableName) .option("kudu.master",master) .save() // spark.read.format("org.apache.kudu.spark.kudu")// .option("kudu.master",master)// .option("kudu.table",tableName)// .load().show() }}
          在主函數(shù)中調(diào)用:
          val masterAddresses = "hadoop000"    val tableName = "ods"    val partitionId = "ip"    val schema = SchemaUtils.ODSSchema
          KuduUtils.sink(result,tableName,masterAddresses,schema,partitionId)
          再次檢查數(shù)據(jù)是否上傳即可。

          歡迎點(diǎn)贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連



          文章不錯?點(diǎn)個【在看】吧!??

          瀏覽 71
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日皮视频免费 | 全免费一级毛片免费看无码播放 | 天天撸一撸免费视频 | 免费AV黄色 | 男女午夜激情福利视频 |