您好,登錄后才能下訂單哦!
小編給大家分享一下如何自定義hadoop MapReduce InputFormat切分輸入文件,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
我們實現了按 cookieId 和 time 進行二次排序,現在又有新問題:假如我需要按 cookieId 和 cookieId&time 的組合進行分析呢?此時最好的辦法是自定義 InputFormat,讓 mapreduce 一次讀取一個 cookieId 下的所有記錄,然后再按 time 進行切分 session,邏輯偽碼如下:
for OneSplit in MyInputFormat.getSplit() // OneSplit 是某個 cookieId 下的所有記錄
for session in OneSplit // session 是按 time 把 OneSplit 進行了二次分割
for line in session // line 是 session 中的每條記錄,對應原始日志的某條記錄
1、原理:
InputFormat是MapReduce中一個很常用的概念,它在程序的運行中到底起到了什么作用呢?
InputFormat其實是一個接口,包含了兩個方法:
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException;
}
這兩個方法有分別完成著以下工作:
方法 getSplits 將輸入數據切分成splits,splits的個數即為map tasks的個數,splits的大小默認為塊大小,即64M
方法 getRecordReader 將每個 split 解析成records, 再依次將record解析成<K,V>對
也就是說 InputFormat完成以下工作:
InputFile --> splits --> <K,V>
系統常用的 InputFormat 又有哪些呢?
其中Text InputFormat便是最常用的,它的 <K,V>就代表 <行偏移,該行內容>
然而系統所提供的這幾種固定的將 InputFile轉換為 <K,V>的方式有時候并不能滿足我們的需求:
此時需要我們自定義 InputFormat ,從而使Hadoop框架按照我們預設的方式來將
InputFile解析為<K,V>
在領會自定義 InputFormat 之前,需要弄懂一下幾個抽象類、接口及其之間的關系:
InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),
RecordReader (interface), Line RecordReader(class)的關系
FileInputFormat implements InputFormat
TextInputFormat extends FileInputFormat
TextInputFormat.get RecordReader calls Line RecordReader
Line RecordReader implements RecordReader
對于InputFormat接口,上面已經有詳細的描述
再看看 FileInputFormat,它實現了 InputFormat接口中的 getSplits方法,而將 getRecordReader與isSplitable留給具體類(如 TextInputFormat )實現, isSplitable方法通常不用修改,所以只需要在自定義的 InputFormat中實現
getRecordReader方法即可,而該方法的核心是調用 Line RecordReader(即由LineRecorderReader類來實現 " 將每個s plit解析成records, 再依次將record解析成<K,V>對" ),該方法實現了接口RecordReader
public interface RecordReader<K, V> {
boolean
next(K key, V value) throws IOException;
K
createKey();
V
createValue();
long
getPos() throws IOException;
public void
close() throws IOException;
float
getProgress() throws IOException;
}
因此自定義InputFormat的核心是自定義一個實現接口RecordReader類似于LineRecordReader的類,該類的核心也正是重寫接口RecordReader中的幾大方法,
定義一個InputFormat的核心是定義一個類似于LineRecordReader的,自己的RecordReader
2、代碼:
package MyInputFormat; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class TrackInputFormat extends FileInputFormat<LongWritable, Text> { @SuppressWarnings("deprecation") @Override public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) { return new TrackRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path file) { CompressionCodec codec = new CompressionCodecFactory( context.getConfiguration()).getCodec(file); return codec == null; } }
package MyInputFormat; import java.io.IOException; import java.io.InputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * Treats keys as offset in file and value as line. * * @deprecated Use * {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader} * instead. */ public class TrackRecordReader extends RecordReader<LongWritable, Text> { private static final Log LOG = LogFactory.getLog(TrackRecordReader.class); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private NewLineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; // ---------------------- // 行分隔符,即一條記錄的分隔符 private byte[] separator = "END\n".getBytes(); // -------------------- public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); boolean skipFirstLine = false; if (codec != null) { in = new NewLineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; this.start -= separator.length;// // --start; fileIn.seek(start); } in = new NewLineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } LOG.info("Skipped line of size ">
package MyInputFormat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class TestMyInputFormat { public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("key:\t " + key); System.out.println("value:\t " + value); System.out.println("-------------------------"); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Path outPath = new Path("/hive/11"); FileSystem.get(conf).delete(outPath, true); Job job = new Job(conf, "TestMyInputFormat"); job.setInputFormatClass(TrackInputFormat.class); job.setJarByClass(TestMyInputFormat.class); job.setMapperClass(TestMyInputFormat.MapperClass.class); job.setNumReduceTasks(0); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outPath); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
3、測試數據:
cookieId time url cookieOverFlag
1 a 1_hao123 1 a 1_baidu 1 b 1_google 2END 2 c 2_google 2 c 2_hao123 2 c 2_google 1END 3 a 3_baidu 3 a 3_sougou 3 b 3_soso 2END
4、結果:
key: 0 value: 1 a 1_hao123 1 a 1_baidu 1 b 1_google 2 ------------------------- key: 47 value: 2 c 2_google 2 c 2_hao123 2 c 2_google 1 ------------------------- key: 96 value: 3 a 3_baidu 3 a 3_sougou 3 b 3_soso 2 -------------------------
看完了這篇文章,相信你對“如何自定義hadoop MapReduce InputFormat切分輸入文件”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。