中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Hadoop之Mapreduce序列化怎么實現

發布時間:2023-05-11 14:43:20 來源:億速云 閱讀:226 作者:iii 欄目:開發技術

這篇“Hadoop之Mapreduce序列化怎么實現”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Hadoop之Mapreduce序列化怎么實現”文章吧。

什么是序列化:    

序列化就是把內存中的對象,轉換成字節序列(或其他數據傳輸協議)以便于存儲到磁盤(持久化)和網絡傳輸。

什么是反序列化:

 反序列化就是將收到字節序列(或其他數據傳輸協議)或者是磁盤的持久化數據,轉換成內存中的對象。

為什么要序列化:

一般來說,“活的”對象只生存在內存里,關機斷 電就沒有了。而且“活的”對象只能由本地的進程使用,不能被發送到網絡上的另外一臺計算機。 然而序列化可以存儲“活的”對象,可以將“活的”對象發送到遠程計算機。

Java的序列化:

在Java中也是有序列化的,我們為什么不通過idea使用Java的序列化那?

因為Java的序列化框架(Serializable)是一個繁重的框架,附帶信息比較多(各種校驗信息,Header,繼承體系等),不便于在網絡中高效傳輸。所以,Hadoop自己開發了一套序列化機制(Writable)。

Hadoop序列化:

Hadoop的序列化比較精簡,只有簡單的校驗,有緊湊(高效使用存儲空間),快速(讀寫數據的額外開銷小),互操作(支持多語言的交互)的特點。

自定義序列化接口:  

在開發過程中,基本序列化類型不能滿足所有需求,比如在Hadoop框架內部傳遞一個bean對象(不是基本的數據類型(某個類)----沒有對應的Hadoop類型),那么該對象就需要實現序列化接口。

實現序列化的步驟:

先看源碼進行簡單分析:

Writable接口(好像也分析不出什么)

兩個方法:

1.write: 進行序列化

2.readFields:進行反序列化 

Hadoop之Mapreduce序列化怎么實現

 (1)  反序列化時,需要反射調用空參構造函數,所以必須有空參構造

public FlowBean() {
	super();
}

(2)  重寫接口中的兩個方法***(注意:反序列化的順序和序列化的順序完全一致)

                如數據結構中的隊列一樣先進先出,先序列化則先反序列化

(3)需要重寫toString()方法,因為需要打印出來,否則打印出來的是地址

(4)  如果需要將自定義的bean放在key中傳輸,則還需要實現Comparable接口,因為MapReduce框中的Shuffle過程要求對key必須能排序。(比如:上一篇博客中的計算單詞出現次數中 最后呈現的單詞是按照26個英文字母的順序進行排序的)

看一個樣例源碼(字符串Text):

Hadoop之Mapreduce序列化怎么實現

看到上圖  實現接口:

WritableComparable<BinaryComparable>

跟進一下:

Hadoop之Mapreduce序列化怎么實現

 看到該接口繼承自Comparable接口(這是Java中的一個API)

序列化案例實操:

案例需求:

        統計每一個手機號耗費的總上行流量、總下行流量、總流量

(1)輸入數據:

Hadoop之Mapreduce序列化怎么實現

(2)輸入數據格式:

Hadoop之Mapreduce序列化怎么實現

(3)期望輸出數據格式

Hadoop之Mapreduce序列化怎么實現

需求分析:

         先輸入數據,輸入數據后需要進行mapper階段---》reduce階段---》輸出階段

mapper階段:

先考慮輸入kv   (k---偏移量    v是一行數據)

輸出(kv)為reduce的輸入(kv) (本樣例中使用的k是手機號--統計手機號的流量使用      v為上行流量,下行流量,總流量    則需要封裝bean類(自定義對象)  再進行序列化傳輸(為什么要進行序列化那?----因為再計算的過程中可能由于資源問題mapper和reduce不在同一臺服務器上))

輸出(kv)同樣也是(手機號,bean類)

編寫MapperReduce程序:

1.FlowBean代碼:

package com.tangxiaocong.mapreduce.writable;
 
import org.apache.hadoop.io.Writable;
 
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
 
/*
*
* 定義bean類
* 需要實現writable
* 重寫序列化和反序列化方法
* 重寫空參構造
* 重寫tostring方法
*
* */
public class FlowBean  implements Writable {
   private  long upFlow;
    private  long downFlow;
    private  long sumFlow;
 
