數(shù)據(jù)傾斜?Spark 3.0 AQE專治各種不服


CBO基本原理
Join應(yīng)該選擇哪種算法策略來執(zhí)行?BroadcastJoin or ShuffleHashJoin or SortMergeJoin?不同的執(zhí)行策略對系統(tǒng)的資源要求不同,執(zhí)行效率也有天壤之別,同一個SQL,選擇到合適的策略執(zhí)行可能只需要幾秒鐘,而如果沒有選擇到合適的執(zhí)行策略就可能會導(dǎo)致系統(tǒng)OOM。
對于雪花模型或者星型模型來講,多表Join應(yīng)該選擇什么樣的順序執(zhí)行?不同的Join順序意味著不同的執(zhí)行效率,比如A join B join C,A、B表都很大,C表很小,那A join B很顯然需要大量的系統(tǒng)資源來運算,執(zhí)行時間必然不會短。而如果使用A join C join B的執(zhí)行順序,因為C表很小,所以A join C會很快得到結(jié)果,而且結(jié)果集會很小,再使用小的結(jié)果集 join B,性能顯而易見會好于前一種方案。
CBO這么難優(yōu)化,Spark怎么解決?
動態(tài)合并shuffle分區(qū)(Dynamically coalescing shuffle partitions)
動態(tài)調(diào)整Join策略(Dynamically switching join strategies)
動態(tài)優(yōu)化數(shù)據(jù)傾斜Join(Dynamically optimizing skew joins)
如果partition過少,每個partition數(shù)據(jù)量就會過多,可能就會導(dǎo)致大量數(shù)據(jù)要落到磁盤上,從而拖慢了查詢。
如果partition過多,每個partition數(shù)據(jù)量就會很少,就會產(chǎn)生很多額外的網(wǎng)絡(luò)開銷,并且影響Spark task scheduler,從而拖慢查詢。





