中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark生產作業容錯能力的負面影響有哪些

發布時間:2021-12-16 14:05:31 來源:億速云 閱讀:105 作者:iii 欄目:大數據

這篇文章主要講解了“Spark生產作業容錯能力的負面影響有哪些”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Spark生產作業容錯能力的負面影響有哪些”吧!

1. Spark TaskLocality

在 Spark 中數據本地性通過 TaskLocality 來表示,有如下幾個級別,

  • PROCESS_LOCAL

  • NODE_LOCAL

  • NO_PREF

  • RACK_LOCAL

  • ANY

從上到下數據本地性依次遞減。

Spark 在執行前通過數據的分區信息進行計算 Task 的 Locality,Task 總是會被優先分配到它要計算的數據所在節點以盡可能地減少網絡 IO。這個計算的過程通過 spark.locality.wait 默認為3s,控制這個計算的過程。

2. Spark 內部容錯

原理這里不細講,簡而言之就是重試。Spark 規定了同一個 Job 中同一個 Stage 連續失敗重試的上限(spark.stage.maxConsecutiveAttempts),默認為4,也規定了一個 Stage 中 同一個 Task 可以失敗重試的次數(spark.task.maxFailures),默認為4。當其中任何一個閾值達到上限,Spark 都會使整個 Job 失敗,停止可能的“無意義”的重試。

3. 數據本地性和容錯的沖突

我們首先來看一個例子,如圖所示,圖為 Spark Stage 頁面下 Task Page 的詳細視圖。

  • 第一列表示該 Task 進行了4次重試,所以這個 Task 對應的 Job 也因此失敗了。

  • 第三列表示該 Task 的數據本地性,都是 NODE_LOCAL 級別,對于一個從HDFS讀取數據的任務,顯然獲得了最優的數據本地性

  • 第四列表示的是 Executor ID,我們可以看到我們任務的重試被分配到ID 為5和6兩個 Executor 上

  • 第五列表示我們運行這些重試的 Task 所在的 Executor 所在的物理機地址,我們可以看到他們都被調度到了同一個

  • 最后列表示每次重試失敗的錯誤棧

3.1 問題一:單個 Task 重試為什么失敗?

結合硬件層面的排查,發現是 NodeManager 物理節點上掛在的 /mnt/dfs/4,出現硬件故障導致盤只讀,ShuffleMapTask 在即將完成時,將index文件和data文件commit時,獲取index的臨時文件時候發生FileNotFoundException   

java.io.FileNotFoundException: /mnt/dfs/4/yarn/local/usercache/da_haitao/appcache/application_1568691584183_1953115/blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/0a/shuffle_96_2685_0.index.82594412-1f46-465e-a067-2c5e386a978e (No such file or directory)    at java.io.FileOutputStream.open0(Native Method)    at java.io.FileOutputStream.open(FileOutputStream.java:270)    at java.io.FileOutputStream.<init>(FileOutputStream.java:213)    at java.io.FileOutputStream.<init>(FileOutputStream.java:162)    at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144)    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:245)    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190)    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)    at org.apache.spark.scheduler.Task.run(Task.scala:109)

3.2 問題二:為什么該 Task 的4次重試都在同一個物理節點?

這是由于 Driver 在調度該 Task 的時候進行了數據本地性的運算,而且在  spark.locality.wait 默認為3s的時間約束內成功獲得了NODE_LOCAL級別的數據本地性,故而都調度到了同一個  NodeManger 物理節點。

