一次Spark SQL線上問題排查和定位
背景
該sql運行在spark版本 3.1.2下的thrift server下
現(xiàn)象
在運行包含多個union 的spark sql的時候報錯(該sql包含了50多個uinon,且每個union字查詢中會包含join操作),其中union中子查詢sql類似如下:
SELECT a1.order_no
,a1.need_column
,a1.join_id
FROM temp.actul_a a1
join temp.actul_a a2 on a1.join_id = a2.join_id and a2.need_column = 'we need it'
WHERE a1.need_column ='others needs it'
運行對應(yīng)的sql,報錯如下:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 22460 tasks (2.0 GiB) is bigger than spark.driver.maxResultSize (2.0 GiB)
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
... 40 more (state=,code=0)
問題復(fù)盤以及解決
問題復(fù)盤
參與union操作的所有RDD的任務(wù)組成了union操作的所有任務(wù) 每個寫文件的任務(wù)在resultTask執(zhí)行完之后把文件的元數(shù)據(jù)(包括,文件個數(shù),文件大小,文件行數(shù))回傳給driver driver會計算一個taskSet里面的所有的resultTask任務(wù)結(jié)果的元數(shù)據(jù)(每完成一個task計算一下)是超過spark.driver.maxResultSize值,如果超過就直接報錯
解決
把分區(qū)合并的初始分區(qū)減少,目前是1000。set spark.sql.adaptive.coalescePartitions.initialPartitionNum=200;
分析以及解釋
報錯的流程復(fù)盤(代碼級別)
對對應(yīng)的sql進行explain,由于代碼過長,自己復(fù)制粘貼即可,得到如下物理計劃:
+----------------------------------------------------+
| plan |
+----------------------------------------------------+
| == Physical Plan ==
Execute OptimizedCreateHiveTableAsSelectCommand [Database: default, TableName: actul_a, InsertIntoHadoopFsRelationCommand]
+- AdaptiveSparkPlan isFinalPlan=false
+- Union
:...
其中union的操作的子節(jié)點有50多個,著重觀察OptimizedCreateHiveTableAsSelectCommand 和unionExec兩個物理計劃,在分析這兩個物理計劃之前,先分析一下報錯的地方的源碼,直接搜索可以找到
TaskSetManager.scala
def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
totalResultSize += size
calculatedTasks += 1
if (!isShuffleMapTasks && maxResultSize > 0 && totalResultSize > maxResultSize) {
val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
s"(${Utils.bytesToString(totalResultSize)}) is bigger than ${config.MAX_RESULT_SIZE.key} " +
s"(${Utils.bytesToString(maxResultSize)})"
logError(msg)
abort(msg)
false
} else {
true
}
}
而canFetchMoreResults這個方法最終會被TaskSchedulerImpl.scala的statusUpdate方法調(diào)用:
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer): Unit = {
...
if (TaskState.isFinished(state)) {
cleanupTaskState(tid)
taskSet.removeRunningTask(tid)
if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
...
注意到這里所有的tid都是在同一個taskSet中,而taskSet是從資源池在獲取的,如下:
val sortedTaskSets = rootPool.getSortedTaskSetQueue
那資源池的taskset是從哪里進去的?在submitTasks方法中:
override def submitTasks(taskSet: TaskSet): Unit = {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks "
+ "resource profile " + taskSet.resourceProfileId)
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
而這里的taskSet由DAGSchduler調(diào)用:
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
stage.resourceProfileId))
而這里的tasks參數(shù)是由:partitionsToCompute來的:
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
而最終partitionsToCompute調(diào)用了rdd.partitions方法, 最終調(diào)用getPartitions方法,這個方法會在unionRDD有體現(xiàn),我們再來看UnionExec:
case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
···
protected override def doExecute(): RDD[InternalRow] =
sparkContext.union(children.map(_.execute()))
}
sparkContext.union會返回UnionRDD,而uninRDD對應(yīng)的getPartitions方法就是所有的rdd的分區(qū)之和,再結(jié)合之前分析的taskset,可以得到unionRDD的task數(shù)量就是所有參與union操作的RDD的task的個數(shù)。
那為啥會出現(xiàn)超出spark.driver.maxResultSize (2.0 GiB)的問題呢?
再看OptimizedCreateHiveTableAsSelectCommand 執(zhí)行計劃,該計劃最終會調(diào)用InsertIntoHadoopFsRelationCommand的Run方法:
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
···
val updatedPartitionPaths =
FileFormatWriter.write(
sparkSession = sparkSession,
plan = child,
fileFormat = fileFormat,
committer = committer,
outputSpec = FileFormatWriter.OutputSpec(
committerOutputPath.toString, customPartitionLocations, outputColumns),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
options = options)
···
而FileFormatWriter.write最終調(diào)用spark的runJob方法,以及每個task會返回WriteTaskResult(包括了寫入的文件的分區(qū),大小,個數(shù),以及數(shù)據(jù)行):
val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
sparkSession.sparkContext.runJob(
rddWithNonEmptyPartitions,
(taskContext: TaskContext, iter: Iterator[InternalRow]) => {
executeTask(
description = description,
jobIdInstant = jobIdInstant,
sparkStageId = taskContext.stageId(),
sparkPartitionId = taskContext.partitionId(),
sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
committer,
iterator = iter)
},
rddWithNonEmptyPartitions.partitions.indices,
(index, res: WriteTaskResult) => {
committer.onTaskCommit(res.commitMsg)
ret(index) = res
})
executeTask的方法才是真正執(zhí)行write任務(wù)的邏輯:
···
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
// Execute the task to write rows out and commit the task.
while (iterator.hasNext) {
dataWriter.write(iterator.next())
}
dataWriter.commit()
})(catchBlock = {
···
dataWriter.write方法真正的寫數(shù)據(jù),dataWrite.commit方法返回對應(yīng)的Task執(zhí)行時候產(chǎn)生的WriteTaskResult信息。
對應(yīng)到ResultTask就是func(context, rdd.iterator(partition, context)):
ResultTask.scala
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
func(context, rdd.iterator(partition, context))
}
而該方法最終被Executor.scala的Run方法調(diào)用:
val value = Utils.tryWithSafeFinally {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem,
resources = taskDescription.resources,
plugins = plugins)
threwException = false
res
而結(jié)果最終經(jīng)過一系列的判斷,最終回傳給CoarseGrainedSchedulerBackend:
val serializedResult: ByteBuffer = {
if (maxResultSize > 0 && resultSize > maxResultSize) {
logWarning(s"Finished $taskName. Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize > maxDirectResultSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId,
new ChunkedByteBuffer(serializedDirectResult.duplicate()),
StorageLevel.MEMORY_AND_DISK_SER)
logInfo(s"Finished $taskName. $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
} else {
logInfo(s"Finished $taskName. $resultSize bytes result sent to driver")
serializedDirectResult
}
}
executorSource.SUCCEEDED_TASKS.inc(1L)
setTaskFinishedAndClearInterruptStatus()
plugins.foreach(_.onTaskSucceeded())
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
最終傳給TaskSchedulerImpl的statusUpdate方法,之后再到canFetchMoreResults,從而在代碼上形成了閉環(huán)。
但是為什么導(dǎo)致這么多的task數(shù)量呢?原因在于spark.sql.adaptive.coalescePartitions.initialPartitionNum的設(shè)置
我們設(shè)置是1000,導(dǎo)致在AQE和開啟分區(qū)合并的前提下,會導(dǎo)致主要是涉及shuffle的操作的時候,shuffle完之后的分區(qū)數(shù)就是1000,而在開啟了localShuffleReader的前提下,該分區(qū)的也不會大量的減少,具體見SQLConf.scala:
def numShufflePartitions: Int = {
if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {
getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)
} else {
defaultNumShufflePartitions
}
}


