您好,登錄后才能下訂單哦!
[TOC]
Spark非常重要的一個功能特性就是可以將RDD持久化在內存中。當對RDD執行持久化操作時,每個節點都會將自己操作的RDD的partition持久化到內存中,并且在之后對該RDD的反復使用中,直接使用內存緩存的partition。這樣的話,對于針對一個RDD反復執行多個操作的場景,就只要對RDD計算一次即可,后面直接使用該RDD,而不需要反復計算多次該RDD。
巧妙使用RDD持久化,甚至在某些場景下,可以將spark應用程序的性能提升10倍。對于迭代式算法和快速交互式應用來說,RDD持久化,是非常重要的。
要持久化一個RDD,只要調用其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接緩存在每個節點中。而且Spark的持久化機制還是自動容錯的,如果持久化的RDD的任何partition丟失了,那么Spark會自動通過其源RDD,使用transformation操作重新計算該partition。
cache()和persist()的區別在于,cache()是persist()的一種簡化方式,cache()的底層就是調用的persist()的無參版本,同時就是調用persist(MEMORY_ONLY),將數據持久化到內存中。如果需要從內存中去除緩存,那么可以使用unpersist()方法。
1、第一次加載大量的數據到RDD中
2、頻繁的動態更新RDD Cache數據,不適合使用Spark Cache、Spark lineage
? 默認情況下,性能最高的當然是MEMORY_ONLY,但前提是你的內存必須足夠足夠大,可以綽綽有余地存放下整個RDD的所有數據。因為不進行序列化與反序列化操作,就避免了這部分的性能開銷;對這個RDD的后續算子操作,都是基于純內存中的數據的操作,不需要從磁盤文件中讀取數據,性能也很高;而且不需要復制一份數據副本,并遠程傳送到其他節點上。但是這里必須要注意的是,在實際的生產環境中,恐怕能夠直接用這種策略的場景還是有限的,如果RDD中數據比較多時(比如幾十億),直接用這種持久化級別,會導致JVM的OOM內存溢出異常。
? 如果使用MEMORY_ONLY級別時發生了內存溢出,那么建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD數據序列化后再保存在內存中,此時每個partition僅僅是一個字節數組而已,大大減少了對象數量,并降低了內存占用。這種級別比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。但是后續算子可以基于純內存進行操作,因此性能總體還是比較高的。此外,可能發生的問題同上,如果RDD中的數據量過多的話,還是可能會導致OOM內存溢出的異常。
? 如果純內存的級別都無法使用,那么建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因為既然到了這一步,就說明RDD的數據量很大,內存無法完全放下。序列化后的數據比較少,可以節省內存和磁盤的空間開銷。同時該策略會優先盡量嘗試將數據緩存在內存中,內存緩存不下才會寫入磁盤。
? 通常不建議使用DISK_ONLY和后綴為_2的級別:因為完全基于磁盤文件進行數據的讀寫,會導致性能急劇降低,有時還不如重新計算一次所有RDD。后綴為_2的級別,必須將所有數據都復制一份副本,并發送到其他節點上,數據復制以及網絡傳輸會導致較大的性能開銷,除非是要求作業的高可用性,否則不建議使用。
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
import org.apache.log4j.{Level, Logger}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
* Spark RDD的持久化
*/
object _01SparkPersistOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
val sc = new SparkContext(conf)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
var start = System.currentTimeMillis()
val linesRDD = sc.textFile("D:/data/spark/sequences.txt")
// linesRDD.cache()
// linesRDD.persist(StorageLevel.MEMORY_ONLY)
// 執行第一次RDD的計算
val retRDD = linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
// retRDD.cache()
// retRDD.persist(StorageLevel.DISK_ONLY)
retRDD.count()
println("第一次計算消耗的時間:" + (System.currentTimeMillis() - start) + "ms")
// 執行第二次RDD的計算
start = System.currentTimeMillis()
// linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).count()
retRDD.count()
println("第二次計算消耗的時間:" + (System.currentTimeMillis() - start) + "ms")
// 持久化使用結束之后,要想卸載數據
// linesRDD.unpersist()
sc.stop()
}
}
設置相關的持久化策略,再觀察執行時間就可以有一個較為直觀的理解。
提供了兩種有限類型的共享變量,廣播變量和累加器。
介紹之前,先直接看下面一個例子:
package cn.xpleaf.bigdata.spark.scala.core.p3
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 共享變量
* 我們在dirver中聲明的這些局部變量或者成員變量,可以直接在transformation中使用,
* 但是經過transformation操作之后,是不會將最終的結果重新賦值給dirver中的對應的變量。
* 因為通過action,觸發了transformation的操作,transformation的操作,都是通過
* DAGScheduler將代碼打包 序列化 交由TaskScheduler傳送到各個Worker節點中的Executor去執行,
* 在transformation中執行的這些變量,是自己節點上的變量,不是dirver上最初的變量,我們只不過是將
* driver上的對應的變量拷貝了一份而已。
*
*
* 這個案例也反映出,我們需要有一些操作對應的變量,在driver和executor上面共享
*
* spark給我們提供了兩種解決方案——兩種共享變量
* 廣播變量
* 累加器
*/
object _02SparkShareVariableOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
val sc = new SparkContext(conf)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
val linesRDD = sc.textFile("D:/data/spark/hello.txt")
val wordsRDD = linesRDD.flatMap(_.split(" "))
var num = 0
val parisRDD = wordsRDD.map(word => {
num += 1
println("map--->num = " + num)
(word, 1)
})
val retRDD = parisRDD.reduceByKey(_ + _)
println("num = " + num)
retRDD.foreach(println)
println("num = " + num)
sc.stop()
}
}
輸出結果如下:
num = 0
map--->num = 1
map--->num = 1
map--->num = 2
map--->num = 2
map--->num = 3
map--->num = 4
(hello,3)
(you,1)
(me,1)
(he,1)
num = 0
Spark的另一種共享變量是廣播變量。通常情況下,當一個RDD的很多操作都需要使用driver中定義的變量時,每次操作,driver都要把變量發送給worker節點一次,如果這個變量中的數據很大的話,會產生很高的傳輸負載,導致執行效率降低。使用廣播變量可以使程序高效地將一個很大的只讀數據發送給多個worker節點,而且對每個worker節點只需要傳輸一次,每次操作時executor可以直接獲取本地保存的數據副本,不需要多次傳輸。
這樣理解, 一個worker中的executor,有5個task運行,假如5個task都需要這從份共享數據,就需要向5個task都傳遞這一份數據,那就十分浪費網絡資源和內存資源了。使用了廣播變量后,只需要向該worker傳遞一次就可以了。
創建并使用廣播變量的過程如下:
在一個類型T的對象obj上使用SparkContext.brodcast(obj)方法,創建一個Broadcast[T]類型的廣播變量,obj必須滿足Serializable。 通過廣播變量的.value()方法訪問其值。 另外,廣播過程可能由于變量的序列化時間過程或者序列化變量的傳輸過程過程而成為瓶頸,而Spark Scala中使用的默認的Java序列化方法通常是低效的,因此可以通過spark.serializer屬性為不同的數據類型實現特定的序列化方法(如Kryo)來優化這一過程。
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
import org.apache.log4j.{Level, Logger}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用Spark廣播變量
*
* 需求:
* 用戶表:
* id name age gender(0|1)
*
* 要求,輸出用戶信息,gender必須為男或者女,不能為0,1
*/
object _03SparkBroadcastOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
val sc = new SparkContext(conf)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
val userList = List(
"001,劉向前,18,0",
"002,馮 劍,28,1",
"003,李志杰,38,0",
"004,郭 鵬,48,2"
)
val genderMap = Map("0" -> "女", "1" -> "男")
val genderMapBC:Broadcast[Map[String, String]] = sc.broadcast(genderMap)
val userRDD = sc.parallelize(userList)
val retRDD = userRDD.map(info => {
val prefix = info.substring(0, info.lastIndexOf(",")) // "001,劉向前,18"
val gender = info.substring(info.lastIndexOf(",") + 1)
val genderMapValue = genderMapBC.value
val newGender = genderMapValue.getOrElse(gender, "男")
prefix + "," + newGender
})
retRDD.foreach(println)
sc.stop()
}
}
輸出結果如下:
001,劉向前,18,女
003,李志杰,38,女
002,馮 劍,28,男
004,郭 鵬,48,男
下面是一個更加精簡的案例:
package cn.xpleaf.spark.p5
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author xpleaf
* @date 2019/1/10 4:53 PM
*/
object SampleSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(s"${SampleSpark.getClass.getSimpleName}")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val genderMap = Map("0" -> "女", "1" -> "男")
val genderMapBC:Broadcast[Map[String, String]] = sc.broadcast(genderMap)
val rdd = sc.parallelize(Seq(("0", "Amy"), ("0", "Spring"), ("0", "Sunny"), ("1", "Mike"), ("1", "xpleaf")))
val retRDD = rdd.map{
case (sex, name) =>
val genderMapValue = genderMapBC.value
(genderMapValue.getOrElse(sex, "男"), name)
}
retRDD.foreach(println)
sc.stop()
}
}
輸出結果如下:
(女,Amy)
(女,Sunny)
(女,Spring)
(男,Mike)
(男,xpleaf)
當然這個案例只是演示一下代碼的使用,并不能看出其運行的機制。
不過可以分析一下其原理,假如在執行map操作時,在某個Worker的一個Executor上有分配5個task來進行計算,在不使用廣播變量的情況下,因為Driver會將我們的代碼通過DAGScheduler劃分會不同stage,交由taskScheduler,taskScheduler再將封裝好的一個個task分發到Worker的Excutor中,也就是說,這個過程當中,我們的genderMap也會被封裝到這個task中,顯然這個過程的粒度是task級別的,每個task都會封裝一個genderMap,在該變量數據量不大的情況下,是沒有問題的,然后,當數據量很大時,同時向一個Excutor上傳遞5份這樣相同的數據,這是很浪費網絡中的帶寬資源的;廣播變量的使用可以避免這一問題的發生,將genderMap廣播出去之后,其只需要發送給Excutor即可,它會保存在Excutor的BlockManager中,此時,Excutor下面的task就可以共享這個變量了,這顯然可以帶來一定性能的提升。
這里放上從網上找的一個圖,就不自己畫了,原理跟上面講的是一樣的:
Spark提供的Accumulator,主要用于多個節點對一個變量進行共享性的操作。Accumulator只提供了累加的功能。但是確給我們提供了多個task對一個變量并行操作的功能。但是task只能對Accumulator進行累加操作,不能讀取它的值。只有Driver程序可以讀取Accumulator的值。
非常類似于在MR中的一個Counter計數器,主要用于統計各個程序片段被調用的次數,和整體進行比較,來對數據進行一個評估。
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Spark共享變量之累加器Accumulator
*
* 需要注意的是,累加器的執行必須需要Action觸發
*/
object _04SparkAccumulatorOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
val sc = new SparkContext(conf)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
// 要對這些變量都*7,同時統計能夠被3整除的數字的個數
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
val listRDD:RDD[Int] = sc.parallelize(list)
var counter = 0
val counterAcc = sc.accumulator[Int](0)
val mapRDD = listRDD.map(num => {
counter += 1
if(num % 3 == 0) {
counterAcc.add(1)
}
num * 7
})
// 下面這種操作又執行了一次RDD計算,所以可以考慮上面的方案,減少一次RDD的計算
// val ret = mapRDD.filter(num => num % 3 == 0).count()
mapRDD.foreach(println)
println("counter===" + counter)
println("counterAcc===" + counterAcc.value)
sc.stop()
}
}
輸出結果如下:
49
56
7
63
14
70
21
77
28
84
35
91
42
counter===0
counterAcc===4
下面是一個更加精簡的案例:
package cn.xpleaf.spark.p5
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author xpleaf
* @date 2019/1/10 6:14 PM
*/
object SampleSpark2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(s"${SampleSpark2.getClass.getSimpleName}")
.setMaster("local[2]")
val sc = new SparkContext(conf)
// 累加器,用來統計rdd中的偶數
val counterAcc = sc.accumulator[Int](0)
// 普通的counter變量
var counter = 0
val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
// 需要觸發transformation的執行
rdd.map {
num =>
if (num % 2 == 0) {
// 累加器和普通counter變量都加1
counterAcc.add(1)
counter += 1
}
}.count()
println(s"counterAcc: ${counterAcc.value}, counter: $counter")
sc.stop()
}
}
輸出結果如下:
counterAcc: 3, counter: 0
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。