您好,登錄后才能下訂單哦!
RDD的依賴關系分為兩類:寬依賴和窄依賴。我們可以這樣認為:
窄依賴每個 child RDD 的 partition 的生成操作都是可以并行的,而寬依賴則需要所有的 parent RDD partition shuffle 結果得到后再進行。
Dependency是一個抽象類:
// Denpendency.scala
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
它有兩個子類:NarrowDependency 和 ShuffleDenpendency,分別對應窄依賴和寬依賴。
定義了抽象方法getParents,輸入partitionId,用于獲得child RDD 的某個partition依賴的parent RDD的所有 partitions。
// Denpendency.scala
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
窄依賴又有兩個具體的實現:OneToOneDependency和RangeDependency。
(a)OneToOneDependency指child RDD的partition只依賴于parent RDD 的一個partition,產生OneToOneDependency的算子有map,filter,flatMap等。可以看到getParents實現很簡單,就是傳進去一個partitionId,再把partitionId放在List里面傳出去。
// Denpendency.scala
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
(b)RangeDependency指child RDD partition在一定的范圍內一對一的依賴于parent RDD partition,主要用于union。
// Denpendency.scala
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {//inStart表示parent RDD的開始索引,outStart表示child RDD 的開始索引
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)//表示于當前索引的相對位置
} else {
Nil
}
}
}
表示一個parent RDD的partition會被child RDD的partition使用多次。需要經過shuffle才能形成。
// Denpendency.scala
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] { //shuffle都是基于PairRDD進行的,所以傳入的RDD要是key-value類型的
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName) //獲取shuffleId
val shuffleId: Int = _rdd.context.newShuffleId() //向shuffleManager注冊shuffle信息
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
由于shuffle涉及到網絡傳輸,所以要有序列化serializer,為了減少網絡傳輸,可以map端聚合,通過mapSideCombine和aggregator控制,還有key排序相關的keyOrdering,以及重輸出的數據如何分區的partitioner,還有一些class信息。Partition之間的關系在shuffle處戛然而止,因此shuffle是劃分stage的依據。
首先,窄依賴允許在一個集群節點上以流水線的方式(pipeline)計算所有父分區。例如,逐個元素地執行map、然后filter操作;而寬依賴則需要首先計算好所有父分區數據,然后在節點之間進行Shuffle,這與MapReduce類似。第二,窄依賴能夠更有效地進行失效節點的恢復,即只需重新計算丟失RDD分區的父分區,而且不同節點之間可以并行計算;而對于一個寬依賴關系的Lineage圖,單個節點失效可能導致這個RDD的所有祖先丟失部分分區,因而需要整體重新計算。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。