中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Hadoop中MapReduce常用算法有哪些

發布時間:2021-12-08 10:55:42 來源:億速云 閱讀:201 作者:小新 欄目:云計算

這篇文章將為大家詳細講解有關Hadoop中MapReduce常用算法有哪些,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。


 

1.排序: 

    1)數據:
 

         hadoop fs -mkdir /import 
           創建一個或者多個文本,上傳 
           hadoop fs -put test.txt /import/ 

Hadoop中MapReduce常用算法有哪些

    2)代碼:

package com.cuiweiyou.sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//hadoop默認排序: 
//如果k2、v2類型是Text-文本,結果是按照字典順序
//如果k2、v2類型是LongWritable-數字,結果是按照數字大小順序

public class SortTest {
	/**
	 * 內部類:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
	 */
	public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
		/**
		 * 重寫map方法
		 */
		public void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
			//這里v1轉為k2-數字類型,舍棄k1。null為v2
			context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get());
  //因為v1可能重復,這時,k2也是可能有重復的
		}
	}

	/**
	 * 內部類:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
	 */
	public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
		/**
		 * 重寫reduce方法
   * 在此方法執行前,有個shuffle過程,會根據k2將對應的v2歸并為v2[...] 
		 */
		protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Reducer<LongWritable, Context context) throws IOException, InterruptedException {
			//k2=>k3, v2[...]舍棄。null => v3
			context.write(k2, NullWritable.get());
  //此時,k3如果發生重復,根據默認算法會發生覆蓋,即最終僅保存一個k3 
		}
	}

	public static void main(String[] args) throws Exception {
		// 聲明配置信息
		Configuration conf = new Configuration();
		conf.set("fs.default.name", "hdfs://localhost:9000");
		
		// 創建作業
		Job job = new Job(conf, "SortTest");
		job.setJarByClass(SortTest.class);
		
		// 設置mr
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		
		// 設置輸出類型,和Context上下文對象write的參數類型一致
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(NullWritable.class);
		
		// 設置輸入輸出路徑
		FileInputFormat.setInputPaths(job, new Path("/import/"));
		FileOutputFormat.setOutputPath(job, new Path("/out"));
		
		// 執行
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}


 

    3)測試:

        可以看到,不僅排序而且去重了。

Hadoop中MapReduce常用算法有哪些


2.去重: 

    需求:查取手機號有哪些。這里的思路和上面排序算法的思路是一致的,僅僅多了分割出手機號這一步驟。

    1)數據:

        創建兩個文本,手動輸入一些測試內容。每個字段用制表符隔開。日期,電話,地址,方式,數據量。
Hadoop中MapReduce常用算法有哪些

 

    2)代碼:

        (1)map和reduce:
/**
	 * 映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
	 */
	public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
		/**
		 * 重寫map方法
		 */
		protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {
			//按照制表符進行分割
			String[] tels = v1.toString().split("\t");
			//k1 => k2-第2列手機號,null => v2
			context.write(new Text(tels[1]), NullWritable.get());
		}
	}
	
	
	/************************************************************
	 *  在map后,reduce前,有個shuffle過程,會根據k2將對應的v2歸并為v2[...] 
	 ***********************************************************/
	

	/**
	 * 拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
	 */
	public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
		/**
		 * 重寫reduce方法
		 */
		protected void reduce(Text k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException {
			//此時,k3如果發生重復,根據默認算法會發生覆蓋,即最終僅保存一個k3,達到去重到效果
			context.write(k2, NullWritable.get());
		}
	}


 

        (2)配置輸出:
// 設置輸出類型,和Context上下文對象write的參數類型一致
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);


 

    3)測試:

Hadoop中MapReduce常用算法有哪些


3.過濾:

    需求:查詢在北京地區發生的上網記錄。思路同上,當寫出 k2 、 v2 時加一個判斷即可。

    1)數據:

        同上。

    2)代碼:

        (1)map和reduce:
/**
	 * 內部類:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
	 */
	public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
		/**
		 * 重寫map方法
		 */
		protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {
			//按照制表符進行分割
			final String[] adds = v1.toString().split("\t");
			//地址在第3列
			//k1 => k2-地址,null => v2
			if(adds[2].equals("beijing")){
				context.write(new Text(v1.toString()), NullWritable.get());
			}
		}
	}

	/**
	 * 內部類:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
	 */
	public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
		/**
		 * 重寫reduce方法
		 */
		protected void reduce(Text k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException {
			context.write(k2, NullWritable.get());
		}
	}


 

        (2)配置輸出:
