中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

hadoop中wordcount 、wordmean的示例代碼

發布時間:2021-12-09 15:24:36 來源:億速云 閱讀:181 作者:小新 欄目:云計算

小編給大家分享一下hadoop中wordcount 、wordmean的示例代碼,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

注意

  1. hadoop src 可以在 hadoop官網上下載。

  2. 分析中/**開頭的為源碼自帶,//開頭的為作者心得。

  3. hadoop安裝環境下的/share/dc/hadoop/api下有自帶的api鏈接,可以方便大家查看,學習。

wordcount:

package hadoop1;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
    	//split into words by blank just like "//s" in  Regular Expression
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        //format :<word,1>,      context.write() is the format in map&&reduce to output
        context.write(word, one);
      }
    }
  }
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      //foreach value(a kind of word),
      //do sum : <word1,1>,......,<word1,1>------><word1,n>, <word2,1>,......,<word2,1>------><word2,n>
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      //format <word1,sum1>,<word2,sum2>......
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    //get in/out path, format:otherArgs ={input1,input2......,output}
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    //singleton
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    
    //combiner is just like a reducer on single node to reduce the net pressure,       
    //not all the task suit for combiner.
    //so <key,1>,......,<key,1>------><key,n>
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    
    //mapper's output
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    //add new path for multinput (input1,input2...)
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    //0 for normal exit,else not normal
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}



wordmean:

package hadoop2;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.google.common.base.Charsets;

public class WordMean extends Configured implements Tool {

  private double mean = 0;
  
  private final static Text COUNT = new Text("count");
  private final static Text LENGTH = new Text("length");
  private final static LongWritable ONE = new LongWritable(1);

  /**
   * Maps words from line of text into 2 key-value pairs; one key-value pair for
   * counting the word, another for counting its length.
   */
  public static class WordMeanMapper extends
      Mapper<Object, Text, Text, LongWritable> {

    private LongWritable wordLen = new LongWritable();

    /**
     * Emits 2 key-value pairs for counting the word and its length. Outputs are
     * (Text, LongWritable).
     * 
     * @param value
     *          This will be a line of text coming in from our input file.
     */
    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        String string = itr.nextToken();
        //map into format:word1,word2......,wordn.
        //for each token(word) split by blank, 
        //set two kinds of<key,value>------><"count",1>and<"length",string.length>
        this.wordLen.set(string.length());
        context.write(LENGTH, this.wordLen);
        context.write(COUNT, ONE);
      }
    }
  }

  /**
   * Performs integer summation of all the values for each key.
   */
  public static class WordMeanReducer extends
      Reducer<Text, LongWritable, Text, LongWritable> {
// LongWritable is just like Long in java, 
//to implement hadoop's own type is for Serialization and Anti serialization
    private LongWritable sum = new LongWritable();

    /**
     * Sums all the individual values within the iterator and writes them to the
     * same key.
     * 
     * @param key
     *          This will be one of 2 constants: LENGTH_STR or COUNT_STR.
     * @param values
     *          This will be an iterator of all the values associated with that
     *          key.
     */
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
//for two constants:"count" and "length",
//calculate the sum for each constant. :<count,1>,.....,<count,1> -------> <count,n>&&<length,3>,......,<length,4>-----><length,m>
      int theSum = 0;
      for (LongWritable val : values) {
        theSum += val.get();
      }
      sum.set(theSum);
      context.write(key, sum);
    }
  }

  /**
   * Reads the output file and parses the summation of lengths, and the word
   * count, to perform a quick calculation of the mean.
   * 
   * @param path
   *          The path to find the output file in. Set in main to the output
   *          directory.
   * @throws IOException
   *           If it cannot access the output directory, we throw an exception.
   */
  private double readAndCalcMean(Path path, Configuration conf)
      throws IOException {
	
	  //read from reducers' output 
    FileSystem fs = FileSystem.get(conf);
    Path file = new Path(path, "part-r-00000");

    if (!fs.exists(file))
      throw new IOException("Output not found!");

    BufferedReader br = null;

    // average = total sum(m in reduce) / number of elements(n in reduce);
    try {
    	//BufferedReader is a Decorator to InputStreamReader, 
/for add the method .readLine() and so on.
      br = new BufferedReader(new InputStreamReader(fs.open(file), Charsets.UTF_8));

      long count = 0;
      long length = 0;

      String line;
      while ((line = br.readLine()) != null) {
    	 
        StringTokenizer st = new StringTokenizer(line);
        
        // grab type ------to spilt "count" and "length"
        String type = st.nextToken();
     
        // differentiate
        if (type.equals(COUNT.toString())) {
          String countLit = st.nextToken();
          count = Long.parseLong(countLit);
          System.out.println("The count is: " + count );//~ add by author :output total word count n
        } else if (type.equals(LENGTH.toString())) {
          String lengthLit = st.nextToken();
          length = Long.parseLong(lengthLit);
          System.out.println("The  length  is: " +  length  );//~ add by author :output total word length m
        }
      }

      double theMean = (((double) length) / ((double) count));
      System.out.println("The mean is: " + theMean);
      return theMean;
    } finally {
      if (br != null) {
        br.close();
      }
    }
  }

  public static void main(String[] args) throws Exception {
    ToolRunner.run(new Configuration(), new WordMean(), args);
  }

  @Override
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: wordmean <in> <out>");
      return 0;
    }

    Configuration conf = getConf();

    Job job = Job.getInstance(conf, "word mean");
    job.setJarByClass(WordMean.class);
    job.setMapperClass(WordMeanMapper.class);
    job.setCombinerClass(WordMeanReducer.class);
    job.setReducerClass(WordMeanReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    Path outputpath = new Path(args[1]);
    FileOutputFormat.setOutputPath(job, outputpath);
    boolean result = job.waitForCompletion(true);
    mean = readAndCalcMean(outputpath, conf);

    return (result ? 0 : 1);
  }

  /**
   * Only valuable after run() called.
   * 
   * @return Returns the mean value.
   */
  public double getMean() {
    return mean;
  }
}

看完了這篇文章,相信你對“hadoop中wordcount 、wordmean的示例代碼”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

台湾省| 鄂尔多斯市| 建瓯市| 望奎县| 林口县| 沂南县| 滨海县| 河间市| 务川| 西峡县| 图片| 阳谷县| 达孜县| 肇州县| 德钦县| 姜堰市| 泗水县| 喜德县| 大厂| 南皮县| 大港区| 长垣县| 扶沟县| 武鸣县| 宾川县| 弥勒县| 永州市| 封丘县| 博野县| 额敏县| 崇左市| 江山市| 尼木县| 弋阳县| 安平县| 咸丰县| 阿坝| 株洲县| 申扎县| 长汀县| 玉林市|