您好,登錄后才能下訂單哦!
好程序員大數據學習路線分享MapReduce全過程解析,移動數據與移動計算
在學習大數據的時候接觸了移動數據和移動計算這兩種聯系緊密而又有很大不同的概念,其中移動計算也叫做本地計算。
在以前的數據處理中時使用的移動數據,其實就是將需要處理的數據傳輸到存放不同處理數據方式邏輯的各個節點上。這樣做的效率很低,特別是大數據中的數據量是很大的,至少都是GB以上,更大的是TB、PB甚至更大,而且磁盤I/O、網絡I/O的效率是很低的,這樣處理起來就需要很長的時間,遠遠不能滿足我們的要求。而移動計算就出現了。
移動計算,也叫做本地計算,是數據就存放在節點上不再變動,而是將處理邏輯程序傳輸到各個數據節點上。由于處理程序的大小肯定不會特別的大,這樣就可以實現很快將程序傳輸到存放數據的各個節點上去,然后本地執行處理數據,效率高。現在的大數據處理技術都是采用這種方式。
?
言簡意賅的說:
Map階段:
1、Read:讀取數據源,將數據進行filter成一個個的K/V
2、Map:在map函數中,處理解析的K/V,并產生新的K/V
3、Collect:輸出結果,存于環形內緩沖區
4、Spill:內存區滿,數據寫到本地磁盤,并生產臨時文件
5、Combine:合并臨時文件,確保生產一個數據文件
?
Reduce階段:
1、Shuffle:Copy階段,Reduce Task到各個Map Task遠程復制一分數據,針對某一份數據,2、若其大小超過一定閥值,則寫磁盤;否則放到內存
3、Merge:合并內存和磁盤上的文件,防止內存占用過多或磁盤文件過多
4、Sort:Map Task階段進行局部排序,Reduce Task階段進行一次歸并排序
5、Reduce:將數據給reduce函數
6、Write:reduce函數將其計算的結果寫到HDFS上
?
深度解析的說:
MapTask階段
(1)Read階段:MapTask通過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。?
(2)Map階段:該節點主要是將解析出的key/value交給用戶編寫map()函數處理,并產生一系列新的key/value。
(3)Collect收集階段:在用戶編寫map()函數中,當數據處理完成后,一般會調用 OutputCollector.collect()輸出結果。在該函數內部,它會將生成的key/value分區(調用 Partitioner),并寫入一個環形內存緩沖區中。?
(4)Spill階段:即“溢寫”,當環形緩沖區滿后,MapReduce 會將數據寫到本地磁盤上,生成一個臨時文件。需要注意的是,將數據寫入本地磁盤之前,先要對數據進行一次本地排序,并在必要時對數據進行合并、壓縮等操作。
?
溢寫階段詳情:
步驟1:利用快速排序算法對緩存區內的數據進行排序,排序方式是,先按照分區編號partition進行排序,然后按照key進行排序。這樣,經過排序后,數據以分區為單位聚集在一起,且同一分區內所有數據按照key有序。?
步驟2:按照分區編號由小到大依次將每個分區中的數據寫入任務工作目錄下的臨時文件output/spillN.out(N表示當前溢寫次數)中。如果用戶設置了Combiner,則寫入文件之前,對每個分區中的數據進行一次聚集操作。?
步驟3:將分區數據的元信息寫到內存索引數據結構SpillRecord中,其中每個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮后數據大小。如果當前內存索引大小超過1MB,則將內存索引寫到文件output/spillN.out.index中。?
(5)Combine階段:當所有數據處理完成后,MapTask對所有臨時文件進行一次合并,以確保最終只會生成一個數據文件。當所有數據處理完后,MapTask會將所有臨時文件合并成一個大文件,并保存到文件output/file.out中,同時生成相應的索引文件output/file.out.index。在進行文件合并過程中,MapTask以分區為單位進行合并。對于某個分區,它將采用多輪遞歸合并的方式。每輪合并io.sort.factor(默認100)個文件,并將產生的文件重新加入待合并列表中,對文件排序后,重復以上過程,直到最終得到一個大文件。讓每個MapTask最終只生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮后數據大小。如果當前內存索引大小超過1MB,則將內存索引寫到文件output/spillN.out.index中。
?
Shuffle階段(map端的輸出到reduce的輸入)
1)maptask收集我們的map()方法輸出的kv對,放到內存緩沖區中
2)從內存緩沖區不斷溢出本地磁盤文件,可能會溢出多個文件?
3)多個溢出文件會被合并成大的溢出文件?
4)在溢出過程中,及合并的過程中,都要調用partitioner進行分區和針對key進行排序?
5)reducetask根據自己的分區號,去各個maptask機器上取相應的結果分區數據?
6)reducetask會取到同一個分區的來自不同maptask的結果文件,reducetask會將這些文件再進行合并(歸并排序)?
7)合并成大文件后,shuffle的過程也就結束了,后面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)?
3)注意Shuffle中的緩沖區大小會影響到mapreduce程序的執行效率,原則上說,緩沖區越大,磁盤io的次數越少,執行速度就越快。緩沖區的大小可以通過參數調整,參數:io.sort.mb默認100M。
?
ReduceTask階段
(1)Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數據,并針對某一片數據,如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內存中。?
(2)Merge階段:在遠程拷貝數據的同時,ReduceTask啟動了兩個后臺線程對內存和磁盤上的文件進行合并,以防止內存使用過多或磁盤上文件過多。?
(3)Sort階段:按照MapReduce語義,用戶編寫reduce()函數輸入數據是按key進行聚集的一組數據。為了將key相同的數據聚在一起,Hadoop采用了基于排序的策略。由于各個MapTask已經實現對自己的處理結果進行了局部排序,因此,ReduceTask只需對所有數據進行一次歸并排序即可。?
(4)Reduce階段:reduce()函數將計算結果寫到HDFS上。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。