您好,登錄后才能下訂單哦!
大數據開發中Spark調優常用手段是什么,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
spark調優常見手段,在生產中常常會遇到各種各樣的問題,有事前原因,有事中原因,也有不規范原因,spark調優總結下來可以從下面幾個點來調優。
分配更多的資源: 它是性能優化調優的王道,就是增加和分配更多的資源,這對于性能和速度上的提升是顯而易見的, 基本上,在一定范圍之內,增加資源與性能的提升,是成正比的;寫完了一個復雜的spark作業之后,進行性能調優的時候,首先第一步,就是要來調節最優的資源配置; 在這個基礎之上,如果說你的spark作業,能夠分配的資源達到了你的能力范圍的頂端之后,無法再分配更多的資源了,公司資源有限;那么才是考慮去做后面的這些性能調優的點。 相關問題: (1)分配哪些資源? (2)在哪里可以設置這些資源? (3)剖析為什么分配這些資源之后,性能可以得到提升?
executor-memory、executor-cores、driver-memory
1.2 在哪里可以設置這些資源
在實際的生產環境中,提交spark任務時,使用spark-submit shell腳本,在里面調整對應的參數。 提交任務的腳本: spark-submit \ --master spark://node1:7077 \ --class com.hoult.WordCount \ --num-executors 3 \ 配置executor的數量 --driver-memory 1g \ 配置driver的內存(影響不大) --executor-memory 1g \ 配置每一個executor的內存大小 --executor-cores 3 \ 配置每一個executor的cpu個數 /export/servers/wordcount.jar
==Standalone模式==
先計算出公司spark集群上的所有資源 每臺節點的內存大小和cpu核數, 比如:一共有20臺worker節點,每臺節點8g內存,10個cpu。 實際任務在給定資源的時候,可以給20個executor、每個executor的內存8g、每個executor的使用的cpu個數10。
==Yarn模式==
先計算出yarn集群的所有大小,比如一共500g內存,100個cpu; 這個時候可以分配的最大資源,比如給定50個executor、每個executor的內存大小10g,每個executor使用的cpu個數為2。
使用原則
在資源比較充足的情況下,盡可能的使用更多的計算資源,盡量去調節到最大的大小
--executor-memory --total-executor-cores
spark作業中,各個stage的task的數量,也就代表了spark作業在各個階段stage的并行度! 當分配完所能分配的最大資源了,然后對應資源去調節程序的并行度,如果并行度沒有與資源相匹配,那么導致你分配下去的資源都浪費掉了。同時并行運行,還可以讓每個task要處理的數量變少(很簡單的原理。合理設置并行度,可以充分利用集群資源,減少每個task處理數據量,而增加性能加快運行速度。)
至少設置成與spark Application 的總cpu core 數量相同。 最理想情況,150個core,分配150task,一起運行,差不多同一時間運行完畢 官方推薦,task數量,設置成spark Application 總cpu core數量的2~3倍 。 比如150個cpu core ,基本設置task數量為300~500. 與理想情況不同的,有些task會運行快一點,比如50s就完了,有些task 可能會慢一點,要一分半才運行完,所以如果你的task數量,剛好設置的跟cpu core 數量相同,可能會導致資源的浪費。 因為比如150個task中10個先運行完了,剩余140個還在運行,但是這個時候,就有10個cpu core空閑出來了,導致浪費。如果設置2~3倍,那么一個task運行完以后,另外一個task馬上補上來,盡量讓cpu core不要空閑。同時盡量提升spark運行效率和速度。提升性能。
設置參數spark.default.parallelism 默認是沒有值的,如果設置了值為10,它會在shuffle的過程才會起作用。 比如 val rdd2 = rdd1.reduceByKey(_+_) 此時rdd2的分區數就是10 可以通過在構建SparkConf對象的時候設置,例如: new SparkConf().set("spark.defalut.parallelism","500")
使用rdd.repartition 來重新分區,該方法會生成一個新的rdd,使其分區數變大。 此時由于一個partition對應一個task,那么對應的task個數越多,通過這種方式也可以提高并行度。
http://spark.apache.org/docs/2.3.3/sql-programming-guide.html
通過設置參數 spark.sql.shuffle.partitions=500 默認為200; 可以適當增大,來提高并行度。 比如設置為 spark.sql.shuffle.partitions=500
專門針對sparkSQL來設置的
如上圖所示的計算邏輯: (1)當第一次使用rdd2做相應的算子操作得到rdd3的時候,就會從rdd1開始計算,先讀取HDFS上的文件,然后對rdd1做對應的算子操作得到rdd2,再由rdd2計算之后得到rdd3。同樣為了計算得到rdd4,前面的邏輯會被重新計算。 (3)默認情況下多次對一個rdd執行算子操作,去獲取不同的rdd,都會對這個rdd及之前的父rdd全部重新計算一次。 這種情況在實際開發代碼的時候會經常遇到,但是我們一定要避免一個rdd重復計算多次,否則會導致性能急劇降低。 總結:可以把多次使用到的rdd,也就是公共rdd進行持久化,避免后續需要,再次重新計算,提升效率。
可以調用rdd的cache或者persist方法。
(1)cache方法默認是把數據持久化到內存中 ,例如:rdd.cache ,其本質還是調用了persist方法 (2)persist方法中有豐富的緩存級別,這些緩存級別都定義在StorageLevel這個object中,可以結合實際的應用場景合理的設置緩存級別。例如: rdd.persist(StorageLevel.MEMORY_ONLY),這是cache方法的實現。
(1)如果正常將數據持久化在內存中,那么可能會導致內存的占用過大,這樣的話,也許會導致OOM內存溢出。 (2)當純內存無法支撐公共RDD數據完全存放的時候,就優先考慮使用序列化的方式在純內存中存儲。將RDD的每個partition的數據,序列化成一個字節數組;序列化后,大大減少內存的空間占用。 (3)序列化的方式,唯一的缺點就是,在獲取數據的時候,需要反序列化。但是可以減少占用的空間和便于網絡傳輸 (4)如果序列化純內存方式,還是導致OOM,內存溢出;就只能考慮磁盤的方式,內存+磁盤的普通方式(無序列化)。 (5)為了數據的高可靠性,而且內存充足,可以使用雙副本機制,進行持久化 持久化的雙副本機制,持久化后的一個副本,因為機器宕機了,副本丟了,就還是得重新計算一次; 持久化的每個數據單元,存儲一份副本,放在其他節點上面,從而進行容錯; 一個副本丟了,不用重新計算,還可以使用另外一份副本。這種方式,僅僅針對你的內存資源極度充足。 比如: StorageLevel.MEMORY_ONLY_2
在實際工作中可能會遇到這樣的情況,由于要處理的數據量非常大,這個時候可能會在一個stage中出現大量的task,比如有1000個task,這些task都需要一份相同的數據來處理業務,這份數據的大小為100M,該數據會拷貝1000份副本,通過網絡傳輸到各個task中去,給task使用。這里會涉及大量的網絡傳輸開銷,同時至少需要的內存為1000*100M=100G,這個內存開銷是非常大的。不必要的內存的消耗和占用,就導致了你在進行RDD持久化到內存,也許就沒法完全在內存中放下;就只能寫入磁盤,最后導致后續的操作在磁盤IO上消耗性能;這對于spark任務處理來說就是一場災難。 由于內存開銷比較大,task在創建對象的時候,可能會出現堆內存放不下所有對象,就會導致頻繁的垃圾回收器的回收GC。GC的時候一定是會導致工作線程停止,也就是導致Spark暫停工作那么一點時間。頻繁GC的話,對Spark作業的運行的速度會有相當可觀的影響。
Spark中分布式執行的代碼需要傳遞到各個executor的task上運行。對于一些只讀、固定的數據,每次都需要Driver廣播到各個Task上,這樣效率低下。廣播變量允許將變量只廣播給各個executor。該executor上的各個task再從所在節點的BlockManager(負責管理某個executor對應的內存和磁盤上的數據)獲取變量,而不是從Driver獲取變量,從而提升了效率。
廣播變量,初始的時候,就在Drvier上有一份副本。通過在Driver把共享數據轉換成廣播變量。 task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的BlockManager中,嘗試獲取變量副本;如果本地沒有,那么就從Driver遠程拉取廣播變量副本,并保存在本地的BlockManager中; 此后這個executor上的task,都會直接使用本地的BlockManager中的副本。那么這個時候所有該executor中的task都會使用這個廣播變量的副本。也就是說一個executor只需要在第一個task啟動時,獲得一份廣播變量數據,之后的task都從本節點的BlockManager中獲取相關數據。 executor的BlockManager除了從driver上拉取,也可能從其他節點的BlockManager上拉取變量副本,網絡距離越近越好。
比如一個任務需要50個executor,1000個task,共享數據為100M。 (1)在不使用廣播變量的情況下,1000個task,就需要該共享數據的1000個副本,也就是說有1000份數需要大量的網絡傳輸和內存開銷存儲。耗費的內存大小1000*100=100G. (2)使用了廣播變量后,50個executor就只需要50個副本數據,而且不一定都是從Driver傳輸到每個節點,還可能是就近從最近的節點的executor的blockmanager上拉取廣播變量副本,網絡傳輸速度大大增加;內存開銷 50*100M=5G 總結: 不使用廣播變量的內存開銷為100G,使用后的內存開銷5G,這里就相差了20倍左右的網絡傳輸性能損耗和內存開銷,使用廣播變量后對于性能的提升和影響,還是很可觀的。 廣播變量的使用不一定會對性能產生決定性的作用。比如運行30分鐘的spark作業,可能做了廣播變量以后,速度快了2分鐘,或者5分鐘。但是一點一滴的調優,積少成多。最后還是會有效果的。
(1)能不能將一個RDD使用廣播變量廣播出去? 不能,因為RDD是不存儲數據的。可以將RDD的結果廣播出去。 (2)廣播變量只能在Driver端定義,不能在Executor端定義。 (3)在Driver端可以修改廣播變量的值,在Executor端無法修改廣播變量的值。 (4)如果executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。 (5)如果Executor端用到了Driver的變量,如果使用廣播變量在每個Executor中只有一份Driver端的變量副本。
例如
(1) 通過sparkContext的broadcast方法把數據轉換成廣播變量,類型為Broadcast, val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5,6)) (2) 然后executor上的BlockManager就可以拉取該廣播變量的副本獲取具體的數據。 獲取廣播變量中的值可以通過調用其value方法 val array: Array[Int] = broadcastArray.value
spark中的shuffle涉及到數據要進行大量的網絡傳輸,下游階段的task任務需要通過網絡拉取上階段task的輸出數據,shuffle過程,簡單來說,就是將分布在集群中多個節點上的同一個key,拉取到同一個節點上,進行聚合或join等操作。比如reduceByKey、join等算子,都會觸發shuffle操作。 如果有可能的話,要盡量避免使用shuffle類算子。 因為Spark作業運行過程中,最消耗性能的地方就是shuffle過程。
spark程序在開發的過程中使用reduceByKey、join、distinct、repartition等算子操作,這里都會產生shuffle,由于shuffle這一塊是非常耗費性能的,實際開發中盡量使用map類的非shuffle算子。這樣的話,沒有shuffle操作或者僅有較少shuffle操作的Spark作業,可以大大減少性能開銷。
小案例
//錯誤的做法: // 傳統的join操作會導致shuffle操作。 // 因為兩個RDD中,相同的key都需要通過網絡拉取到一個節點上,由一個task進行join操作。 val rdd3 = rdd1.join(rdd2) //正確的做法: // Broadcast+map的join操作,不會導致shuffle操作。 // 使用Broadcast將一個數據量較小的RDD作為廣播變量。 val rdd2Data = rdd2.collect() val rdd2DataBroadcast = sc.broadcast(rdd2Data) // 在rdd1.map算子中,可以從rdd2DataBroadcast中,獲取rdd2的所有數據。 // 然后進行遍歷,如果發現rdd2中某條數據的key與rdd1的當前數據的key是相同的,那么就判定可以進行join。 // 此時就可以根據自己需要的方式,將rdd1當前數據與rdd2中可以連接的數據,拼接在一起(String或Tuple)。 val rdd3 = rdd1.map(rdd2DataBroadcast...) // 注意,以上操作,建議僅僅在rdd2的數據量比較少(比如幾百M,或者一兩G)的情況下使用。 // 因為每個Executor的內存中,都會駐留一份rdd2的全量數據。
map-side預聚合
如果因為業務需要,一定要使用shuffle操作,無法用map類的算子來替代,那么盡量使用可以map-side預聚合的算子。 所謂的map-side預聚合,說的是在每個節點本地對相同的key進行一次聚合操作,類似于MapReduce中的本地combiner。 map-side預聚合之后,每個節點本地就只會有一條相同的key,因為多條相同的key都被聚合起來了。其他節點在拉取所有節點上的相同key時,就會大大減少需要拉取的數據數量,從而也就減少了磁盤IO以及網絡傳輸開銷。 通常來說,在可能的情況下,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。因為reduceByKey和aggregateByKey算子都會使用用戶自定義的函數對每個節點本地的相同key進行預聚合。 而groupByKey算子是不會進行預聚合的,全量的數據會在集群的各個節點之間分發和傳輸,性能相對來說比較差。 比如如下兩幅圖,就是典型的例子,分別基于reduceByKey和groupByKey進行單詞計數。其中第一張圖是groupByKey的原理圖,可以看到,沒有進行任何本地聚合時,所有數據都會在集群節點之間傳輸;第二張圖是reduceByKey的原理圖,可以看到,每個節點本地的相同key數據,都進行了預聚合,然后才傳輸到其他節點上進行全局聚合。
==groupByKey進行單詞計數原理==
==reduceByKey單詞計數原理==
reduceByKey/aggregateByKey 可以進行預聚合操作,減少數據的傳輸量,提升性能
groupByKey 不會進行預聚合操作,進行數據的全量拉取,性能比較低
mapPartitions類的算子,一次函數調用會處理一個partition所有的數據,而不是一次函數調用處理一條,性能相對來說會高一些。 但是有的時候,使用mapPartitions會出現OOM(內存溢出)的問題。因為單次函數調用就要處理掉一個partition所有的數據,如果內存不夠,垃圾回收時是無法回收掉太多對象的,很可能出現OOM異常。所以使用這類操作時要慎重!
原理類似于“使用mapPartitions替代map”,也是一次函數調用處理一個partition的所有數據,而不是一次函數調用處理一條數據。 在實踐中發現,foreachPartitions類的算子,對性能的提升還是很有幫助的。比如在foreach函數中,將RDD中所有數據寫MySQL,那么如果是普通的foreach算子,就會一條數據一條數據地寫,每次函數調用可能就會創建一個數據庫連接,此時就勢必會頻繁地創建和銷毀數據庫連接,性能是非常低下; 但是如果用foreachPartitions算子一次性處理一個partition的數據,那么對于每個partition,只要創建一個數據庫連接即可,然后執行批量插入操作,此時性能是比較高的。實踐中發現,對于1萬條左右的數據量寫MySQL,性能可以提升30%以上。
通常對一個RDD執行filter算子過濾掉RDD中較多數據后(比如30%以上的數據),建議使用coalesce算子,手動減少RDD的partition數量,將RDD中的數據壓縮到更少的partition中去。 因為filter之后,RDD的每個partition中都會有很多數據被過濾掉,此時如果照常進行后續的計算,其實每個task處理的partition中的數據量并不是很多,有一點資源浪費,而且此時處理的task越多,可能速度反而越慢。 因此用coalesce減少partition數量,將RDD中的數據壓縮到更少的partition之后,只要使用更少的task即可處理完所有的partition。在某些場景下,對于性能的提升會有一定的幫助。
repartitionAndSortWithinPartitions是Spark官網推薦的一個算子,官方建議,如果需要在repartition重分區之后,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子。 因為該算子可以一邊進行重分區的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,性能可能是要高的。
Spark在進行任務計算的時候,會涉及到數據跨進程的網絡傳輸、數據的持久化,這個時候就需要對數據進行序列化。Spark默認采用Java的序列化器。默認java序列化的優缺點如下: 其好處: 處理起來方便,不需要我們手動做其他操作,只是在使用一個對象和變量的時候,需要實現Serializble接口。 其缺點: 默認的序列化機制的效率不高,序列化的速度比較慢;序列化以后的數據,占用的內存空間相對還是比較大。 Spark支持使用Kryo序列化機制。Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化后的數據要更小,大概是Java序列化機制的1/10。所以Kryo序列化優化以后,可以讓網絡傳輸的數據變少;在集群中耗費的內存資源大大減少。
Kryo序列化機制,一旦啟用以后,會生效的幾個地方: (1)算子函數中使用到的外部變量 算子中的外部變量可能來著與driver需要涉及到網絡傳輸,就需要用到序列化。 最終可以優化網絡傳輸的性能,優化集群中內存的占用和消耗 (2)持久化RDD時進行序列化,StorageLevel.MEMORY_ONLY_SER 將rdd持久化時,對應的存儲級別里,需要用到序列化。 最終可以優化內存的占用和消耗;持久化RDD占用的內存越少,task執行的時候,創建的對象,就不至于頻繁的占滿內存,頻繁發生GC。 (3) 產生shuffle的地方,也就是寬依賴 下游的stage中的task,拉取上游stage中的task產生的結果數據,跨網絡傳輸,需要用到序列化。最終可以優化網絡傳輸的性能
// 創建SparkConf對象。 val conf = new SparkConf().setMaster(...).setAppName(...) // 設置序列化器為KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注冊要序列化的自定義類型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
fastutil是擴展了Java標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue; fastutil能夠提供更小的內存占用,更快的存取速度;我們使用fastutil提供的集合類,來替代自己平時使用的JDK的原生的Map、List、Set.
fastutil集合類,可以減小內存的占用,并且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設置元素的值的時候,提供更快的存取速度
(1)你可以使用Broadcast廣播變量優化; (2)可以使用Kryo序列化類庫,提升序列化性能和效率; (3)如果外部變量是某種比較大的集合,那么可以考慮使用fastutil改寫外部變量; 首先從源頭上就減少內存的占用(fastutil),通過廣播變量進一步減少內存占用,再通過Kryo序列化類庫進一步減少內存占用。
在你的算子函數里,也就是task要執行的計算邏輯里面,如果有邏輯中,出現,要創建比較大的Map、List等集合, 可能會占用較大的內存空間,而且可能涉及到消耗性能的遍歷、存取等集合操作; 那么此時,可以考慮將這些集合類型使用fastutil類庫重寫, 使用了fastutil集合類以后,就可以在一定程度上,減少task創建出來的集合類型的內存占用。 避免executor內存頻繁占滿,頻繁喚起GC,導致性能下降。
第一步:在pom.xml中引用fastutil的包 <dependency> <groupId>fastutil</groupId> <artifactId>fastutil</artifactId> <version>5.0.9</version> </dependency> 第二步:平時使用List (Integer)的替換成IntList即可。 List<Integer>的list對應的到fastutil就是IntList類型 使用說明: 基本都是類似于IntList的格式,前綴就是集合的元素類型; 特殊的就是Map,Int2IntMap,代表了key-value映射的元素類型。
Spark在Driver上對Application的每一個stage的task進行分配之前,都會計算出每個task要計算的是哪個分片數據,RDD的某個partition;Spark的task分配算法,優先會希望每個task正好分配到它要計算的數據所在的節點,這樣的話就不用在網絡間傳輸數據; 但是通常來說,有時事與愿違,可能task沒有機會分配到它的數據所在的節點,為什么呢,可能那個節點的計算資源和計算能力都滿了;所以這種時候,通常來說,Spark會等待一段時間,默認情況下是3秒(不是絕對的,還有很多種情況,對不同的本地化級別,都會去等待),到最后實在是等待不了了,就會選擇一個比較差的本地化級別,比如說將task分配到距離要計算的數據所在節點比較近的一個節點,然后進行計算。
(1)PROCESS_LOCAL:進程本地化 代碼和數據在同一個進程中,也就是在同一個executor中;計算數據的task由executor執行,數據在executor的BlockManager中;性能最好 (2)NODE_LOCAL:節點本地化 代碼和數據在同一個節點中;比如說數據作為一個HDFS block塊,就在節點上,而task在節點上某個executor中運行;或者是數據和task在一個節點上的不同executor中;數據需要在進程間進行傳輸;性能其次 (3)RACK_LOCAL:機架本地化 數據和task在一個機架的兩個節點上;數據需要通過網絡在節點之間進行傳輸; 性能比較差 (4) ANY:無限制 數據和task可能在集群中的任何地方,而且不在一個機架中;性能最差
spark.locality.wait,默認是3s 首先采用最佳的方式,等待3s后降級,還是不行,繼續降級...,最后還是不行,只能夠采用最差的。
修改spark.locality.wait參數,默認是3s,可以增加 下面是每個數據本地化級別的等待時間,默認都是跟spark.locality.wait時間相同, 默認都是3s(可查看spark官網對應參數說明,如下圖所示) spark.locality.wait.node spark.locality.wait.process spark.locality.wait.rack
在代碼中設置: new SparkConf().set("spark.locality.wait","10") 然后把程序提交到spark集群中運行,注意觀察日志,spark作業的運行日志,推薦大家在測試的時候,先用client模式,在本地就直接可以看到比較全的日志。 日志里面會顯示,starting task .... PROCESS LOCAL、NODE LOCAL..... 例如: Starting task 0.0 in stage 1.0 (TID 2, 192.168.200.102, partition 0, NODE_LOCAL, 5254 bytes) 觀察大部分task的數據本地化級別 如果大多都是PROCESS_LOCAL,那就不用調節了。如果是發現,好多的級別都是NODE_LOCAL、ANY,那么最好就去調節一下數據本地化的等待時長。應該是要反復調節,每次調節完以后,再來運行,觀察日志 看看大部分的task的本地化級別有沒有提升;看看整個spark作業的運行時間有沒有縮短。 注意注意: 在調節參數、運行任務的時候,別本末倒置,本地化級別倒是提升了, 但是因為大量的等待時長,spark作業的運行時間反而增加了,那就還是不要調節了。
Executor的內存主要分為三塊
第一塊是讓task執行我們自己編寫的代碼時使用;
第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進行聚合等操作時使用
第三塊是讓RDD緩存時使用
在spark1.6版本以前 spark的executor使用的靜態內存模型,但是在spark1.6開始,多增加了一個統一內存模型。 通過spark.memory.useLegacyMode 這個參數去配置 默認這個值是false,代表用的是新的動態內存模型; 如果想用以前的靜態內存模型,那么就要把這個值改為true。
實際上就是把我們的一個executor分成了三部分, 一部分是Storage內存區域, 一部分是execution區域, 還有一部分是其他區域。如果使用的靜態內存模型,那么用這幾個參數去控制: spark.storage.memoryFraction:默認0.6 spark.shuffle.memoryFraction:默認0.2 所以第三部分就是0.2 如果我們cache數據量比較大,或者是我們的廣播變量比較大, 那我們就把spark.storage.memoryFraction這個值調大一點。 但是如果我們代碼里面沒有廣播變量,也沒有cache,shuffle又比較多,那我們要把spark.shuffle.memoryFraction 這值調大。
靜態內存模型的缺點
我們配置好了Storage內存區域和execution區域后,我們的一個任務假設execution內存不夠用了,但是它的Storage內存區域是空閑的,兩個之間不能互相借用,不夠靈活,所以才出來我們新的統一內存模型。
動態內存模型先是預留了300m內存,防止內存溢出。動態內存模型把整體內存分成了兩部分, 由這個參數表示spark.memory.fraction 這個指的默認值是0.6 代表另外的一部分是0.4, 然后spark.memory.fraction 這部分又劃分成為兩個小部分。這兩小部分共占整體內存的0.6 .這兩部分其實就是:Storage內存和execution內存。由spark.memory.storageFraction 這個參數去調配,因為兩個共占0.6。如果spark.memory.storageFraction這個值配的是0.5,那說明這0.6里面 storage占了0.5,也就是executor占了0.3 。
統一內存模型有什么特點呢?
Storage內存和execution內存 可以相互借用。不用像靜態內存模型那樣死板,但是是有規則的
為什么受傷的都是storage呢? 是因為execution里面的數據是馬上就要用的,而storage里的數據不一定馬上就要用。
以下是一份spark-submit命令的示例,大家可以參考一下,并根據自己的實際情況進行調節
bin/spark-submit \ --master yarn-cluster \ --num-executors 100 \ --executor-memory 6G \ --executor-cores 4 \ --driver-memory 1G \ --conf spark.default.parallelism=1000 \ --conf spark.storage.memoryFraction=0.5 \ --conf spark.shuffle.memoryFraction=0.3 \
java.lang.OutOfMemoryError ExecutorLostFailure Executor exit code 為143 executor lost hearbeat time out shuffle file lost 如果遇到以上問題,很有可能就是內存除了問題,可以先嘗試增加內存。如果還是解決不了,那么請聽下一次數據傾斜調優的課。
關于大數據開發中Spark調優常用手段是什么問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。