中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RDD怎么向spark傳遞函數

發布時間:2021-12-16 17:01:42 來源:億速云 閱讀:145 作者:iii 欄目:云計算

本篇內容介紹了“RDD怎么向spark傳遞函數”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

惰性求值

RDD的轉換操作都是惰性求值的。
惰性求值意味著我們對RDD調用轉化操做(例如map操作)并不會立即執行,相反spark會在內部記錄下所要求執行的操作的相關信息。
把數據讀取到RDD的操作同樣也是惰性的,因此我們調用sc.textFile()時數據沒有立即讀取進來,而是有必要時才會讀取。和轉化操作一樣讀取數據操作也有可能被多次執行。這在寫代碼時要特別注意。

關于惰性求值,對新手來說可能有與直覺相違背之處。有接觸過函數式語言類如haskell的應該不會陌生。
在最初接觸spark時,我們也會有這樣的疑問。
也參與過這樣的討論:

 val sc = new SparkContext("local[2]", "test")
 val f:Int ? Int = (x:Int) ? x + 1
 val g:Int ? Int = (x:Int) ? x + 1
 val rdd = sc.parallelize(Seq(1,2,3,4),1)
 //1
 val res1 = rdd.map(x ? g(f(x))).collect
 //2
 val res2 = rdd.map(g).map(f).collect

第1和第2兩種操作均能得到我們想要的結果,但那種操作更好呢?
直觀上我們會覺得第1種操作更好,因為第一種操作可以僅僅需要一次迭代就能得到我們想要的結果。第二種操作需要兩次迭代操作才能完成。
是我們想象的這樣嗎?讓我們對函數f和g的調用加上打印。按照上面的假設。1和2的輸出分別是這樣的:

1:  f   g   f   g   f   g   f   g       
2:  g   g   g   g   f   f   f   f

代碼:

val sc = new SparkContext("local[2]", "test")
val f:Int ? Int = (x:Int) ? {
    print("f\t")
    x + 1
    }
val g:Int ? Int = (x:Int) ? {
  print("g\t")
  x + 1
}
val rdd = sc.parallelize(Seq(1,2,3,4), 1
//1
val res1 = rdd.map(x ? g(f(x))).collect()
//2
val res2 = rdd.map(f).map(g).collect()

將上面的代碼copy試著運行一下吧,我們在控制臺得到的結果是這樣的。

f   g   f   g   f   g   f   g
f   g   f   g   f   g   f   g

是不是大大出乎我們的意料?這說明什么?說明spark是懶性求值的! 我們在調用map(f)時并不會真正去計算, map(f)只是告訴spark數據是怎么計算出來的。map(f).map(g)其實就是在告訴spark數據先通過f在通過g計算出來的。然后在collect()時,spark在一次迭代中先后對數據調用f、g。

繼續回到我們最初的問題,既然兩種調用方式,在性能上毫無差異,那種調用方式更好呢?我們更推薦第二種調用方式,除了api更加清晰之外。在調用鏈很長的情況下,我們可以利用spark的檢查點機制,在中間添加檢查點,這樣數據恢復的代價更小。而第一種方式調用鏈一旦出錯,數據只能從頭計算。

那么spark到底施加了何種魔法,如此神奇?讓我們來撥開spark的層層面紗。最好的方式當然是看源碼了。以map為例:

RDD的map方法

  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

和MapPartitionsRDD的compute方法

override def compute(split: Partition, context:
                TaskContext): Iterator[U] =
                f(context, split.index,
                firstParent[T].iterator(split, 
                context))

關鍵是這個 iter.map(cleanF)),我們調用一個map方法其實是在iter對象上調用一個map方法。iter對象是scala.collection.Iterator的一個實例。
在看一下Iterator的map方法

def map[B](f: A => B): Iterator[B]=  
    new AbstractIterator[B] {
    def hasNext = self.hasNext
    def next() = f(self.next())
}

聯想到我們剛才說的我們在RDD上調用一個map方法只是告訴spark數據是怎么計算出來的,并不會真正計算。是不是恍然大悟了。

向spark傳遞函數

我們可以把定義好的內聯函數、方法的引用或靜態方法傳遞給spark。就像scala的其它函數式API一樣。我們還要考慮一些細節,比如傳遞的函數及其引用的變量是可序列話的(實現了java的Serializable接口)。除此之外傳遞一個對象的方法或字段時,會包含對整個對象的引用。我們可以把該字段放到一個局部變量中,來避免傳遞包含該字段的整個對象。

scala中的函數傳遞

class SearchFunctions(val query:String){
    def isMatch(s:String) = s.contains(query)

    def getMatchFuncRef(rdd:RDD[String])
        :RDD[String]= {
        //isMatch 代表this.isMatch因此我們要傳遞整個this
        rdd.map(isMatch)
    }

    def getMatchFieldRef(rdd:RDD[String])={
    //query表示this.query因此我們要傳遞整個this
    rdd.map(x=>x.split(query))
    }

    def getMatchsNoRef(rdd:RDD[String])={
    //安全只要把我們需要的字段放到局部變量中
    val q = this.query
    rdd.map(x=>x.split(query))
    }
}

如果在scala中出現了NotSerializableException,通常問題就在我們傳遞了一個不可序列化類中的函數或字段。傳遞局部可序列變量或頂級對象中的函數始終是安全的。

持久化

如前所述,spark的RDD是惰性求值的,有時我們希望能過多次使用同一個RDD。如果只是簡單的對RDD調用行動操作,spark每次都會重算RDD和它的依賴。這在迭代算法中消耗巨大。 可以使用RDD.persist()讓spark把RDD緩存下來。

避免GroupByKey

讓我們來看看兩種workCount的方式,一種使用reduceByKey,另一種使用groupByKey。

val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))

val wordCountsWithReduce = wordPairsRDD
  .reduceByKey(_ + _)
  .collect()

val wordCountsWithGroup = wordPairsRDD
  .groupByKey()
  .map(t => (t._1, t._2.sum))
  .collect()

雖然兩種方式都能產生正確的結果,但reduceByKey在大數據集時工作的更好。這時因為spark會在shuffling數據之前,為每一個分區添加一個combine操作。這將大大減少shuffling前的數據。

看下圖來理解 reduceBykey的過程

RDD怎么向spark傳遞函數

而groupBykey會shuff所有的數據,這大大加重了網絡傳輸的數據量。另外如果一個key對應很多value,這樣也可能引起out of memory。

如圖,groupby的過程

RDD怎么向spark傳遞函數

“RDD怎么向spark傳遞函數”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

精河县| 营山县| 赤水市| 呼和浩特市| 抚顺市| 龙岩市| 保康县| 西贡区| 泗水县| 出国| 惠州市| 怀来县| 阿拉尔市| 齐河县| 陇川县| 锡林浩特市| 永川市| 岳普湖县| 潢川县| 香格里拉县| 漯河市| 遵义县| 织金县| 张掖市| 卓尼县| 布尔津县| 香格里拉县| 泊头市| 临湘市| 庆安县| 南康市| 深水埗区| 灵川县| 星座| 扶沟县| 屏山县| 长兴县| 宝兴县| 文化| 阜宁县| 库车县|