中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Shuffle流程是什么

發布時間:2021-12-23 16:07:57 來源:億速云 閱讀:150 作者:iii 欄目:大數據

本篇內容介紹了“Shuffle流程是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

shuffle流程源碼解讀

1、從WordCountMapper類中的map方法中寫出kv后,進入shuffle流程	--context.write(outK,outV);
進入TaskInputOutputContext中的write()方法			--看下就過
進入WrappedMapper.java中的mapContext.write(key, value);方法	//112行
進入TaskInputOutputContextImpl.java 中output.write(key, value);方法 	//89行
最終定位到MapTask的write()方法內,	//726行
2、重點步驟,收集器對象將kv收集到緩沖區,并在收集前將kv的分區號計算出來.
collector.collect(key, value,partitioner.getPartition(key, value, partitions));
第一次進入該方法時,因為沒有設置reduce的個數,所以最終返回的永遠是0號分區
3、定位到MapTask類中的collect方法并進入    		//1082行
bufferRemaining -= METASIZE;	//計算緩沖區剩余大小,該行代碼前面的代碼是對kv類型的一個判斷
如果bufferRemaining < 0 則開始進行溢寫操作,內部是對數據的一些校驗和計算
4、定位到startSpill(); --1126行  	//只有當溢寫數據大小滿足80%時,才會觸發該操作
WordCountMapper持續往緩沖區寫數據,當達到溢寫條件80%時,開始溢寫
5、進入到startSpill()方法內部		--MapTask類1590行
spillReady.signal(); //1602行  		--線程通信, 通知溢寫線程開始干活
//執行溢寫線程(MapTask內部類SpillThread)的run方法
//run方法中調用MapTask$MapOutputBuffer中的sortAndSpill()方法
直接執行下面的排序和溢寫方法		--sortAndSpill()方法  	--MapTask的1605行
6、定位到1615行
final SpillRecord spillRec = new SpillRecord(partitions); //根據分區數創建溢寫記錄對象
--排序按照分區排序,溢寫按照分區溢寫

final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);//獲取溢寫文件名稱
 ///tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619
 _0001/attempt_local1440922619_0001_m_000000_0/output/(spill0.out),這時還沒有溢寫文件,只有目錄

out = rfs.create(filename);		//創建執行改步后,在上述的目錄下生成溢寫文件spill0.out文件

Shuffle流程是什么

7、繼續向下走,定位到MapTask類的1625行
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);		//溢寫前排序

8、定位到1629行,進入for循環	--按照分區進行溢寫

9、分析for循環內代碼,看具體溢寫過程
	9.1 先有一個writer對象,通過該對象來完成數據溢寫
		writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
	9.2 判斷是否有設置combinerRunner對象
		如果有,則按照設置的combinerRunner業務去處理;
		如果沒有,則走默認的溢寫規則

10、執行到1667行,即writer.close();方法,本次溢寫完畢,此時我們再去看溢寫文件spill0.out文件有數據

Shuffle流程是什么

11、if (totalIndexCacheMemory >= indexCacheMemoryLimit(大小為:1M)) {}	//MapTask類的1685行
// 如果索引數據超過指定的內存大小,也需要溢寫到文件中.(該現象一般情況很難發生.)
12、當本次溢寫完畢之后,繼續回到WordCountMapper類中的map方法內的context.write(outk,outv);方法處

--說明:因為我們使用本地debug模式調試,所以看不到并行的效果,只能是串行效果,因此看到的是當內存內讀取滿足
80%時,發生溢寫操作,其實溢寫并未停止,只不過我們看不到,剩余的溢寫數據在20%內存進行
13、如上溢寫過程,在整個mapTask中會出現N次,具體多少看數據量. 如果map中最后的數據寫到緩沖區,但是沒有滿足
80%溢寫條件的情況,最終也需要將緩沖區的數據刷寫到磁盤(最后一次溢寫)。

最后一次會發生在 MapTask中關閉 NewOutputCollector對象的時候.
即在該行代碼處發生    output.close(mapperContext);	--MapTask的805行

14、進入output.close(mapperContext);方法內	--MapTask的732行
定位到collector.flush();方法 // 735行
-->將緩沖區的數據刷寫到磁盤-->重新走sortAndSpill()方法(最后一次刷寫)

Shuffle流程是什么 Shuffle流程是什么

上述流程,每發生一次溢寫就會生成一個溢寫小文件(溢寫文件內的數據是排好序的)
最終所有的數據都寫到磁盤中后,在磁盤上就是多個溢寫文件, 比如:spill0.out,spill1.out,...spillN.out
15、溢寫全部完成之后,就進入歸并操作		--MapTask的1527行
mergeParts();方法,進入該方法,定位到MapTask的1844行
filename[0]: /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local
1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/spill0.out

Shuffle流程是什么

16、繼續向下走,定位到MapTask的1880行
Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize);
   --歸并后,最終輸出的文件路徑
/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_00
01/attempt_local1440922619_0001_m_000000_0/output/file.out

17、繼續向下走,定位到MapTask的1882行
Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
   --歸并后,最終輸出文件的索引文件
/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_00
01/attempt_local1440922619_0001_m_000000_0/output/file.out.index

18、創建file.out 文件
	FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);

19、for (int parts = 0; parts < partitions; parts++) {}	//1925行,按照分區進行歸并排序

20、for循環內具體的歸并操作	//1950行
	RawKeyValueIterator kvIter = Merger.merge(job, rfs,
                         keyClass, valClass, codec,
                         segmentList, mergeFactor,
                         new Path(mapId.toString()),
                         job.getOutputKeyComparator(), reporter, sortSegments,
                         null, spilledRecordsCounter, sortPhase.phase(),
                         TaskType.MAP);
21、歸并后的數據寫出到文件
Writer<K, V> writer = new Writer<K, V>(job, finalPartitionOut, 
keyClass, valClass, codec,spilledRecordsCounter); //1961行

//歸并也可以使用combiner,但是前提條件是設置了combiner,并且溢寫次數大于等于3 
if (combinerRunner == null || numSpills < minSpillsForCombine(3)) {
      Merger.writeFile(kvIter, writer, reporter, job);
} else {
      combineCollector.setWriter(writer);
      combinerRunner.combine(kvIter, combineCollector);
}

22、歸并完成
writer.close();		//1972行

Shuffle流程是什么

23、寫出索引文件
spillRec.writeToFile(finalIndexFile, job);	//1986行

24、刪除所有的溢寫文件spill0.out spill1.out ... spill0.out,只保留最終的輸出文件。
for(int i = 0; i < numSpills; i++) {
       rfs.delete(filename[i],true);
}

Shuffle流程是什么

“Shuffle流程是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

高唐县| 鄂托克前旗| 乐至县| 巴东县| 沂水县| 叶城县| 长沙市| 乳山市| 车致| 浠水县| 芦溪县| 巴林右旗| 喀喇沁旗| 九龙城区| 吉木乃县| 贺州市| 太康县| 长宁县| 宁安市| 元氏县| 临猗县| 正安县| 芒康县| 赤水市| 兴海县| 子长县| 亚东县| 宣威市| 昭平县| 潜山县| 蛟河市| 乌恰县| 贵州省| 黄浦区| 石泉县| 钟山县| 金湖县| 容城县| 三原县| 铁力市| 武隆县|