<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark處理數(shù)據(jù)傾斜過程記錄

          共 6911字,需瀏覽 14分鐘

           ·

          2022-08-25 00:04


          數(shù)據(jù)傾斜是指我們在并行進(jìn)行數(shù)據(jù)處理的時(shí)候,由于數(shù)據(jù)散列引起Spark的單個(gè)Partition的分布不均,導(dǎo)致大量的數(shù)據(jù)集中分布到一臺(tái)或者幾臺(tái)計(jì)算節(jié)點(diǎn)上,導(dǎo)致處理速度遠(yuǎn)低于平均計(jì)算速度,從而拖延導(dǎo)致整個(gè)計(jì)算過程過慢,影響整個(gè)計(jì)算性能。


          數(shù)據(jù)傾斜帶來的問題


          單個(gè)或者多個(gè)Task長尾執(zhí)行,拖延整個(gè)任務(wù)運(yùn)行時(shí)間,導(dǎo)致整體耗時(shí)過大。單個(gè)Task處理數(shù)據(jù)過多,很容易導(dǎo)致OOM。


          數(shù)據(jù)傾斜的產(chǎn)生原因


          數(shù)據(jù)傾斜一般是發(fā)生在 shuffle 類的算子、SQL函數(shù)導(dǎo)致,具體如以下:



          類型RDDSQL
          去重distinctdistinct
          聚合groupByKey、reduceByKey、aggregateByKeygroup by
          關(guān)聯(lián)join、left join、right joinjoin、left join、right join


          通過Spark web ui event timeline觀察明顯長尾任務(wù):



          數(shù)據(jù)傾斜大Key定位


          RDD進(jìn)行抽?。?/span>
          val cscTopKey: Array[(Int, Row)] = sampleSKew(sparkSession,"default.tab_spark","id")println(cscTopKey.mkString("\n"))   def sampleSKew( sparkSession: SparkSession, tableName: String, keyColumn: String ): Array[(Int, Row)] = {    val df: DataFrame = sparkSession.sql("select " + keyColumn + " from " + tableName)    val top10Key: Array[(Int, Row)] = df      .select(keyColumn).sample(withReplacement = false, 0.1).rdd      .map(k => (k, 1)).reduceByKey(_ + _)      .map(k => (k._2, k._1)).sortByKey(ascending = false)      .take(10)    top10Key  }
          SQL進(jìn)行抽?。?/span>
          SELECT  id,conut(1) as cnFROM  default.tab_spark_test_3GROUP BY id  ORDER BY cn DESCLIMIT 100;
          ###結(jié)果集100000,2000012100001,1600012100002,1


          單表數(shù)據(jù)傾斜優(yōu)化


          為了減少 shuffle 數(shù)據(jù)量以及 reduce 端的壓力,通常 Spark SQL 在 map 端會(huì)做一個(gè)partial aggregate(通常叫做預(yù)聚合或者偏聚合),即在 shuffle 前將同一分區(qū)內(nèi)所屬同 key 的記錄先進(jìn)行一個(gè)預(yù)結(jié)算,再將結(jié)果進(jìn)行 shuffle,發(fā)送到 reduce 端做一個(gè)匯總,類似 MR 的提前Combiner,所以執(zhí)行計(jì)劃中 HashAggregate 通常成對出現(xiàn)。但是這種也會(huì)出現(xiàn)問題,如果key重復(fù)的量級特別大,Combiner也是解決不了本質(zhì)問題。


          解決方案:


          sparkSession.udf.register("random_prefix", ( value: Int, num: Int ) => randomPrefixUDF(value, num))sparkSession.udf.register("remove_random_prefix", ( value: String ) => removeRandomPrefixUDF(value))     //t1 增加前綴,t2按照加鹽的key進(jìn)行聚,t3去除加鹽,聚合    val sql =      """        |select        |  id,        |  sum(sell) totalSell        |from        |  (        |    select        |      remove_random_prefix(random_id) id,        |      sell        |    from        |      (        |        select        |          random_id,        |          sum(pic) sell        |        from        |          (        |            select        |              random_prefix(id, 6) random_id,        |              pic        |            from        |              default.tab_spark_test_3        |          ) t1        |        group by random_id        |      ) t2        |  ) t3        |group by        |   id      """.stripMargin      def randomPrefixUDF( value: Int, num: Int ): String = {    new Random().nextInt(num).toString + "_" + value  } def removeRandomPrefixUDF( value: String ): String = {    value.toString.split("_")(1)  }


          表關(guān)聯(lián)數(shù)據(jù)傾斜優(yōu)化

          1、適用場景


          適用于 join 時(shí)出現(xiàn)數(shù)據(jù)傾斜。


          2、解決邏輯


          a.將存在傾斜的表,根據(jù)抽樣結(jié)果,拆分為傾斜 key(skew 表)和沒有傾斜 key(common)的兩個(gè)數(shù)據(jù)集;
          b.將 skew 表的 key 全部加上隨機(jī)前綴,然后對另外一個(gè)不存在嚴(yán)重?cái)?shù)據(jù)傾斜的數(shù)據(jù)集(old 表)整體與隨機(jī)前綴集作笛卡爾乘積(即將數(shù)據(jù)量擴(kuò)大 N 倍,得到 new 表)。
          c.打散的 skew 表 join 擴(kuò)容的 new 表



          union common 表 join old 表

          以下為打散大 key 和擴(kuò)容小表的實(shí)現(xiàn)思路:


          1、打散大表:實(shí)際就是數(shù)據(jù)一進(jìn)一出進(jìn)行處理,對大 key 前拼上隨機(jī)前綴實(shí)現(xiàn)打散;
          2、擴(kuò)容小表:實(shí)際就是將 DataFrame 中每一條數(shù)據(jù),轉(zhuǎn)成一個(gè)集合,并往這個(gè)集合里循環(huán)添加 10 條數(shù)據(jù),最后使用 flatmap 壓平此集合,達(dá)到擴(kuò)容的效果。
           /**   * 打散大表  擴(kuò)容小表 解決數(shù)據(jù)傾斜   *   * @param sparkSession   */  def scatterBigAndExpansionSmall(sparkSession: SparkSession): Unit = {    import sparkSession.implicits._    val saleCourse = sparkSession.sql("select *from sparktuning.sale_course")    val coursePay = sparkSession.sql("select * from sparktuning.course_pay")      .withColumnRenamed("discount", "pay_discount")      .withColumnRenamed("createtime", "pay_createtime")    val courseShoppingCart = sparkSession.sql("select * from sparktuning.course_shopping_cart")      .withColumnRenamed("discount", "cart_discount")      .withColumnRenamed("createtime", "cart_createtime")     // TODO 1、拆分 傾斜的key    val commonCourseShoppingCart: Dataset[Row] = courseShoppingCart.filter(item => item.getAs[Long]("courseid") != 101 && item.getAs[Long]("courseid") != 103)    val skewCourseShoppingCart: Dataset[Row] = courseShoppingCart.filter(item => item.getAs[Long]("courseid") == 101 || item.getAs[Long]("courseid") == 103)     //TODO 2、將傾斜的key打散  打散36份    val newCourseShoppingCart = skewCourseShoppingCart.mapPartitions((partitions: Iterator[Row]) => {      partitions.map(item => {        val courseid = item.getAs[Long]("courseid")        val randInt = Random.nextInt(36)        CourseShoppingCart(courseid, item.getAs[String]("orderid"),          item.getAs[String]("coursename"), item.getAs[String]("cart_discount"),          item.getAs[String]("sellmoney"), item.getAs[String]("cart_createtime"),          item.getAs[String]("dt"), item.getAs[String]("dn"), randInt + "_" + courseid)      })    })    //TODO 3、小表進(jìn)行擴(kuò)容 擴(kuò)大36倍    val newSaleCourse = saleCourse.flatMap(item => {      val list = new ArrayBuffer[SaleCourse]()      val courseid = item.getAs[Long]("courseid")      val coursename = item.getAs[String]("coursename")      val status = item.getAs[String]("status")      val pointlistid = item.getAs[Long]("pointlistid")      val majorid = item.getAs[Long]("majorid")      val chapterid = item.getAs[Long]("chapterid")      val chaptername = item.getAs[String]("chaptername")      val edusubjectid = item.getAs[Long]("edusubjectid")      val edusubjectname = item.getAs[String]("edusubjectname")      val teacherid = item.getAs[Long]("teacherid")      val teachername = item.getAs[String]("teachername")      val coursemanager = item.getAs[String]("coursemanager")      val money = item.getAs[String]("money")      val dt = item.getAs[String]("dt")      val dn = item.getAs[String]("dn")      for (i <- 0 until 36) {        list.append(SaleCourse(courseid, coursename, status, pointlistid, majorid, chapterid, chaptername, edusubjectid,          edusubjectname, teacherid, teachername, coursemanager, money, dt, dn, i + "_" + courseid))      }      list    })     // TODO 4、傾斜的大key 與  擴(kuò)容后的表 進(jìn)行join    val df1: DataFrame = newSaleCourse      .join(newCourseShoppingCart.drop("courseid").drop("coursename"), Seq("rand_courseid", "dt", "dn"), "right")      .join(coursePay, Seq("orderid", "dt", "dn"), "left")      .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"        , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",        "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")      // TODO 5、沒有傾斜大key的部分 與 原來的表 進(jìn)行join    val df2: DataFrame = saleCourse      .join(commonCourseShoppingCart.drop("coursename"), Seq("courseid", "dt", "dn"), "right")      .join(coursePay, Seq("orderid", "dt", "dn"), "left")      .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"        , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",        "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")     // TODO 6、將 傾斜key join后的結(jié)果 與 普通key join后的結(jié)果,uinon起來    df1      .union(df2)      .write.mode(SaveMode.Overwrite).insertInto("sparktuning.salecourse_detail")  }

          瀏覽 43
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  蜜臀久久精品久久久久宅男 | 99中文字幕在线观看 | 91看片鸡巴大 | 久久久国产精品无码 | 久久精品国产亚洲AV成人婷婷 |