中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

大數據開發中Spark調優常用手段是什么

發布時間:2021-12-17 09:11:47 來源:億速云 閱讀:141 作者:柒染 欄目:大數據

大數據開發中Spark調優常用手段是什么,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

Spark調優

spark調優常見手段,在生產中常常會遇到各種各樣的問題,有事前原因,有事中原因,也有不規范原因,spark調優總結下來可以從下面幾個點來調優。

1. 分配更多的資源

分配更多的資源:
  它是性能優化調優的王道,就是增加和分配更多的資源,這對于性能和速度上的提升是顯而易見的,
  基本上,在一定范圍之內,增加資源與性能的提升,是成正比的;寫完了一個復雜的spark作業之后,進行性能調優的時候,首先第一步,就是要來調節最優的資源配置;
  在這個基礎之上,如果說你的spark作業,能夠分配的資源達到了你的能力范圍的頂端之后,無法再分配更多的資源了,公司資源有限;那么才是考慮去做后面的這些性能調優的點。

相關問題:
(1)分配哪些資源?
(2)在哪里可以設置這些資源?
(3)剖析為什么分配這些資源之后,性能可以得到提升?

1.1 分配哪些資源

executor-memory、executor-cores、driver-memory

1.2 在哪里可以設置這些資源

在實際的生產環境中,提交spark任務時,使用spark-submit shell腳本,在里面調整對應的參數。
   
 提交任務的腳本:
 spark-submit \
 --master spark://node1:7077 \
 --class com.hoult.WordCount \
 --num-executors 3 \    配置executor的數量
 --driver-memory 1g \   配置driver的內存(影響不大)
 --executor-memory 1g \ 配置每一個executor的內存大小
 --executor-cores 3 \   配置每一個executor的cpu個數
 /export/servers/wordcount.jar

1.2 參數調節到多大,算是最大

  • ==Standalone模式==

先計算出公司spark集群上的所有資源 每臺節點的內存大小和cpu核數,
   比如:一共有20臺worker節點,每臺節點8g內存,10個cpu。
   實際任務在給定資源的時候,可以給20個executor、每個executor的內存8g、每個executor的使用的cpu個數10。
  • ==Yarn模式==

先計算出yarn集群的所有大小,比如一共500g內存,100個cpu;
   這個時候可以分配的最大資源,比如給定50個executor、每個executor的內存大小10g,每個executor使用的cpu個數為2。
  • 使用原則

在資源比較充足的情況下,盡可能的使用更多的計算資源,盡量去調節到最大的大小

1.3 為什么調大資源以后性能可以提升

--executor-memory

--total-executor-cores

2. 提高并行度

2.1 Spark的并行度指的是什么

spark作業中,各個stage的task的數量,也就代表了spark作業在各個階段stage的并行度!
    當分配完所能分配的最大資源了,然后對應資源去調節程序的并行度,如果并行度沒有與資源相匹配,那么導致你分配下去的資源都浪費掉了。同時并行運行,還可以讓每個task要處理的數量變少(很簡單的原理。合理設置并行度,可以充分利用集群資源,減少每個task處理數據量,而增加性能加快運行速度。)

2.2 如何提高并行度

2.2.1 可以設置task的數量

至少設置成與spark Application 的總cpu core 數量相同。
最理想情況,150個core,分配150task,一起運行,差不多同一時間運行完畢
官方推薦,task數量,設置成spark Application 總cpu core數量的2~3倍 。
  
比如150個cpu core ,基本設置task數量為300~500. 與理想情況不同的,有些task會運行快一點,比如50s就完了,有些task 可能會慢一點,要一分半才運行完,所以如果你的task數量,剛好設置的跟cpu core 數量相同,可能會導致資源的浪費。
  因為比如150個task中10個先運行完了,剩余140個還在運行,但是這個時候,就有10個cpu core空閑出來了,導致浪費。如果設置2~3倍,那么一個task運行完以后,另外一個task馬上補上來,盡量讓cpu core不要空閑。同時盡量提升spark運行效率和速度。提升性能。

2.2.2 如何設置task數量來提高并行度

