中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行Spark Shuffle 原理分析

發布時間:2021-12-03 19:10:51 來源:億速云 閱讀:168 作者:柒染 欄目:大數據

這期內容當中小編將會給大家帶來有關如何進行Spark Shuffle 原理分析,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

Spark Shuffle原理分析

 Shuffle就是對數據進行重組,由于分布式計算的特性和要求,在實現細節上更加繁瑣和復雜。
 在MapReduce框架,Shuffle是連接Map和Reduce之間的橋梁,Map階段通過shuffle讀取數據并輸出到對應的Reduce;而Reduce階段負責從Map端拉取數據并進行計算。在整個shuffle過程中,往往伴隨著大量的磁盤和網絡I/O。所以shuffle性能的高低也直接決定了整個程序的性能高低。Spark也會有自己的shuffle實現過程。
 在DAG調度的過程中,Stage階段的劃分是根據是否有shuffle過程,也就是存在wide Dependency寬依賴的時候,需要進行shuffle,這時候會將作業job劃分成多個Stage,每一個stage內部有很多可以并行運行的task。
 
 stage與stage之間的過程就是shuffle階段,在Spark的中,負責shuffle過程的執行、計算和處理的組件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager隨著Spark的發展有兩種實現的方式,分別為HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle兩種。

HashShuffle機制

 在Spark 1.2以前,默認的shuffle計算引擎是HashShuffleManager。
 該ShuffleManager-HashShuffleManager有著一個非常嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操作影響了性能。因此在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager。
 SortShuffleManager相較于HashShuffleManager來說,有了一定的改進。主要就在于每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁盤文件,但是最后會將所有的臨時文件合并(merge)成一個磁盤文件,因此每個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可。
  • Hash shuffle

    • 一種是普通運行機制

    • 另一種是合并的運行機制。

    • HashShuffleManager的運行機制主要分成兩種

    • 合并機制主要是通過復用buffer來優化Shuffle過程中產生的小文件的數量。

    • Hash shuffle是不具有排序的Shuffle。

普通機制的Hash shuffle

圖解
   這里我們先明確一個假設前提:每個Executor只有1個CPU core,也就是說,無論這個Executor上分配多少個task線程,同一時間都只能執行一個task線程。

    圖中有3個ReduceTask,從ShuffleMapTask 開始那邊各自把自己進行 Hash 計算(分區器:hash/numreduce取模),分類出3個不同的類別,每個 ShuffleMapTask 都分成3種類別的數據,想把不同的數據匯聚然后計算出最終的結果,所以ReduceTask 會在屬于自己類別的數據收集過來,匯聚成一個同類別的大集合,每1個 ShuffleMapTask 輸出3份本地文件,這里有4個 ShuffleMapTask,所以總共輸出了4 x 3個分類文件 = 12個本地小文件。
shuffle Write階段
 主要就是在一個stage結束計算之后,為了下一個stage可以執行shuffle類的算子(比如reduceByKey,groupByKey),而將每個task處理的數據按key進行“分區”。所謂“分區”,就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每一個磁盤文件都只屬于reduce端的stage的一個task。在將數據寫入磁盤之前,會先將數據寫入內存緩沖中,當內存緩沖填滿之后,才會溢寫到磁盤文件中去。

     那么每個執行shuffle write的task,要為下一個stage創建多少個磁盤文件呢? 很簡單,下一個stage的task有多少個,當前stage的每個task就要創建多少份磁盤文件。比如下一個stage總共有100個task,那么當前stage的每個task都要創建100份磁盤文件。如果當前stage有50個task,總共有10個Executor,每個Executor執行5個Task,那么每個Executor上總共就要創建500個磁盤文件,所有Executor上會創建5000個磁盤文件。由此可見,未經優化的shuffle write操作所產生的磁盤文件的數量是極其驚人的。
shuffle Read階段
 shuffle read,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要將上一個stage的計算結果中的所有相同key,從各個節點上通過網絡都拉取到自己所在的節點上,然后進行key的聚合或連接等操作。由于shuffle write的過程中,task給Reduce端的stage的每個task都創建了一個磁盤文件,因此shuffle read的過程中,每個task只要從上游stage的所有task所在節點上,拉取屬于自己的那一個磁盤文件即可。

  shuffle read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩沖,每次都只能拉取與buffer緩沖相同大小的數據,然后通過內存中的一個Map進行聚合等操作。聚合完一批數據后,再拉取下一批數據,并放到buffer緩沖中進行聚合操作。以此類推,直到最后將所有數據到拉取完,并得到最終的結果。
注意
(1)buffer起到的是緩存作用,緩存能夠加速寫磁盤,提高計算的效率,buffer的默認大小32k。
(2)分區器:根據hash/numRedcue取模決定數據由幾個Reduce處理,也決定了寫入幾個buffer中
(3)block file:磁盤小文件,從圖中我們可以知道磁盤小文件的個數計算公式:
                 block file=M*R
 (4) M為map task的數量,R為Reduce的數量,一般Reduce的數量等于buffer的數量,都是由分區器決定的
