中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark的動態分區裁剪下物理計劃怎么實現

發布時間:2021-12-09 16:47:14 來源:億速云 閱讀:179 作者:iii 欄目:大數據

本篇內容介紹了“spark的動態分區裁剪下物理計劃怎么實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

背景

本文基于delta 0.7.0 spark 3.0.1 spark 3.x引入了動態分區裁剪,在 spark 的動態分區裁剪上(Dynamic partition pruning)-邏輯計劃我們提到在邏輯計劃階段會加入DynamicPruningSubquery,今天我們分析一下在物理階段怎么對DynamicPruningSubquery進行優化以及實現的

分析

直接轉到PlanDynamicPruningFilters的apply方法:

override def apply(plan: SparkPlan): SparkPlan = {
    if (!SQLConf.get.dynamicPartitionPruningEnabled) {
      return plan
    }

    plan transformAllExpressions {
      case DynamicPruningSubquery(
          value, buildPlan, buildKeys, broadcastKeyIndex, onlyInBroadcast, exprId) =>
        val sparkPlan = QueryExecution.createSparkPlan(
          sparkSession, sparkSession.sessionState.planner, buildPlan)
        // Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is
        // the first to be applied (apart from `InsertAdaptiveSparkPlan`).
        val canReuseExchange = SQLConf.get.exchangeReuseEnabled && buildKeys.nonEmpty &&
          plan.find {
            case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _) =>
              left.sameResult(sparkPlan)
            case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right) =>
              right.sameResult(sparkPlan)
            case _ => false
          }.isDefined

        if (canReuseExchange) {
          val mode = broadcastMode(buildKeys, buildPlan)
          val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan)
          // plan a broadcast exchange of the build side of the join
          val exchange = BroadcastExchangeExec(mode, executedPlan)
          val name = s"dynamicpruning#${exprId.id}"
          // place the broadcast adaptor for reusing the broadcast results on the probe side
          val broadcastValues =
            SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange)
          DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
        } else if (onlyInBroadcast) {
          // it is not worthwhile to execute the query, so we fall-back to a true literal
          DynamicPruningExpression(Literal.TrueLiteral)
        } else {
          // we need to apply an aggregate on the buildPlan in order to be column pruned
          val alias = Alias(buildKeys(broadcastKeyIndex), buildKeys(broadcastKeyIndex).toString)()
          val aggregate = Aggregate(Seq(alias), Seq(alias), buildPlan)
          DynamicPruningExpression(expressions.InSubquery(
            Seq(value), ListQuery(aggregate, childOutputs = aggregate.output)))
        }
    }
  }
  1. 如果沒有開啟動態分區裁剪,則直接跳過

  2. QueryExecution.createSparkPlan( sparkSession, sparkSession.sessionState.planner, buildPlan) 通過邏輯計劃構造物理計劃

  3. 判斷是否reuseExchange,如果spark.sql.exchange.reuse配置為true,且存在join的是broadcastHashjoin,而且計算結果和要進行過濾的物理計劃的結果一樣,則進行下一步,

  • 進行物理計劃執行前的準備, 得到executedPlan

  • 構建BroadcastExchangeExec,broadcastValues,InSubqueryExec,DynamicPruningExpression,BroadcastExchangeExec內部就是進行spark的broadcast操作 注意:這里的BroadcastExchangeExec會在ReuseExchange規則中被優化, 最終會被BroadcastQueryStageExec調用,從而公用同一個broacast的值

  1. 如果以上不滿足,默認DynamicPruningExpression(Literal.TrueLiteral),也就是不會進行裁剪

  2. 如果不是broadcastHashjoin,但是能夠加速,則按照需要過濾的key做一次聚合,之后再組成DynamicPruningExpression

至此動態裁剪的物理計劃優化就分析完了

“spark的動態分區裁剪下物理計劃怎么實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

湟中县| 华池县| 金溪县| 涿州市| 安丘市| 井陉县| 勐海县| 驻马店市| 金溪县| 高淳县| 肇源县| 沿河| 新野县| 中西区| 三门峡市| 喀喇沁旗| 文昌市| 米泉市| 上犹县| 阿坝| 汾阳市| 鱼台县| 托克托县| 横峰县| 黄梅县| 旬阳县| 晋州市| 莲花县| 舞钢市| 抚州市| 延寿县| 天台县| 大冶市| 宁德市| 巴林右旗| 格尔木市| 金塔县| 梁平县| 樟树市| 吴旗县| 岳西县|