中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Hadoop的多文件輸出及自定義文件名方法是什么

發布時間:2021-12-10 11:18:59 來源:億速云 閱讀:163 作者:iii 欄目:云計算

本篇內容介紹了“Hadoop的多文件輸出及自定義文件名方法是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

    首先是輸出格式的類,也就是job.setOutputFormatClass(……)參數列表中的類:

public class MoreFileOutputFormat extends Multiple<Text,Text>
{
  @Override
  protected String generateFileNameForKeyValue(Text key, Text value,Configuration conf) 
  {
      return "Your name";
  }
}

    這里,繼承Multiple類后必須重寫generateFileNameForKeyValue()方法,這個方法返回的字符串作為輸出文件的文件名。內容有各位自己根據需要編寫。同時,key和value的值也根據自己的需要更換。

    接下來是Multiple模板類的代碼:

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public abstract class Multiple<K extends WritableComparable<?>, V extends Writable>
  extends FileOutputFormat<K, V> 
{
   // 接口類,需要在調用程序中實現generateFileNameForKeyValue來獲取文件名
   private MultiRecordWriter writer = null;
   public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
     throws IOException, InterruptedException 
   {
        if (writer == null) 
        {
             writer = new MultiRecordWriter(job, getTaskOutputPath(job));
        }
        return writer;
   }
    
   /**
    * get task output path
    * 
    * @param conf
    * @return
    * @throws IOException
    */
   private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException
   {
        Path workPath = null;
        OutputCommitter committer = super.getOutputCommitter(conf);
        if (committer instanceof FileOutputCommitter) 
        {
             workPath = ((FileOutputCommitter) committer).getWorkPath();
        } 
        else 
        {
             Path outputPath = super.getOutputPath(conf);
             if (outputPath == null) 
             {
                  throw new IOException("Undefined job output-path");
             }
             workPath = outputPath;
        }
        return workPath;
   }
    
   //繼承后重寫以獲得文件名
   protected abstract String generateFileNameForKeyValue(K key, V value,Configuration conf);
    
   //實現記錄寫入器RecordWriter類 (內部類)
   public class MultiRecordWriter extends RecordWriter<K, V> 
   {
        /** RecordWriter的緩存 */
        private HashMap<String, RecordWriter<K, V>> recordWriters = null;
        private TaskAttemptContext job = null;
        
        /** 輸出目錄 */
        private Path workPath = null;
        public MultiRecordWriter(TaskAttemptContext job, Path workPath) 
        {
             super();
             this.job = job;
             this.workPath = workPath;
             recordWriters = new HashMap<String, RecordWriter<K, V>>();
        }
          
        @Override
        public void close(TaskAttemptContext context) throws IOException,
          InterruptedException 
        {
             Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
             while (values.hasNext()) 
             {
                  values.next().close(context);
             }
             this.recordWriters.clear();
        }
          
        @Override
        public void write(K key, V value) throws IOException,
          InterruptedException 
        {
             // 得到輸出文件名
             String baseName = generateFileNameForKeyValue(key, value,job.getConfiguration());
             // 如果recordWriters里沒有文件名,那么就建立。否則就直接寫值。
             RecordWriter<K, V> rw = this.recordWriters.get(baseName);
             if (rw == null) 
             {
                  rw = getBaseRecordWriter(job, baseName);
                  this.recordWriters.put(baseName, rw);
             }
             rw.write(key, value);
        }
          
        // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
        private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job,
          String baseName) throws IOException, InterruptedException 
        {
             Configuration conf = job.getConfiguration();
             // 查看是否使用解碼器
             boolean isCompressed = getCompressOutput(job);
             RecordWriter<K, V> recordWriter = null;
             if (isCompressed) 
             {
                  Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
                    job, GzipCodec.class);
                  CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
                  Path file = new Path(workPath, baseName + codec.getDefaultExtension());
                  FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                  // 這里我使用的自定義的OutputFormat
                  recordWriter = new MyRecordWriter<K, V>(new DataOutputStream(
                    codec.createOutputStream(fileOut)));
             } 
             else 
             {
                  Path file;
                  System.out.println("workPath = " + workPath + ", basename = " + baseName);
                  file = new Path(workPath, baseName);
                  FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                  // 這里我使用的自定義的OutputFormat
                  recordWriter = new MyRecordWriter<K, V>(fileOut);
             }
             return recordWriter;
        }
   }
}

    現在來實現Multiple的內部類MultiRecordWriter中的MyRecordWriter類以實現自己想要的輸出方式:

public class MyRecordWriter<K, V> extends RecordWriter<K,V>
{
   private static final String utf8 = "UTF-8";//定義字符編碼格式     
   protected DataOutputStream out;  
       
   public MyRecordWriter(DataOutputStream out) 
   {
        this.out = out;  
   }
       
   private void writeObject(Object o) throws IOException 
   {
        if (o instanceof Text)
        {
             Text to = (Text) o;
             out.write(to.getBytes(), 0, to.getLength());
        }
        else
        {
               //輸出成字節流。如果不是文本類的,請更改此處
             out.write(o.toString().getBytes(utf8));
        }
   }
     
   /** 
    * 將mapreduce的key,value以自定義格式寫入到輸出流中 
    */
   public synchronized void write(K key, V value) throws IOException
   {
        writeObject(value);
   }  
     
   public synchronized void close(TaskAttemptContext context) throws IOException
   {
        out.close();
   } 
}

    這個類中還有其它集中方法,不過筆者不需要那些方法,所以把它們都刪除了,但最初的文件也刪除了- -,所以現在找不到了。請大家見諒。

    現在,只需在main()或者run()函數中將job的輸出格式設置成MoreFileOutputFormat類就行了,如下:

job.setOutputFormatClass(MoreFileOutputFormatClass);

“Hadoop的多文件輸出及自定義文件名方法是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

丰顺县| 梅河口市| 浪卡子县| 关岭| 平利县| 望都县| 嘉义县| 嘉黎县| 鄯善县| 河源市| 绍兴县| 西宁市| 佛冈县| 佛教| 高唐县| 九龙城区| 延吉市| 邯郸县| 静宁县| 保德县| 鹤山市| 长寿区| 喀什市| 扎兰屯市| 龙陵县| 西城区| 喜德县| 伊吾县| 游戏| 陆河县| 永城市| 德昌县| 镇康县| 鱼台县| 大渡口区| 安塞县| 左贡县| 黄梅县| 永吉县| 股票| 庆元县|