    public long getUpFlow() {
        return upFlow;
    }
 
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
 
    public long getDownFlow() {
        return downFlow;
    }
 
    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }
 
    public long getSumFlow() {
        return sumFlow;
    }
 
    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
    public void setSumFlow() {
        this.sumFlow = this.downFlow+this.upFlow;
    }
    //生成空參構造函數由于反射  快捷鍵alt   + insert
 
    public FlowBean() {
    }
 
    @Override
    public void write(DataOutput out) throws IOException {
        //序列化方法
        //  向緩沖流中寫入Long類型的數據
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }
 
    @Override
    public void readFields(DataInput in) throws IOException {
//反序列化方法
        //讀取緩沖區中數據
        this.upFlow= in.readLong();
        this.downFlow= in.readLong();
        this.sumFlow= in.readLong();
    }
 
    @Override
    public String toString() {
        return upFlow + "\t"+downFlow +"\t"+ sumFlow ;
    }
}

2.Mapper代碼:

package com.tangxiaocong.mapreduce.writable;
 
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
import java.io.IOException;
 
public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
 
    private Text outK=new Text();
    private  FlowBean outV=new FlowBean();  //調用的無參構造函數
 
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
        //1 獲取一行
        //1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
 
        String s = value.toString();// 將數據轉換成string
        //2 進行切割
 
        String[] split = s.split("\t"); //將數據按寫入形式進行切割
        //3 抓取想要的數據
        //根據角標獲取  手機號  上行流量  下行流量
 
 
        String phone = split[1];
        String up = split[split.length - 3];//  不能正序 因為有的屬性是沒有字段的
        String down = split[split.length - 2];
//     封裝輸出的kv
 
        outK.set(phone);
        outV.setUpFlow(Long.parseLong(up));//  up為string類型
        outV.setDownFlow(Long.parseLong(down));
        outV.setSumFlow();          //
 
        //寫出
        context.write(outK,outV);
    }
}

3. reduce代碼:

package com.tangxiaocong.mapreduce.writable;
 
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
 
import java.io.IOException;
 
public class FlowReducer extends Reducer  <Text,FlowBean,Text,FlowBean>{
   private FlowBean outv=new FlowBean();
 
 
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
 
         long totalUp=0;
         long totaldown=0;
         
        //分析   傳入TEXT  為手機號  后邊為集合(Bean類的對象的集合)輸出還是一個一個bean類  (每個手機號的總和)
        for (FlowBean value : values) {  //傳入的參數是同一個key的
            totalUp+=value.getUpFlow();
            totaldown+=value.getDownFlow();
        }
        //  現在求出的是每個手機號的總的上行流量  下行流量
            //封裝  key不需要
        //outv
    outv.setUpFlow(totalUp);
    outv.setDownFlow(totaldown);
    outv.setSumFlow();
    //寫出
        context.write(key,outv);
    }
}

4.driver代碼:

package com.tangxiaocong.mapreduce.writable;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
import java.io.IOException;
 
public class FlowDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //獲取JOB
        Configuration entries = new Configuration();
        Job job = Job.getInstance(entries);
 
        job.setJarByClass(FlowDriver.class);
        //關聯mapper  和reduce
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
 
        //設置mapper  輸出的key 和value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
 
        // 設置最終的數據輸出的key和value 類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
 
        //設置數據的輸入路徑和輸出路徑
        FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\phone_data.txt"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output3"));
        //提交job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
 
    }
 
}

最后運行  

出現了bug  經過兩個小時的調試  找出答案   是在driver類中設置mapper類輸出kv類型出現差錯沒有配型成功 

現在是運作正確的

Hadoop之Mapreduce序列化怎么實現

Hadoop之Mapreduce序列化怎么實現

以上就是關于“Hadoop之Mapreduce序列化怎么實現”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

拜泉县| 达拉特旗| 汶上县| 富阳市| 达孜县| 饶平县| 霍山县| 宣武区| 英德市| 太原市| 怀集县| 同仁县| 右玉县| 铜鼓县| 仁怀市| 夏河县| 靖宇县| 惠来县| 湖南省| 民权县| 汝城县| 南充市| 开封县| 东至县| 正镶白旗| 晋中市| 稻城县| 阆中市| 遂溪县| 平遥县| 贵溪市| 青神县| 凯里市| 苏尼特左旗| 乡城县| 筠连县| 乌拉特中旗| 咸阳市| 沁阳市| 古丈县| 柳州市|