Hash shuffle普通機制的問題
(1).Shuffle階段在磁盤上會產生海量的小文件,建立通信和拉取數據的次數變多,此時會產生大量耗時低效的 IO 操作 (因為產生過多的小文件)

(2).可能導致OOM,大量耗時低效的 IO 操作 ,導致寫磁盤時的對象過多,讀磁盤時候的對象也過多,這些對象存儲在堆內存中,會導致堆內存不足,相應會導致頻繁的GC,GC會導致OOM。由于內存中需要保存海量文件操作句柄和臨時信息,如果數據處理的規模比較龐大的話,內存不可承受,會出現 OOM 等問題

合并機制的Hash shuffle

 合并機制就是復用buffer緩沖區,開啟合并機制的配置是spark.shuffle.consolidateFiles。該參數默認值為false,將其設置為true即可開啟優化機制。通常來說,如果我們使用HashShuffleManager,那么都建議開啟這個選項。

圖解
 這里有6個這里有6個shuffleMapTask,數據類別還是分成3種類型,因為Hash算法會根據你的 Key 進行分類,在同一個進程中,無論是有多少過Task,都會把同樣的Key放在同一個Buffer里,然后把Buffer中的數據寫入以Core數量為單位的本地文件中,(一個Core只有一種類型的Key的數據),每1個Task所在的進程中,分別寫入共同進程中的3份本地文件,這里有6個shuffleMapTasks,所以總共輸出是 2個Cores x 3個分類文件 = 6個本地小文件。
注意
(1).啟動HashShuffle的合并機制ConsolidatedShuffle的配置
   spark.shuffle.consolidateFiles=true(2).block file=Core*R
 Core為CPU的核數,R為Reduce的數量
Hash shuffle合并機制的問題
 如果 Reducer 端的并行任務或者是數據分片過多的話則 Core * Reducer Task 依舊過大,也會產生很多小文件。

Sort shuffle

  • SortShuffleManager的運行機制主要分成兩種,

    • 一種是普通運行機制

    • 另一種是bypass運行機制

Sort shuffle的普通機制

 在該模式下,數據會先寫入一個數據結構,聚合算子寫入Map,一邊通過Map局部聚合,一邊寫入內存。Join算子寫入ArrayList直接寫入內存中。然后需要判斷是否達到閾值(5M),如果達到就會將內存數據結構的數據寫入到磁盤,清空內存數據結構。

在溢寫磁盤前,先根據key進行排序,排序過后的數據,會分批寫入到磁盤文件中。默認批次為10000條,數據會以每批一萬條寫入到磁盤文件。寫入磁盤文件通過緩沖區溢寫的方式,每次溢寫都會產生一個磁盤文件,也就是說一個task過程會產生多個臨時文件
。

最后在每個task中,將所有的臨時文件合并,這就是merge過程,此過程將所有臨時文件讀取出來,一次寫入到最終文件。意味著一個task的所有數據都在這一個文件中。同時單獨寫一份索引文件,標識下游各個task的數據在文件中的索引start offset和end offset。

 這樣算來如果第一個stage 50個task,每個Executor執行一個task,那么無論下游有幾個task,就需要50*2=100個磁盤文件。
好處
1. 小文件明顯變少了,一個task只生成一個file文件
2. file文件整體有序,加上索引文件的輔助,查找變快,雖然排序浪費一些性能,但是查找變快很多

bypass模式的sortShuffle

bypass機制運行條件
  • shuffle map task數量小于spark.shuffle.sort.bypassMergeThreshold參數的值

  • 不是聚合類的shuffle算子(比如reduceByKey)

好處
    該機制與sortshuffle的普通機制相比,在shuffleMapTask不多的情況下,首先寫的機制是不同,其次不會進行排序。這樣就可以節約一部分性能開銷。
總結
在shuffleMapTask數量小于默認值200時,啟用bypass模式的sortShuffle(原因是數據量本身比較少,沒必要進行sort全排序,因為數據量少本身查詢速度就快,正好省了sort的那部分性能開銷。)

該機制與普通SortShuffleManager運行機制的不同在于:
  第一: 磁盤寫機制不同;
  第二: 不會進行sort排序;

Shuffle調優

Shuffle核心組件

碰到ShuffleDenpendency就進行stage的劃分,ShuffleMapStage: 為shuffle提供數據的中間stage,ResultStage: 為一個action操作計算結果的stage。

Shuffle原理剖析

MapOutputTracker

解決的一個問題是resut task如何知道從哪個Executor去拉取Shuffle data

ShuffleWriter

(1)HashShuffleWriter

特點:根據Hash分區,分區數是m * n 個。

val counts: RDD[(String, Int)] 
 = wordCount.reduceByKey(new HashPartitioner(2), (x, y) => x + y)

(2)SortShuffleWriter

特點:

a、文件數量為m

b、如果需要排序或者需要combine,那么每一個partition數據排序要自己實現。(SortShuffleWriter里的sort指的是對partition的分區號進行排序)

