中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

MapReduce編寫實現wordcount詞頻統計

發布時間:2020-07-29 12:58:10 來源:網絡 閱讀:722 作者:nineteens 欄目:編程語言

  p>首先編寫WordCountDriver:

  package com.jym.hadoop.mr.demo;

  import java.io.IOException;

  import org.apache.hadoop.conf.Configuration;

  import org.apache.hadoop.fs.Path;

  import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.Job;

  import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;

  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  /**

  * 這個程序相當于一個yarn集群的客戶端,

  * 需要在此封裝我們的mr程序的相關運行參數,指定jar包,

  * 最后提交給yarn

  * */

  public class WordcountDriver

  {

  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException

  {

  Configuration conf=new Configuration();

  /*其實如果在本地運行MR程序其實不用配置下面的代碼程序,在MR默認下就是本地運行*/

  /**下面這段代碼配置的是在本地模式下運行MR程序*/

  /**是否運行為本地模式,就是看這個參數值是否為local,默認就是local;*/

  //conf.set("mapreduce.framework.name", "local"); //在本地運行MR程序

  //本地模式運行MR程序時,輸入輸出的數據可以在本地,也可以在hdfs上

  //到底在哪里,就看以下兩行配置用哪一行了,默認是“file:///”

  /**conf.set("fs.defaultFS", "hdfs://hadoop1:9000");*/ //使用的是HDFS系統

  //conf.set("fs.defaultFS", "file:///"); //使用的是本地Windows磁盤

  /**運行集群模式,就是把程序提交到yarn中去運行

  * 要想運行為集群模式,以下3個參數要指定為集群上的值

  * */

  conf.set("mapreduce.framework.name", "yarn");

  conf.set("yarn.resourcemanager.hostname", "hadoop1");

  conf.set("fs.defaultFS", "hdfs://hadoop1:9000");

  Job job = Job.getInstance(conf);

  /**要想在Windows的Eclipse上運行程序,并提交到hadoop的YARN集群上需要指定jar包,如下:*/

  /**job.setJar("c:/wc.jar");*/

  //job.setJar("/home/hadoop/wc.jar"); //這種是將程序打包成jar包,放到指定的位置,缺乏靈活性,不建議使用;

  //指定本程序的jar包所在的本地路徑

  job.setJarByClass(WordcountDriver.class);

  //指定本業務job要使用的mapper/reducer業務類

  job.setMapperClass(WordcountMapper.class);

  job.setReducerClass(WordcountReducerr.class);

  //指定mapper輸出數據的kv類型;

  job.setMapOutputKeyClass(Text.class);

  job.setMapOutputValueClass(IntWritable.class);

  //指定最終輸出的數據的kv類型

  job.setOutputKeyClass(Text.class);

  job.setOutputValueClass(IntWritable.class);

  //指定需要使用的combiner,以及用哪一個類作為combiner的邏輯

  /*job.setCombinerClass(WordcountCombiner.class);*/

  job.setCombinerClass(WordcountReducerr.class);

  /**因為combiner的工作原理通reducecer的作用是一樣的,所以直接反射調用reducerr類其實作用是一樣的*/

  /**此處為之后為測試添加的*/

  //如果不設置InputFormat,它默認使用的是TextInputFormat.class

  /**job.setInputFormatClass(CombineTextInputFormatInputFormatInputFormat.class);

  CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

  CombineTextInputFormat.setMinInputSplitSize(job, 2097152);

  */

  //指定job的輸入原始文件所在目錄

  //FileInputFormat.setInputPaths(job, new Path("/wordcount/input")); //此處添加的路徑為HDFS文件系統的路徑;

  FileInputFormat.setInputPaths(job, new Path(args[0])); //傳一個路徑參數

  //指定job的輸出結果所在目錄

  FileOutputFormat.setOutputPath(job, new Path(args[1])); //傳一個參數進來作為輸出的路徑參數

  //將job中配置的相關參數,以及job所用的Java類所在的jar包,提交給yarn去運行;

  /*job.submit(); */

  boolean res = job.waitForCompletion(true);

  System.exit(res?0:1);

  }

  }

  其次編寫WordCountMapper:

  package com.jym.hadoop.mr.demo;

  import java.io.IOException;

  import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.io.LongWritable;

  import org.apache.hadoop.io.Text;

  //這是一個簡單的MapReduce例子,進行單詞數量的統計操作;

  import org.apache.hadoop.mapreduce.Mapper;

  /**

  * KEYIN:默認情況下,是mr框架所讀到的一行文本的起始偏移量,Long類型,但是在Hadoop中有更精簡的序列化接口,因此采用LongWritable類型;

  * VALUEIN:默認情況下,是mr框架所讀到的一行文本的內容,String類型的,同上用Text(org.apache.hadoop.io.Text)類型;

  * KEYOUT:是用戶自定義邏輯處理完成之后輸出數據中的key,在此處是單詞,為String類型,同上用Text類型;

  * VALUEOUT:是用戶自定義邏輯處理完成之后輸出數據中的value,在此處是單詞數量,為Integer類型,同上用IntWritable類型;

  * */

  public class WordcountMapper extends Mapper

  {

  /**

  * map階段的業務邏輯就寫在自定義的map()方法中,

  * maptask會對每一行輸入數據調用一次我們自定義的map()方法;

  * */

  @Override //覆寫Mapper中的方法;

  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException

  {

  //將maptask傳給我們的文本內容先轉換成String類型

  String line = value.toString();

  //根據空格將這一行切分成單詞;

  String[] words = line.split(" ");

  //將單詞輸出為<單詞,1>

  for(String word:words)

  {

  //將單詞作為key,將次數1作為value,以便于后續的數據分發,可以根據單詞分發,以便于相同單詞會分到相同的reduce task中;

  context.write(new Text(word),new IntWritable(1)); //進行類型轉換一下;

  }無錫×××醫院 https://yyk.familydoctor.com.cn/20612/

  }

  最后編寫WordCountReduceer:

  package com.jym.hadoop.mr.demo;

  import java.io.IOException;

  import java.util.Iterator;

  import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.Reducer;

  /**

  * KEYIN,VALUEIN應該對應mapper中的輸出的KEYOUT,VALUEOUT類型;

  * KEYOUT是單詞

  * VALUEOUT是總次數*/

  public class WordcountReducerr extends Reducer

  {

  /**

  * 例如:

  *

  * 輸入參數key,是一組相同單詞kv對的key

  * */

  @Override

  protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException

  {

  int count= 0;

  /* //采用迭代器的方式進行統計單詞的數量;

  Iterator iterator = values.iterator();

  while(iterator.hasNext())

  {

  count+=iterator.next().get(); //獲取key對應的value值

  }

  */

  //下面的for循環和上面注釋中的效果是一樣的;

  for(IntWritable value:values)

  {

  count+=value.get();

  }

  //輸出統計結果

  context.write(key, new IntWritable(count));

  /**

  * 默認情況下reduce task會將輸出結果放到一個文件中(最好是HDFS文件系統上的一個文件)

  * */

  }

  }

  然而還可以編寫一個Combiner類:

  package com.jym.hadoop.mr.demo;

  import java.io.IOException;

  import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.Reducer;

  /*

  * 此處的這個combiner其實不用自己編寫,因為combiner的工作原理同reducer的原理是一樣

  * 的,故可以直接反射調用WordcountReducer類即可

  * */

  public class WordcountCombiner extends Reducer

  {

  @Override

  protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException

  {

  }


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

蓬溪县| 大邑县| 沅陵县| 镇平县| 丰原市| 泽普县| 文登市| 汾西县| 会理县| 莒南县| 兴文县| 霸州市| 亳州市| 南郑县| 灵丘县| 徐闻县| 明溪县| 益阳市| 谢通门县| 普宁市| 广灵县| 鲜城| 岱山县| 临泽县| 长顺县| 丰都县| 安丘市| 铜川市| 洛阳市| 西乌珠穆沁旗| 怀化市| 开原市| 延边| 陆河县| 沿河| 宾阳县| 邮箱| 恩平市| 廊坊市| 新建县| 眉山市|