您好,登錄后才能下訂單哦!
這篇文章主要講解了“Spark生產作業容錯能力的負面影響有哪些”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Spark生產作業容錯能力的負面影響有哪些”吧!
在 Spark 中數據本地性通過 TaskLocality 來表示,有如下幾個級別,
PROCESS_LOCAL
NODE_LOCAL
NO_PREF
RACK_LOCAL
ANY
從上到下數據本地性依次遞減。
Spark 在執行前通過數據的分區信息進行計算 Task 的 Locality,Task 總是會被優先分配到它要計算的數據所在節點以盡可能地減少網絡 IO。這個計算的過程通過 spark.locality.wait
默認為3s,控制這個計算的過程。
原理這里不細講,簡而言之就是重試。Spark 規定了同一個 Job 中同一個 Stage 連續失敗重試的上限(spark.stage.maxConsecutiveAttempts
),默認為4,也規定了一個 Stage 中 同一個 Task 可以失敗重試的次數(spark.task.maxFailures
),默認為4。當其中任何一個閾值達到上限,Spark 都會使整個 Job 失敗,停止可能的“無意義”的重試。
我們首先來看一個例子,如圖所示,圖為 Spark Stage 頁面下 Task Page 的詳細視圖。
第一列表示該 Task 進行了4次重試,所以這個 Task 對應的 Job 也因此失敗了。
第三列表示該 Task 的數據本地性,都是 NODE_LOCAL 級別,對于一個從HDFS讀取數據的任務,顯然獲得了最優的數據本地性
第四列表示的是 Executor ID,我們可以看到我們任務的重試被分配到ID 為5和6兩個 Executor 上
第五列表示我們運行這些重試的 Task 所在的 Executor 所在的物理機地址,我們可以看到他們都被調度到了同一個
最后列表示每次重試失敗的錯誤棧
結合硬件層面的排查,發現是 NodeManager 物理節點上掛在的 /mnt/dfs/4,出現硬件故障導致盤只讀,ShuffleMapTask 在即將完成時,將index文件和data文件commit時,獲取index的臨時文件時候發生FileNotFoundException
。
java.io.FileNotFoundException: /mnt/dfs/4/yarn/local/usercache/da_haitao/appcache/application_1568691584183_1953115/blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/0a/shuffle_96_2685_0.index.82594412-1f46-465e-a067-2c5e386a978e (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.<init>(FileOutputStream.java:213) at java.io.FileOutputStream.<init>(FileOutputStream.java:162) at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:245) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109)
3.2 問題二:為什么該 Task 的4次重試都在同一個物理節點?
這是由于 Driver 在調度該 Task 的時候進行了數據本地性的運算,而且在
spark.locality.wait
默認為3s的時間約束內成功獲得了NODE_LOCAL級別的數據本地性,故而都調度到了同一個
NodeManger
物理節點。
1. /mnt/dfs/4/yarn/local/2. usercache/da_haitao/appcache/application_1568691584183_1953115/ blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/3. 0a/4. shuffle_96_2685_0.index5. .82594412-1f46-465e-a067-2c5e386a978e
spark.diskStore.subDirectories
默認為64控制.data
結尾,另一個就是這個與之對應的 .index
文件。96是 ShuffleID 表標識是哪個Shuffle 過程,2685是 MapID 對應的是 一個RDD 所以有分區中其中一個的順序號, 而0是一個固定值,原本表示是ReduceID,Spark Sort Based Shuffle 的實現不需要依賴這個值,所以被固定為了0。通過Shuffle ID和 MapId,Shufle Write 階段就可以生成類似shuffle_96_2685_0.index這樣的文件,而Shuffle Read 階段也可以通過兩個ID 定位到這個文件。scala> math.abs("shuffle_96_2685_0.index".hashCode) % 12res0: Int = 6
scala> def randomizeInPlace[T](arr: Array[Int], rand: java.util.Random = new java.util.Random): Array[Int] = {
| for (i <- (arr.length - 1) to 1 by -1) {
| val j = rand.nextInt(i + 1)
| val tmp = arr(j)
| arr(j) = arr(i)
| arr(i) = tmp
| }
| arr
| }
randomizeInPlace: [T](arr: Array[Int], rand: java.util.Random)Array[Int]
scala> randomizeInPlace(res11)
res23: Array[Int] = Array(3, 2, 4, 1)
scala> randomizeInPlace(res11)
res24: Array[Int] = Array(2, 3, 4, 1)
scala> randomizeInPlace(res11)
res25: Array[Int] = Array(2, 1, 3, 4)
scala> randomizeInPlace(res11)
res26: Array[Int] = Array(4, 2, 1, 3)
scala> randomizeInPlace(res11)
res27: Array[Int] = Array(2, 3, 4, 1)
感謝各位的閱讀,以上就是“Spark生產作業容錯能力的負面影響有哪些”的內容了,經過本文的學習后,相信大家對Spark生產作業容錯能力的負面影響有哪些這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。