// 設置輸出類型,和Context上下文對象write的參數類型一致
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);


 

    3)測試:

Hadoop中MapReduce常用算法有哪些


4.TopN: 

    這個算法非常經典,面試必問。實現這個效果的算法也很多。下面是個簡單的示例。
    需求:找到流量最大值;找出前5個最大值。

    1)數據:

        同上。

    2)代碼1-最大值:

        (1)map和reduce:
//map
	public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {

		//首先創建一個臨時變量,保存一個可存儲的最小值:Long.MIN_VALUE=-9223372036854775808
		long temp = Long.MIN_VALUE;
		
		//找出最大值
		protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {
			//按照制表符進行分割
			final String[] flows = v1.toString().split("\t");
			//將文本轉數值
			final long val = Long.parseLong(flows[4]);
			//如果v1比臨時變量大,則保存v1的值
			if(temp<val){
				temp = val;
			}
		}
		
		/** ---此方法在全部的map任務結束后執行一次。這時僅輸出臨時變量到最大值--- **/
		protected void cleanup(Context context) throws IOException ,InterruptedException {
			context.write(new LongWritable(temp), NullWritable.get());
			System.out.println("文件讀取完畢");
		}
	}
	
	//reduce
	public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
		//臨時變量
		Long temp = Long.MIN_VALUE;

		//因為一個文件得到一個最大值,再次將這些值比對,得到最大的
		protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException {
			
			long long1 = Long.parseLong(k2.toString());
			//如果k2比臨時變量大,則保存k2的值
			if(temp<long1){
				temp = long1;
			}
		}
		
		/** !!!此方法在全部的reduce任務結束后執行一次。這時僅輸出臨時變量到最大值!!! **/
		protected void cleanup(Context context) throws IOException, InterruptedException {
			context.write(new LongWritable(temp), NullWritable.get());
		}
	}


 

        (2)配置輸出:
// 設置輸出類型
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);


 

    3)測試1:

Hadoop中MapReduce常用算法有哪些

    4)代碼2-TopN:

        (1)map和reduce:
//map
	public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {

		//首先創建一個臨時變量,保存一個可存儲的最小值:Long.MIN_VALUE=-9223372036854775808
		long temp = Long.MIN_VALUE;
		//Top5存儲空間
		long[] tops;
		
		/** 次方法在run中調用,在全部map之前執行一次 **/
		protected void setup(Context context) {
			//初始化數組長度為5
			tops = new long[5];  
		}
		
		//找出最大值
		protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {
			//按照制表符進行分割
			final String[] flows = v1.toString().split("\t");
			//將文本轉數值
			final long val = Long.parseLong(flows[4]);
			//保存在0索引
			tops[0] = val;
			//排序后最大值在最后一個索引,這樣從后到前依次減小
			Arrays.sort(tops);
		}
		
		/** ---此方法在全部到map任務結束后執行一次。這時僅輸出臨時變量到最大值--- **/
		protected void cleanup(Context context) throws IOException ,InterruptedException {
			//保存前5條數據
			for( int i = 0; i < tops.length; i++) {  
				context.write(new LongWritable(tops[i]), NullWritable.get());  
			}
		}
	}
	
	//reduce
	public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
		//臨時變量
		Long temp = Long.MIN_VALUE;
		//Top5存儲空間
		long[] tops;

		/** 次方法在run中調用,在全部map之前執行一次 **/
		protected void setup(Context context) {
			//初始化長度為5
			tops = new long[5];  
		}
		
		//因為每個文件都得到5個值,再次將這些值比對,得到最大的
		protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException {
			
			long top = Long.parseLong(k2.toString());
			//
			tops[0] = top;
			//
			Arrays.sort(tops);
		}
		
		/** ---此方法在全部到reduce任務結束后執行一次。輸出前5個最大值--- **/
		protected void cleanup(Context context) throws IOException, InterruptedException {
			//保存前5條數據
			for( int i = 0; i < tops.length; i++) {  
				context.write(new LongWritable(tops[i]), NullWritable.get());  
			} 
		}
	}


 

        (2)配置輸出:
// 設置輸出類型
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);


 

    5)測試2:

Hadoop中MapReduce常用算法有哪些


5.單表關聯: 

    本例中的單表實際就是一個文本文件。 

    1)數據:

Hadoop中MapReduce常用算法有哪些

    2)代碼:

        (1)map和reduce:
//map
	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
		//拆分原始數據
		protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {
			//按制表符拆分記錄
			String[] splits = v1.toString().split("\t");
			//一條k2v2記錄:把孫輩作為k2;祖輩加下劃線區分,作為v2
			context.write(new Text(splits[0]), new Text("_"+splits[1]));
			//一條k2v2記錄:把祖輩作為k2;孫輩作為v2。就是把原兩個單詞調換位置保存
			context.write(new Text(splits[1]), new Text(splits[0]));
		}
			
			/**
				張三		_張三爸爸
				張三爸爸	張三
				
				張三爸爸	_張三爺爺
				張三爺爺	張三爸爸
			**/
	}
	
	//reduce
	public static class MyReducer extends Reducer<Text, Text, Text, Text> {
		//拆分k2v2[...]數據
		protected void reduce(Text k2, Iterable<Text> v2, Context context) throws IOException ,InterruptedException {
			String grandchild = "";	//孫輩
			String grandfather = "";	//祖輩
			
			/**
			 	張三爸爸		[_張三爺爺,張三]
			**/
			
			//從迭代中遍歷v2[...]
			for (Text man : v2) {
				String p = man.toString();
				//如果單詞是以下劃線開始的
				if(p.startsWith("_")){
					//從索引1開始截取字符串,保存到祖輩變量
					grandfather = p.substring(1);
				}
				//如果單詞沒有下劃線起始
				else{
					//直接賦值給孫輩變量
					grandchild = p;
				}
			}
			
			//在得到有效數據的情況下
			if( grandchild!="" && grandfather!=""){
				//寫出得到的結果。
				context.write(new Text(grandchild), new Text(grandfather));
			}
			
			/**
				k3=張三,v3=張三爺爺
			**/
		}
	}


 

        (2)配置輸出:
// 設置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);


 

    3)測試:

Hadoop中MapReduce常用算法有哪些


6.雙表關聯: 

    本例中仍簡單采用兩個文本文件。

    1)數據:

Hadoop中MapReduce常用算法有哪些

    2)代碼:

        (1)map和reduce:
//map
	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
		//拆分原始數據
		protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {
			//拆分記錄
			String[] splited = v1.toString().split("\t");
			//如果第一列是數字(使用正則判斷),就是地址表
			if(splited[0].matches("^[-+]?(([0-9]+)([.]([0-9]+))?|([.]([0-9]+))?)$")){
				String addreId = splited[0];
				String address = splited[1];
  //k2,v2-加兩條下劃線作為前綴標識為地址
				context.write(new Text(addreId), new Text("__"+address));
			}
			//否則就是人員表
			else{
				String personId = splited[1];
				String persName = splited[0];
  //k2,v2-加兩條橫線作為前綴標識為人員
				context.write(new Text(personId), new Text("--"+persName));
			}
			/**
			 1	__北京
			 1	--張三
			**/
		}
	}
	
	//reduce
	public static class MyReducer extends Reducer<Text, Text, Text, Text> {
		//拆分k2v2[...]數據
		protected void reduce(Text k2, Iterable<Text> v2, Context context) throws IOException ,InterruptedException {
			String address = "";	//地址
			String person = "";		//人員
			/**
				1, [__北京,--張三]
			**/
			//迭代的是address或者person
			for (Text text : v2) {
				String tmp = text.toString();
				
				if(tmp.startsWith("__")){
					//如果是__開頭的是address
					address = tmp.substring(2);	//從索引2開始截取字符串
				}
				if(tmp.startsWith("--")){
					//如果是--開頭的是person
					person = tmp.substring(2);
				}
			}
			context.write(new Text(person), new Text(address));
		}
		/**
		 k3=張三,v3=北京
		**/
	

 

        (2)配置輸出:
// 設置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

    3)測試:

Hadoop中MapReduce常用算法有哪些

關于“Hadoop中MapReduce常用算法有哪些”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

台安县| 灵山县| 天镇县| 尉犁县| 平和县| 东兰县| 襄汾县| 齐河县| 饶阳县| 利川市| 阳西县| 临泉县| 同仁县| 洛宁县| 晋宁县| 象山县| 嘉峪关市| 峡江县| 岗巴县| 南安市| 潼关县| 金湖县| 隆安县| 荥阳市| 瑞金市| 治县。| 射洪县| 铜梁县| 德格县| 辰溪县| 三门峡市| 安乡县| 甘南县| 金华市| 密山市| 安多县| 年辖:市辖区| 乌审旗| 淮滨县| 临猗县| 临桂县|