中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Hadoop中如何分區

發布時間:2021-12-09 15:44:33 來源:億速云 閱讀:166 作者:小新 欄目:云計算

小編給大家分享一下Hadoop中如何分區,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

package partition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class KpiApp {
	public static final String INPUT_PATH = "hdfs://hadoop:9000/files/HTTP_20130313143750.dat";
	public static final String OUTPUT_PATH = "hdfs://hadoop:9000/files/format";
	public static void main(String[] args)throws Exception {
		Configuration conf = new Configuration();
		existsFile(conf);
		Job job = new Job(conf, KpiApp.class.getName());
		//打成Jar在Linux運行
		job.setJarByClass(KpiApp.class);
		
		//1.1
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2
		job.setMapperClass(MyMapper.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWritable.class);
		
		//1.3 自定義分區
		job.setPartitionerClass(KpiPartition.class);
		job.setNumReduceTasks(2);
		
		//1.4 排序分組
		//1.5 聚合
		
		//2.1
		
		//2.2
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWritable.class);
		
		//2.3
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		job.setOutputFormatClass(TextOutputFormat.class);
		
		job.waitForCompletion(true);
	}
	private static void existsFile(Configuration conf) throws IOException,
			URISyntaxException {
		FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf);
		if(fs.exists(new Path(OUTPUT_PATH))){
			fs.delete(new Path(OUTPUT_PATH), true);
		}
	}
	static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String string = value.toString();
			String[] split = string.split("\t");
			String phone = split[1];
			Text key2 = new Text();
			key2.set(phone);
			
			KpiWritable v2= new KpiWritable();
			v2.set(split[6],split[7],split[8],split[9]);
			context.write(key2, v2);
		}
		
	}
	static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{

		@Override
		protected void reduce(Text key2, Iterable<KpiWritable> values,Context context)
				throws IOException, InterruptedException {
				long upPackNum = 0L;
				long downPackNum = 0L;
				long upPayLoad = 0L;
				long downPayLoad = 0L;
				for(KpiWritable writable : values){
					upPackNum += writable.upPackNum;
					downPackNum += writable.downPackNum;
					upPayLoad += writable.upPayLoad;
					downPayLoad += writable.downPayLoad;
				}
				KpiWritable value3 = new KpiWritable();
				value3.set(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad));
				context.write(key2, value3);
		}
	}
}
class KpiWritable implements Writable{
	long upPackNum;
	long downPackNum;
	long upPayLoad;
	long downPayLoad;
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(this.upPackNum);
		out.writeLong(this.downPackNum);
		out.writeLong(this.upPayLoad);
		out.writeLong(this.downPayLoad);
	}

	public void set(String string, String string2, String string3,
			String string4) {
		this.upPackNum = Long.parseLong(string);
		this.downPackNum = Long.parseLong(string2);
		this.upPayLoad = Long.parseLong(string3);
		this.downPayLoad = Long.parseLong(string4);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.upPackNum = in.readLong();
		this.downPackNum = in.readLong();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
	}

	@Override
	public String toString() {
		return  upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
	}
}
class KpiPartition extends Partitioner<Text, KpiWritable>{

	@Override
	public int getPartition(Text key, KpiWritable value, int numPartitions) {
		String string = key.toString();
		return string.length()==11?0:1;
	}
}

  Paritioner是Hashpartitioner的基類,如果需要定制Partitioner也需要繼承該類。

  HashPartitioner是MapReduce的默認Partitioner。

以上是“Hadoop中如何分區”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

炎陵县| 甘南县| 金华市| 福泉市| 土默特右旗| 凭祥市| 浮梁县| 衡南县| 余庆县| 色达县| 东兴市| 灌南县| 正宁县| 永丰县| 镇沅| 洛宁县| 许昌县| 北京市| 南江县| 仪征市| 滕州市| 香格里拉县| 大石桥市| 砚山县| 甘孜| 东宁县| 清镇市| 吐鲁番市| 阿巴嘎旗| 盖州市| 霞浦县| 赣榆县| 太和县| 房产| 浪卡子县| 渝北区| 于都县| 韩城市| 沅江市| 迁西县| 揭西县|