中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark中Cache與Persist的巔峰對決

發布時間:2020-06-14 13:06:05 來源:網絡 閱讀:594 作者:Stitch_x 欄目:大數據

Cache的產生背景

我們先做一個簡單的測試讀取一個本地文件做一次collect操作:

val rdd=sc.textFile("file:///home/hadoop/data/input.txt")
val rdd=sc.textFile("file:///home/hadoop/data/input.txt")

上面我們進行了兩次相同的操作,觀察日志我們發現這樣一句話Submitting ResultStage 0 (file:///home/hadoop/data/input.txt MapPartitionsRDD[1] at textFile at <console>:25), which has no missing parents,每次都要去本地讀取input.txt文件,這里大家能想到存在什么問題嗎? 如果我的文件很大,每次都對相同的RDD進行同一個action操作,那么每次都要到本地讀取文件,得到相同的結果。不斷進行這樣的重復操作,耗費資源浪費時間啊。這時候我們可能想到能不能把RDD保存在內存中呢?答案是可以的,這就是我們所要學習的cache。

Cache的作用

通過上面的講解我們知道, 有時候很多地方都會用到同一個RDD, 那么每個地方遇到Action操作的時候都會對同一個算子計算多次, 這樣會造成效率低下的問題。通過cache操作可以把RDD持久化到內存或者磁盤。

現在我們利用上面說的例子,把rdd進行cache操作

rdd.cache這時候我們打開192.168.137.130:4040界面查看storage界面中是否有我們的剛才cache的文件,發現并沒有。這時候我們進行一個action操作rdd.count。繼續查看storage是不是有東西了哈
Spark中Cache與Persist的巔峰對決
并且給我們列出了很多信息,存儲級別(后面詳解),大小(會發現要比源文件大,這也是一個調優點)等等。

說到這里小伙伴能能想到什么呢? cacha是一個Tranformation還是一個Action呢?相信大伙應該知道了。

cache這個方法也是個Tranformation,當第一次遇到Action算子的時才會進行持久化,所以說我們第一次進行了cache操作在ui中并沒有看到結果,進行了count操作才有。

源碼詳細解析

Spark版本:2.2.0

源碼分析

/**
  * Persist this RDD with the default storage level (`MEMORY_ONLY`).
  */
 def cache(): this.type = persist()

從源碼中可以明顯看出cache()調用了persist(), 想要知道二者的不同還需要看一下persist函數:(這里注釋cache的storage level)

/**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

可以看到persist()內部調用了persist(StorageLevel.MEMORY_ONLY),是不是和上面對上了哈,這里我們能夠得出cache和persist的區別了:cache只有一個默認的緩存級別MEMORY_ONLY ,而persist可以根據情況設置其它的緩存級別。

我相信小伙伴們肯定很好奇這個緩存級別到底有多少種呢?我們繼續懟源碼看看:

/**
 * :: DeveloperApi ::
 * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
 * or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or
 * ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether
 * to replicate the RDD partitions on multiple nodes.
 *
 * The [[org.apache.spark.storage.StorageLevel]] singleton object contains some static constants
 * for commonly useful storage levels. To create your own storage level object, use the
 * factory method of the singleton object (`StorageLevel(...)`).
 */
@DeveloperApi
class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable

我們先來看看存儲類型,源碼中我們可以看出有五個參數,分別代表:

useDisk:使用硬盤(外存);

useMemory:使用內存;

useOffHeap:使用堆外內存,這是Java虛擬機里面的概念,堆外內存意味著把內存對象分配在Java虛擬機的堆以外的內存,這些內存直接受操作系統管理(而不是虛擬機)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。這部分內存也會被頻繁的使用而且也可能導致OOM,它是通過存儲在堆中的DirectByteBuffer對象進行引用,可以避免堆和堆外數據進行來回復制;

deserialized:反序列化,其逆過程序列化(Serialization)是java提供的一種機制,將對象表示成一連串的字節;而反序列化就表示將字節恢復為對象的過程。序列化是對象永久化的一種機制,可以將對象及其屬性保存起來,并能在反序列化后直接恢復這個對象;

replication:備份數(在多個節點上備份,默認為1)。

我們接著看看緩存級別:

/**
 * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
 * new storage levels.
 */
object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

可以看到這里列出了12種緩存級別,但這些有什么區別呢?可以看到每個緩存級別后面都跟了一個StorageLevel的構造函數,里面包含了4個或5個參數,和上面說的存儲類型是相對應的,四個參數是因為有一個是有默認值的。

好吧這里我又想問小伙伴們一個問題了,這幾種存儲方式什么意思呢?該如何選擇呢?

官網上進行了詳細的解釋。我這里介紹一個有興趣的同學可以去官網看看哈。

MEMORY_ONLY

使用反序列化的Java對象格式,將數據保存在內存中。如果內存不夠存放所有的數據,某些分區將不會被緩存,并且將在需要時重新計算。這是默認級別。

MEMORY_AND_DISK

使用反序列化的Java對象格式,優先嘗試將數據保存在內存中。如果內存不夠存放所有的數據,會將數據寫入磁盤文件中,下次對這個RDD執行算子時,持久化在磁盤文件中的數據會被讀取出來使用。

MEMORY_ONLY_SER((Java and Scala))

基本含義同MEMORY_ONLY。唯一的區別是,會將RDD中的數據進行序列化,RDD的每個partition會被序列化成一個字節數組。這種方式更加節省內存,但是會加大cpu負擔。

一個簡單的案例感官行的認識存儲級別的差別:

19M     page_views.dat

val rdd1=sc.textFile("file:///home/hadoop/data/page_views.dat")
rdd1.persist().count

ui查看緩存大小:

Spark中Cache與Persist的巔峰對決
是不是明顯變大了,我們先刪除緩存rdd1.unpersist()

使用MEMORY_ONLY_SER級別

import org.apache.spark.storage.StorageLevel
rdd1.persist(StorageLevel.MEMORY_ONLY_SER)
rdd1.count

Spark中Cache與Persist的巔峰對決

這里我就用這兩種方式進行對比,大家可以試試其他方式。

那如何選擇呢?哈哈官網也說了。

你可以在內存使用和CPU效率之間來做出不同的選擇不同的權衡。

默認情況下,性能最高的當然是MEMORY_ONLY,但前提是你的內存必須足夠足夠大,可以綽綽有余地存放下整個RDD的所有數據。因為不進行序列化與反序列化操作,就避免了這部分的性能開銷;對這個RDD的后續算子操作,都是基于純內存中的數據的操作,不需要從磁盤文件中讀取數據,性能也很高;而且不需要復制一份數據副本,并遠程傳送到其他節點上。但是這里必須要注意的是,在實際的生產環境中,恐怕能夠直接用這種策略的場景還是有限的,如果RDD中數據比較多時(比如幾十億),直接用這種持久化級別,會導致JVM的OOM內存溢出異常。

如果使用MEMORY_ONLY級別時發生了內存溢出,那么建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD數據序列化后再保存在內存中,此時每個partition僅僅是一個字節數組而已,大大減少了對象數量,并降低了內存占用。這種級別比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。但是后續算子可以基于純內存進行操作,因此性能總體還是比較高的。此外,可能發生的問題同上,如果RDD中的數據量過多的話,還是可能會導致OOM內存溢出的異常。

不要泄漏到磁盤,除非你在內存中計算需要很大的花費,或者可以過濾大量數據,保存部分相對重要的在內存中。否則存儲在磁盤中計算速度會很慢,性能急劇降低。

后綴為_2的級別,必須將所有數據都復制一份副本,并發送到其他節點上,數據復制以及網絡傳輸會導致較大的性能開銷,除非是要求作業的高可用性,否則不建議使用。

刪除緩存中的數據

spark自動監視每個節點上的緩存使用,并以最近最少使用的(LRU)方式丟棄舊數據分區。如果您想手動刪除RDD,而不是等待它從緩存中掉出來,請使用 RDD.unpersist()方法。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

陕西省| 兴业县| 衡山县| 武胜县| 潼南县| 五常市| 旬邑县| 密云县| 东方市| 湖口县| 萨迦县| 健康| 繁峙县| 登封市| 祁东县| 平南县| 凤冈县| 安仁县| 马山县| 黄冈市| 怀集县| 卢湾区| 华容县| 上林县| 福安市| 军事| 宜昌市| 沽源县| 会宁县| 霍林郭勒市| 出国| 驻马店市| 青龙| 通城县| 金塔县| 郧西县| 嵩明县| 阿合奇县| 淳安县| 石台县| 汉寿县|