中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark(四):shuffle

發布時間:2020-06-01 10:49:05 來源:網絡 閱讀:601 作者:afeiye 欄目:大數據

shuflle write

spark(四):shuffle

  1. 上圖有 4 個 ShuffleMapTask 要在同一個 worker node 上運行,CPU core 數為 2,可以同時運行兩個 task。
  2. 在一個 core 上連續執行的 ShuffleMapTasks 可以共用一個輸出文件 ShuffleFile。先執行完的 ShuffleMapTask 形成 ShuffleBlock i,后執行的 ShuffleMapTask 可以將輸出數據直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每個 ShuffleBlock 被稱為 FileSegment。

shuflle read

  1. 在什么時候 fetch 數據?當 parent stage 的所有 ShuffleMapTasks 結束后再 fetch。
  2. 邊 fetch 邊處理還是一次性 fetch 完再處理?邊 fetch 邊處理。使用可以 aggregate 的數據結構,比如 HashMap,每 shuffle 得到(從緩沖的 FileSegment 中 deserialize 出來)一個 <Key, Value> record,直接將其放進 HashMap 里面。如果該 HashMap 已經存在相應的 Key,那么直接進行 aggregate 也就是 func(hashMap.get(Key), Value)
  3. fetch 來的數據存放到哪里?剛 fetch 來的 FileSegment 存放在 softBuffer 緩沖區,經過處理后的數據放在內存 + 磁盤上。
  4. 怎么獲得要 fetch 的數據的存放位置?reducer 在 shuffle 的時候是要去 driver 里面的 MapOutputTrackerMaster 詢問 ShuffleMapTask 輸出的數據位置的。每個 ShuffleMapTask 完成時會將 FileSegment 的存儲位置信息匯報給MapOutputTrackerMaster。

Shuffle read 中的 HashMap

ashMap 是 Spark shuffle read 過程中頻繁使用的、用于 aggregate 的數據結構。Spark 設計了兩種:一種是全內存的 AppendOnlyMap,另一種是內存+磁盤的 ExternalAppendOnlyMap。
spark(四):shuffle

  1. 類似 HashMap,但沒有remove(key)方法。其實現原理很簡單,開一個大 Object 數組,藍色部分存儲 Key,白色部分存儲 Value。
  2. 如果 Array 的利用率達到 70%,那么就擴張一倍,并對所有 key 進行 rehash 后,重新排列每個 key 的位置。
    spark(四):shuffle
  3. ExternalAppendOnlyMap 持有一個 AppendOnlyMap,shuffle 來的一個個 (K, V) record 先 insert 到 AppendOnlyMap 中,insert 過程與原始的 AppendOnlyMap 一模一樣。
  4. 如果 AppendOnlyMap 快被裝滿時檢查一下內存剩余空間是否可以夠擴展,夠就直接在內存中擴展,不夠就 sort 一下 AppendOnlyMap,將其內部所有 records 都 spill 到磁盤上。
  5. 每次 spill 完在磁盤上生成一個 spilledMap 文件,然后重新 new 出來一個 AppendOnlyMap。
  6. 最后一個 (K, V) record insert 到 AppendOnlyMap 后,表示所有 shuffle 來的 records 都被放到了 ExternalAppendOnlyMap 中,但不表示 records 已經被處理完,因為每次 insert 的時候,新來的 record 只與 AppendOnlyMap 中的 records 進行 aggregate,并不是與所有的 records 進行 aggregate(一些 records 已經被 spill 到磁盤上了)。因此當需要 aggregate 的最終結果時,需要對 AppendOnlyMap 和所有的 spilledMaps 進行全局 merge-aggregate。
  7. 全局 merge-aggregate 的流程:先將 AppendOnlyMap 中的 records 進行 sort,形成 sortedMap。
  8. 然后分別從 sortedMap 和各個 spilledMap 讀出一部分數據(StreamBuffer)放到 mergeHeap 里面。StreamBuffer 里面包含的 records 需要具有相同的 hash(key)
  9. mergeHeap 顧名思義就是使用堆排序不斷提取出 hash(firstRecord.Key) 相同的 StreamBuffer,并將其一個個放入 mergeBuffers 中,放入的時候與已經存在于 mergeBuffers 中的 StreamBuffer 進行 merge-combine

在Sort Based Shuffle的Shuffle Write階段,map端的任務會按照Partition id以及key對記錄進行排序。同時將全部結果寫到一個數據文件中,同時生成一個索引文件,reduce端的Task可以通過該索引文件獲取相關的數據。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

新沂市| 怀安县| 西青区| 十堰市| 西宁市| 峨边| 平和县| 彝良县| 和政县| 博野县| 贡山| 新龙县| 青冈县| 手游| 龙里县| 威海市| 灵寿县| 盐池县| 招远市| 商南县| 浦江县| 金川县| 黄骅市| 浠水县| 平罗县| 肃南| 九龙坡区| 肥乡县| 南雄市| 丹江口市| 溆浦县| 南召县| 南乐县| 乐安县| 上高县| 绥芬河市| 武宁县| 隆尧县| 洛扎县| 黔西县| 杭州市|