您好,登錄后才能下訂單哦!
jai包
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> </dependency>
2.x以后就拆成一些零散的包了,沒有core包了
代碼:
package org.conan.myhadoop.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; //org.apache.hadoop.mapred 老系統的包 //org.apache.hadoop.mapreduce 新系統的包 import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* * ModuleMapReduce Class * 單純的注釋 */ public class ModuleMapReduce extends Configured implements Tool { /** * * ModuleMapper Class 不僅有注釋的功效而且你鼠標放在你注釋的方法上面他會把你注釋的內容顯示出來, * */ public static class ModuleMapper extends Mapper<LongWritable, Text, LongWritable, Text> { @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO } @Override public void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } /** * * ModuleReducer Class * */ public static class ModuleReducer extends Reducer<LongWritable, Text, LongWritable, Text> { @Override public void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.setup(context); } @Override protected void reduce(LongWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException { // TODO } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } // Driver 驅動 // @Override //實現接口時關鍵字1.5和1.7的JDK都會報錯,只有1.6不報錯 public int run(String[] args) throws Exception { Job job = parseInputAndOutput(this, this.getConf(), args); // 2.set job // step 1:set input job.setInputFormatClass(TextInputFormat.class); // step 3:set mappper class job.setMapperClass(ModuleMapper.class); // step 4:set mapout key/value class job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); // step 5:set shuffle(sort,combiner,group) // set sort job.setSortComparatorClass(LongWritable.Comparator.class); // set combiner(optional,default is unset)必須是Reducer的子類 job.setCombinerClass(ModuleReducer.class); // set grouping job.setGroupingComparatorClass(LongWritable.Comparator.class); // step 6 set reducer class job.setReducerClass(ModuleReducer.class); // step 7:set job output key/value class job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); // step 8:set output format job.setOutputFormatClass(FileOutputFormat.class); // step 10: submit job Boolean isCompletion = job.waitForCompletion(true);// 提交job return isCompletion ? 0 : 1; } public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws IOException { // 輸入參數的合法性 if (args.length != 2) { System.err.printf( "Usage: %s [generic options] <input> <output> \n ", tool .getClass().getSimpleName()); //%s表示輸出字符串,也就是將后面的字符串替換模式中的%s ToolRunner.printGenericCommandUsage(System.err); return null; } // 1.create job Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(ModuleMapReduce.class); // step 2:set input path Path inputPath = new Path(args[0]); FileInputFormat.addInputPath(job, inputPath); // step 9:set output path Path outputPath = new Path(args[0]); FileOutputFormat.setOutputPath(job, outputPath); return job; } public static void main(String[] args) { try { int status = ToolRunner.run(new ModuleMapReduce(), args);// 返回值即為isCompletion ? 0 : 1 System.exit(status);// System.exit(0)中斷虛擬機的運行,退出應用程序,0表示沒有異常正常退出。 } catch (Exception e) { e.printStackTrace(); } } }
倒排索引代碼
輸入文件如下:
13588888888 112
13678987879 13509098987
18987655436 110
2543789 112
15699807656 110
011-678987 112
說明:每一行為一條電話通話記錄,左邊的號碼(記為a)打給右邊的號碼(記為b號碼),中間用空格隔開
要求:
將以上文件以如下格式輸出:
110 18987655436|15699807656
112 13588888888|011-678987
13509098987 13678987879
說明:左邊為被呼叫的號碼b,右邊為呼叫b的號碼a以"|"分割
package org.conan.myhadoop.mr; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ReverseIndex extends Configured implements Tool { enum Counter { LINESKIP, // 出錯的行 } public static class Map extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); // 讀取源數據 try { // 數據處理 String[] lineSplit = line.split(" "); String anum = lineSplit[0]; String bnum = lineSplit[1]; context.write(new Text(bnum), new Text(anum)); // 輸出 } catch (java.lang.ArrayIndexOutOfBoundsException e) { context.getCounter(Counter.LINESKIP).increment(1); // 出錯hang計數器+1 return; } } } public static class Reduce extends Reducer { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String valueString; String out = ""; for (Text value : values) { valueString = value.toString(); out += valueString + "|"; System.out.println("Ruduce:key=" + key + " value=" + value); } context.write(key, new Text(out)); } } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); Job job = new Job(conf, "ReverseIndex"); // 任務名 job.setJarByClass(ReverseIndex.class); // 指定Class FileInputFormat.addInputPath(job, new Path(args[0])); // 輸入路徑 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 輸出路徑 job.setMapperClass(Map.class); // 調用上面Map類作為Map任務代碼 job.setReducerClass(ReverseIndex.Reduce.class); // 調用上面Reduce類作為Reduce任務代碼 job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); // 指定輸出的KEY的格式 job.setOutputValueClass(Text.class); // 指定輸出的VALUE的格式 job.waitForCompletion(true); // 輸出任務完成情況 System.out.println("任務名稱:" + job.getJobName()); System.out.println("任務成功:" + (job.isSuccessful() ? "是" : "否")); System.out.println("輸入行數:" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue()); System.out.println("輸出行數:" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue()); System.out.println("跳過的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue()); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) throws Exception { // 判斷參數個數是否正確 // 如果無參數運行則顯示以作程序說明 if (args.length != 2) { System.err.println(""); System.err .println("Usage: ReverseIndex < input path > < output path > "); System.err .println("Example: hadoop jar ~/ReverseIndex.jar hdfs://localhost:9000/in/telephone.txt hdfs://localhost:9000/out"); System.exit(-1); } // 記錄開始時間 DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date start = new Date(); // 運行任務 int res = ToolRunner.run(new Configuration(), new ReverseIndex(), args); // 輸出任務耗時 Date end = new Date(); float time = (float) ((end.getTime() - start.getTime()) / 60000.0); System.out.println("任務開始:" + formatter.format(start)); System.out.println("任務結束:" + formatter.format(end)); System.out.println("任務耗時:" + String.valueOf(time) + " 分鐘"); System.exit(res); } }
去重代碼
//Mapper任務 static class DDMap extends Mapper<LongWritable,Text,Text,Text>{ private static Text line = new Text(); protected void map(LongWritable k1,Text v1,Context context){ line = v1; Text text = new Text(""); try { context.write(line,text); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }; } //Reducer任務 static class DDReduce extends Reducer<Text,Text,Text,Text>{ protected void reduce(Text k2,Iterable<Text> v2s,Context context){ Text text = new Text(""); try { context.write(k2, text); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }; }
參考文章;
一個經典的MapReduce模板代碼,倒排索引(ReverseIndex)
http://blog.itpub.net/26400547/viewspace-1214945/
http://www.tuicool.com/articles/emi6Fb
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。