3.3 問題三:為什么總是“本地重試”,不是“異地重試”?
這個過程從邏輯上講,其實已經不是“本地重試”,而恰恰是“異地重試”了。這我們可以從4次的重試的 Executor ID 上進行判斷,第0、1和3次是在 ID 6上進行的,而第2次是在 ID 5上發生的。但由于ID 5和6都在同一個 NodeManger 節點,所以我們看起來像是“本地重試”。另一個原因就是上面所說的數據本地性的成功解析,所以這些 Task 的每次重試都高概率的來到這個節點。  
所有 Spark Task 級別的重試從邏輯上都應該屬于“異地重試”,他們都需要通過 Driver 重新調度到新的 Executor 進行重試。我們所觀測到的“本地”和“異地”是屬于“現象”而非“本質”,影響這種現象的條件有比如下面幾個(不一定全面):1. 數據本地性 2. Executor 由于 NodeLabel 限制,只在若干有限的物理機上分配 3. ResourceManager 調度時剛好把所有的 Executor 都分配到某個節點上。
3.4 問題5:為什么4次失敗都操作同一個壞的盤?
該 NodeManger 實際上有/mnt/dfs/{0-11}, 一共12塊盤,從物理檢查上看,整個過程中也只有/mnt/dfs/4有異常告警,那為啥 Spark 這么傻?這么多好盤不用,專挑一塊壞的盤死磕?
我們可以先看下出錯的文件,我們包這個文件分成5個部分來看,  
  
  
  1. /mnt/dfs/4/yarn/local/2. usercache/da_haitao/appcache/application_1568691584183_1953115/ blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/3. 0a/4. shuffle_96_2685_0.index5. .82594412-1f46-465e-a067-2c5e386a978e
  • 第一行,是 Yarn NodeManger 所配置的LOCAL_DIR的一部分,完整的應該包括12塊盤
  • 第二行,是 Spark 生成的 BlockManger 的根目錄之一,其他盤符下也有類似的一個目錄
  • 第三行,是一個根目錄下的一級子目錄,數量由 spark.diskStore.subDirectories 默認為64控制
  • 第四行,Spark Shuffle 過程產生的兩個重要的文件之一,一個是數據文件 .data 結尾,另一個就是這個與之對應的 .index 文件。96是 ShuffleID 表標識是哪個Shuffle 過程,2685是 MapID 對應的是 一個RDD 所以有分區中其中一個的順序號, 而0是一個固定值,原本表示是ReduceID,Spark Sort Based Shuffle 的實現不需要依賴這個值,所以被固定為了0。通過Shuffle ID和 MapId,Shufle Write 階段就可以生成類似shuffle_96_2685_0.index這樣的文件,而Shuffle Read 階段也可以通過兩個ID 定位到這個文件。
  • 第五行, 是Index文件的對應臨時文件的UUID標識。
基于這樣的邏輯,對于某次Shuffle 過程的某個分區(Partition)的最終輸出文件名其實是可以預測的也是固定的,比如我們這個 case 中,第96次shuffle的第2685分區的 index 文件的文件名即為shuffle_96_2685_0.index。
Spark 在寫和讀這個文件的時候,基于相同的定位邏輯(算法)來保證依賴關系,  
第一步確定根目錄,Spark 通過文件名的hash絕對值與盤符數的模,作為索引卻確定根目錄  
  scala> math.abs("shuffle_96_2685_0.index".hashCode) % 12res0: Int = 6
而根目錄的數組對于一個 Executor 的這個生命周期內而言是確定的,它是一個由簡單隨機算法將所有路徑打散的一個固定數組。所以一旦文件名稱確定,Executor 不換的話,根目錄一定是確定的。所以都固定的去訪問/mnt/dfs/4這個壞盤。  
但這只解釋了一個 Executor 所被分配 Task 失敗的原因,我們的 Task 還在不同的 executor 上進行過嘗試。
3.5 問題5:為什么兩個 Executor 上的重試都失敗了?
其實這個問題只是概率的問題, Spark 用類似下面算法打亂所有LOCAL_DIRS的配置,如下面的的簡單測試,這種碰撞的概率還是極高的,我們ID 5,6,的 Executor 下 DiskBlockManager 包含的 localDirs(6)應該都對應于 /mnt/dfs/4 這個壞盤。  
scala> def randomizeInPlace[T](arr: Array[Int], rand: java.util.Random = new java.util.Random): Array[Int] = {     |     for (i <- (arr.length - 1) to 1 by -1) {     |       val j = rand.nextInt(i + 1)     |       val tmp = arr(j)     |       arr(j) = arr(i)     |       arr(i) = tmp     |     }     |     arr     |   }randomizeInPlace: [T](arr: Array[Int], rand: java.util.Random)Array[Int]scala> randomizeInPlace(res11)res23: Array[Int] = Array(3, 2, 4, 1)
scala> randomizeInPlace(res11)res24: Array[Int] = Array(2, 3, 4, 1)
scala> randomizeInPlace(res11)res25: Array[Int] = Array(2, 1, 3, 4)
scala> randomizeInPlace(res11)res26: Array[Int] = Array(4, 2, 1, 3)
scala> randomizeInPlace(res11)res27: Array[Int] = Array(2, 3, 4, 1)

感謝各位的閱讀,以上就是“Spark生產作業容錯能力的負面影響有哪些”的內容了,經過本文的學習后,相信大家對Spark生產作業容錯能力的負面影響有哪些這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

天峨县| 绩溪县| 永清县| 延川县| 潞城市| 安丘市| 苏尼特右旗| 武汉市| 砀山县| 罗定市| 洛南县| 高清| 西贡区| 金湖县| 舒兰市| 山西省| 洛川县| 柘城县| 武夷山市| 焉耆| 昆明市| 清徐县| 江陵县| 清涧县| 北辰区| 福州市| 海阳市| 茶陵县| 鹤山市| 惠来县| 德庆县| 南雄市| 石渠县| 宾阳县| 南江县| 宜都市| 壤塘县| 遂宁市| 大丰市| 梨树县| 昌宁县|