設置參數spark.default.parallelism
   默認是沒有值的,如果設置了值為10,它會在shuffle的過程才會起作用。
   比如 val rdd2 = rdd1.reduceByKey(_+_) 
   此時rdd2的分區數就是10
   
可以通過在構建SparkConf對象的時候設置,例如:
   new SparkConf().set("spark.defalut.parallelism","500")

2.2.3 給RDD重新設置partition的數量

使用rdd.repartition 來重新分區,該方法會生成一個新的rdd,使其分區數變大。
此時由于一個partition對應一個task,那么對應的task個數越多,通過這種方式也可以提高并行度。

2.2.4 提高sparksql運行的task數量

http://spark.apache.org/docs/2.3.3/sql-programming-guide.html

通過設置參數 spark.sql.shuffle.partitions=500  默認為200;
可以適當增大,來提高并行度。 比如設置為 spark.sql.shuffle.partitions=500

專門針對sparkSQL來設置的

3. RDD的重用和持久化

3.1 實際開發遇到的情況說明

如上圖所示的計算邏輯:
(1)當第一次使用rdd2做相應的算子操作得到rdd3的時候,就會從rdd1開始計算,先讀取HDFS上的文件,然后對rdd1做對應的算子操作得到rdd2,再由rdd2計算之后得到rdd3。同樣為了計算得到rdd4,前面的邏輯會被重新計算。

(3)默認情況下多次對一個rdd執行算子操作,去獲取不同的rdd,都會對這個rdd及之前的父rdd全部重新計算一次。
這種情況在實際開發代碼的時候會經常遇到,但是我們一定要避免一個rdd重復計算多次,否則會導致性能急劇降低。

總結:可以把多次使用到的rdd,也就是公共rdd進行持久化,避免后續需要,再次重新計算,提升效率。

3.2 如何對rdd進行持久化

  • 可以調用rdd的cache或者persist方法。

(1)cache方法默認是把數據持久化到內存中 ,例如:rdd.cache ,其本質還是調用了persist方法
(2)persist方法中有豐富的緩存級別,這些緩存級別都定義在StorageLevel這個object中,可以結合實際的應用場景合理的設置緩存級別。例如: rdd.persist(StorageLevel.MEMORY_ONLY),這是cache方法的實現。

3.3 rdd持久化的時可以采用序列化

(1)如果正常將數據持久化在內存中,那么可能會導致內存的占用過大,這樣的話,也許會導致OOM內存溢出。
(2)當純內存無法支撐公共RDD數據完全存放的時候,就優先考慮使用序列化的方式在純內存中存儲。將RDD的每個partition的數據,序列化成一個字節數組;序列化后,大大減少內存的空間占用。
(3)序列化的方式,唯一的缺點就是,在獲取數據的時候,需要反序列化。但是可以減少占用的空間和便于網絡傳輸
(4)如果序列化純內存方式,還是導致OOM,內存溢出;就只能考慮磁盤的方式,內存+磁盤的普通方式(無序列化)。
(5)為了數據的高可靠性,而且內存充足,可以使用雙副本機制,進行持久化
  持久化的雙副本機制,持久化后的一個副本,因為機器宕機了,副本丟了,就還是得重新計算一次;
  持久化的每個數據單元,存儲一份副本,放在其他節點上面,從而進行容錯;
  一個副本丟了,不用重新計算,還可以使用另外一份副本。這種方式,僅僅針對你的內存資源極度充足。
   比如: StorageLevel.MEMORY_ONLY_2

4. 廣播變量的使用

4.1 場景描述

在實際工作中可能會遇到這樣的情況,由于要處理的數據量非常大,這個時候可能會在一個stage中出現大量的task,比如有1000個task,這些task都需要一份相同的數據來處理業務,這份數據的大小為100M,該數據會拷貝1000份副本,通過網絡傳輸到各個task中去,給task使用。這里會涉及大量的網絡傳輸開銷,同時至少需要的內存為1000*100M=100G,這個內存開銷是非常大的。不必要的內存的消耗和占用,就導致了你在進行RDD持久化到內存,也許就沒法完全在內存中放下;就只能寫入磁盤,最后導致后續的操作在磁盤IO上消耗性能;這對于spark任務處理來說就是一場災難。

    由于內存開銷比較大,task在創建對象的時候,可能會出現堆內存放不下所有對象,就會導致頻繁的垃圾回收器的回收GC。GC的時候一定是會導致工作線程停止,也就是導致Spark暫停工作那么一點時間。頻繁GC的話,對Spark作業的運行的速度會有相當可觀的影響。

