中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

hadoop如何自定義格式化輸出

發布時間:2021-12-09 16:25:34 來源:億速云 閱讀:184 作者:小新 欄目:大數據

這篇文章給大家分享的是有關hadoop如何自定義格式化輸出的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

import java.io.IOException;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CustomizeOutputFormat {
	static final Log LOG = LogFactory.getLog(CustomizeOutputFormat.class);
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(CustomizeOutputFormat.class);
		job.setMapperClass(CustMapper.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		//此處這只自定義的格式化輸出
		job.setOutputFormatClass(CustOutputFormat.class);
		String jobName = "Customize outputformat test!";
		job.setJobName(jobName);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		boolean b = job.waitForCompletion(true);
		if(b) {
			LOG.info("Job "+ jobName +" is done.");
			
		}else {
			LOG.info("Job "+ jobName +"is going wrong,now exit.");
			System.exit(0);
		}
		
	}
}
class CustMapper extends Mapper<LongWritable, Text, Text, Text>{
	String[] textIn = null;
	Text outkey = new Text();
	Text outvalue = new Text();
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		/**
		 * 假設文件的內容為如下:
		 * boys	girls
		 * firends goodbye
		 * down up
		 * fly to
		 * neibors that
		 * 
		 */
		textIn = value.toString().split("\t");
		outkey.set(textIn[0]);
		outvalue.set(textIn[1]);
		context.write(outkey, outvalue);		
	}	
}
//自定義OutoutFormat
class CustOutputFormat extends FileOutputFormat<Text, Text>{
	@Override
	public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
		//獲得configration
		Configuration conf = context.getConfiguration();
		//獲得FileSystem
		FileSystem fs =  FileSystem.newInstance(conf);
		//獲得輸出路徑
		Path path = CustOutputFormat.getOutputPath(context);
		URI uri = path.toUri();
		//創建兩個文件,得到寫入流
		FSDataOutputStream foa = fs.create(new Path(uri.toString()+"/out.a"));
		FSDataOutputStream fob = fs.create(new Path(uri.toString()+"/out.b"));	
		//創建自定義RecordWriter  傳入 兩個流
		CustRecordWriter rw = new CustRecordWriter(foa,fob);
		return rw;
		
	}
	
	
	class CustRecordWriter extends RecordWriter<Text, Text>{
		 FSDataOutputStream foa = null;
		 FSDataOutputStream fob = null;
		CustRecordWriter(FSDataOutputStream foa,FSDataOutputStream fob){
			this.foa = foa;
			this.fob = fob;
		}
		@Override
		public void write(Text key, Text value) throws IOException, InterruptedException {
			String mText  = key.toString();
			//根據可以長度的不同分別輸入到不同的文件
			if(mText.length()>=5) {
				foa.writeUTF(mText+"\t"+value.toString()+"\n");
			}else {
				fob.writeUTF(mText+"\t"+value.toString()+"\n");
			}
		}
		@Override
		public void close(TaskAttemptContext context) throws IOException, InterruptedException {
			//最后將兩個寫入流關閉
			if(foa!=null) {
				foa.close();
			}
			if(fob!=null) {
				fob.close();
			}
		}
	}
	
}
//使用MultipleInputs,c處理多個來源的文件
package hgs.multipuleinput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import hgs.custsort.SortBean;
import hgs.custsort.SortDriver;
import hgs.custsort.SortMapper;
import hgs.custsort.SortReducer;
public class MultipuleInputDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(SortDriver.class);
		job.setMapperClass(SortMapper.class);
		job.setReducerClass(SortReducer.class);
		job.setOutputKeyClass(SortBean.class);
		job.setOutputValueClass(NullWritable.class);
		
		MultipleInputs.addInputPath(job, new Path("/sort"), TextInputFormat.class,SortMapper.class);
		MultipleInputs.addInputPath(job, new Path("/sort1"), TextInputFormat.class,SortMapper.class);
		//FileInputFormat.setInputPaths(job, new Path("/sort"));
		FileOutputFormat.setOutputPath(job, new Path("/sortresult"));
		System.exit(job.waitForCompletion(true)==true?0:1);
	}
}

感謝各位的閱讀!關于“hadoop如何自定義格式化輸出”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

丹巴县| 郸城县| 汨罗市| 德清县| 中牟县| 石台县| 合作市| 临桂县| 泰宁县| 沙湾县| 筠连县| 开原市| 深水埗区| 巴楚县| 色达县| 四平市| 武穴市| 定西市| 扎赉特旗| 大英县| 普兰店市| 莱西市| 周至县| 扬中市| 枞阳县| 武义县| 拜泉县| 延寿县| 桓仁| 辽宁省| 阿荣旗| 措美县| 曲沃县| 略阳县| 临武县| 克什克腾旗| 梁山县| 南乐县| 黄大仙区| 美姑县| 四会市|