中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

【總結】Hadoop中的MultipleOutputs實踐

發布時間:2020-07-20 09:09:13 來源:網絡 閱讀:1577 作者:巧克力黒 欄目:大數據

本例子采用hadoop1.1.2版本,附件中有例子的數據文件

采用氣象數據作為處理數據


1、MultipleOutputs例子,具體解釋在代碼中有注釋

package StationPatitioner;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * hadoop Version 1.1.2
 * MultipleOutputs例子
 * @author 巧克力黑
 *
 */
public class PatitionByStationUsingMultipleOutputs extends Configured implements Tool {
	enum Counter 
	{
		LINESKIP,	//出錯的行
	}
	static class StationMapper extends MapReduceBase implements Mapper<LongWritable , Text, Text , Text>{
		private NcdcRecordParser parser = new NcdcRecordParser();
		@Override
		public void map(LongWritable key, Text value,
				OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {
			try {
				parser.parse(value);
				output.collect(new Text(parser.getStationid()), value);
			} catch (Exception e) {
				reporter.getCounter(Counter.LINESKIP).increment(1);	//出錯令計數器+1
			}
			
		}
	}

	static class MultipleOutputReducer extends MapReduceBase implements Reducer<Text, Text, NullWritable, Text>{
		private MultipleOutputs multipleOutputs;
		@Override
		public void configure(JobConf jobconf) {
			multipleOutputs = new MultipleOutputs(jobconf);//初始化一個MultipleOutputs
		}
		
		@Override
		public void reduce(Text key, Iterator<Text> values,
				OutputCollector<NullWritable, Text> output, Reporter reporter)
				throws IOException {
			//得到OutputCollector
			OutputCollector collector = multipleOutputs.getCollector("station", key.toString().replace("-", ""), reporter);
			while(values.hasNext()){
				collector.collect(NullWritable.get(), values.next());//MultipleOutputs用OutputCollector輸出數據
			}
		}
		
		@Override
		public void close() throws IOException {
			multipleOutputs.close();
		}
	}
	
	@Override
	public int run(String[] as) throws Exception {
		System.setProperty("HADOOP_USER_NAME", "root");//windows下用戶與linux用戶不一直,采用此方法避免報Permission相關錯誤
		JobConf conf = new JobConf();
		
		conf.setMapperClass(StationMapper.class);
		conf.setReducerClass(MultipleOutputReducer.class);
		conf.setMapOutputKeyClass(Text.class);
		conf.setOutputKeyClass(NullWritable.class);
		conf.setOutputFormat(NullOutputFormat.class);
	    FileInputFormat.setInputPaths(conf, new Path("hdfs://ubuntu:9000/sample1.txt"));//input路徑
	    FileOutputFormat.setOutputPath(conf, new Path("hdfs://ubuntu:9000/temperature"));//output路徑
		
		MultipleOutputs.addMultiNamedOutput(conf, "station", TextOutputFormat.class, NullWritable.class, Text.class);
		
		JobClient.runJob(conf);
		return 0;
	}
	
	public static void main(String[] args) throws Exception{
		int exitCode = ToolRunner.run(new PatitionByStationUsingMultipleOutputs(), args);
		System.exit(exitCode);
	}
	
}


2、解析氣象數據的類

package StationPatitioner;

import org.apache.hadoop.io.Text;

public class NcdcRecordParser {
	private static final int MISSING_TEMPERATURE = 9999;

	private String year;
	private int airTemperature;
	private String quality;
	private String stationid;

	public void parse(String record) {
		stationid = record.substring(0, 5);
		year = record.substring(15, 19);
		String airTemperatureString;
		// Remove leading plus sign as parseInt doesn't like them
		if (record.charAt(87) == '+') {
			airTemperatureString = record.substring(88, 92);
		} else {
			airTemperatureString = record.substring(87, 92);
		}
		airTemperature = Integer.parseInt(airTemperatureString);
		quality = record.substring(92, 93);
	}
	
	public String getStationid(){
		return stationid;
	}

	public void parse(Text record) {
		parse(record.toString());
	}

	public boolean isValidTemperature() {
		return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]");
	}

	public String getYear() {
		return year;
	}

	public int getAirTemperature() {
		return airTemperature;
	}
}


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

理塘县| 鲁山县| 东阳市| 桃园县| 仙居县| 辰溪县| 淮阳县| 绵竹市| 通城县| 日照市| 黄骅市| 灵丘县| 资阳市| 峡江县| 毕节市| 万年县| 瓮安县| 彰武县| 德昌县| 晋城| 莲花县| 阿瓦提县| 佛学| 巨野县| 稻城县| 陇南市| 承德县| 三门峡市| 云龙县| 凯里市| 富顺县| 岑巩县| 青海省| 靖边县| 冷水江市| 沁阳市| 横峰县| 稷山县| 昌宁县| 英德市| 辛集市|