4.2 廣播變量引入

Spark中分布式執行的代碼需要傳遞到各個executor的task上運行。對于一些只讀、固定的數據,每次都需要Driver廣播到各個Task上,這樣效率低下。廣播變量允許將變量只廣播給各個executor。該executor上的各個task再從所在節點的BlockManager(負責管理某個executor對應的內存和磁盤上的數據)獲取變量,而不是從Driver獲取變量,從而提升了效率。
廣播變量,初始的時候,就在Drvier上有一份副本。通過在Driver把共享數據轉換成廣播變量。

  task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的BlockManager中,嘗試獲取變量副本;如果本地沒有,那么就從Driver遠程拉取廣播變量副本,并保存在本地的BlockManager中;
  
  此后這個executor上的task,都會直接使用本地的BlockManager中的副本。那么這個時候所有該executor中的task都會使用這個廣播變量的副本。也就是說一個executor只需要在第一個task啟動時,獲得一份廣播變量數據,之后的task都從本節點的BlockManager中獲取相關數據。

  executor的BlockManager除了從driver上拉取,也可能從其他節點的BlockManager上拉取變量副本,網絡距離越近越好。

4.3 使用廣播變量后的性能分析

比如一個任務需要50個executor,1000個task,共享數據為100M。
(1)在不使用廣播變量的情況下,1000個task,就需要該共享數據的1000個副本,也就是說有1000份數需要大量的網絡傳輸和內存開銷存儲。耗費的內存大小1000*100=100G.

(2)使用了廣播變量后,50個executor就只需要50個副本數據,而且不一定都是從Driver傳輸到每個節點,還可能是就近從最近的節點的executor的blockmanager上拉取廣播變量副本,網絡傳輸速度大大增加;內存開銷 50*100M=5G

總結:
  不使用廣播變量的內存開銷為100G,使用后的內存開銷5G,這里就相差了20倍左右的網絡傳輸性能損耗和內存開銷,使用廣播變量后對于性能的提升和影響,還是很可觀的。
  
  廣播變量的使用不一定會對性能產生決定性的作用。比如運行30分鐘的spark作業,可能做了廣播變量以后,速度快了2分鐘,或者5分鐘。但是一點一滴的調優,積少成多。最后還是會有效果的。

4.4 廣播變量使用注意事項

(1)能不能將一個RDD使用廣播變量廣播出去?

       不能,因為RDD是不存儲數據的。可以將RDD的結果廣播出去。

(2)廣播變量只能在Driver端定義,不能在Executor端定義。

(3)在Driver端可以修改廣播變量的值,在Executor端無法修改廣播變量的值。

(4)如果executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。

(5)如果Executor端用到了Driver的變量,如果使用廣播變量在每個Executor中只有一份Driver端的變量副本。

4.5 如何使用廣播變量

  • 例如

(1) 通過sparkContext的broadcast方法把數據轉換成廣播變量,類型為Broadcast,
  val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5,6))
  
(2) 然后executor上的BlockManager就可以拉取該廣播變量的副本獲取具體的數據。
    獲取廣播變量中的值可以通過調用其value方法
   val array: Array[Int] = broadcastArray.value

5. 盡量避免使用shuffle類算子

5.1 shuffle描述

spark中的shuffle涉及到數據要進行大量的網絡傳輸,下游階段的task任務需要通過網絡拉取上階段task的輸出數據,shuffle過程,簡單來說,就是將分布在集群中多個節點上的同一個key,拉取到同一個節點上,進行聚合或join等操作。比如reduceByKey、join等算子,都會觸發shuffle操作。
  
  如果有可能的話,要盡量避免使用shuffle類算子。
  因為Spark作業運行過程中,最消耗性能的地方就是shuffle過程。

