中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

DAG任務分解和Shuffle RDD怎么使用

發布時間:2021-12-30 10:08:08 來源:億速云 閱讀:161 作者:iii 欄目:云計算

這篇文章主要介紹“DAG任務分解和Shuffle RDD怎么使用”,在日常操作中,相信很多人在DAG任務分解和Shuffle RDD怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”DAG任務分解和Shuffle RDD怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

1、DagScheduler分析

DagScheduler功能主要是負責RDD的各個stage的分解和任務提交。Stage分解是從觸發任務調度過程的finalStage開始倒推尋找父stage,如果父stage沒有提交任務則循環提交缺失的父stage。每個stage有一個父RDD的概念,根據分區數的多少創建多個任務(Task)。

Task的調度實際是通過TaskSchedulerImp來完成的,TaskSchedulerImp里根據環境部署的不同又會使用不同的Backend,比如Yarn集群、獨立集群等其Backend是不一樣的,這里先有個概念,先不深究Backend。

這里先看看DagScheduler的核心邏輯把。里面首先要研究的一個方法:

def submitMissingTasks(stage: Stage, jobId: Int)

該方法就是提交stage執行,為什么叫這個名稱呢?說明這里的stage是需先需要提交執行的,沒有其他依賴的stage還未執行了。

submitMissingTasks方法會根據RDD的依賴關系創建兩種task,ResultTask和ShuffleMapTask。

一步步來,只看關鍵代碼,因為整體代碼太多了不利于理解關鍵邏輯。

1.1 生成序列化的taskBinary

taskBinaryBytes = stage match {
          case stage: ShuffleMapStage =>
            JavaUtils.bufferToArray(
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
          case stage: ResultStage =>
            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
        }

taskBinaryBytes待會是要封裝成對像分發到遠端Executor上執行的,所以必須是可序列化的。

兩者最主要區別就是:ShuffleMapStage的入參是依賴的shuffleDep;而ResultStage的入參是函數的定義func。

1.2 生成task

現在有了taskBinaryBytes,下一步就是生成Task了。

val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

兩種Task類型:ShuffleMapTask和ResultTask。這里要主要的是對Task而言,有多少分區(partition)就會生成多少個Task,Task是到分區維度的,而不是到RDD維度的,這個概念一定要明確。

1.3 提交Task

最后一步就是提交任務執行。這里就要用到taskScheduler了,當然了,這里的taskScheduler目前就是指TaskSchedulerImp。

taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))

DagScheduler里還有一個方法這里可以提一下,就是:

submitWaitingChildStages(stage)

這個方法是提交等待當前stage執行的等待stage,這樣DAG的整個調度過程就完整了。

2、Task執行

兩種Task類型:ShuffleMapTask和ResultTask。

2.1 ResultTask

我們先看ResultTask的執行,它相對比較簡單,核心方式是runTask,核心代碼:

override def runTask(context: TaskContext): U = {   
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    func(context, rdd.iterator(partition, context))
  }

反序列化出來RDD和func,然后執行rdd的iterator方法獲取數據集,并在此數據集上執行func函數,要注意實際上這是一次迭代過程而不是多次迭代過程。

2.2 ShuffleMapTask

ShuffleMapTask任務的執行相對復雜些。

核心方法還是runTask,核心代碼:

override def runTask(context: TaskContext): MapStatus = {    
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    val rdd = rddAndDep._1
    val dep = rddAndDep._2
    dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
  }

首先反序列化出RDD和依賴項ShuffleDependency。然后用ShuffleWriterProcessor寫數據到RDD。

這里的dep其實沒太大意義,主要就是來判斷是否要進行合并使用的,不影響理解整個shuffle流程,所以我們可以先不要管dep:

dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)

這里的rdd實際就是ShuffleMapTask所要生成的數據集。這句代碼到底是什么意思呢? ShuffleWriterProcessor實際上是將數據集寫到了BlockManager上去的,先看看ShuffleWriterProcessor的含義。

2.3 ShuffleWriterProcessor

ShuffleWriterProcessor的關鍵方法的定義先看一下。

def write(rdd: RDD[_],dep: ShuffleDependency[_, _, _],
      partitionId: Int, context: TaskContext,partition: Partition): MapStatus = {
    var writer: ShuffleWriter[Any, Any] = null   
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](
        dep.shuffleHandle,
        partitionId,
        context,
        createMetricsReporter(context))
    writer.write(
        rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    writer.stop(success = true).get    
  }

ShuffleManager實際上就是BlockManager,管理塊空間的。

Write是Shuffle寫入器,寫到BlockManager去;rdd.iterator(partition, context)就是當前Shuffle類型的RDD定義的數據集,dep是rdd計算數據集時依賴的RDD(這里的dep沒多大意思先不管)。

這段代碼的作用就是將shuffle rdd數據集輸出到BlockManager上,在讀取RDD的數據時,如果該RDD是shuffle類型,則需要到BlockManager上去讀取,這里就是這個作用。

2.4 Shuffle RDD的相關概念

Shuffle類的RDD是指這類RDD的compute方法是依賴于其他RDD的,這里的其他RDD可以是多個。執行shuffle的RDD的計算過程的時候,是將一到多個依賴RDD的迭代器的輸出作為數據源迭代器,在此之上執行自己的操作。所以shuffle RDD的compute方法里一定會用到依賴RDD的iterator方法。

可以看看CoGroupedRDD的源碼,就能很快的理解shuffle的含義。

到此,關于“DAG任務分解和Shuffle RDD怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

太仆寺旗| 长葛市| 兴山县| 澄江县| 共和县| 专栏| 体育| 林芝县| 沙湾县| 易门县| 临泉县| 广饶县| 漠河县| 郓城县| 托里县| 佛冈县| 台北市| 黄陵县| 盐山县| 镶黄旗| 莱西市| 桦川县| 兰坪| 张家川| 武威市| 北辰区| 多伦县| 新绛县| 赫章县| 巴中市| 惠州市| 和龙市| 宁城县| 酒泉市| 新乡县| 佛冈县| 和田市| 洞口县| 房产| 五河县| 乐昌市|