中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark2.x中怎么實現CacheManager源碼深度剖析

發布時間:2021-12-16 20:39:09 來源:億速云 閱讀:143 作者:柒染 欄目:大數據

這篇文章將為大家詳細講解有關Spark2.x中怎么實現CacheManager源碼深度剖析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

一、概述

    CacheManager主要發生在利用RDD的數據執行算子的時候,之前我們講過在ShufffleWriter進行數據寫時,會調用RDD對應的Iterator()方法,獲取RDD對應的數據,CacheManager主要干三件事:

    a. 管理Spark的緩存,可以基于內存,也可以基于磁盤;

    b.底層是通過BlockManager進行數據的讀寫操作;

    c.Task運行會調用RDD中的iterator方法進行數據的計算;

二、CacheManager源碼剖析

1.之前我們講解的ShuffleMapTask中的runTask方法時,ShuffleWriter寫數據的參數傳入的是rdd.iterator()方法計算出來的那個partition數據,代碼如下:

  var writer: ShuffleWriter[Any, Any] = null    try {      val manager = SparkEnv.get.shuffleManager      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)      //這里就是ShuffleMapTask類的runTask()方法中對應的代碼調用      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])      writer.stop(success = true).get    } catch {      ...................    }

2.這里看RDD類中的iterator方法,代碼如下:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {    //判斷下如果StorageLevel.NONE這說明RDD,之前肯定是進行了持久化    //getOrCompute中會通過CacheManager獲取之前持久化的數據    if (storageLevel != StorageLevel.NONE) {      getOrCompute(split, context)      //如果沒有進行過持久化,就需要通過父RDD定義的算子去獲取數據      //注意這里如果有CheckPoint,會通過CheckPoint獲取,checkPoint獲取不到才去重新計算      } else {      computeOrReadCheckpoint(split, context)    }  }

3.跟進去看下持久化的RDD的處理,getOrCompute()函數,代碼如下:

 private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {    val blockId = RDDBlockId(id, partition.index)    var readCachedBlock = true    //CacheManger這里是通過BlockManager獲取持久化數據,    //如果獲取成功直接返回,如果獲取失敗,調用computeOrReadCheckpoint進行計算     //內存數據為啥會丟失? 之前我們知道內存中的數據如果空間不夠的話,同時如果指定可以將數據緩存到磁盤,會溢寫到磁盤,   //如果未指定溢寫到磁盤,這些數據就會丟失掉 就需要重新計算    SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {      readCachedBlock = false      //獲取不到重新計算,這里要注意,代碼執行到這里說明這個RDD肯定是經過持久化的      //這里計算出數據后,會在getOrElseUpdate里面通過makeIterator參數對數據進行重新持久化(這里理解的不太透徹)      computeOrReadCheckpoint(partition, context)    }) match {      case Left(blockResult) =>        if (readCachedBlock) {          val existingMetrics = context.taskMetrics().inputMetrics          existingMetrics.incBytesRead(blockResult.bytes)          new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {            override def next(): T = {              existingMetrics.incRecordsRead(1)              delegate.next()            }          }        } else {          new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])        }      case Right(iter) =>        new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])    }  }

4.這里繼續跟蹤getOrElseUpdate()獲取持久化的數據 ,代碼如下:

//這里會調用get()方法從本地或者遠程獲取block數據,直接返回//如果沒有讀取到數據就需要重新計算數據,由于代碼執行到這里,rdd肯定是經過持久化的//這里計算出數據后,通過makeIterator迭代器,重新進行持久化(這里理解的不太透徹) def getOrElseUpdate[T](      blockId: BlockId,      level: StorageLevel,      classTag: ClassTag[T],      makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {    // Attempt to read the block from local or remote storage. If it's present, then we don't need    // to go through the local-get-or-put path.    //這里會調用get()方法從本地或者遠程獲取block數據,直接返回    get[T](blockId)(classTag) match {      case Some(block) =>        return Left(block)      case _ =>        // Need to compute the block.    }    //這里的處理意思是:對于本地遠程沒有獲取到數據,然后computeOrReadCheckpoint重新計算的數據    //由于RDD是持久化的,原來的持久化數據可能丟了,這里根據持久化級別重新進行數據的持久化    //這里代碼有點不太好理解 要結合上面2中第12-14行代碼 一起理解    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {      case None =>        // doPut() didn't hand work back to us, so the block already existed or was successfully        // stored. Therefore, we now hold a read lock on the block.        val blockResult = getLocalValues(blockId).getOrElse {          // Since we held a read lock between the doPut() and get() calls, the block should not          // have been evicted, so get() not returning the block indicates some internal error.          releaseLock(blockId)          throw new SparkException(s"get() failed for block $blockId even though we held a lock")        }        // We already hold a read lock on the block from the doPut() call and getLocalValues()        // acquires the lock again, so we need to call releaseLock() here so that the net number        // of lock acquisitions is 1 (since the caller will only call release() once).        releaseLock(blockId)        Left(blockResult)      case Some(iter) =>        // The put failed, likely because the data was too large to fit in memory and could not be        // dropped to disk. Therefore, we need to pass the input iterator back to the caller so        // that they can decide what to do with the values (e.g. process them without caching).       Right(iter)    }  }

5.這里回過頭來看computeOrReadCheckpoint方法,如果計算數據的,代碼如下:

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =  {  //如果設置了CheckPoint,從Checkpoint中獲取數據  //這里CheckPoint相關的知識,先不講解,后面有篇文章單獨講解    if (isCheckpointedAndMaterialized) {          firstParent[T].iterator(split, context)    } else {    //如果數據沒有進行過Checkpoint,這里只能重新計算一次    //這里就是根據自己的rdd算子重新計算      compute(split, context)    }  }

6.CacheManager數據計算的大體流程:

    1).如果RDD進行過持久化,根據持久化級別通過BlockManager從本地或者遠程獲取數據,如果數據獲取不到,則需要重新計算,由于這里RDD進行過持久化,只是由于某種原因丟失,還需要根據持久化級別重新進行一次數據的持久化。

    2).如果RDD沒有進行持久化,就需要重新計算,重新計算時,這里如果RDD進行了CheckPoint,則優先獲取CheckPoint過的數據,如果沒有,則需要從RDD的父RDD執行我們定義的算子來重新計算Partition數據。

關于Spark2.x中怎么實現CacheManager源碼深度剖析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

金华市| 土默特左旗| 沙坪坝区| 综艺| 大荔县| 宣汉县| 郸城县| 鹿邑县| 平遥县| 莎车县| 金门县| 拉萨市| 天台县| 湘潭县| 策勒县| 青阳县| 洱源县| 原平市| 金华市| 凉城县| 泽州县| 乌拉特后旗| 东阿县| 黔江区| 四会市| 东台市| 兴和县| 卢龙县| 永平县| 广水市| 阿鲁科尔沁旗| 上蔡县| 宿迁市| 玛纳斯县| 凭祥市| 苍梧县| 池州市| 青铜峡市| 湄潭县| 周宁县| 岚皋县|