中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Hadoop序列化怎么實現

發布時間:2021-12-10 09:50:09 來源:億速云 閱讀:140 作者:iii 欄目:云計算

這篇文章主要講解了“Hadoop序列化怎么實現”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Hadoop序列化怎么實現”吧!

Hadoop I/O

Data Integrity

Hdfs: % hadoop fs -cat hdfs://namenode/data/a.txt

LocalFS: % hadoop fs -cat file:///tmp/a.txt

generate crc check sum file

%hadoop fs -copyToLocal -crc /data/a.txt file:///data/a.txt

check sum file: .a.txt.crc is a hidden file.

Ref: CRC-32,循環冗余校驗算法,error-detecting.

io.bytes.per.checksum is deprecated, it's dfs.bytes-per-checksum, default is 512, Must not be larger than dfs.stream-buffer-size,which is the size of buffer to stream files. The size of this buffer should probably be a multiple of hardware page size (4096 on Intel x86), and it determines how much data is buffered during read and write operations.

Data Compression

常用算法

讀書時,hadoop支持四種壓縮算法,如果調解空間和效率的話,-1 ~ -9,代表從最優速度到最優空間. 壓縮算法支持在org.apache.hadoop.io.compress.*.

  1. deflate (.deflate), 就是常用的gzip, package ..DefaultCodec

  2. Gzip (.gz),在deflate格式加了文件頭和尾. 壓縮速度(適中),解壓速度(適中),壓縮效率(適中),package ..GzipCodec, both of java and native

  3. bzip2 (.bz2), 壓縮速度(最差),< 解壓速度(最差),壓縮效率 (最好),特點是支持可切分(splitable),對map-red非常友好。,package ..BZip2Codec,java only

  4. LZO (.lzo), 壓縮速度(最快),解壓速度(最快),壓縮效率(最差),,package com.hadoop.compressiojn.lzo.lzopCodec, native only

如果禁用原生庫,使用hadoop.native.lib.

如果使用原生庫,可能對象創建的成本較高,所以可以使用CodecPool,重復使用這些對象。

對于一個非常大的數據文件,存儲如下方案:

  1. 使用支持切分的bzip2

  2. 手動切分,并使壓縮后的part接近于block size.

  3. 使用Sequence File, 它支持壓縮和切分

  4. 使用Avro數據文件,它也支持壓縮和切分,而且增加了很多編程語言的可讀寫性。

如果Map-Red的output自動壓縮:

conf.setBoolean ("mared.output.compress",true);
conf.setClass("mapred.output.compression.codec",GzipCodec.class,CompressionCodec.class);

如果Map-Red的中間結果的自動壓縮:

//or conf.setCompressMapOutput(true);
conf.setBoolean ("mared.compress.map.output",true);

//or conf.setMapOutputComressorClass(GzipCodec.class)
conf.setClass("mapred.map.output.compression.codec",GzipCodec.class,CompressionCodec.class);

序列化(Serialization/Deserialization)

Writable and WritableComparable

// core class for hadoop
public interface Writable{
       void write(DataOutput out) throw IOException;
       void readFields(DataInput in) throw IOException;
}

public interface Comparable<T>{
       int compareTo(T o);
}

//core class for map-reduce shuffle
public interface WritableComparable<T> extends Writable, Comparable<T> {
}

// Sample
public class MyWritableComparable implements WritableComparable {
       // Some data
       private int counter;
       private long timestamp;
       
       public void write(DataOutput out) throws IOException {
         out.writeInt(counter);
         out.writeLong(timestamp);
       }
       
       public void readFields(DataInput in) throws IOException {
         counter = in.readInt();
         timestamp = in.readLong();
       }
       
       public int compareTo(MyWritableComparable o) {
         int thisValue = this.value;
         int thatValue = o.value;
         return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
       }

       public int hashCode() {
         final int prime = 31;
         int result = 1;
         result = prime * result + counter;
         result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
         return result
       }
}