5.2 哪些算子操作會產生shuffle

spark程序在開發的過程中使用reduceByKey、join、distinct、repartition等算子操作,這里都會產生shuffle,由于shuffle這一塊是非常耗費性能的,實際開發中盡量使用map類的非shuffle算子。這樣的話,沒有shuffle操作或者僅有較少shuffle操作的Spark作業,可以大大減少性能開銷。

5.3 如何避免產生shuffle

  • 小案例

//錯誤的做法:
// 傳統的join操作會導致shuffle操作。
// 因為兩個RDD中,相同的key都需要通過網絡拉取到一個節點上,由一個task進行join操作。
val rdd3 = rdd1.join(rdd2)
    
//正確的做法:
// Broadcast+map的join操作,不會導致shuffle操作。
// 使用Broadcast將一個數據量較小的RDD作為廣播變量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)

// 在rdd1.map算子中,可以從rdd2DataBroadcast中,獲取rdd2的所有數據。
// 然后進行遍歷,如果發現rdd2中某條數據的key與rdd1的當前數據的key是相同的,那么就判定可以進行join。
// 此時就可以根據自己需要的方式,將rdd1當前數據與rdd2中可以連接的數據,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)

// 注意,以上操作,建議僅僅在rdd2的數據量比較少(比如幾百M,或者一兩G)的情況下使用。
// 因為每個Executor的內存中,都會駐留一份rdd2的全量數據。

5.4 使用map-side預聚合的shuffle操作

  • map-side預聚合

如果因為業務需要,一定要使用shuffle操作,無法用map類的算子來替代,那么盡量使用可以map-side預聚合的算子。

  所謂的map-side預聚合,說的是在每個節點本地對相同的key進行一次聚合操作,類似于MapReduce中的本地combiner。
  map-side預聚合之后,每個節點本地就只會有一條相同的key,因為多條相同的key都被聚合起來了。其他節點在拉取所有節點上的相同key時,就會大大減少需要拉取的數據數量,從而也就減少了磁盤IO以及網絡傳輸開銷。
  通常來說,在可能的情況下,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。因為reduceByKey和aggregateByKey算子都會使用用戶自定義的函數對每個節點本地的相同key進行預聚合。
  而groupByKey算子是不會進行預聚合的,全量的數據會在集群的各個節點之間分發和傳輸,性能相對來說比較差。
  
  比如如下兩幅圖,就是典型的例子,分別基于reduceByKey和groupByKey進行單詞計數。其中第一張圖是groupByKey的原理圖,可以看到,沒有進行任何本地聚合時,所有數據都會在集群節點之間傳輸;第二張圖是reduceByKey的原理圖,可以看到,每個節點本地的相同key數據,都進行了預聚合,然后才傳輸到其他節點上進行全局聚合。
  • ==groupByKey進行單詞計數原理==

  • ==reduceByKey單詞計數原理==

6. 使用高性能的算子

6.1 使用reduceByKey/aggregateByKey替代groupByKey

  • reduceByKey/aggregateByKey 可以進行預聚合操作,減少數據的傳輸量,提升性能

  • groupByKey 不會進行預聚合操作,進行數據的全量拉取,性能比較低

6.2 使用mapPartitions替代普通map

mapPartitions類的算子,一次函數調用會處理一個partition所有的數據,而不是一次函數調用處理一條,性能相對來說會高一些。
  但是有的時候,使用mapPartitions會出現OOM(內存溢出)的問題。因為單次函數調用就要處理掉一個partition所有的數據,如果內存不夠,垃圾回收時是無法回收掉太多對象的,很可能出現OOM異常。所以使用這類操作時要慎重!

6.3 使用foreachPartition替代foreach

