中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何用Mapreduce程序完成wordcount

發布時間:2020-06-18 16:15:24 來源:億速云 閱讀:171 作者:元一 欄目:大數據

MapReduce概述:

MapReduce采用分而治之的思想,把對大規模數據集的操作,分發給一個主節點管理下的各個分節點共同完成,然后通過整合各個節點的中間結果,得到最終結果。簡單來說,MapReduce就是“任務的分解和結果的匯總”。

在Hadoop中,用于執行MapReduce任務的機器角色有兩個:一個是JobTracker;另一個是TaskTracker。JobTracker用于調度工作的,TaskTracker是用于執行工作的。一個Hadoop集群中只有一臺JobTracker。

在分布式計算中,MapReduce框架負責處理了并行編程中分布式存儲、工作調度、負載均衡、容錯均衡、容錯處理以及網絡通信等復雜問題,把處理過程高度抽象為兩個函數:map和reduce,map負責把任務分解成多個任務,reduce負責把分解后多任務處理的結果匯總起來。

需要注意的是,用MapReduce來處理的數據集(或任務)必須具備這樣的特點:待處理的數據集可以分解成許多小的數據集,而且每一個小數據集都可以完全并行地進行處理。

程序使用的測試文本數據

Dear River
Dear River Bear Spark 
Car Dear Car Bear Car
Dear Car River Car 
Spark Spark Dear Spark 

1編寫主要類

(1)Maper類

首先是自定義的Maper類代碼

public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        //fields:代表著文本一行的的數據: dear bear river
        String[] words = value.toString().split("\t");
        for (String word : words) {
            // 每個單詞出現1次,作為中間結果輸出
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

?????這個Map類是一個泛型類型,它有四個形參類型,分別指定map()函數的輸入鍵、輸入值、輸出鍵和輸出值的類型。LongWritable:輸入鍵類型,Text:輸入值類型,Text:輸出鍵類型,IntWritable:輸出值類型.
?????String[] words = value.toString().split("\t");,words 的值為Dear River Bear River
?????輸入鍵key是一個長整數偏移量,用來尋找第一行的數據和下一行的數據,輸入值是一行文本Dear River Bear River,輸出鍵是單詞Bear ,輸出值是整數1
?????Hadoop本身提供了一套可優化網絡序列化傳輸的基本類型,而不直接使用Java內嵌的類型。這些類型都在org.apache.hadoop.io包中。這里使用LongWritable類型(相當于Java的Long類型)、Text類型(相當于Java中的String類型)和IntWritable類型(相當于Java的Integer類型)。
?????map()方法的參數是輸入鍵和輸入值。以本程序為例,輸入鍵LongWritable key是一個偏移量,輸入值Text valueDear Car Bear Car ,我們首先將包含有一行輸入的Text值轉換成Java的String類型,之后使用substring()方法提取我們感興趣的列。map()方法還提供了Context實例用于輸出內容的寫入。

(2)Reducer類

public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    /*
        (River, 1)
        (River, 1)
        (River, 1)
        (Spark , 1)
        (Spark , 1)
        (Spark , 1)
        (Spark , 1)

        key: River
        value: List(1, 1, 1)
        key: Spark
        value: List(1, 1, 1,1)

    */
    public void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }
        context.write(key, new IntWritable(sum));// 輸出最終結果
    };
}

Reduce任務最初按照分區號從Map端抓取數據為:
(River, 1)
(River, 1)
(River, 1)
(spark, 1)
(Spark , 1)
(Spark , 1)
(Spark , 1)
經過處理后得到的結果為:
key: hello   value: List(1, 1, 1)
key: spark  value: List(1, 1, 1,1)
所以reduce()函數的形參 Iterable&lt;IntWritable&gt; values    接收到的值為List(1, 1, 1)List(1, 1, 1,1)

(3)Main函數

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class WordCountMain {
    //若在IDEA中本地執行MR程序,需要將mapred-site.xml中的mapreduce.framework.name值修改成local
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        if (args.length != 2 || args == null) {
            System.out.println("please input Path!");
            System.exit(0);
        }
        //System.setProperty("HADOOP_USER_NAME","hadoop2.7");
        Configuration configuration = new Configuration();
        //configuration.set("mapreduce.job.jar","/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar");
        //調用getInstance方法,生成job實例
        Job job = Job.getInstance(configuration, WordCountMain.class.getSimpleName());
        // 打jar包
        job.setJarByClass(WordCountMain.class);

        // 通過job設置輸入/輸出格式
        // MR的默認輸入格式是TextInputFormat,所以下兩行可以注釋掉
        // job.setInputFormatClass(TextInputFormat.class);
        // job.setOutputFormatClass(TextOutputFormat.class);
        // 設置輸入/輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 設置處理Map/Reduce階段的類
        job.setMapperClass(WordCountMap.class);
        //map combine減少網路傳出量
        job.setCombinerClass(WordCountReduce.class);
        job.setReducerClass(WordCountReduce.class);

        //如果map、reduce的輸出的kv對類型一致,直接設置reduce的輸出的kv對就行;如果不一樣,需要分別設置map, reduce的        輸出的kv類型
        //job.setMapOutputKeyClass(.class)
        // job.setMapOutputKeyClass(Text.class);
        // job.setMapOutputValueClass(IntWritable.class);

        // 設置reduce task最終輸出key/value的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 提交作業
        job.waitForCompletion(true);

    }
}

2本地運行

首先更改mapred-site.xml文件配置
將mapreduce.framework.name的值設置為local
如何用Mapreduce程序完成wordcount
然后本地運行:
如何用Mapreduce程序完成wordcount
查看結果:
如何用Mapreduce程序完成wordcount

3集群運行

方式一:

首先打包
如何用Mapreduce程序完成wordcount
更改配置文件,改成yarn模式
如何用Mapreduce程序完成wordcount
添加本地jar包位置:

 Configuration configuration = new Configuration();
 configuration.set("mapreduce.job.jar","C:\\Users\\tanglei1\\IdeaProjects\\Hadooptang\\target");

如何用Mapreduce程序完成wordcount
設置允許跨平臺遠程調用:

configuration.set("mapreduce.app-submission.cross-platform","true");

如何用Mapreduce程序完成wordcount
修改輸入參數:
如何用Mapreduce程序完成wordcount
運行結果:
如何用Mapreduce程序完成wordcount

方式二:

將maven項目打包,在服務器端用命令運行mr程序

hadoop jar com.kaikeba.hadoop-1.0-SNAPSHOT.jar
com.kaikeba.hadoop.wordcount.WordCountMain /tttt.txt  /wordcount11
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

平遥县| 巴南区| 巨鹿县| 上饶县| 勐海县| 广州市| 济源市| 伊春市| 大连市| 称多县| 上林县| 肥东县| 汉源县| 浦江县| 定州市| 义马市| 灵丘县| 上饶市| 重庆市| 穆棱市| 进贤县| 略阳县| 喀喇| 沧州市| 万安县| 广州市| 鹤庆县| 弥渡县| 房山区| 邯郸县| 温宿县| 赫章县| 噶尔县| 通城县| 泰顺县| 喀喇| 乐至县| 襄樊市| 榆社县| 靖州| 慈利县|