//optimize for stream comparasion
public interface RawComparator<T> extends Comparator<T>{
      // s1 start position, l1, length of bytes
      public int compare(byte[] b1, int s1,int l1,byte[] b2,int s2,int l2);
}

public class WritableComparator implements RawComparator{
}

Comparator RawComparator WritableComparator

WritableComparator 提供了原始compator的compare反序列化對象的實現,性能較差。不過它作為RawComparator實例的工廠:

RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class);

// 注冊一個經過優化的比較算子。Register an optimized comparator for a WritableComparable implementation.

static void define(Class c, WritableComparator comparator);

// 獲得一個WritableComparable的比較算子. Get a comparator for a WritableComparable implementation.

static WritableComparator get(Class<? extends WritableComparable> c);

public MyWritableComparator extends WritableComparator{
    static{
        define(MyWritableComparable.class, new MyWritableComparator());
    }
    public MyWritableComparator {
        super(MyWritableComparable.class);
    }

    @Override
    public int compare(byte[] b1, int s1,int l1,byte[] b2,int s2,int l2){
    }
}

 

注: 要使static initializer被調用,除非有該類的實例被創建,或某靜態方法或成員被訪問。或者直接強制,代碼如:

Class.forName("package.yourclass"); 它會強制初始化靜態initializer.

Java Primitive Data Type wrapped by Writable

Extends from WritableComparable
  • BooleanWritable, 1

  • ByteWritable, 1,

  • BytesWritable,

  • IntWritable,4

  • VIntWritable,1~5

  • FloatWritable,4,

  • LongWritable,8,

  • VLongWritable,1~9

  • DoubleWritable,8

  • NullWritable,Immutable singletone.

  • Text,4~

  • MD5Hash,

  • ObjectWritable,

  • GenericWritable

Extends from Writable only
  • ArrayWritable

  • TwoDArrayWritable

  • AbstractMapWritable

  •      MapWritable

  •      SortedMapWritable

[Text]

值得一提的是Text的序列化方式是Zero-compressed encoding,這個看過一些資料,其實是一種編碼方式,意圖是省略掉高位0所占用的空間,對于小數,它能節省空間,對于大數會額外占用空間。相比壓縮,它能比較快速。其實類似于VIntWritable, VLongWritable的編碼方式。

- 如何選擇變長和定長數值呢?

1. 定長適合分布非常均勻的數值(如hash),變長適合分布非常不均勻的數值。

2. 變長可以節省空間,而且可以在VIntWritable 和VLongWritable之間轉換。

- Text和String的區別

1。String是char序列,Text是UTF-8的byte序列.

UTF-8類不能對字符串大于32767的進行utf-8編碼。

(Indexing)索引:對于ASCII來說, Text和String是一樣的, 對于Unicode就不同了。String類的長度是其所含char編碼單元的長度,然而Text是UTF-8的字節碼的長度。CodePointAt表示一個真正的Unicode字符,它可以是2char,4bytes的unicode。

Iteration(迭代): 將Text轉換ByteBuffer,然后反復調用bytesToCodePoint()靜態方法,可以取到整型的Unicode.

Mutable(易變性): 可以set,類似writable 和StringBuffer,getLength()返回有效字串長度,getbytes().length,返回空間大小。

[BytesWritable]

這是二進制數組的封裝,類似于windows下的BSTR,都是前面一個整型表示字節長度,后面是字節的二進制流。

它也是mutable,getLength() != getBytes().length

[NullWritable]

NullWritable是Writable的一個特殊類型。它的序列化長度為0,其實只是一個占位符,既不讀入,也不寫出。只是存在于程序體中。

Immutable,是一個singleton。

[ObjectWritable] 

ObjectWritable是Java的Array, String, 以及Primitive類型的通用封裝 (注:不包含Integer)。它的序列化則使用java的類型序列化,寫入類型信息等,比較占用空間。

通過兩個特殊的構造:

public ObjectWritable(Object instance);

public ObjectWritable(Class declaredClass,Object instance);

舉例子:

ObjectWritable objectw = new ObjectWritable(int.class,5);

[GenericWritable]

首先這是一個抽象類,需要被具象化才能使用。