原理類似于“使用mapPartitions替代map”,也是一次函數調用處理一個partition的所有數據,而不是一次函數調用處理一條數據。
  在實踐中發現,foreachPartitions類的算子,對性能的提升還是很有幫助的。比如在foreach函數中,將RDD中所有數據寫MySQL,那么如果是普通的foreach算子,就會一條數據一條數據地寫,每次函數調用可能就會創建一個數據庫連接,此時就勢必會頻繁地創建和銷毀數據庫連接,性能是非常低下;  但是如果用foreachPartitions算子一次性處理一個partition的數據,那么對于每個partition,只要創建一個數據庫連接即可,然后執行批量插入操作,此時性能是比較高的。實踐中發現,對于1萬條左右的數據量寫MySQL,性能可以提升30%以上。

6.4 使用filter之后進行coalesce操作

通常對一個RDD執行filter算子過濾掉RDD中較多數據后(比如30%以上的數據),建議使用coalesce算子,手動減少RDD的partition數量,將RDD中的數據壓縮到更少的partition中去。
  因為filter之后,RDD的每個partition中都會有很多數據被過濾掉,此時如果照常進行后續的計算,其實每個task處理的partition中的數據量并不是很多,有一點資源浪費,而且此時處理的task越多,可能速度反而越慢。
  因此用coalesce減少partition數量,將RDD中的數據壓縮到更少的partition之后,只要使用更少的task即可處理完所有的partition。在某些場景下,對于性能的提升會有一定的幫助。

6.5 使用repartitionAndSortWithinPartitions替代repartition與sort類操作

repartitionAndSortWithinPartitions是Spark官網推薦的一個算子,官方建議,如果需要在repartition重分區之后,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子。
  因為該算子可以一邊進行重分區的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,性能可能是要高的。

7. 使用Kryo優化序列化性能

7.1 spark序列化介紹

Spark在進行任務計算的時候,會涉及到數據跨進程的網絡傳輸、數據的持久化,這個時候就需要對數據進行序列化。Spark默認采用Java的序列化器。默認java序列化的優缺點如下:
其好處:
  處理起來方便,不需要我們手動做其他操作,只是在使用一個對象和變量的時候,需要實現Serializble接口。
其缺點:
  默認的序列化機制的效率不高,序列化的速度比較慢;序列化以后的數據,占用的內存空間相對還是比較大。

Spark支持使用Kryo序列化機制。Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化后的數據要更小,大概是Java序列化機制的1/10。所以Kryo序列化優化以后,可以讓網絡傳輸的數據變少;在集群中耗費的內存資源大大減少。

7.2 Kryo序列化啟用后生效的地方

Kryo序列化機制,一旦啟用以后,會生效的幾個地方:
(1)算子函數中使用到的外部變量
  算子中的外部變量可能來著與driver需要涉及到網絡傳輸,就需要用到序列化。
      最終可以優化網絡傳輸的性能,優化集群中內存的占用和消耗
    
(2)持久化RDD時進行序列化,StorageLevel.MEMORY_ONLY_SER
  將rdd持久化時,對應的存儲級別里,需要用到序列化。
      最終可以優化內存的占用和消耗;持久化RDD占用的內存越少,task執行的時候,創建的對象,就不至于頻繁的占滿內存,頻繁發生GC。
    
(3)  產生shuffle的地方,也就是寬依賴
  下游的stage中的task,拉取上游stage中的task產生的結果數據,跨網絡傳輸,需要用到序列化。最終可以優化網絡傳輸的性能

7.3 如何開啟Kryo序列化機制

// 創建SparkConf對象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設置序列化器為KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

// 注冊要序列化的自定義類型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

8. 使用fastutil優化數據格式

8.1 fastutil介紹

fastutil是擴展了Java標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue;

fastutil能夠提供更小的內存占用,更快的存取速度;我們使用fastutil提供的集合類,來替代自己平時使用的JDK的原生的Map、List、Set.

8.2 fastutil好處

fastutil集合類,可以減小內存的占用,并且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設置元素的值的時候,提供更快的存取速度

8.3 Spark中應用fastutil的場景和使用

8.3.1 算子函數使用了外部變量

(1)你可以使用Broadcast廣播變量優化;

(2)可以使用Kryo序列化類庫,提升序列化性能和效率;

(3)如果外部變量是某種比較大的集合,那么可以考慮使用fastutil改寫外部變量;

