中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Hadoop2.6.0學習筆記(四)TextInputFormat及RecordReader解析

發布時間:2020-07-27 10:15:00 來源:網絡 閱讀:2942 作者:luchunli1985 欄目:大數據

魯春利的工作筆記,誰說程序員不能有文藝范?


 

一個最簡單的MapReduce程序

package com.lucl.hadoop.mapreduce;

public class MiniMRDriver extends Configured implements Tool {
    public static void main(String[] args) {
        try {
            ToolRunner.run(new MiniMRDriver(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
        job.setJarByClass(MiniMRDriver.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
   
        return job.waitForCompletion(true) ? 0 : 1;
    }

}

查看MapReduce任務的數據

[hadoop@nnode code]$ hdfs dfs -text /data/HTTP_SITE_FLOW.log
視頻網站        15      1527
信息安全        20      3156
站點統計        24      6960
搜索引擎        28      3659
站點統計        3       1938
綜合門戶        15      1938
搜索引擎        21      9531
搜索引擎        63      11058
[hadoop@nnode code]$

打包運行該MapReduce程序

[hadoop@nnode code]$ hadoop jar MiniMR.jar /data/HTTP_SITE_FLOW.log /201511302119
15/11/30 21:19:46 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032
15/11/30 21:19:48 INFO input.FileInputFormat: Total input paths to process : 1
15/11/30 21:19:48 INFO mapreduce.JobSubmitter: number of splits:1
15/11/30 21:19:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448889273221_0001
15/11/30 21:19:50 INFO impl.YarnClientImpl: Submitted application application_1448889273221_0001
15/11/30 21:19:50 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448889273221_0001/
15/11/30 21:19:50 INFO mapreduce.Job: Running job: job_1448889273221_0001
15/11/30 21:20:26 INFO mapreduce.Job: Job job_1448889273221_0001 running in uber mode : false
15/11/30 21:20:26 INFO mapreduce.Job:  map 0% reduce 0%
15/11/30 21:20:59 INFO mapreduce.Job:  map 100% reduce 0%
15/11/30 21:21:30 INFO mapreduce.Job:  map 100% reduce 100%
15/11/30 21:21:31 INFO mapreduce.Job: Job job_1448889273221_0001 completed successfully
15/11/30 21:21:31 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=254
                FILE: Number of bytes written=213863
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=277
                HDFS: Number of bytes written=194
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=30256
                Total time spent by all reduces in occupied slots (ms)=27787
                Total time spent by all map tasks (ms)=30256
                Total time spent by all reduce tasks (ms)=27787
                Total vcore-seconds taken by all map tasks=30256
                Total vcore-seconds taken by all reduce tasks=27787
                Total megabyte-seconds taken by all map tasks=30982144
                Total megabyte-seconds taken by all reduce tasks=28453888
        Map-Reduce Framework
                Map input records=8
                Map output records=8
                Map output bytes=232
                Map output materialized bytes=254
                Input split bytes=103
                Combine input records=0
                Combine output records=0
                Reduce input groups=8
                Reduce shuffle bytes=254
                Reduce input records=8
                Reduce output records=8
                Spilled Records=16
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=182
                CPU time spent (ms)=2000
                Physical memory (bytes) snapshot=305459200
                Virtual memory (bytes) snapshot=1697824768
                Total committed heap usage (bytes)=136450048
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=174
        File Output Format Counters 
                Bytes Written=194
[hadoop@nnode code]$

查看輸出結果

[hadoop@nnode code]$ hdfs dfs -ls /201511302119
Found 2 items
-rw-r--r--   2 hadoop hadoop          0 2015-11-30 21:21 /201511302119/_SUCCESS
-rw-r--r--   2 hadoop hadoop        194 2015-11-30 21:21 /201511302119/part-r-00000
[hadoop@nnode code]$ hdfs dfs -text /201511302119/part-r-00000
0       視頻網站        15      1527
22      信息安全        20      3156
44      站點統計        24      6960
66      搜索引擎        28      3659
88      站點統計        3       1938
109     綜合門戶        15      1938
131     搜索引擎        21      9531
153     搜索引擎        63      11058
[hadoop@nnode code]$


在這里沒有指定Mapper類、Reducer類,并通過FileInputFormat和FileOutputFormat指定了輸入數據及輸出結果存儲路徑,執行后把行偏移量和行內容保存到了指定的輸出路徑下。


FileInputFormat的默認實現為TextInputFormat,專門用來處理文本數據,以回車換行符作為一行的分割標記,其中key為該行的行偏移量,value為這一行內容。

類定義如下:

public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, 
                                                  TaskAttemptContext context) {
    // 略
    return new LineRecordReader(recordDelimiterBytes);
  }

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    // 是否可切片
  }
}

在Job任務中可以通過public void setInputFormatClass(Class<? extends InputFormat> cls)方法設定希望使用的InputFormat格式。

public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext context) 
                        throws IOException, InterruptedException;
                               
    public abstract RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, InterruptedException;
}

