您好,登錄后才能下訂單哦!
本篇內容介紹了“spark的動態分區裁剪下物理計劃怎么實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
本文基于delta 0.7.0 spark 3.0.1 spark 3.x引入了動態分區裁剪,在 spark 的動態分區裁剪上(Dynamic partition pruning)-邏輯計劃我們提到在邏輯計劃階段會加入DynamicPruningSubquery,今天我們分析一下在物理階段怎么對DynamicPruningSubquery進行優化以及實現的
直接轉到PlanDynamicPruningFilters的apply方法:
override def apply(plan: SparkPlan): SparkPlan = { if (!SQLConf.get.dynamicPartitionPruningEnabled) { return plan } plan transformAllExpressions { case DynamicPruningSubquery( value, buildPlan, buildKeys, broadcastKeyIndex, onlyInBroadcast, exprId) => val sparkPlan = QueryExecution.createSparkPlan( sparkSession, sparkSession.sessionState.planner, buildPlan) // Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is // the first to be applied (apart from `InsertAdaptiveSparkPlan`). val canReuseExchange = SQLConf.get.exchangeReuseEnabled && buildKeys.nonEmpty && plan.find { case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _) => left.sameResult(sparkPlan) case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right) => right.sameResult(sparkPlan) case _ => false }.isDefined if (canReuseExchange) { val mode = broadcastMode(buildKeys, buildPlan) val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan) // plan a broadcast exchange of the build side of the join val exchange = BroadcastExchangeExec(mode, executedPlan) val name = s"dynamicpruning#${exprId.id}" // place the broadcast adaptor for reusing the broadcast results on the probe side val broadcastValues = SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange) DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId)) } else if (onlyInBroadcast) { // it is not worthwhile to execute the query, so we fall-back to a true literal DynamicPruningExpression(Literal.TrueLiteral) } else { // we need to apply an aggregate on the buildPlan in order to be column pruned val alias = Alias(buildKeys(broadcastKeyIndex), buildKeys(broadcastKeyIndex).toString)() val aggregate = Aggregate(Seq(alias), Seq(alias), buildPlan) DynamicPruningExpression(expressions.InSubquery( Seq(value), ListQuery(aggregate, childOutputs = aggregate.output))) } } }
如果沒有開啟動態分區裁剪,則直接跳過
QueryExecution.createSparkPlan( sparkSession, sparkSession.sessionState.planner, buildPlan)
通過邏輯計劃構造物理計劃
判斷是否reuseExchange,如果spark.sql.exchange.reuse配置為true,且存在join的是broadcastHashjoin,而且計算結果和要進行過濾的物理計劃的結果一樣,則進行下一步,
進行物理計劃執行前的準備, 得到executedPlan
構建BroadcastExchangeExec,broadcastValues,InSubqueryExec,DynamicPruningExpression,BroadcastExchangeExec內部就是進行spark的broadcast操作 注意:這里的BroadcastExchangeExec會在ReuseExchange規則中被優化, 最終會被BroadcastQueryStageExec調用,從而公用同一個broacast的值
如果以上不滿足,默認DynamicPruningExpression(Literal.TrueLiteral),也就是不會進行裁剪
如果不是broadcastHashjoin,但是能夠加速,則按照需要過濾的key做一次聚合,之后再組成DynamicPruningExpression
至此動態裁剪的物理計劃優化就分析完了
“spark的動態分區裁剪下物理計劃怎么實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。