a. 管理Spark的緩存,可以基于內存,也可以基于磁盤;
var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) //這里就是ShuffleMapTask類的runTask()方法中對應的代碼調用 writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { ................... }
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = { val blockId = RDDBlockId(id, partition.index) var readCachedBlock = true //CacheManger這里是通過BlockManager獲取持久化數據, //如果獲取成功直接返回,如果獲取失敗,調用computeOrReadCheckpoint進行計算 //內存數據為啥會丟失? 之前我們知道內存中的數據如果空間不夠的話,同時如果指定可以將數據緩存到磁盤,會溢寫到磁盤, //如果未指定溢寫到磁盤,這些數據就會丟失掉 就需要重新計算 SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { readCachedBlock = false //獲取不到重新計算,這里要注意,代碼執行到這里說明這個RDD肯定是經過持久化的 //這里計算出數據后,會在getOrElseUpdate里面通過makeIterator參數對數據進行重新持久化(這里理解的不太透徹) computeOrReadCheckpoint(partition, context) }) match { case Left(blockResult) => if (readCachedBlock) { val existingMetrics = context.taskMetrics().inputMetrics existingMetrics.incBytesRead(blockResult.bytes) new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) { override def next(): T = { existingMetrics.incRecordsRead(1) delegate.next() } } } else { new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) } case Right(iter) => new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]]) } }
4.這里繼續跟蹤getOrElseUpdate()獲取持久化的數據 ,代碼如下:
//這里會調用get()方法從本地或者遠程獲取block數據,直接返回//如果沒有讀取到數據就需要重新計算數據,由于代碼執行到這里,rdd肯定是經過持久化的//這里計算出數據后,通過makeIterator迭代器,重新進行持久化(這里理解的不太透徹) def getOrElseUpdate[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { // Attempt to read the block from local or remote storage. If it's present, then we don't need // to go through the local-get-or-put path. //這里會調用get()方法從本地或者遠程獲取block數據,直接返回 get[T](blockId)(classTag) match { case Some(block) => return Left(block) case _ => // Need to compute the block. } //這里的處理意思是:對于本地遠程沒有獲取到數據,然后computeOrReadCheckpoint重新計算的數據 //由于RDD是持久化的,原來的持久化數據可能丟了,這里根據持久化級別重新進行數據的持久化 //這里代碼有點不太好理解 要結合上面2中第12-14行代碼 一起理解 doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match { case None => // doPut() didn't hand work back to us, so the block already existed or was successfully // stored. Therefore, we now hold a read lock on the block. val blockResult = getLocalValues(blockId).getOrElse { // Since we held a read lock between the doPut() and get() calls, the block should not // have been evicted, so get() not returning the block indicates some internal error. releaseLock(blockId) throw new SparkException(s"get() failed for block $blockId even though we held a lock") } // We already hold a read lock on the block from the doPut() call and getLocalValues() // acquires the lock again, so we need to call releaseLock() here so that the net number // of lock acquisitions is 1 (since the caller will only call release() once). releaseLock(blockId) Left(blockResult) case Some(iter) => // The put failed, likely because the data was too large to fit in memory and could not be // dropped to disk. Therefore, we need to pass the input iterator back to the caller so // that they can decide what to do with the values (e.g. process them without caching). Right(iter) } }
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { //如果設置了CheckPoint,從Checkpoint中獲取數據 //這里CheckPoint相關的知識,先不講解,后面有篇文章單獨講解 if (isCheckpointedAndMaterialized) { firstParent[T].iterator(split, context) } else { //如果數據沒有進行過Checkpoint,這里只能重新計算一次 //這里就是根據自己的rdd算子重新計算 compute(split, context) } }