<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          一次Spark SQL線上問題排查和定位

          共 22096字,需瀏覽 45分鐘

           ·

          2022-07-31 18:42

          全網(wǎng)最全大數(shù)據(jù)面試提升手冊!

          背景

          該sql運行在spark版本 3.1.2下的thrift server下

          現(xiàn)象

          在運行包含多個union 的spark sql的時候報錯(該sql包含了50多個uinon,且每個union字查詢中會包含join操作),其中union中子查詢sql類似如下:

          SELECT  a1.order_no
                              ,a1.need_column
                              ,a1.join_id
                      FROM    temp.actul_a a1 
                      join temp.actul_a a2 on a1.join_id = a2.join_id and a2.need_column = 'we need it' 
                      WHERE a1.need_column ='others needs it'

          運行對應(yīng)的sql,報錯如下:

          Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 22460 tasks (2.0 GiB) is bigger than spark.driver.maxResultSize (2.0 GiB)
           at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
           at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
           at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
           at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
           at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
           at scala.Option.foreach(Option.scala:407)
           at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
           at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
           at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
           at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
           at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
           at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
           at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
           ... 40 more (state=,code=0)

          問題復(fù)盤以及解決

          問題復(fù)盤
          • 參與union操作的所有RDD的任務(wù)組成了union操作的所有任務(wù)
          • 每個寫文件的任務(wù)在resultTask執(zhí)行完之后把文件的元數(shù)據(jù)(包括,文件個數(shù),文件大小,文件行數(shù))回傳給driver
          • driver會計算一個taskSet里面的所有的resultTask任務(wù)結(jié)果的元數(shù)據(jù)(每完成一個task計算一下)是超過spark.driver.maxResultSize值,如果超過就直接報錯
          解決

          把分區(qū)合并的初始分區(qū)減少,目前是1000。set spark.sql.adaptive.coalescePartitions.initialPartitionNum=200;

          分析以及解釋
          • 報錯的流程復(fù)盤(代碼級別)
            對對應(yīng)的sql進行explain,由于代碼過長,自己復(fù)制粘貼即可,得到如下物理計劃:
          +----------------------------------------------------+
          |                        plan                        |
          +----------------------------------------------------+
          | == Physical Plan ==
          Execute OptimizedCreateHiveTableAsSelectCommand [Database: default, TableName: actul_a, InsertIntoHadoopFsRelationCommand]
          +- AdaptiveSparkPlan isFinalPlan=false
             +- Union
                :...

          其中union的操作的子節(jié)點有50多個,著重觀察OptimizedCreateHiveTableAsSelectCommand 和unionExec兩個物理計劃,在分析這兩個物理計劃之前,先分析一下報錯的地方的源碼,直接搜索可以找到

          TaskSetManager.scala

           def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
              totalResultSize += size
              calculatedTasks += 1
              if (!isShuffleMapTasks && maxResultSize > 0 && totalResultSize > maxResultSize) {
                val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
                  s"(${Utils.bytesToString(totalResultSize)}) is bigger than ${config.MAX_RESULT_SIZE.key} " +
                  s"(${Utils.bytesToString(maxResultSize)})"
                logError(msg)
                abort(msg)
                false
              } else {
                true
              }
            }

          而canFetchMoreResults這個方法最終會被TaskSchedulerImpl.scala的statusUpdate方法調(diào)用:

          def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer): Unit = {
               ...
                      if (TaskState.isFinished(state)) {
                        cleanupTaskState(tid)
                        taskSet.removeRunningTask(tid)
                        if (state == TaskState.FINISHED) {
                          taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
                        } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
                          taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
                        }
                      }
              ...

          注意到這里所有的tid都是在同一個taskSet中,而taskSet是從資源池在獲取的,如下:

          val sortedTaskSets = rootPool.getSortedTaskSetQueue

          那資源池的taskset是從哪里進去的?在submitTasks方法中:

          override def submitTasks(taskSet: TaskSet): Unit = {
              val tasks = taskSet.tasks
              logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks "
                + "resource profile " + taskSet.resourceProfileId)
              this.synchronized {
                val manager = createTaskSetManager(taskSet, maxTaskFailures)
                val stage = taskSet.stageId
                val stageTaskSets =
                  taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
                stageTaskSets(taskSet.stageAttemptId) = manager
                schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

          而這里的taskSet由DAGSchduler調(diào)用:

          taskScheduler.submitTasks(new TaskSet(
                  tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
                  stage.resourceProfileId))

          而這里的tasks參數(shù)是由:partitionsToCompute來的:

          val tasks: Seq[Task[_]] = try {
                val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
                stage match {
                  case stage: ShuffleMapStage =>
                    stage.pendingPartitions.clear()
                    partitionsToCompute.map { id =>
                      val locs = taskIdToLocations(id)
                      val part = partitions(id)
                      stage.pendingPartitions += id
                      new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
                        taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
                        Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
                    }

                  case stage: ResultStage =>
                    partitionsToCompute.map { id =>
                      val p: Int = stage.partitions(id)
                      val part = partitions(p)
                      val locs = taskIdToLocations(id)
                      new ResultTask(stage.id, stage.latestInfo.attemptNumber,
                        taskBinary, part, locs, id, properties, serializedTaskMetrics,
                        Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
                        stage.rdd.isBarrier())
                    }
                }

          而最終partitionsToCompute調(diào)用了rdd.partitions方法, 最終調(diào)用getPartitions方法,這個方法會在unionRDD有體現(xiàn),我們再來看UnionExec:

          case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
            ···

            protected override def doExecute(): RDD[InternalRow] =
              sparkContext.union(children.map(_.execute()))
          }

          sparkContext.union會返回UnionRDD,而uninRDD對應(yīng)的getPartitions方法就是所有的rdd的分區(qū)之和,再結(jié)合之前分析的taskset,可以得到unionRDD的task數(shù)量就是所有參與union操作的RDD的task的個數(shù)。

          那為啥會出現(xiàn)超出spark.driver.maxResultSize (2.0 GiB)的問題呢?

          再看OptimizedCreateHiveTableAsSelectCommand 執(zhí)行計劃,該計劃最終會調(diào)用InsertIntoHadoopFsRelationCommand的Run方法:

          override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
             ···
                val updatedPartitionPaths =
                  FileFormatWriter.write(
                    sparkSession = sparkSession,
                    plan = child,
                    fileFormat = fileFormat,
                    committer = committer,
                    outputSpec = FileFormatWriter.OutputSpec(
                      committerOutputPath.toString, customPartitionLocations, outputColumns),
                    hadoopConf = hadoopConf,
                    partitionColumns = partitionColumns,
                    bucketSpec = bucketSpec,
                    statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
                    options = options)
          ···

          而FileFormatWriter.write最終調(diào)用spark的runJob方法,以及每個task會返回WriteTaskResult(包括了寫入的文件的分區(qū),大小,個數(shù),以及數(shù)據(jù)行):

          val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
                sparkSession.sparkContext.runJob(
                  rddWithNonEmptyPartitions,
                  (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
                    executeTask(
                      description = description,
                      jobIdInstant = jobIdInstant,
                      sparkStageId = taskContext.stageId(),
                      sparkPartitionId = taskContext.partitionId(),
                      sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
                      committer,
                      iterator = iter)
                  },
                  rddWithNonEmptyPartitions.partitions.indices,
                  (index, res: WriteTaskResult) => {
                    committer.onTaskCommit(res.commitMsg)
                    ret(index) = res
                  })

          executeTask的方法才是真正執(zhí)行write任務(wù)的邏輯:

          ···
          try {
                Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
                  // Execute the task to write rows out and commit the task.
                  while (iterator.hasNext) {
                    dataWriter.write(iterator.next())
                  }
                  dataWriter.commit()
                })(catchBlock = {
          ···

          dataWriter.write方法真正的寫數(shù)據(jù),dataWrite.commit方法返回對應(yīng)的Task執(zhí)行時候產(chǎn)生的WriteTaskResult信息。

          對應(yīng)到ResultTask就是func(context, rdd.iterator(partition, context)):

          ResultTask.scala

           override def runTask(context: TaskContext): U = {
              // Deserialize the RDD and the func using the broadcast variables.
              val threadMXBean = ManagementFactory.getThreadMXBean
              val deserializeStartTimeNs = System.nanoTime()
              val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
                threadMXBean.getCurrentThreadCpuTime
              } else 0L
              val ser = SparkEnv.get.closureSerializer.newInstance()
              val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
                ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
              _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
              _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
                threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
              } else 0L

              func(context, rdd.iterator(partition, context))
            }

          而該方法最終被Executor.scala的Run方法調(diào)用:

          val value = Utils.tryWithSafeFinally {
                    val res = task.run(
                      taskAttemptId = taskId,
                      attemptNumber = taskDescription.attemptNumber,
                      metricsSystem = env.metricsSystem,
                      resources = taskDescription.resources,
                      plugins = plugins)
                    threwException = false
                    res

          而結(jié)果最終經(jīng)過一系列的判斷,最終回傳給CoarseGrainedSchedulerBackend:

          val serializedResult: ByteBuffer = {
                    if (maxResultSize > 0 && resultSize > maxResultSize) {
                      logWarning(s"Finished $taskName. Result is larger than maxResultSize " +
                        s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
                        s"dropping it.")
                      ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
                    } else if (resultSize > maxDirectResultSize) {
                      val blockId = TaskResultBlockId(taskId)
                      env.blockManager.putBytes(
                        blockId,
                        new ChunkedByteBuffer(serializedDirectResult.duplicate()),
                        StorageLevel.MEMORY_AND_DISK_SER)
                      logInfo(s"Finished $taskName$resultSize bytes result sent via BlockManager)")
                      ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
                    } else {
                      logInfo(s"Finished $taskName$resultSize bytes result sent to driver")
                      serializedDirectResult
                    }
                  }

                  executorSource.SUCCEEDED_TASKS.inc(1L)
                  setTaskFinishedAndClearInterruptStatus()
                  plugins.foreach(_.onTaskSucceeded())
                  execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

          最終傳給TaskSchedulerImpl的statusUpdate方法,之后再到canFetchMoreResults,從而在代碼上形成了閉環(huán)。

          但是為什么導(dǎo)致這么多的task數(shù)量呢?原因在于spark.sql.adaptive.coalescePartitions.initialPartitionNum的設(shè)置

          我們設(shè)置是1000,導(dǎo)致在AQE和開啟分區(qū)合并的前提下,會導(dǎo)致主要是涉及shuffle的操作的時候,shuffle完之后的分區(qū)數(shù)就是1000,而在開啟了localShuffleReader的前提下,該分區(qū)的也不會大量的減少,具體見SQLConf.scala:

          def numShufflePartitions: Int = {
              if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {
                getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)
              } else {
                defaultNumShufflePartitions
              }
            }
          如果這個文章對你有幫助,不要忘記 「在看」 「點贊」 「收藏」 三連啊喂!

          2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專家級技能模型與學(xué)習(xí)指南(勝天半子篇)
          互聯(lián)網(wǎng)最壞的時代可能真的來了
          我在B站讀大學(xué),大數(shù)據(jù)專業(yè)
          我們在學(xué)習(xí)Flink的時候,到底在學(xué)習(xí)什么?
          193篇文章暴揍Flink,這個合集你需要關(guān)注一下
          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點
          我們在學(xué)習(xí)Spark的時候,到底在學(xué)習(xí)什么?
          在所有Spark模塊中,我愿稱SparkSQL為最強!
          硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
          數(shù)據(jù)治理方法論和實踐小百科全書
          標簽體系下的用戶畫像建設(shè)小指南
          4萬字長文 | ClickHouse基礎(chǔ)&實踐&調(diào)優(yōu)全視角解析
          【面試&個人成長】2021年過半,社招和校招的經(jīng)驗之談
          大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結(jié)
          我寫過的關(guān)于成長/面試/職場進階的文章
          當(dāng)我們在學(xué)習(xí)Hive的時候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
          瀏覽 62
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲无码在线高清 | 日韩无码黄色电影 | 中文字幕无码在线视频 | 91成人区 | 气质女人操逼 |