非流式查詢
包含至少一個exchange(如join、聚合、窗口算子)或者一個子查詢
Spark CBO源碼實現(xiàn)
// QueryExecution類
lazy val executedPlan: SparkPlan = {
executePhase(QueryPlanningTracker.PLANNING) {
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
}
}
protected def preparations: Seq[Rule[SparkPlan]] = {
QueryExecution.preparations(sparkSession,
Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))))
}
private[execution] def preparations(
sparkSession: SparkSession,
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
adaptiveExecutionRule.toSeq ++
Seq(
PlanDynamicPruningFilters(sparkSession),
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf)
)
}
// InsertAdaptiveSparkPlan
override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)
private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
// ...some checking
case _ if shouldApplyAQE(plan, isSubquery) =>
if (supportAdaptive(plan)) {
try {
// Plan sub-queries recursively and pass in the shared stage cache for exchange reuse.
// Fall back to non-AQE mode if AQE is not supported in any of the sub-queries.
val subqueryMap = buildSubqueryMap(plan)
val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
val preprocessingRules = Seq(
planSubqueriesRule)
// Run pre-processing rules.
val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules)
logDebug(s"Adaptive execution enabled for plan: $plan")
AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery)
} catch {
case SubqueryAdaptiveNotSupportedException(subquery) =>
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
s"but is not supported for sub-query: $subquery.")
plan
}
} else {
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
s"but is not supported for query: $plan.")
plan
}
case _ => plan
}
private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
// 第一次調(diào)用 getFinalPhysicalPlan方法時為false,等待該方法執(zhí)行完畢,全部Stage不會再改變,直接返回最終plan
if (isFinalPlan) return currentPhysicalPlan
// In case of this adaptive plan being executed out of `withActive` scoped functions, e.g.,
// `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be
// created in the middle of the execution.
context.session.withActive {
val executionId = getExecutionId
var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
var result = createQueryStages(currentPhysicalPlan)
val events = new LinkedBlockingQueue[StageMaterializationEvent]()
val errors = new mutable.ArrayBuffer[Throwable]()
var stagesToReplace = Seq.empty[QueryStageExec]
while (!result.allChildStagesMaterialized) {
currentPhysicalPlan = result.newPlan
// 接下來有哪些Stage要執(zhí)行,參考 createQueryStages(plan: SparkPlan) 方法
if (result.newStages.nonEmpty) {
stagesToReplace = result.newStages ++ stagesToReplace
// onUpdatePlan 通過listener更新UI
executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))
// Start materialization of all new stages and fail fast if any stages failed eagerly
result.newStages.foreach { stage =>
try {
// materialize() 方法對Stage的作為一個單獨的Job提交執(zhí)行,并返回 SimpleFutureAction 來接收執(zhí)行結(jié)果
// QueryStageExec: materialize() -> doMaterialize() ->
// ShuffleExchangeExec: -> mapOutputStatisticsFuture -> ShuffleExchangeExec
// SparkContext: -> submitMapStage(shuffleDependency)
stage.materialize().onComplete { res =>
if (res.isSuccess) {
events.offer(StageSuccess(stage, res.get))
} else {
events.offer(StageFailure(stage, res.failed.get))
}
}(AdaptiveSparkPlanExec.executionContext)
} catch {
case e: Throwable =>
cleanUpAndThrowException(Seq(e), Some(stage.id))
}
}
}
// Wait on the next completed stage, which indicates new stats are available and probably
// new stages can be created. There might be other stages that finish at around the same
// time, so we process those stages too in order to reduce re-planning.
// 等待,直到有Stage執(zhí)行完畢
val nextMsg = events.take()
val rem = new util.ArrayList[StageMaterializationEvent]()
events.drainTo(rem)
(Seq(nextMsg) ++ rem.asScala).foreach {
case StageSuccess(stage, res) =>
stage.resultOption = Some(res)
case StageFailure(stage, ex) =>
errors.append(ex)
}
// In case of errors, we cancel all running stages and throw exception.
if (errors.nonEmpty) {
cleanUpAndThrowException(errors, None)
}
// Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less
// than that of the current plan; otherwise keep the current physical plan together with
// the current logical plan since the physical plan's logical links point to the logical
// plan it has originated from.
// Meanwhile, we keep a list of the query stages that have been created since last plan
// update, which stands for the "semantic gap" between the current logical and physical
// plans. And each time before re-planning, we replace the corresponding nodes in the
// current logical plan with logical query stages to make it semantically in sync with
// the current physical plan. Once a new plan is adopted and both logical and physical
// plans are updated, we can clear the query stage list because at this point the two plans
// are semantically and physically in sync again.
// 對前面的Stage替換為 LogicalQueryStage 節(jié)點
val logicalPlan = replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
// 再次調(diào)用optimizer 和planner 進行優(yōu)化
val (newPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan)
val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
if (newCost < origCost ||
(newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan")
cleanUpTempTags(newPhysicalPlan)
currentPhysicalPlan = newPhysicalPlan
currentLogicalPlan = newLogicalPlan
stagesToReplace = Seq.empty[QueryStageExec]
}
// Now that some stages have finished, we can try creating new stages.
// 進入下一輪循環(huán),如果存在Stage執(zhí)行完畢, 對應(yīng)的resultOption 會有值,對應(yīng)的allChildStagesMaterialized 屬性 = true
result = createQueryStages(currentPhysicalPlan)
}
// Run the final plan when there's no more unfinished stages.
// 所有前置stage全部執(zhí)行完畢,根據(jù)stats信息優(yōu)化物理執(zhí)行計劃,確定最終的 physical plan
currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules)
isFinalPlan = true
executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
currentPhysicalPlan
}
}
// SparkContext
/**
* Submit a map stage for execution. This is currently an internal API only, but might be
* promoted to DeveloperApi in the future.
*/
private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C])
: SimpleFutureAction[MapOutputStatistics] = {
assertNotStopped()
val callSite = getCallSite()
var result: MapOutputStatistics = null
val waiter = dagScheduler.submitMapStage(
dependency,
(r: MapOutputStatistics) => { result = r },
callSite,
localProperties.get)
new SimpleFutureAction[MapOutputStatistics](waiter, result)
}
// DAGScheduler
def submitMapStage[K, V, C](
dependency: ShuffleDependency[K, V, C],
callback: MapOutputStatistics => Unit,
callSite: CallSite,
properties: Properties): JobWaiter[MapOutputStatistics] = {
val rdd = dependency.rdd
val jobId = nextJobId.getAndIncrement()
if (rdd.partitions.length == 0) {
throw new SparkException("Can't run submitMapStage on RDD with 0 partitions")
}
// We create a JobWaiter with only one "task", which will be marked as complete when the whole
// map stage has completed, and will be passed the MapOutputStatistics for that stage.
// This makes it easier to avoid race conditions between the user code and the map output
// tracker that might result if we told the user the stage had finished, but then they queries
// the map output tracker and some node failures had caused the output statistics to be lost.
val waiter = new JobWaiter[MapOutputStatistics](
this, jobId, 1,
(_: Int, r: MapOutputStatistics) => callback(r))
eventProcessLoop.post(MapStageSubmitted(
jobId, dependency, callSite, waiter, Utils.cloneProperties(properties)))
waiter
}
// AdaptiveSparkPlanExec
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReuseAdaptiveSubquery(conf, context.subqueryCache),
CoalesceShufflePartitions(context.session),
// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
// added by `CoalesceShufflePartitions`. So they must be executed after it.
OptimizeSkewedJoin(conf),
OptimizeLocalShuffleReader(conf),
ApplyColumnarRulesAndInsertTransitions(conf, context.session.sessionState.columnarRules),
CollapseCodegenStages(conf)
)


Spark3.0AQE在FreeWheel的應(yīng)用與實踐
主要升級改動
"spark.sql.adaptive.enabled": true,
"spark.sql.adaptive.coalescePartitions.enabled": true,
"spark.sql.adaptive.coalescePartitions.minPartitionNum": 1,
"spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB"
在 reduce 階段從沒有 AQE 的40320個 tasks 銳減到4580個 tasks,減少了一個數(shù)量級。
下圖里下半部分是沒有 AQE 的 Spark 2.x 的 task 情況,上半部分是打開 AQE 特性后的 Spark 3.x 的情況。

從更詳細(xì)的運行時間圖來看,shuffler reader后同樣的 aggregate 的操作等時間也從4.44h到2.56h,節(jié)省將近一半。
左邊是 spark 2.x 的運行指標(biāo)明細(xì),右邊是打開 AQE 后通過custom shuffler reader后的運行指標(biāo)情況。

性能提升

實踐成果
歷史數(shù)據(jù) Pipeline 對于大 batch 的數(shù)據(jù)(200~400G/每小時)性能提升高達40%, 對于小 batch(小于 100G/每小時)提升效果沒有大 batch 提升的那么明顯,每天所有 batches平均提升水平27.5%左右。
預(yù)測數(shù)據(jù)性能平均提升30%。由于數(shù)據(jù)輸入源不一樣,目前是分別兩個 pipelines 在跑歷史和預(yù)測數(shù)據(jù),產(chǎn)生的表的數(shù)目也不太一樣,因此做了分別的評估。



MongoDB + Spark: 完整的大數(shù)據(jù)解決方案
歡迎點贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連
文章不錯?點個【在看】吧!?
