您好,登錄后才能下訂單哦!
當job提交至yarn之后,就會開始調度運行map任務,這里開始講解map輸入的源碼分析。
一個map任務的入口就是 MapTask.class 中的run() 方法
MapTask.class
//---------------------------------MapTask.java
public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;
if (this.isMapTask()) {
if (this.conf.getNumReduceTasks() == 0) {
this.mapPhase = this.getProgress().addPhase("map", 1.0F);
} else {
this.mapPhase = this.getProgress().addPhase("map", 0.667F);
this.sortPhase = this.getProgress().addPhase("sort", 0.333F);
}
}
TaskReporter reporter = this.startReporter(umbilical);
boolean useNewApi = job.getUseNewMapper();
//進行map任務的初始化
this.initialize(job, this.getJobID(), reporter, useNewApi);
if (this.jobCleanup) {
this.runJobCleanupTask(umbilical, reporter);
} else if (this.jobSetup) {
this.runJobSetupTask(umbilical, reporter);
} else if (this.taskCleanup) {
this.runTaskCleanupTask(umbilical, reporter);
} else {
//啟動map任務,判斷是使用新的還是舊的api
if (useNewApi) {
this.runNewMapper(job, this.splitMetaInfo, umbilical, reporter);
} else {
this.runOldMapper(job, this.splitMetaInfo, umbilical, reporter);
}
this.done(umbilical, reporter);
}
}
上面重點有兩個方法,一個是 this.initialize()以及 this.runNewMapper()。
//---------------------------------Task.java
public void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException {
//創建task以及job上下文對象
this.jobContext = new JobContextImpl(job, id, reporter);
this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter);
//將task任務的狀態改為正在運行
if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) {
this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING);
}
if (useNewApi) {
if (LOG.isDebugEnabled()) {
LOG.debug("using new api for output committer");
}
//獲取job中配置的輸出格式類,并通過反射獲取該類的Class對象
this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job);
//通過outputformat類獲取commiter
this.committer = this.outputFormat.getOutputCommitter(this.taskContext);
} else {
this.committer = this.conf.getOutputCommitter();
}
//從FileOutputFormat獲取任務結果輸出路徑。
/*
可能有的人會奇怪,為啥mapper這里要獲取outputformat 的輸出路徑。
首先我們要知道,一個MapReduce任務可以只有mapper,而沒有reducer的,
那么這時候程序的輸出是有mapper直接輸出的,這時候自然就需要知道輸出的路徑,這里就派上用場了
*/
Path outputPath = FileOutputFormat.getOutputPath(this.conf);
if (outputPath != null) {
if (this.committer instanceof FileOutputCommitter) {
FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext));
} else {
FileOutputFormat.setWorkOutputPath(this.conf, outputPath);
}
}
this.committer.setupTask(this.taskContext);
Class<? extends ResourceCalculatorProcessTree> clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class);
this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf);
LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree);
if (this.pTree != null) {
this.pTree.updateProcessTree();
this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime();
}
}
這個方法主要做了一些初始化工作,比如創建上下文對象,獲取輸出outputFormat類,以及路徑等。
//---------------------------------MapTask.java
private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewMapper(JobConf job, TaskSplitIndex splitIndex, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException, InterruptedException {
TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter);
//通過反射獲取job中配置的mapper實現類
Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
//通過反射獲取job中配置的輸入格式類,默認是TextInputFormat
InputFormat<INKEY, INVALUE> inputFormat = (InputFormat)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
org.apache.hadoop.mapreduce.InputSplit split = null;
//獲取切片詳細信息,傳入輸出路徑以及偏移量作為參數.也就是當前mapper處理的某個切片
split = (org.apache.hadoop.mapreduce.InputSplit)this.getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
//獲取輸入的讀取數據文件的 RecordReader 的對象,默認inputformat為TextInputFormat,對應默認的RecordReader為LineRecordReader
org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> input = new MapTask.NewTrackingRecordReader(split, inputFormat, reporter, taskContext);
job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());
RecordWriter output = null;
//獲取RecordWriter輸出對象
if (job.getNumReduceTasks() == 0) {
output = new MapTask.NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new MapTask.NewOutputCollector(taskContext, job, umbilical, reporter);
}
MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl(job, this.getTaskID(), input, (RecordWriter)output, this.committer, reporter, split);
org.apache.hadoop.mapreduce.Mapper.Context mapperContext = (new WrappedMapper()).getMapContext(mapContext);
try {
//初始化RecordReader中的數據
input.initialize(split, mapperContext);
//運行mapper中的run方法,也就是Mapper類中的run方法,開始運行map任務
mapper.run(mapperContext);
this.mapPhase.complete();
this.setPhase(Phase.SORT);
this.statusUpdate(umbilical);
//map運行完,關閉輸入、輸出流
input.close();
input = null;
((RecordWriter)output).close(mapperContext);
output = null;
} finally {
this.closeQuietly((org.apache.hadoop.mapreduce.RecordReader)input);
this.closeQuietly((RecordWriter)output, mapperContext);
}
}
可以看到,這里就是整個map任務的核心流程,做了以下工作:
(1)獲取mapper類對象,下面要執行里面的map方法
(2)獲取InputFormat對象,默認是默認inputformat為TextInputFormat
(3)通過InputFormat對象獲取RecordReader對象,后面用于讀取數據文件
(4)獲取用于輸出map的結果的RecordWriter對象
(5)獲取切片信息,比如切片所在文件的路徑,起始偏移量等
(6)初始化切片數據
(7)開始運行mapper中的run()方法
(8)運行完畢,關閉輸入流,將結果通過RecordWriter刷寫。
(9)刷寫完畢后,關閉輸入流以及輸出流
下面看看其中的核心方法
//---------------------------------MapTask.java
private <T> T getSplitDetails(Path file, long offset) throws IOException {
//獲取文件系統對象,并打開文件輸出流
FileSystem fs = file.getFileSystem(this.conf);
FSDataInputStream inFile = fs.open(file);
//跳過指定的偏移量,也就是從指定偏移量的位置開始讀取數據,其實就是切片開始的偏移量
inFile.seek(offset);
String className = StringInterner.weakIntern(Text.readString(inFile));
Class cls;
try {
cls = this.conf.getClassByName(className);
} catch (ClassNotFoundException var13) {
IOException wrap = new IOException("Split class " + className + " not found");
wrap.initCause(var13);
throw wrap;
}
SerializationFactory factory = new SerializationFactory(this.conf);
//反序列化方式打開輸入流
Deserializer<T> deserializer = factory.getDeserializer(cls);
deserializer.open(inFile);
T split = deserializer.deserialize((Object)null);
long pos = inFile.getPos();
((Counter)this.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)).increment(pos - offset);
inFile.close();
//返回切片經過反序列化之后的可讀取對象
return split;
}
可以看到這里主要是返回切片的反序列化之后可以讀取的信息對象
在看這個方法之前,首先我們看看input這個對象是由哪個類創建的。它是由NewTrackingRecordReader 這個類創建的。這是個靜態內部類
//---------------------------------MapTask.java
static class NewTrackingRecordReader<K, V> extends org.apache.hadoop.mapreduce.RecordReader<K, V> {
private final org.apache.hadoop.mapreduce.RecordReader<K, V> real;
private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
private final org.apache.hadoop.mapreduce.Counter fileInputByteCounter;
private final TaskReporter reporter;
private final List<Statistics> fsStats;
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split, InputFormat<K, V> inputFormat, TaskReporter reporter, TaskAttemptContext taskContext) throws InterruptedException, IOException {
this.reporter = reporter;
this.inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
this.fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ);
List<Statistics> matchedStats = null;
if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
matchedStats = Task.getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit)split).getPath(), taskContext.getConfiguration());
}
this.fsStats = matchedStats;
long bytesInPrev = this.getInputBytes(this.fsStats);
//調用job任務中定義的inputformat類中的createRecordReader方法,獲取RecordReader對象。返回的是 LineRecordReader
this.real = inputFormat.createRecordReader(split, taskContext);
long bytesInCurr = this.getInputBytes(this.fsStats);
this.fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}
...........
}
我們可以看到構造方法中,是調用 inputFormat對象的createRecordReader() 方法來創建RecordReader對象的,上面也說了默認inputFormat為 TextInputFormat。
//---------------------------TextInputFormat.java
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
public TextInputFormat() {
}
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
return new LineRecordReader(recordDelimiterBytes);
}
可以清楚看到,返回的就是 LineRecordReader 這個reader類。
接著我們繼續看 input.initialize()
static class NewTrackingRecordReader<K, V> extends org.apache.hadoop.mapreduce.RecordReader<K, V> {
public void initialize(org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
long bytesInPrev = this.getInputBytes(this.fsStats);
//調用 RecordReader對象的 initialize方法,初始化輸入。上面說到默認的是LineRecordReader
//this.real已經在上面初始化了,就是LineRecordReader
this.real.initialize(split, context);
long bytesInCurr = this.getInputBytes(this.fsStats);
this.fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}
}
可以看到,調用 RecordReader中的 initialize 方法,也就是調用LineRecordReader 中的 initialize() 方法,下面看看
//---------------------------------------LineRecordReader.java
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit)genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647);
//獲取切片的數據開始位置以及終止位置
this.start = split.getStart();
this.end = this.start + split.getLength();
//獲取切片對應的文件的輸入流
Path file = split.getPath();
FileSystem fs = file.getFileSystem(job);
this.fileIn = fs.open(file);
//如果文件有壓縮,則用壓縮類解壓
CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file);
//以壓縮方式讀取切片
if (null != codec) {
this.isCompressedInput = true;
this.decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream(this.fileIn, this.decompressor, this.start, this.end, READ_MODE.BYBLOCK);
this.in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes);
this.start = cIn.getAdjustedStart();
this.end = cIn.getAdjustedEnd();
this.filePosition = cIn;
} else {
if (this.start != 0L) {
throw new IOException("Cannot seek in " + codec.getClass().getSimpleName() + " compressed stream");
}
this.in = new SplitLineReader(codec.createInputStream(this.fileIn, this.decompressor), job, this.recordDelimiterBytes);
this.filePosition = this.fileIn;
}
} else {
//無壓縮方式讀取切片
this.fileIn.seek(this.start);
//這里很重要,是真正用于讀取數據的類
this.in = new UncompressedSplitLineReader(this.fileIn, job, this.recordDelimiterBytes, split.getLength());
this.filePosition = this.fileIn;
}
//對起始偏移量進行修正,并賦值給pos這個偏移量
if (this.start != 0L) {
this.start += (long)this.in.readLine(new Text(), 0, this.maxBytesToConsume(this.start));
}
this.pos = this.start;
}
這里的工作主要是給 RecordReader對象讀取文件做初始化工作。主要就是獲取切片的輸入流對象。
this.in 這里就用于后面讀取數據的對象,這里就是完成了這個輸入流對象的初始化。
這個其實就是寫的mapper 的run方法:
//------------------------Mapper.java mapper.run(mapperContext);
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
this.setup(context);
try {
//這里循環讀取key和value,給map方法處理
//關鍵在于 context這個對象,從上面runNewApi中可以看到,是MapContextImpl類型的
while(context.nextKeyValue()) {
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
this.cleanup(context);
}
}
可以看到,這里是個while循環,通過context上下文對象獲取KV,然后傳入map方法中處理。
從3中可以看到,這個context是 MapContextImpl類型的,看看這個類
//-----------------------MapContextImpl.java..
public class MapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends TaskInputOutputContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private RecordReader<KEYIN, VALUEIN> reader;
private InputSplit split;
//構造方法中包括獲取 RecordReader對象,以及split
public MapContextImpl(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN, VALUEIN> reader, RecordWriter<KEYOUT, VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader = reader;
this.split = split;
}
public InputSplit getInputSplit() {
return this.split;
}
//下面都是調用 RecordReader 中的get方法獲取key value
public KEYIN getCurrentKey() throws IOException, InterruptedException {
return this.reader.getCurrentKey();
}
public VALUEIN getCurrentValue() throws IOException, InterruptedException {
return this.reader.getCurrentValue();
}
public boolean nextKeyValue() throws IOException, InterruptedException {
//這里就是調用reader 的方法
return this.reader.nextKeyValue();
}
}
在它的構造方法中,主要從3中傳入了 split切片,以及 RecordReader對象。下面就是三個獲取KV的方法,也就是在 mapper.run() 中調用的方法。
下面看看 this.reader.nextKeyValue()
//----------------------------------LineRecordReader.java
public boolean nextKeyValue() throws IOException {
if (this.key == null) {
this.key = new LongWritable();
}
//設置key為偏移量
this.key.set(this.pos);
if (this.value == null) {
this.value = new Text();
}
int newSize = 0;
while(this.getFilePosition() <= this.end || this.in.needAdditionalRecordAfterSplit()) {
if (this.pos == 0L) {
newSize = this.skipUtfByteOrderMark();
} else {
/*讀取數據到value中。this.in是UncompressedSplitLineReader類型的,在LineRecordReader的initialize方法中初始化了。該類父類為LineReader。*/
//調用 LineRreader 的readline 方法。讀一行數據
newSize = this.in.readLine(this.value, this.maxLineLength, this.maxBytesToConsume(this.pos));
this.pos += (long)newSize;
}
if (newSize == 0 || newSize < this.maxLineLength) {
break;
}
LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - (long)newSize));
}
if (newSize == 0) {
this.key = null;
this.value = null;
return false;
} else {
return true;
}
}
可以看到,這里已經看到key和value的蹤影了。key就是數據偏移量,value就是通過readLine讀取的數據。如果有數據返回true,mapper.run() 通過getKey和getValue對應的KV。下面看看 this.in.readLine,也就是 LineReader.readLine()。
//---------------------------LineReader.java
public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {
return this.recordDelimiterBytes != null ? this.readCustomLine(str, maxLineLength, maxBytesToConsume) : this.readDefaultLine(str, maxLineLength, maxBytesToConsume);
}
private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {
str.clear();
int txtLength = 0;
long bytesConsumed = 0L;
int delPosn = 0;
int ambiguousByteCount = 0;
do {
int startPosn = this.bufferPosn;
if (this.bufferPosn >= this.bufferLength) {
startPosn = this.bufferPosn = 0;
this.bufferLength = this.fillBuffer(this.in, this.buffer, ambiguousByteCount > 0);
if (this.bufferLength <= 0) {
if (ambiguousByteCount > 0) {
str.append(this.recordDelimiterBytes, 0, ambiguousByteCount);
bytesConsumed += (long)ambiguousByteCount;
}
break;
}
}
for(; this.bufferPosn < this.bufferLength; ++this.bufferPosn) {
if (this.buffer[this.bufferPosn] == this.recordDelimiterBytes[delPosn]) {
++delPosn;
if (delPosn >= this.recordDelimiterBytes.length) {
++this.bufferPosn;
break;
}
} else if (delPosn != 0) {
this.bufferPosn -= delPosn;
if (this.bufferPosn < -1) {
this.bufferPosn = -1;
}
delPosn = 0;
}
}
int readLength = this.bufferPosn - startPosn;
bytesConsumed += (long)readLength;
int appendLength = readLength - delPosn;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
bytesConsumed += (long)ambiguousByteCount;
if (appendLength >= 0 && ambiguousByteCount > 0) {
//看到這里就很明顯了,將數據追加到 value中
str.append(this.recordDelimiterBytes, 0, ambiguousByteCount);
ambiguousByteCount = 0;
this.unsetNeedAdditionalRecordAfterSplit();
}
if (appendLength > 0) {
str.append(this.buffer, startPosn, appendLength);
txtLength += appendLength;
}
if (this.bufferPosn >= this.bufferLength && delPosn > 0 && delPosn < this.recordDelimiterBytes.length) {
ambiguousByteCount = delPosn;
bytesConsumed -= (long)delPosn;
}
} while(delPosn < this.recordDelimiterBytes.length && bytesConsumed < (long)maxBytesToConsume);
if (bytesConsumed > 2147483647L) {
throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
} else {
return (int)bytesConsumed;
}
}
上面重要就是讀取數據的過程了,過程過于長,抓住關鍵的看,其實就是將讀取的一行數據追加到 this.value中。
至此,map的整個輸入流程涉及到兩個重要的類
InputFormat -- 處理原始數據并切片;創建RecordReader 對象
RecordReader -- 讀取切片中的數據,處理成KV,傳遞KV給map方法處理
這兩個都是抽象類:
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
public RecordReader() {
}
public abstract void initialize(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
public abstract float getProgress() throws IOException, InterruptedException;
public abstract void close() throws IOException;
}
public abstract class InputFormat<K, V> {
public InputFormat() {
}
public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}
當我們想自定義inputformat類和recordreader類時,就需要繼承這兩個類,并實現其中的方法。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。