文件在HDFS上是以Block塊的形式存儲的,而在MapReduce計算中則是以劃分的切片(split后稱為split分片或chunk)進行讀取的,每個split的就對應一個mapper task,split的數量決定了mappertask的數量。

注意:MapReduce是由Mapper和Reducer組成的,MapperTask由split決定,那么Reducer由什么來決定呢?后面會逐漸通過示例代碼進行說明


List<InputSplit> getSplits(JobContext context)負責將一個大數據邏輯分成多片。比如數據庫表有100條數據,按照主鍵ID升序存儲,假設每20條分成一片,這個List的大小就是5,然后每個InputSplit記錄兩個參數,第一個為這個分片的起始ID,第二個為這個分片數據的大小(這里是20)。InputSplit并沒有真正存儲數據,只是提供了一個如何將數據分片的方法。

RecordReader<K, V) createRecordReader(InputSplit split, TaskAttemptContext context)根據InputSplit定義的分片方法,返回一個能夠讀取分片記錄的RecordReader。


InputSplit類定義

public abstract class InputSplit {
    // Split分片的大小,用來實現輸入的split的排序
    public abstract long getLength() throws IOException, InterruptedException;
    // 用來獲取存儲分片的位置列表
    public abstract String[] getLocations() throws IOException, InterruptedException;
}


RecordReader類定義

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
    public abstract void initialize(InputSplit split,TaskAttemptContext context
                                  ) throws IOException, InterruptedException;
    public abstract boolean nextKeyValue() throws IOException, InterruptedException;
    public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
    public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
    public abstract float getProgress() throws IOException, InterruptedException;
    public abstract void close() throws IOException;
}

InputSplit描述了數據塊的切分方式,RecordReader類則是實際用來加載split分片數據,并把數據轉換為適合Mapper類里面map()方法處理的<key, value>形式。

RecordReader實例是由輸入格式定義的,默認的輸入格式為TextInputFormat,提供了一個LineRecordReader,把每一行的行偏移量作為key,把內容作為value。RecordReader會在輸入塊上被反復調用,直到整個輸入塊被處理完畢,每一次調用RecordReader都會調用Mapper類的map()函數。


TextInputFormat并沒有getSplits的實現,而是其父類FileInputFormat進行了實現。

public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
    // Generate the list of files and make them into FileSplits
    public List<InputSplit> getSplits(JobContext job) throws IOException {
        // 1. 通過JobContext中獲取List<FileStatus>;
        // 2. 遍歷文件屬性數據
        //    2.1. 如果是空文件,則初始化一個無主機信息的FileSplits實例;
        //    2.2. 非空文件,判斷是否分片,默認是分片的
        //         如果不分片則每個文件作為一個FileSplit
        //         計算分片大小splitSize
        
        // getFormatMinSplitSize()返回固定值1
        // getMinSplitSize(job)通過Configuration獲取,配置參數為(mapred-default.xml):
        // mapreduce.input.fileinputformat.split.minsize默認值為0
        // minSize的值為1
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        // 實際調用context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
        // 通過Configuration獲取,配置參數為(mapred-default.xml無該參數):
        // mapreduce.input.fileinputformat.split.maxsize
        // 未配置該參數,取Long.MAX_VALUE,maxSize的值為Long.MAX_VALUE
        long maxSize = getMaxSplitSize(job);
    
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<FileStatus> files = listStatus(job);
        for (FileStatus file: files) {
          Path path = file.getPath();     // 在HDFS上的絕對路徑
          long length = file.getLen();    // 文件的實際大小
          if (length != 0) {
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            if (isSplitable(job, path)) {
              // 這里取的是Block塊的大小,在2.6里面默認是134217728(即128M)
              long blockSize = file.getBlockSize();
              // 獲取切片大小,computeSplitSize(blockSize, minSize, maxSize)實際調用:
              //          1                Long.MAX_VALUE   128M
              // Math.max(minSize, Math.min(maxSize,        blockSize));
              // split的大小剛好等于block塊的大小,為128M
              long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    
              long bytesRemaining = length;   // 取文件的實際大小 
              // 如果文件的實際大小/splitSize > 1.1(即實際大小大于128M * 1.1)
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                // getBlockIndex判斷is the offset inside this block?
                // 第一次length-bytesRemaining的值為0,取block塊的第一個復本
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                            blkLocations[blkIndex].getHosts(),
                            blkLocations[blkIndex].getCachedHosts()));
                bytesRemaining -= splitSize;    // 依次減去分片的大小,對剩余長度再次分片
              }
              
              /**
              * 加入有一個300M的文件,設置bytesRemaining = length = 300M;
              * 1、判定bytesRemaining / splitSize = 300 / 128 > 1.1
              *  makeSplie-->FileSplit(path, length - bytesRemaining = 0, splitSize=128M)
              *  bytesRemaining -= splitSize => bytesRemaining = 172M
              * 2、判定bytesRemaining / splitSize = 172 / 128 > 1.1
              *  makeSplie-->FileSplit(path, length - bytesRemaining = 128, splitSize=128M)
              *  bytesRemaining -= splitSize => bytesRemaining = 44M
              * 3、判定bytesRemaining / splitSize = 44 / 128 < 1.1
              *  while循環結束。
              */
    
              // 多次分片后,最后的數據長度仍不為0但又不足一個分片大小
              if (bytesRemaining != 0) {   
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                           blkLocations[blkIndex].getHosts(),
                           blkLocations[blkIndex].getCachedHosts()));
                // 在這里把最后的44M又make了一個分片
                // makeSplie-->FileSplit(path, length - bytesRemaining = 256, splitSize=44)
              }
            } else { // not splitable,就取實際大小
              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                          blkLocations[0].getCachedHosts()));
            }
          } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
          }
        }
        // Save the number of input files for metrics/loadgen
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        
        return splits;
    }
}