首先從源頭上就減少內存的占用(fastutil),通過廣播變量進一步減少內存占用,再通過Kryo序列化類庫進一步減少內存占用。

8.3.2 算子函數里使用了比較大的集合Map/List

在你的算子函數里,也就是task要執行的計算邏輯里面,如果有邏輯中,出現,要創建比較大的Map、List等集合,
可能會占用較大的內存空間,而且可能涉及到消耗性能的遍歷、存取等集合操作; 
那么此時,可以考慮將這些集合類型使用fastutil類庫重寫,

使用了fastutil集合類以后,就可以在一定程度上,減少task創建出來的集合類型的內存占用。 
避免executor內存頻繁占滿,頻繁喚起GC,導致性能下降。

8.3.3 fastutil的使用

第一步:在pom.xml中引用fastutil的包
    <dependency>
      <groupId>fastutil</groupId>
      <artifactId>fastutil</artifactId>
      <version>5.0.9</version>
    </dependency>
    
第二步:平時使用List (Integer)的替換成IntList即可。 
  List<Integer>的list對應的到fastutil就是IntList類型
  
  
使用說明:
基本都是類似于IntList的格式,前綴就是集合的元素類型; 
特殊的就是Map,Int2IntMap,代表了key-value映射的元素類型。

9. 調節數據本地化等待時長

Spark在Driver上對Application的每一個stage的task進行分配之前,都會計算出每個task要計算的是哪個分片數據,RDD的某個partition;Spark的task分配算法,優先會希望每個task正好分配到它要計算的數據所在的節點,這樣的話就不用在網絡間傳輸數據;

  但是通常來說,有時事與愿違,可能task沒有機會分配到它的數據所在的節點,為什么呢,可能那個節點的計算資源和計算能力都滿了;所以這種時候,通常來說,Spark會等待一段時間,默認情況下是3秒(不是絕對的,還有很多種情況,對不同的本地化級別,都會去等待),到最后實在是等待不了了,就會選擇一個比較差的本地化級別,比如說將task分配到距離要計算的數據所在節點比較近的一個節點,然后進行計算。

9.1 本地化級別

(1)PROCESS_LOCAL:進程本地化
  代碼和數據在同一個進程中,也就是在同一個executor中;計算數據的task由executor執行,數據在executor的BlockManager中;性能最好
(2)NODE_LOCAL:節點本地化
  代碼和數據在同一個節點中;比如說數據作為一個HDFS block塊,就在節點上,而task在節點上某個executor中運行;或者是數據和task在一個節點上的不同executor中;數據需要在進程間進行傳輸;性能其次
(3)RACK_LOCAL:機架本地化  
  數據和task在一個機架的兩個節點上;數據需要通過網絡在節點之間進行傳輸; 性能比較差
(4)  ANY:無限制
  數據和task可能在集群中的任何地方,而且不在一個機架中;性能最差

9.2 數據本地化等待時長

spark.locality.wait,默認是3s
首先采用最佳的方式,等待3s后降級,還是不行,繼續降級...,最后還是不行,只能夠采用最差的。

9.3 如何調節參數并且測試

修改spark.locality.wait參數,默認是3s,可以增加

下面是每個數據本地化級別的等待時間,默認都是跟spark.locality.wait時間相同,
默認都是3s(可查看spark官網對應參數說明,如下圖所示)
spark.locality.wait.node
spark.locality.wait.process
spark.locality.wait.rack
在代碼中設置:
new SparkConf().set("spark.locality.wait","10")

然后把程序提交到spark集群中運行,注意觀察日志,spark作業的運行日志,推薦大家在測試的時候,先用client模式,在本地就直接可以看到比較全的日志。 
日志里面會顯示,starting task .... PROCESS LOCAL、NODE LOCAL.....
例如:
Starting task 0.0 in stage 1.0 (TID 2, 192.168.200.102, partition 0, NODE_LOCAL, 5254 bytes)

