中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

mapreduce 模板代碼

發布時間:2020-06-27 01:52:39 來源:網絡 閱讀:722 作者:jethai 欄目:大數據



jai包

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-core</artifactId>
    <version>1.2.1</version>
</dependency>

2.x以后就拆成一些零散的包了,沒有core包了



代碼:

package org.conan.myhadoop.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
//org.apache.hadoop.mapred 老系統的包
//org.apache.hadoop.mapreduce 新系統的包 
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*
 * ModuleMapReduce Class
 * 單純的注釋 
 */
public class ModuleMapReduce extends Configured implements Tool {

    /**
     * 
     * ModuleMapper Class 不僅有注釋的功效而且你鼠標放在你注釋的方法上面他會把你注釋的內容顯示出來,
     * 
     */
    public static class ModuleMapper extends
            Mapper<LongWritable, Text, LongWritable, Text>

    {

        @Override
        public void setup(Context context) throws IOException,
                InterruptedException {

            super.setup(context);
        }

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // TODO

        }

        @Override
        public void cleanup(Context context) throws IOException,
                InterruptedException {

            super.cleanup(context);
        }

    }

    /**
     * 
     * ModuleReducer Class
     * 
     */
    public static class ModuleReducer extends
            Reducer<LongWritable, Text, LongWritable, Text> {
        @Override
        public void setup(Context context) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            super.setup(context);
        }

        @Override
        protected void reduce(LongWritable key, Iterable<Text> value,
                Context context) throws IOException, InterruptedException {
            // TODO

        }

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            super.cleanup(context);
        }

    }

    // Driver 驅動
    // @Override //實現接口時關鍵字1.5和1.7的JDK都會報錯,只有1.6不報錯
    public int run(String[] args) throws Exception {
        Job job = parseInputAndOutput(this, this.getConf(), args);
        // 2.set job

        // step 1:set input
        job.setInputFormatClass(TextInputFormat.class);

        // step 3:set mappper class
        job.setMapperClass(ModuleMapper.class);
        // step 4:set mapout key/value class
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        // step 5:set shuffle(sort,combiner,group)
        // set sort
        job.setSortComparatorClass(LongWritable.Comparator.class);
        // set combiner(optional,default is unset)必須是Reducer的子類
        job.setCombinerClass(ModuleReducer.class);
        // set grouping
        job.setGroupingComparatorClass(LongWritable.Comparator.class);
        // step 6 set reducer class
        job.setReducerClass(ModuleReducer.class);
        // step 7:set job output key/value class
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        // step 8:set output format
        job.setOutputFormatClass(FileOutputFormat.class);

        // step 10: submit job
        Boolean isCompletion = job.waitForCompletion(true);// 提交job
        return isCompletion ? 0 : 1;
    }

    public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args)
            throws IOException {
        // 輸入參數的合法性
        if (args.length != 2) {
            System.err.printf(
                    "Usage: %s [generic options] <input> <output> \n ", tool
                            .getClass().getSimpleName());
      //%s表示輸出字符串,也就是將后面的字符串替換模式中的%s
            ToolRunner.printGenericCommandUsage(System.err);
            return null;
        }

        // 1.create job

        Job job = Job.getInstance(conf, this.getClass().getSimpleName());
        job.setJarByClass(ModuleMapReduce.class);
        // step 2:set input path
        Path inputPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inputPath);
        // step 9:set output path
        Path outputPath = new Path(args[0]);
        FileOutputFormat.setOutputPath(job, outputPath);

        return job;
    }

    public static void main(String[] args) {
        try {
            int status = ToolRunner.run(new ModuleMapReduce(), args);// 返回值即為isCompletion ? 0 : 1
            System.exit(status);// System.exit(0)中斷虛擬機的運行,退出應用程序,0表示沒有異常正常退出。
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


倒排索引代碼


輸入文件如下:
13588888888 112
13678987879 13509098987
18987655436 110
2543789    112
15699807656 110
011-678987 112
說明:每一行為一條電話通話記錄,左邊的號碼(記為a)打給右邊的號碼(記為b號碼),中間用空格隔開

要求:
將以上文件以如下格式輸出:
110 18987655436|15699807656
112 13588888888|011-678987
13509098987 13678987879
說明:左邊為被呼叫的號碼b,右邊為呼叫b的號碼a以"|"分割

package org.conan.myhadoop.mr;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReverseIndex extends Configured implements Tool {

    enum Counter {
        LINESKIP, // 出錯的行
    }

    public static class Map extends Mapper {
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString(); // 讀取源數據
            try {
                // 數據處理
                String[] lineSplit = line.split(" ");
                String anum = lineSplit[0];
                String bnum = lineSplit[1];
                context.write(new Text(bnum), new Text(anum)); // 輸出

            } catch (java.lang.ArrayIndexOutOfBoundsException e) {
                context.getCounter(Counter.LINESKIP).increment(1); // 出錯hang計數器+1
                return;
            }
        }
    }

    public static class Reduce extends Reducer {
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String valueString;
            String out = "";
            for (Text value : values) {
                valueString = value.toString();
                out += valueString + "|";
                System.out.println("Ruduce:key=" + key + "  value=" + value);
            }
            context.write(key, new Text(out));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();

        Job job = new Job(conf, "ReverseIndex"); // 任務名
        job.setJarByClass(ReverseIndex.class); // 指定Class

        FileInputFormat.addInputPath(job, new Path(args[0])); // 輸入路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1])); // 輸出路徑

        job.setMapperClass(Map.class); // 調用上面Map類作為Map任務代碼
        job.setReducerClass(ReverseIndex.Reduce.class); // 調用上面Reduce類作為Reduce任務代碼

        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class); // 指定輸出的KEY的格式
        job.setOutputValueClass(Text.class); // 指定輸出的VALUE的格式

        job.waitForCompletion(true);

        // 輸出任務完成情況
        System.out.println("任務名稱:" + job.getJobName());
        System.out.println("任務成功:" + (job.isSuccessful() ? "是" : "否"));
        System.out.println("輸入行數:"
                + job.getCounters()
                        .findCounter("org.apache.hadoop.mapred.Task$Counter",
                                "MAP_INPUT_RECORDS").getValue());
        System.out.println("輸出行數:"
                + job.getCounters()
                        .findCounter("org.apache.hadoop.mapred.Task$Counter",
                                "MAP_OUTPUT_RECORDS").getValue());
        System.out.println("跳過的行:"
                + job.getCounters().findCounter(Counter.LINESKIP).getValue());

        return job.isSuccessful() ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        // 判斷參數個數是否正確
        // 如果無參數運行則顯示以作程序說明
        if (args.length != 2) {
            System.err.println("");
            System.err
                    .println("Usage: ReverseIndex < input path > < output path > ");
            System.err
                    .println("Example: hadoop jar ~/ReverseIndex.jar hdfs://localhost:9000/in/telephone.txt hdfs://localhost:9000/out");

            System.exit(-1);
        }
        // 記錄開始時間
        DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date start = new Date();
        // 運行任務
        int res = ToolRunner.run(new Configuration(), new ReverseIndex(), args);

        // 輸出任務耗時
        Date end = new Date();
        float time = (float) ((end.getTime() - start.getTime()) / 60000.0);
        System.out.println("任務開始:" + formatter.format(start));
        System.out.println("任務結束:" + formatter.format(end));
        System.out.println("任務耗時:" + String.valueOf(time) + " 分鐘");

        System.exit(res);
   }
    
}