說明:List<FileStatus>中FileStatus可能為LocatedFileStatus(a FileStatus that includes a file's block locations)。


LineRecordReader提供對文本數據的讀取解析,并依次調用Mapper的map()函數傳入<key, value>。

個人理解TextInputFormat通過Split將文件邏輯上進行分片,對于每一個分片分別new一個LineRecordReader進行解析處理,解析后的買一行調用一次map()函數,而map task仍是一個。

public class LineRecordReader extends RecordReader<LongWritable, Text> {
    public void initialize(InputSplit genericSplit,TaskAttemptContext context)
     throws IOException {
         // 1. 接收split(FileSplit對象)分片,并通過分片解析出:
         //     分片起始位置:start = split.getStart();
         //     結束位置:end = start + split.getLength();
         //     文件位置:在HDFS上的絕對路徑final Path file = split.getPath();
         // 2. 獲取文件的輸入流
         //     通過FileSystem獲取文件,并獲取輸入流 fileIn = fs.open(file);
         // 3. 判定是否為壓縮文件,并獲取壓縮格式
         //     CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
         // 4. 計算行偏移量(原始解釋如下)
         //     If this is not the first split, we always throw away first record
         //     because we always (except the last split) read one extra line in
         //     next() method.
        if (start != 0) {
          start += in.readLine(new Text(), 0, maxBytesToConsume(start));
        }
        this.pos = start;       
    }
    
    public boolean nextKeyValue() throws IOException {
         if (key == null) {    // key-->這里為map task中map()函數的key
          key = new LongWritable();
         }
         key.set(pos);         // 取的是行偏移量
         if (value == null) {
          value = new Text();
         }
         // 判定split是否已經讀取解析完成,如果未完成的話就讀取一行數據
         // 通過org.apache.hadoop.util.LineReader的readCustomLine或readDefaultLine讀取
         //   如果指定了行分隔符則調用readCustomLine;
         //   否則默認通過回車換行作為分隔符調用readDefaultLine
         newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
         pos += newSize;        // 偏移量加上個讀取的行的長度,作為下一行的偏移量
    }
    
    /**
     * nextKeyValue是一個對split分片依次讀入迭代的過程,
     * 每次讀一行,并從這一行中解析出key和value,并分別賦值,
     * 傳入到map函數時將該<key, value>值傳入(具體是怎么調用map函數的,后續分析)。
     */
    @Override
    public LongWritable getCurrentKey() {
        return key;
    }

    @Override
    public Text getCurrentValue() {
        return value;
    }
    
    /**
    * Get the progress within the split
    */
    public float getProgress() throws IOException {
        if (start == end) {
          return 0.0f;
        } else {
          return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
        }
    }
    
    // 關閉打開的從hdfs的輸入流對象
    public synchronized void close() throws IOException {
        try {
          if (in != null) {
            in.close();
          }
        } finally {
          if (decompressor != null) {
            CodecPool.returnDecompressor(decompressor);
          }
        }
    }
}



向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

十堰市| 马边| 登封市| 桃源县| 孙吴县| 凉城县| 友谊县| 额敏县| 东阿县| 红河县| 林西县| 崇明县| 钟山县| 昌宁县| 基隆市| 滦南县| 铁力市| 葫芦岛市| 化州市| 辉县市| 武鸣县| 濮阳县| 仙游县| 漠河县| 论坛| 宝鸡市| 蒲江县| 长岭县| 金乡县| 阳谷县| 怀安县| 阿坝县| 蚌埠市| 紫阳县| 句容市| 伊吾县| 肇东市| 松潘县| 岐山县| 佳木斯市| 福鼎市|