您好,登錄后才能下訂單哦!
這篇文章給大家介紹基于Hive的文件格式的RCFile及其應用是怎樣的,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
Hadoop 作為MR 的開源實現,一直以動態運行解析文件格式并獲得比MPP數據庫快上幾倍的裝載速度為優勢。不過,MPP數據庫社區也一直批評Hadoop由于文件格式并非為特定目的而建,因此序列化和反序列化的成本過高。
目前 hadoop 中流行的文件格式有如下幾種:
SequenceFile是Hadoop API 提供的一種二進制文件,它將數據以<key,value>的形式序列化到文件中。這種二進制文件內部使用Hadoop 的標準的Writable 接口實現序列化和反序列化。它與Hadoop API中的MapFile 是互相兼容的。Hive 中的SequenceFile 繼承自Hadoop API 的SequenceFile,不過它的key為空,使用value 存放實際的值, 這樣是為了避免MR 在運行map 階段的排序過程。如果你用Java API 編寫SequenceFile,并讓Hive 讀取的話,請確保使用value字段存放數據,否則你需要自定義讀取這種SequenceFile 的InputFormat class 和OutputFormat class。
RCFile是Hive推出的一種專門面向列的數據格式。 它遵循“先按列劃分,再垂直劃分”的設計理念。當查詢過程中,針對它并不關心的列時,它會在IO上跳過這些列。需要說明的是,RCFile在map階段從遠端拷貝仍然是拷貝整個數據塊,并且拷貝到本地目錄后RCFile并不是真正直接跳過不需要的列,并跳到需要讀取的列, 而是通過掃描每一個row group的頭部定義來實現的,但是在整個HDFS Block 級別的頭部并沒有定義每個列從哪個row group起始到哪個row group結束。所以在讀取所有列的情況下,RCFile的性能反而沒有SequenceFile高。
HDFS塊內行存儲的例子
HDFS塊內列存儲的例子
HDFS塊內RCFile方式存儲的例子
Avro是一種用于支持數據密集型的二進制文件格式。它的文件格式更為緊湊,若要讀取大量數據時,Avro能夠提供更好的序列化和反序列化性能。并且Avro數據文件天生是帶Schema定義的,所以它不需要開發者在API 級別實現自己的Writable對象。最近多個Hadoop 子項目都支持Avro 數據格式,如Pig 、Hive、Flume、Sqoop和Hcatalog。
除上面提到的3種二進制格式之外,文本格式的數據也是Hadoop中經常碰到的。如TextFile 、XML和JSON。 文本格式除了會占用更多磁盤資源外,對它的解析開銷一般會比二進制格式高幾十倍以上,尤其是XML 和JSON,它們的解析開銷比Textfile 還要大,因此強烈不建議在生產系統中使用這些格式進行儲存。 如果需要輸出這些格式,請在客戶端做相應的轉換操作。 文本格式經常會用于日志收集,數據庫導入,Hive默認配置也是使用文本格式,而且常常容易忘了壓縮,所以請確保使用了正確的格式。另外文本格式的一個缺點是它不具備類型和模式,比如銷售金額、利潤這類數值數據或者日期時間類型的數據,如果使用文本格式保存,由于它們本身的字符串類型的長短不一,或者含有負數,導致MR沒有辦法排序,所以往往需要將它們預處理成含有模式的二進制格式,這又導致了不必要的預處理步驟的開銷和儲存資源的浪費。
Hadoop實際上支持任意文件格式,只要能夠實現對應的RecordWriter和RecordReader即可。其中數據庫格式也是會經常儲存在Hadoop中,比如Hbase,Mysql,Cassandra,MongoDB。 這些格式一般是為了避免大量的數據移動和快速裝載的需求而用的。他們的序列化和反序列化都是由這些數據庫格式的客戶端完成,并且文件的儲存位置和數據布局(Data Layout)不由Hadoop控制,他們的文件切分也不是按HDFS的塊大小(blocksize)進行切割。
Facebook曾在2010 ICDE(IEEE International Conference on Data Engineering)會議上介紹了數據倉庫Hive。Hive存儲海量數據在Hadoop系統中,提供了一套類數據庫的數據存儲和處理機制。它采用類SQL語言對數據進行自動化管理和處理,經過語句解析和轉換,最終生成基于Hadoop的MapReduce任務,通過執行這些任務完成數據處理。下圖顯示了Hive數據倉庫的系統結構。
Facebook在數據倉庫上遇到的存儲可擴展性的挑戰是獨一無二的。他們在基于Hive的數據倉庫中存儲了超過300PB的數據,并且以每日新增600TB的速度增長。去年這個數據倉庫所存儲的數據量增長了3倍。考慮到這個增長趨勢,存儲效率問題是facebook數據倉庫基礎設施方面目前乃至將來一段時間內最需要關注的。facebook工程師發表的RCFile: A Fast and Spaceefficient Data Placement Structure in MapReducebased Warehouse Systems一文,介紹了一種高效的數據存儲結構——RCFile(Record Columnar File),并將其應用于Facebook的數據倉庫Hive中。與傳統數據庫的數據存儲結構相比,RCFile更有效地滿足了基于MapReduce的數據倉庫的四個關鍵需求,即Fast data loading、Fast query processing、Highly efficient storage space utilization和Strong adaptivity to highly dynamic workload patterns。RCFile 廣泛應用于Facebook公司的數據分析系統Hive中。首先,RCFile具備相當于行存儲的數據加載速度和負載適應能力;其次,RCFile的讀優化可以在掃描表格時避免不必要的列讀取,測試顯示在多數情況下,它比其他結構擁有更好的性能;再次,RCFile使用列維度的壓縮,因此能夠有效提升存儲空間利用率。
為了提高存儲空間利用率,Facebook各產品線應用產生的數據從2010年起均采用RCFile結構存儲,按行存儲(SequenceFile/TextFile)結構保存的數據集也轉存為RCFile格式。此外,Yahoo公司也在Pig數據分析系統中集成了RCFile,RCFile正在用于另一個基于Hadoop的數據管理系統Howl(http://wiki.apache.org/pig/Howl)。而且,根據Hive開發社區的交流,RCFile也成功整合加入其他基于MapReduce的數據分析平臺。有理由相信,作為數據存儲標準的RCFile,將繼續在MapReduce環境下的大規模數據分析中扮演重要角色。
facebook 的數據倉庫中數據被加載到表里面時首先使用的存儲格式是Facebook自己開發的Record-Columnar File Format(RCFile)。RCFile是一種“允許按行查詢,提供了列存儲的壓縮效率”的混合列存儲格式。它的核心思想是首先把Hive表水平切分成多個行組(row groups),然后組內按照列垂直切分,這樣列與列的數據在磁盤上就是連續的存儲塊了。
當一個行組內的所有列寫到磁盤時,RCFile就會以列為單位對數據使用類似zlib/lzo的算法進行壓縮。當讀取列數據的時候使用惰性解壓策略( lazy decompression),也就是說用戶的某個查詢如果只是涉及到一個表中的部分列的時候,RCFile會跳過不需要的列的解壓縮和反序列化的過程。通過在facebook的數據倉庫中選取有代表性的例子實驗,RCFile能夠提供5倍的壓縮比。
隨著數據倉庫中存儲的數據量持續增長,FB組內的工程師開始研究提高壓縮效率的技術和方法。研究的焦點集中在列級別的編碼方法,例如行程長度編碼(run-length encoding)、詞典編碼(dictionary encoding)、參考幀編碼(frame of reference encoding)、能夠在通用壓縮過程之前更好的在列級別降低邏輯冗余的數值編碼方法。FB也嘗試過新的列類型(例如JSON是在Facebook內部廣泛使用的格式,把JSON格式的數據按照結構化的方式存儲既可以滿足高效查詢的需求,同時也降低了JSON元數據存儲的冗余)。FB的實驗表明列級別的編碼如果使用得當的話能夠顯著提高RCFile的壓縮比。
與此同時,Hortonworks也在嘗試類似的思路去改進Hive的存儲格式。Hortonworks的工程團隊設計和實現了ORCFile(包括存儲格式和讀寫接口),這幫助Facebook的數據倉庫設計和實現新的存儲格式提供了一個很好的開始。
關于 ORCFile 的介紹請見這里:http://yanbohappy.sinaapp.com/?p=478
關于性能評測,筆者這里暫時沒有條件,貼一張某次 hive 技術峰會演講嘉賓的截圖:
上面說了這么多,想必你已經知道 RCFile 主要用于提升 hive 的查詢效率,那如何生成這種格式的文件呢?
例如:
insert overwrite table http_RCTable partition(dt='2013-09-30') select p_id,tm,idate,phone from tmp_testp where dt='2013-09-30';
目前為止,mapreduce 并沒有提供內置 API 對 RCFile 進行支持,倒是 pig、hive、hcatalog 等 hadoop生態圈里的其他項目進行了支持,究其原因是因為 RCFile 相比 textfile 等其它文件格式,對于 mapreduce 的應用場景來說沒有顯著的優勢。
為了避免重復造輪子,下面的生成 RCFile 的 mapreduce 代碼調用了 hive 和 hcatalog 的相關類,注意你在測試下面的代碼時,你的 hadoop、hive、hcatalog 版本要一致,否則。。。你懂的。。。
比如我用的 hive-0.10.0+198-1.cdh5.4.0,那么就應該下載對應的版本:http://archive.cloudera.com/cdh5/cdh/4/
PS:下面的代碼已經測試通過,木有問題。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hcatalog.rcfile.RCFileMapReduceInputFormat; import org.apache.hcatalog.rcfile.RCFileMapReduceOutputFormat; public class TextToRCFile extends Configured implements Tool{ public static class Map extends Mapper<Object, Text, NullWritable, BytesRefArrayWritable>{ private byte[] fieldData; private int numCols; private BytesRefArrayWritable bytes; @Override protected void setup(Context context) throws IOException, InterruptedException { numCols = context.getConfiguration().getInt("hive.io.rcfile.column.number.conf", 0); bytes = new BytesRefArrayWritable(numCols); } public void map(Object key, Text line, Context context ) throws IOException, InterruptedException { bytes.clear(); String[] cols = line.toString().split("\\|"); System.out.println("SIZE : "+cols.length); for (int i=0; i<numCols; i++){ fieldData = cols[i].getBytes("UTF-8"); BytesRefWritable cu = null; cu = new BytesRefWritable(fieldData, 0, fieldData.length); bytes.set(i, cu); } context.write(NullWritable.get(), bytes); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length < 2){ System.out.println("Usage: " + "hadoop jar RCFileLoader.jar <main class> " + "-tableName <tableName> -numCols <numberOfColumns> -input <input path> " + "-output <output path> -rowGroupSize <rowGroupSize> -ioBufferSize <ioBufferSize>"); System.out.println("For test"); System.out.println("$HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable " + "-tableName test1 -numCols 10 -input RCFileLoaderTest/test1 " + "-output RCFileLoaderTest/RCFile_test1"); System.out.println("$HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable " + "-tableName test2 -numCols 5 -input RCFileLoaderTest/test2 " + "-output RCFileLoaderTest/RCFile_test2"); return 2; } /* For test */ String tableName = ""; int numCols = 0; String inputPath = ""; String outputPath = ""; int rowGroupSize = 16 *1024*1024; int ioBufferSize = 128*1024; for (int i=0; i<otherArgs.length - 1; i++){ if("-tableName".equals(otherArgs[i])){ tableName = otherArgs[i+1]; }else if ("-numCols".equals(otherArgs[i])){ numCols = Integer.parseInt(otherArgs[i+1]); }else if ("-input".equals(otherArgs[i])){ inputPath = otherArgs[i+1]; }else if("-output".equals(otherArgs[i])){ outputPath = otherArgs[i+1]; }else if("-rowGroupSize".equals(otherArgs[i])){ rowGroupSize = Integer.parseInt(otherArgs[i+1]); }else if("-ioBufferSize".equals(otherArgs[i])){ ioBufferSize = Integer.parseInt(otherArgs[i+1]); } } conf.setInt("hive.io.rcfile.record.buffer.size", rowGroupSize); conf.setInt("io.file.buffer.size", ioBufferSize); Job job = new Job(conf, "RCFile loader: loading table " + tableName + " with " + numCols + " columns"); job.setJarByClass(TextToRCFile.class); job.setMapperClass(Map.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(BytesRefArrayWritable.class); // job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(inputPath)); job.setOutputFormatClass(RCFileMapReduceOutputFormat.class); RCFileMapReduceOutputFormat.setColumnNumber(job.getConfiguration(), numCols); RCFileMapReduceOutputFormat.setOutputPath(job, new Path(outputPath)); RCFileMapReduceOutputFormat.setCompressOutput(job, false); System.out.println("Loading table " + tableName + " from " + inputPath + " to RCFile located at " + outputPath); System.out.println("number of columns:" + job.getConfiguration().get("hive.io.rcfile.column.number.conf")); System.out.println("RCFile row group size:" + job.getConfiguration().get("hive.io.rcfile.record.buffer.size")); System.out.println("io bufer size:" + job.getConfiguration().get("io.file.buffer.size")); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new TextToRCFile(), args); System.exit(res); } }
關于基于Hive的文件格式的RCFile及其應用是怎樣的就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。