去重代碼

 //Mapper任務
      static class DDMap extends Mapper<LongWritable,Text,Text,Text>{
       private static Text line = new Text();
       protected void map(LongWritable k1,Text v1,Context context){
        line = v1;
        Text text = new Text("");
         try {
          context.write(line,text);
         } catch (IOException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
         } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
         }
       };
      }

    //Reducer任務
      static class DDReduce extends Reducer<Text,Text,Text,Text>{
       protected void reduce(Text k2,Iterable<Text> v2s,Context context){
        Text text = new Text("");
        try {
         context.write(k2, text);
        } catch (IOException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
        } catch (InterruptedException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
        }
       };
      }


參考文章;

一個經典的MapReduce模板代碼,倒排索引(ReverseIndex)

http://blog.itpub.net/26400547/viewspace-1214945/

詳解MapReduce實現數據去重與倒排索引應用場景案例

http://www.tuicool.com/articles/emi6Fb



向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

武定县| 忻城县| 大洼县| 四会市| 宁强县| 鹤峰县| 锦州市| 普兰县| 松阳县| 丹棱县| 四子王旗| 左云县| 五华县| 甘孜| 林州市| 大厂| 忻州市| 沂源县| 娄底市| 崇信县| 中西区| 天门市| 来宾市| 凤山县| 丹江口市| 南澳县| 黄山市| 灌南县| 平顺县| 洛川县| 垫江县| 闽侯县| 收藏| 同心县| 陕西省| 丹巴县| 依安县| 洛隆县| 广灵县| 平度市| 屏山县|