您好,登錄后才能下訂單哦!
Mapreduce中由于sort的存在,MapTask和ReduceTask直接是工作流的架構。而不是數據流的架構。在MapTask尚未結束,其輸出結果尚未排序及合并前,ReduceTask是又有數據輸入的,因此即使ReduceTask已經創建也只能睡眠等待MapTask完成。從而可以從MapTask節點獲取數據。一個MapTask最終的數據輸出是一個合并的spill文件,可以通過Web地址訪問。所以reduceTask一般在MapTask快要完成的時候才啟動。啟動早了浪費container資源。
ReduceTask是個線程,這個線程運行在YarnChild的Java虛擬機上,我們從ReduceTask.run開始看Reduce階段。 獲取更多大數據視頻資料請加QQ群:947967114
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
if (isMapOrReduce()) {
/添加reduce過程需要經過的幾個階段。以便通知TaskTracker目前運 行的情況/
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
TaskReporter reporter = startReporter(umbilical);
// 設置并啟動reporter進程以便和TaskTracker進行交流
boolean useNewApi = job.getUseNewReducer();
//在job client中初始化job時,默認就是用新的API,詳見Job.setUseNewAPI()方法
initialize(job, getJobID(), reporter, useNewApi);
/用來初始化任務,主要是進行一些和任務輸出相關的設置,比如創建commiter,設置工作目錄等/
// check if it is a cleanupJobTask
/以下4個if語句均是根據任務類型的不同進行相應的操作,這些方 法均是Task類的方法,所以與任務是MapTask還是ReduceTask無關/
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;//只是為了JobCleanup,做完就停
}
if () {
runJobSetupTask(umbilical, reporter);
return;
//主要是創建工作目錄的FileSystem對象
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
//設置任務目前所處的階段為結束階段,并且刪除工作目錄
}
下面才是真正要成為reducer
// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
//如果需要就創建combineCollector
Classextends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
//配置文件找mapreduce.job.reduce.shuffle.consumer.plugin.class默認是shuffle.class
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
//創建shuffle類對象
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
//創建context對象,ShuffleConsumerPlugin.Context
shuffleConsumerPlugin.init(shuffleContext);
//這里調用的起始是shuffle的init函數,重點摘要如下。
this.localMapFiles = context.getLocalMapFiles();
scheduler = new ShuffleSchedulerImpl(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
//創建shuffle所需的調度器
merger = createMergeManager(context);
//創建shuffle內部的merge,createMergeManager里面源碼:
return new MergeManagerImpl(reduceId, jobConf, context.getLocalFS(),
context.getLocalDirAllocator(), reporter, context.getCodec(),
context.getCombinerClass(), context.getCombineCollector(),
context.getSpilledRecordsCounter(),
context.getReduceCombineInputCounter(),
context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
context.getMapOutputFile());
//創建MergeMnagerImpl對象和Merge線程
rIter = shuffleConsumerPlugin.run();
//從各個Mapper復制其輸出文件,并加以合并排序,等待直到完成為止
// free up the data structures
mapOutputFilesOnDisk.clear();
sortPhase.complete();
//排序階段完成
setPhase(TaskStatus.Phase.REDUCE);
//進入reduce階段
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();
//3.Reduce 1.Reduce任務的最后一個階段。它會準備好Map的 keyClass("mapred.output.key.class""mapred.mapoutput.key.class"),valueClass("mapred.mapoutput.value.class"或"mapred.output.value.class")和 Comparator (“mapred.output.value.groupfn.class”或“mapred.output.key.comparator.class”)
if (useNewApi) {
//2.根據參數useNewAPI判斷執行runNewReduce還是runOldReduce。分析潤runNewReduce
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
//0.像報告進程書寫一些信息,1.獲得一個TaskAttemptContext對象。通過這個對象創建reduce、output及用于跟蹤的統計output的RecordWrit、最后創建用于收集reduce結果的Context,2.reducer.run(reducerContext)開始執行reduce
} else {//老API
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
(1)reduce分為三個階段(copy就是遠程拷貝Map的輸出數據、sort就是對所有的數據做排序、reduce做聚集就是我們自己寫的reducer),為這三個階段分別設置Progress,用來和TaskTracker通信報道狀態。
(2)上面代碼的15-40行和MapReduce的MapTask任務的運行源碼級分析中對應部分基本相同,可參考之;
(3)codec = initCodec()這句是檢查map的輸出是否是壓縮的,壓縮的則返回壓縮codec實例,否則返回null,這里討論不壓縮的;
(4)我們討論完全分布式的hadoop,即isLocal==false,然后構造一個ReduceCopier對象reduceCopier,并調用reduceCopier.fetchOutputs()方法拷貝各個Mapper的輸出,到本地;
(5)然后copy階段完成,設置接下來的階段是sort階段,更新狀態信息;
(6)根據isLocal來選擇KV迭代器,完全分布式的會使用reduceCopier.createKVIterator(job, rfs, reporter)作為KV迭代器;
(7)sort階段完成,設置接下來的階段是reduce階段,更新狀態信息;
(8)然后獲取一些配置信息,并根據是否使用新API選擇不同的處理方式,這里是新的API,調用runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass)會執行reducer;
(9)done(umbilical, reporter)這個方法用于做結束任務的一些清理工作:更新計數器updateCounters();如果任務需要提交,設置Taks狀態為COMMIT_PENDING,并利用TaskUmbilicalProtocol,匯報Task完成,等待提交,然后調用commit提交任務;設置任務結束標志位;結束Reporter通信線程;發送最后一次統計報告(通過sendLastUpdate方法);利用TaskUmbilicalProtocol報告結束狀態(通過sendDone方法)。
有些人將Reduce Task分為了5個階段:一、shuffle階段:也稱為Copy階段,就是從各個MapTask上遠程拷貝一片數據,如果大小超過一定閾值就寫到磁盤,否則放入內存;二、Merge階段:在遠程拷貝數據的同時,Reduce Task啟動了兩個后臺線程對內存和磁盤上的文件進行合并,防止內存使用過多和磁盤文件過多;三、sort階段:用戶編寫的reduce方法的輸入數據是按key進行聚集的,需要對copy過來的數據排序,這里用的是歸并排序,因為Map Task的結果是有序的;四、Reduce階段:將每組數據依次交給用戶編寫的Reduce方法處理;五、write階段:就是將結果寫入HDFS。
上面的5個階段分的比較細了,代碼里分為3個階段copy、sort、reduce,我們在eclipse運行MR程序時,控制臺看到的reduce階段的百分比就分為3個階段各占33.3%。
這里的shuffleConsumerPlugin是實現了ShuffleConsumerPlugin的某個類對象。具體可以通過配置文件mapreduce.job.reduce.shuffle.consumer.plugin.class選項設置,默認情況下是使用shuffle。我們在代碼中分析過完成shuffleConsumerPlugin.run,通常是shuffle.run,因為有了這個過程Mapper的合成的spill文件才能通過HTTP協議傳輸到Reducer端。有了數據才能進行runNewReducer或者runOldReducer。可以說shuffle對象就是MapTask的搬運工。而且shuffle的搬運方式不是一遍搬運一遍Reducer處理,而是要把MapTask所有的數據都搬運過來,并且進行合并排序之后才開始提供給對應的Reducer。
一般而言,MapTask和ReduceTask是多對多的關系,假如有M個Mapper有N個Reducer。我們知道N個Reducer對應著N個partition,所以每個Mapper都會被劃分成N個Partition,每個Reducer承擔著一個Partition部分的操作。這樣每一個Reducer從每個不同的Mapper內拿來屬于自己的那部分數據,這樣每個Reducer就有M份不同Mapper的數據,把M份數據合并在一起就是一個最終完整的Partition,有必要還會進行排序,這時候才成為了Reducer的具體輸入數據。這個數據搬運和重組的過程被叫做shuffle過程。shuffle這個過程開銷頗大,會占用較大的網絡流量,因為涉及到大量數據的傳輸,shuffle過程也會有延遲,因為M個Mapper的計算有快有慢,但是shuffle要所有的Mapper完成才能開始,Reduce又必須等shuffle完成才能開始,當然這種延遲不是shuffle造成的,如果Reducer不需要全部Partition數據到位并排序,就不用與最慢的Mapper同步,這是排序付出的代價。
所以shuffle在MapReduce框架中起著非常重要的作用。我們先看shuffle的摘要: 獲取更多大數據視頻資料請加QQ群:947967114
public class Shuffle implements ShuffleConsumerPlugin, ExceptionReporter
private ShuffleConsumerPlugin.Context context;
private TaskAttemptID reduceId;
private JobConf jobConf;
private TaskUmbilicalProtocol umbilical;
private ShuffleSchedulerImpl scheduler;
private MergeManager merger;
private Task reduceTask; //Used for status updates
private Map localMapFiles;
public void init(ShuffleConsumerPlugin.Context context)
public RawKeyValueIterator run() throws IOException, InterruptedException
在ReduceTask.run中看到調用了shuffle.init,在run理創建了ShuffleSchedulerImpl和MergeManagerImpl對象。后面會講解就是是做什么用的。
之后就是對shuffle.run的調用,shuffle雖然有一個run但是并非是一個線程,只是用了這個名字而已。
我們看:ReduceTask.run->Shuffle.run
public RawKeyValueIterator run() throws IOException, InterruptedException {
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
// Start the map-completion events fetcher thread
final EventFetcher eventFetcher =
new EventFetcher(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
//通過查看EventFetcher我們看到他繼承了Thread,所以他是一個線程
// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher[] fetchers = new Fetcher[numFetchers];
//創建了一個線程池
if (isLocal) {
//如果Mapper和Reducer在同一臺機器上,就在本地fetche
fetchers[0] = new LocalFetcher(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
//LocalFetcher是對Fetcher的擴展,也是線程。
fetchers[0].start();//本地Fecher只有一個
} else {
//Mapper集合Reducer不在同一個機器上,需要跨多個節點Fecher
for (int i=0; i < numFetchers; ++i) {
//啟動所有的Fecher
fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
//創建Fecher線程
fetchers[i].start();
//跨節點的Fecher需要好多個,都需要開啟
}
}
// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
//等待所有的Fecher都完成,如果有超時情況就報告進度
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}
// Stop the event-fetcher thread
eventFetcher.shutDown();
//關閉eventFetcher,代表shuffle操作完成,所有的MapTask的數據都拷貝過來了
// Stop the map-output fetcher threads
for (Fetcher fetcher : fetchers) {
fetcher.shutDown();//關閉所有的fetcher。
}
// stop the scheduler
scheduler.close();
//也不需要shuffle的調度,所以關閉
copyPhase.complete(); // copy is already complete
//文件復制階段結束
以下就是Reduce階段的MergeSort了
taskStatus.setPhase(TaskStatus.Phase.SORT);
//完成排序
reduceTask.statusUpdate(umbilical);
//通過umbilical向MRAppMaster匯報,更新狀態
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
//合并和排序,完成后返回一個隊列kvIter 。
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
// Sanity check
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
return kvIter;
}
數據從MapTask轉移到ReduceTask就兩種方式,一MapTask送,二ReduceTask取,hadoop采用的是第二種方式,就是文件的復制。在Shuffle進入run之前,RduceTask.run調用過他的init函數shuffleConsumerPlugin.init(shuffleContext),在init里創建了scheduler和用于合并排序的merge,進入run后又創建了EventFetcher線程和若干個Fetcher線程。Fetcher的作用就是拿取,向MapTask節點提取數據。但是我們要清楚EventFetcher雖然也是Fetcher,但是提取的是event,不是數據本身。我們可以認為它只是對Fetcher過程的一個事件的控制。
Fetcher線程的數量也不一定,Uber模式下,MapTask和ReduceTask在同一個節點上,并且只有一個MapTask,所以只有一個Fetcher就能夠完成,而且這個Fetcher是localFetcher。如果不是Uber模式可能會有很多MapTask并且一般和ReduceTask不在同一個節點。這時Fetcher的數量可以進行配置,默認有5個。數組fetchers就相當于Fetcher的線程池。
創建了EventFetcher和Fetcher線程池后,進入了while循環,但是while循環什么都不做,一直等待,所以實際的操作都是在線程完成的,也就是通過EventFetcher和若干的Fetcher完成。EventFetcher起到了非常關鍵的樞紐的作用。
我們查看EventFetcher的源代碼摘要,我們提取關鍵的東西:
class EventFetcher extends Thread {
private final TaskAttemptID reduce;
private final TaskUmbilicalProtocol umbilical;
private final ShuffleScheduler scheduler;
private final int maxEventsToFetch;
public void run() {
int failures = 0;
LOG.info(reduce + " Thread started: " + getName());
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {//線程沒有被打斷
try {
int numNewMaps = getMapCompletionEvents();
//獲取Map的完成的事件,接著我們看getMapCompletionEvents源代碼:
protected int getMapCompletionEvents()
throws IOException, InterruptedException {
int numNewMaps = 0;
TaskCompletionEvent events[] = null;
do {
MapTaskCompletionEventsUpdate update =
umbilical.getMapCompletionEvents(
(org.apache.hadoop.mapred.JobID)reduce.getJobID(),
fromEventIdx,
maxEventsToFetch,
(org.apache.hadoop.mapred.TaskAttemptID)reduce);
//匯報umbilical從MRAppMaster獲取Map完成的時間的報告
events = update.getMapTaskCompletionEvents();
//獲取有關具體的MapTask結束運行的情況
LOG.debug("Got " + events.length + " map completion events from " +
fromEventIdx);
assert !update.shouldReset() : "Unexpected legacy state";
//做了一個斷言 獲取更多大數據視頻資料請加QQ群:947967114
// Update the last seen event ID
fromEventIdx += events.length;
// Process the TaskCompletionEvents:
// 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
// 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
// fetching from those maps.
// 3. Remove TIPFAILED maps from neededOutputs since we don't need their
// outputs at all.
for (TaskCompletionEvent event : events) {
//對于獲取的每個事件的報告
scheduler.resolve(event);
//這里使用了ShuffleSchedullerImpl.resolve函數,源代碼如下:
public void resolve(TaskCompletionEvent event) {
switch (event.getTaskStatus()) {
case SUCCEEDED://如果成功
URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());//獲取其URI
addKnownMapOutput(u.getHost() + ":" + u.getPort(),
u.toString(),
event.getTaskAttemptId());
//記錄這個MapTask的節點主機記錄下來,供Fetcher使用,getBaseURI的源代碼:
static URI getBaseURI(TaskAttemptID reduceId, String url) {
StringBuffer baseUrl = new StringBuffer(url);
if (!url.endsWith("/")) {
baseUrl.append("/");
}
baseUrl.append("mapOutput?job=");
baseUrl.append(reduceId.getJobID());
baseUrl.append("&reduce=");
baseUrl.append(reduceId.getTaskID().getId());
baseUrl.append("&map=");
URI u = URI.create(baseUrl.toString());
return u;
獲取各種信息,然后添加都URI對象中。
}
回到源代碼
maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
//最大的嘗試時間
break;
case FAILED:
case KILLED:
case OBSOLETE://如果MapTask運行失敗
obsoleteMapOutput(event.getTaskAttemptId());//獲取TaskId
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'");//寫日志
break;
case TIPFAILED://如果失敗
tipFailed(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'");//寫日志
break;
}
}
回到源代碼
if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {//如果事件成功
++numNewMaps;//增加map數量
}
}
} while (events.length == maxEventsToFetch);
return numNewMaps;
}
回到源代碼
failures = 0;
if (numNewMaps > 0) {
LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
}
LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(SLEEP_TIME);
}
} catch (InterruptedException e) {
LOG.info("EventFetcher is interrupted.. Returning");
return;
} catch (IOException ie) {
LOG.info("Exception in getting events", ie);
// check to see whether to abort
if (++failures >= MAX_RETRIES) {
throw new IOException("too many failures downloading events", ie);//失敗數量大于重試的數量
}
// sleep for a bit
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(RETRY_PERIOD);
}
}
}
} catch (InterruptedException e) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
return;
}
}
MapTask和ReduceTask沒有直接的關系,MapTask不知道ReduceTask在哪些節點上,它只是把進度的時間報告給MRAppMaster。ReduceTask通過“臍帶”執行getMapCompletionEvents操作想MRAppMaster獲取MapTask結束運行的時間報告。有個別的MapTask可能會失敗,但是絕大多數都會成功,只要成功的就通過Fetcher去索取輸出數據,這個信息就是通過shcheduler完成的也就是ShuffleSchedulerImpl對象,ShuffleSchedulerImpl對象并不多,只是個普通的對象。
fetchers就像線程池,里面有若干線程(默認有5個),這些線程等待EventFetcher的通知,一旦有MapTask完成就前往提取數據。
獲取更多大數據視頻資料請加QQ群:947967114
我們看Fetcher線程類的run方法:
public void run() {
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
merger.waitForResource();
// Get a host to shuffle from
host = scheduler.getHost();
//從scheduler獲取一個已經成功完成的MapTask的節點。
metrics.threadBusy();
//線程變成繁忙狀態
// Shuffle
copyFromHost(host);
//開始復制這個節點的數據
} finally {
if (host != null) {//maphost還有運行中的
scheduler.freeHost(host);
//狀態設置成空閑狀態,等待其完成。
metrics.threadFree();
}
}
}
} catch (InterruptedException ie) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
}
}
這里的重點是copyFromHost獲取數據的函數。
protected void copyFromHost(MapHost host) throws IOException {
// reset retryStartTime for a new host
//這是在ReduceTask的節點上運行的
retryStartTime = 0;
// Get completed maps on 'host'
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
//獲取目標節點上的MapTask集合。
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
if (maps.size() == 0) {
return;//沒有完成的直接返回
}
if(LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
}
// List of maps to be fetched yet
Set remaining = new HashSet(maps);
//已經完成、等待shuffle的MapTask集合。
// Construct the url and connect
DataInputStream input = null;
URL url = getMapOutputURL(host, maps);
//生成MapTask所在節點的URL,下面要看getMapOutputURL源碼:
private URL getMapOutputURL(MapHost host, Collection maps
) throws MalformedURLException {
// Get the base url
StringBuffer url = new StringBuffer(host.getBaseUrl());
boolean first = true;
for (TaskAttemptID mapId : maps) {
if (!first) {
url.append(",");
}
url.append(mapId);//在URL后面加上mapid
first = false;
}
LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
//寫日志
return new URL(url.toString());
//返回URL
}
回到主代碼:
try {
setupConnectionsWithRetry(host, remaining, url);
//和對方主機建立HTTP連接,setupConnectionsWithRetry使用了openConnectionWithRetry函數打開鏈接。
openConnectionWithRetry(host, remaining, url);
這段源代碼有使用了openConnection(url);方式,繼續查看。
如下是鏈接的主要過程:
protected synchronized void openConnection(URL url)
throws IOException {
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
//使用的是HTTPURL進行連接
if (sslShuffle) {//如果是有信任證書的
HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
//強轉conn類型
try {
httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());//添加一個證書socket的工廠
} catch (GeneralSecurityException ex) {
throw new IOException(ex);
}
httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
}
connection = conn;
}
在setupConnectionsWithRetry中繼續寫到:
setupShuffleConnection(encHash);
//建立了Shuffle鏈接
connect(connection, connectionTimeout);
// verify that the thread wasn't stopped during calls to connect
if (stopped) {
return;
}
verifyConnection(url, msgToEncode, encHash);
}
//至此連接通過。
if (stopped) {
abortConnect(host, remaining);
//這里邊是關閉連接,可以點進去看一下,滿足列表和等待的兩個條件
return;
}
} catch (IOException ie) {
boolean connectExcpt = ie instanceof ConnectException;
ioErrs.increment(1);
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
" map outputs", ie);
回到主代碼
input = new DataInputStream(connection.getInputStream());
//實例一個輸入流對象。
try {
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
TaskAttemptID[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
//如果需要fetcher的列表不空,并且失敗的task數量沒有
try {
failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
//復制數據出來copyMapOutput的源代碼如下:
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
mapId = TaskAttemptID.forName(header.mapId);
//獲取mapID
compressedLength = header.compressedLength;
decompressedLength = header.uncompressedLength;
forReduce = header.forReduce;
} catch (IllegalArgumentException e) {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
//Don't know which one was bad, so consider all of them as bad
return remaining.toArray(new TaskAttemptID[remaining.size()]);
}
InputStream is = input;
is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
compressedLength -= CryptoUtils.cryptoPadding(jobConf);
decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
//如果需要解壓或解密
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, mapId)) {
return new TaskAttemptID[] {mapId};
}
if(LOG.isDebugEnabled()) {
LOG.debug("header: " + mapId + ", len: " + compressedLength +
", decomp len: " + decompressedLength);
}
try {
mapOutput = merger.reserve(mapId, decompressedLength, id);
//為merge預留一個MapOutput:是內存還是磁盤上。
} catch (IOException ioe) {
// kill this reduce attempt
ioErrs.increment(1);
scheduler.reportLocalError(ioe);
//報告錯誤
return EMPTY_ATTEMPT_ID_ARRAY;
}
// Check if we can shuffle now ...
if (mapOutput == null) {
LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ...");
//Not an error but wait to process data.
return EMPTY_ATTEMPT_ID_ARRAY;
}
// The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
// on decompression failures. Catching and re-throwing as IOException
// to allow fetch failure logic to be processed
try {
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map "
mapOutput.getMapId() + " decomp: " + decompressedLength
mapOutput.shuffle(host, is, compressedLength, decompressedLength,
metrics, reporter);
//跨節點把Mapper的文件內容拷貝到reduce的內存或者磁盤上。
} catch (java.lang.InternalError e) {
LOG.warn("Failed to shuffle for fetcher#"+id, e);
throw new IOException(e);
}
// Inform the shuffle scheduler
long endTime = Time.monotonicNow();
// Reset retryStartTime as map task make progress if retried before.
retryStartTime = 0;
scheduler.copySucceeded(mapId, host, compressedLength,
startTime, endTime, mapOutput);
//告訴調度器完成了一個節點的Map輸出的文件拷貝。
remaining.remove(mapId);
//這個MapTask的輸出已經shuffle完畢
metrics.successFetch();
return null;后面的異常失敗信息我們不管。
這里的mapOutput是用來容納MapTask輸出文件的存儲空間,根據輸出文件的內容大小和內存的情況,可以是內存的Output也可以是DiskOutput。 如果是內存需要預約,因為不止一個Fetcher。我們以InMemoryMapOutput為例。
代碼結構;
Fetcher.run-->copyFromHost-->copyMapOutput-->merger.reserve(MergeManagerImpl.reserve)-->InmemoryMapOutput.shuffle
public void shuffle(MapHost host, InputStream input,
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics,
Reporter reporter) throws IOException {
//跨節點從Mapper拷貝spill文件
IFileInputStream checksumIn =
new IFileInputStream(input, compressedLength, conf);
//校驗和的輸入流
input = checksumIn;
// Are map-outputs compressed?
if (codec != null) {
//如果涉及到了壓縮
decompressor.reset();
//重啟解壓器
input = codec.createInputStream(input, decompressor);
//加了解壓器的輸入流
}
try {
IOUtils.readFully(input, memory, 0, memory.length);
//從Mapper方把特定的Partition數據讀入Reducer的內存緩沖區。
metrics.inputBytes(memory.length);
reporter.progress();//匯報進度
LOG.info("Read " + memory.length + " bytes from map-output for " +
getMapId());
/**
We've gotten the amount of data we were expecting. Verify the
decompressor has nothing more to offer. This action also forces the
decompressor to read any trailing bytes that weren't critical
for decompression, which is necessary to keep the stream
*/
if (input.read() >= 0 ) {
throw new IOException("Unexpected extra bytes from input stream for " +
getMapId());
}
} catch (IOException ioe) {
// Close the streams
IOUtils.cleanup(LOG, input);
// Re-throw
throw ioe;
} finally {
CodecPool.returnDecompressor(decompressor);
//釋放解壓器
}
}
從對方把spill文件中屬于本partition數據復制過來,回到copyFromHost中,通過scheduler.copySuccessed告知scheduler,并把這個MapTask的ID從remaining集合中刪除,進入下一個循環,復制下一個MapTask數據。直到把所有的屬于本Partition的數據都復制過來。
以上是Reducer端Fetcher的過程,它向Mapper端發送HTTP GET請求,下載文件。在MapTask就有一個與之對應的Server,這個網絡協議的源代碼不做深究,課下有興趣自己研究。 獲取更多大數據視頻資料請加QQ群:947967114
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。