c、數據先放在內存,內存不夠則寫到磁盤中,最后再全部寫到磁盤。

(3)BypassMergeSortShuffleWriter

這種模式同時具有HashShuffleWriter和SortShuffleter的特點。因為其實HashShufflerWriter的性能不錯,但是如果task數太多的話,性能話下降,所以Spark在task數較少的時候自動使用了這種模式,一開始還是像HashShufflerWriter那種生成多個文件,但是最后會把多個文件合并成一個文件。然后下游來讀取文件。默認map的分區需要小于spark.shuffle.sort.bypassMergeThreshold(默認是200),因為如何分區數太多,產生的小文件就會很多性能就會下降。

Spark Shuffle參數調優

spark.shuffle.file.buffer

  • 默認值:32k

  • 參數說明:該參數用于設置shuffle write task的BufferedOutputStream的buffer緩沖大小。將數據寫到磁盤文件之前,會先寫入buffer緩沖中,待緩沖寫滿之后,才會溢寫到磁盤。

  • 調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如64k),從而減少shuffle write過程中溢寫磁盤文件的次數,也就可以減少磁盤IO次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。

spark.reducer.maxSizeInFlight

  • 默認值:48m

  • 參數說明:該參數用于設置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數據。

  • 調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如96m),從而減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。

spark.shuffle.io.maxRetries

  • 默認值:3

  • 參數說明:shuffle read task從shuffle write task所在節點拉取屬于自己的數據時,如果因為網絡異常導致拉取失敗,是會自動進行重試的。該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗。

  • 調優建議:對于那些包含了特別耗時的shuffle操作的作業,建議增加重試最大次數(比如60次),以避免由于JVM的full gc或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對于針對超大數據量(數十億~上百億)的shuffle過程,調節該參數可以大幅度提升穩定性。

spark.shuffle.io.retryWait

  • 默認值:5s

  • 參數說明:具體解釋同上,該參數代表了每次重試拉取數據的等待間隔,默認是5s。

  • 調優建議:建議加大間隔時長(比如60s),以增加shuffle操作的穩定性。

spark.shuffle.memoryFraction

(Spark1.6是這個參數,1.6以后參數變了,具體參考上一講Spark內存模型知識)

  • 默認值:0.2

  • 參數說明:該參數代表了Executor內存中,分配給shuffle read task進行聚合操作的內存比例,默認是20%。

  • 調優建議:在資源參數調優中講解過這個參數。如果內存充足,而且很少使用持久化操作,建議調高這個比例,給shuffle read的聚合操作更多內存,以避免由于內存不足導致聚合過程中頻繁讀寫磁盤。在實踐中發現,合理調節該參數可以將性能提升10%左右。

spark.shuffle.manager

  • 默認值:sort

  • 參數說明:該參數用于設置ShuffleManager的類型。Spark 1.5以后,有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默認選項,但是Spark 1.2以及之后的版本默認都是SortShuffleManager了。Spark1.6以后把hash方式給移除了,tungsten-sort與sort類似,但是使用了tungsten計劃中的堆外內存管理機制,內存使用效率更高。

  • 調優建議:由于SortShuffleManager默認會對數據進行排序,因此如果你的業務邏輯中需要該排序機制的話,則使用默認的SortShuffleManager就可以;而如果你的業務邏輯不需要對數據進行排序,那么建議參考后面的幾個參數調優,通過bypass機制或優化的HashShuffleManager來避免排序操作,同時提供較好的磁盤讀寫性能。這里要注意的是,tungsten-sort要慎用,因為之前發現了一些相應的bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默認值:200

  • 參數說明:當ShuffleManager為SortShuffleManager時,如果shuffle read task的數量小于這個閾值(默認是200),則shuffle write過程中不會進行排序操作,而是直接按照未經優化的HashShuffleManager的方式去寫數據,但是最后會將每個task產生的所有臨時磁盤文件都合并成一個文件,并會創建單獨的索引文件。

  • 調優建議:當你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數調大一些,大于shuffle read task的數量。那么此時就會自動啟用bypass機制,map-side就不會進行排序了,減少了排序的性能開銷。但是這種方式下,依然會產生大量的磁盤文件,因此shuffle write性能有待提高。

上述就是小編為大家分享的如何進行Spark Shuffle 原理分析了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

万安县| 柳河县| 平定县| 淄博市| 涟源市| 鄂托克旗| 屯留县| 同德县| 浪卡子县| 封丘县| 沈阳市| 湘乡市| 织金县| 麦盖提县| 利津县| 攀枝花市| 彰武县| 宁津县| 通江县| 碌曲县| 涿鹿县| 高碑店市| 闽清县| 哈尔滨市| 庐江县| 上饶市| 将乐县| 新巴尔虎左旗| 沽源县| 南安市| 故城县| 静宁县| 吉木萨尔县| 高雄市| 长寿区| 新余市| 阜城县| 昭平县| 辽宁省| 沐川县| 兰西县|