觀察下面這個實列,它以一種Union方式,顯示的代理一個Writable實例,解決了Reduce函數的參數聲明問題。

public class MyGenericWritable extends GenericWritable {

    private static Class<? extends Writable>[] CLASSES = null;

    static {
        CLASSES = (Class<? extends Writable>[]) new Class[] {
            IntWritable.class,
            Text.class
             //add as many different Writable class as you want
        };
    }


    @Override
    protected Class<? extends Writable>[] getTypes() {
        return CLASSES;
    }

    @Override
    public String toString() {
        return "MyGenericWritable [getTypes()=" + Arrays.toString(getTypes()) + "]";
    }

    // override hashcode();
}

public class Reduce extends Reducer<Text, MyGenericWritable, Text, Text> {
    public void reduce(Text key, Iterable<MyGenericWritable> values, Context context) throws IOException, InterruptedException {
}
[ArrayWritable /TwoDArrayWritable]

ArrayWritable aw = new ArrayWriable(Text.class);

[MapWritable / SortedMapWritable]

實現了java.util.Map<Writable,Writable> 和SortedMap...

它的serialize, 使用先寫map<classname,id>,然后后邊每個類的類型,以id來替代,節省空間。這些都在父類AbstractMapWritable中實現。

集合小結:

1. 如果是單類型的列表,使用ArrayWritable就足夠了

2。如果是把不同類型的Writable存儲在一個列表中:

-- 可以使用GenerickWritable,把元素封裝在一個ArrayWritable,這個貌似只能同一類型。

    public class MyGenericWritable extends GenericWritable {

    private static Class<? extends Writable>[] CLASSES = null;

    static {
        CLASSES = (Class<? extends Writable>[]) new Class[] {
            ArrayWritable.class,
             //add as many different Writable class as you want
        };
    }


    @Override
    protected Class<? extends Writable>[] getTypes() {
        return CLASSES;
    }

-- 可以使用寫一個仿照MapWritable的ListWritable

    //注意實現hashcode,equals,toString, comparTo (if possible)

    //hashcode尤其重要,HashPartitioner通常用hashcode來選擇reduce分區,所以為你的類寫一個比較好的hashcode非常必要。

    public class ListWritable extends ArrayList<Writable> implements Writable {

    }

/**
 * @author cloudera
 *
 */
public class ListWritable extends ArrayList<Writable> implements Writable {
	private List<Writable> list = new ArrayList<Writable>();
	
	public void set(Writable writable){
		list.add(writable);
	}
	
	@Override
	public void readFields(DataInput in) throws IOException {
		int nsize = in.readInt();
		Configuration conf = new Configuration();
		Text className = new Text();
		while(nsize-->0){
	
			Class theClass = null;
			try {
				className.readFields(in);
				theClass = Class.forName(className.toString());
			} catch (ClassNotFoundException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

			Writable w = (Writable)ReflectionUtils.newInstance(theClass,conf);
			w.readFields(in);
			
			add(w);
			
		}
	}

	@Override
	public void write(DataOutput out) throws IOException {
		Writable w = null;
		out.writeInt(size());
		for(int i = 0;i<size();i++){
			w = get(i);
			new Text(w.getClass().getName()).write(out);
			w.write(out);
		}
	}

}

感謝各位的閱讀,以上就是“Hadoop序列化怎么實現”的內容了,經過本文的學習后,相信大家對Hadoop序列化怎么實現這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

政和县| 冀州市| 淮阳县| 抚顺县| 遂平县| 龙泉市| 明光市| 泽州县| 西盟| 广东省| 高唐县| 昔阳县| 张北县| 海口市| 崇文区| 内黄县| 梧州市| 泽普县| 义乌市| 庆云县| 冕宁县| 昌宁县| 保德县| 普陀区| 贵溪市| 桂阳县| 鄄城县| 灵丘县| 涞源县| 藁城市| 甘洛县| 孝感市| 浮山县| 娱乐| 绥阳县| 武汉市| 德惠市| 洛川县| 垦利县| 延边| 福州市|