觀察大部分task的數據本地化級別 
如果大多都是PROCESS_LOCAL,那就不用調節了。如果是發現,好多的級別都是NODE_LOCAL、ANY,那么最好就去調節一下數據本地化的等待時長。應該是要反復調節,每次調節完以后,再來運行,觀察日志 
看看大部分的task的本地化級別有沒有提升;看看整個spark作業的運行時間有沒有縮短。

注意注意:
在調節參數、運行任務的時候,別本末倒置,本地化級別倒是提升了, 但是因為大量的等待時長,spark作業的運行時間反而增加了,那就還是不要調節了。

10. 基于Spark內存模型調優

10.1 spark中executor內存劃分

  • Executor的內存主要分為三塊

    • 第一塊是讓task執行我們自己編寫的代碼時使用;

    • 第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進行聚合等操作時使用

    • 第三塊是讓RDD緩存時使用

10.2 spark的內存模型

在spark1.6版本以前 spark的executor使用的靜態內存模型,但是在spark1.6開始,多增加了一個統一內存模型。
  通過spark.memory.useLegacyMode 這個參數去配置
      默認這個值是false,代表用的是新的動態內存模型;
      如果想用以前的靜態內存模型,那么就要把這個值改為true。

10.2.1 靜態內存模型

實際上就是把我們的一個executor分成了三部分,
  一部分是Storage內存區域,
  一部分是execution區域,
  還有一部分是其他區域。如果使用的靜態內存模型,那么用這幾個參數去控制:
  
spark.storage.memoryFraction:默認0.6
spark.shuffle.memoryFraction:默認0.2  
所以第三部分就是0.2

如果我們cache數據量比較大,或者是我們的廣播變量比較大,
  那我們就把spark.storage.memoryFraction這個值調大一點。
  但是如果我們代碼里面沒有廣播變量,也沒有cache,shuffle又比較多,那我們要把spark.shuffle.memoryFraction 這值調大。
  • 靜態內存模型的缺點

我們配置好了Storage內存區域和execution區域后,我們的一個任務假設execution內存不夠用了,但是它的Storage內存區域是空閑的,兩個之間不能互相借用,不夠靈活,所以才出來我們新的統一內存模型。

10.2.2 統一內存模型

動態內存模型先是預留了300m內存,防止內存溢出。動態內存模型把整體內存分成了兩部分,
由這個參數表示spark.memory.fraction 這個指的默認值是0.6 代表另外的一部分是0.4,

然后spark.memory.fraction 這部分又劃分成為兩個小部分。這兩小部分共占整體內存的0.6 .這兩部分其實就是:Storage內存和execution內存。由spark.memory.storageFraction 這個參數去調配,因為兩個共占0.6。如果spark.memory.storageFraction這個值配的是0.5,那說明這0.6里面 storage占了0.5,也就是executor占了0.3 。
  • 統一內存模型有什么特點呢?

Storage內存和execution內存 可以相互借用。不用像靜態內存模型那樣死板,但是是有規則的
為什么受傷的都是storage呢?

是因為execution里面的數據是馬上就要用的,而storage里的數據不一定馬上就要用。

10.2.3 任務提交腳本參考

  • 以下是一份spark-submit命令的示例,大家可以參考一下,并根據自己的實際情況進行調節

bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

10.2.4 個人經驗

java.lang.OutOfMemoryError
ExecutorLostFailure
Executor exit code 為143
executor lost
hearbeat time out
shuffle file lost

如果遇到以上問題,很有可能就是內存除了問題,可以先嘗試增加內存。如果還是解決不了,那么請聽下一次數據傾斜調優的課。

關于大數據開發中Spark調優常用手段是什么問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

阿城市| 叙永县| 大丰市| 上饶县| 金沙县| 汉源县| 琼中| 聊城市| 体育| 柳江县| 盐池县| 莒南县| 民乐县| 清苑县| 东阳市| 临武县| 东莞市| 盈江县| 华坪县| 岳普湖县| 九龙城区| 江达县| 稻城县| 上思县| 内乡县| 青冈县| 综艺| 锦屏县| 民权县| 广丰县| 丰镇市| 乌苏市| 莱阳市| 巴林左旗| 麟游县| 全南县| 特克斯县| 临湘市| 云林县| 枞阳县| 新密市|