中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何分析Spark SQL中的Parquet

發布時間:2021-12-16 21:03:39 來源:億速云 閱讀:173 作者:柒染 欄目:大數據

今天就跟大家聊聊有關如何分析Spark SQL中的Parquet,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

Parquet是一種列式存儲格式,很多種處理引擎都支持這種存儲格式,也是sparksql的默認存儲格式。Spark SQL支持靈活的讀和寫Parquet文件,并且對parquet文件的schema可以自動解析。當Spark SQL需要寫成Parquet文件時,處于兼容的原因所有的列都被自動轉化為了nullable。

讀寫Parquet文件

// Encoders for most common types are automatically provided by importing spark.implicits._import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema informationpeopleDF.write.parquet("people.parquet")

// Read in the parquet file created above// Parquet files are self-describing so the schema is preserved// The result of loading a Parquet file is also a DataFrameval parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statementsparquetFileDF.createOrReplaceTempView("parquetFile")val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")namesDF.map(attributes => "Name: " + attributes(0)).show()// +------------+// |       value|// +------------+// |Name: Justin|// +------------+

分區發現

分區表時很多系統支持的,比如hive,對于一個分區表,往往是采用表中的某一或多個列去作為分區的依據,分區是以文件目錄的形式體現。所有內置的文件源(Text/CSV/JSON/ORC/Parquet)都支持自動的發現和推測分區信息。例如,我們想取兩個分區列,gender和country,先按照性別分區,再按照國家分區:

path└── to   └── table       ├── gender=male       │   ├── ...       │   │       │   ├── country=US       │   │   └── data.parquet       │   ├── country=CN       │   │   └── data.parquet       │   └── ...       └── gender=female           ├── ...           │           ├── country=US           │   └── data.parquet           ├── country=CN           │   └── data.parquet           └── ...

SparkSession.read.parquet 或者 SparkSession.read.load讀取的目錄為path/to/table的時候,會自動從路徑下抽取分區信息。返回DataFrame的表結構為:

root|-- name: string (nullable = true)|-- age: long (nullable = true)|-- gender: string (nullable = true)|-- country: string (nullable = true)

細細分析一下你也會發現分區列的數據類型也是自動推斷的。當前支持的數據類型有,數字類型,date,timestamp和string類型。有時候用戶可能不希望自動推斷分區列的類型,這時候只需要將spark.sql.sources.partitionColumnTypeInference.enabled配置為false即可。如果分區列的類型推斷這個參數設置為了false,那么分區列的類型會被認為是string。

從spark 1.6開始,分區發現默認情況只會發現給定路徑下的分區。比如,上面的分區表,假如你講路徑path/to/table/gender=male傳遞給SparkSession.read.parquet 或者 SparkSession.read.load 那么gender不會被認為是分區列。如果想檢測到該分區,傳給spark的路徑應該是其父路徑也即是path/to/table/,這樣gender就會被認為是分區列。

schema合并

跟protocol buffer,avro,thrift一樣,parquet也支持schema演變升級。用戶可以在剛開始的時候創建簡單的schema,然后根據需要隨時擴展新的列。

Parquet 數據源支持自動檢測新作列并且會合并schema。

由于合并schema是一個相當耗費性能的操作,而且很多情況下都是不必要的,所以從spark 1.5開始就默認關閉掉該功能。有兩種配置開啟方式:

通過數據源option設置mergeSchema為true。在全局sql配置中設置spark.sql.parquet.mergeSchema 為true.// This is used to implicitly convert an RDD to a DataFrame.import spark.implicits._

// Create a simple DataFrame, store into a partition directoryval squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,// adding a new column and dropping an existing columnval cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned tableval mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together// with the partitioning column appeared in the partition directory paths// root//  |-- value: int (nullable = true)//  |-- square: int (nullable = true)//  |-- cube: int (nullable = true)//  |-- key: int (nullable = true)

hive metastore Parquet表轉換

當讀寫hive metastore parquet格式表的時候,Spark SQL為了較好的性能會使用自己默認的parquet格式而不是采用hive SerDe。該行為是通過參數spark.sql.hive.convertMetastoreParquet空值,默認是true。

Hive和parquet兼容性

從表schema處理角度講hive和parquet有兩個主要的區別

  1. hive是大小寫敏感的,但是parquet不是。

  2. hive會講所有列視為nullable,但是nullability在parquet里有獨特的意義。

由于上面的原因,在將hive metastore parquet轉化為spark parquet表的時候,需要處理兼容一下hive的schema和parquet的schema。兼容處理的原則是:

  1. 有相同名字的字段必須要有相同的數據類型,忽略nullability。兼容處理的字段應該保持parquet側的數據類型,這樣就可以處理到nullability類型了。

  2. 兼容處理的schema應直接包含在hive元數據里的schema信息:

    1. 任何僅僅出現在parquet schema的字段將會被刪除

    2. 任何僅僅出現在hive 元數據里的字段將會被視為nullable。

元數據刷新

Spark SQL為了更好的性能會緩存parquet的元數據。當spark 讀取hive表的時候,schema一旦從hive轉化為spark sql的,就會被spark sql緩存,如果此時表的schema被hive或者其他外部工具更新,必須要手動的去刷新元數據,才能保證元數據的一致性。

spark.catalog.refreshTable("my_table")

配置

parquet的相關的參數可以通過setconf或者set key=value的形式配置。

  • spark.sql.parquet.binaryAsString 默認值是false。一些parquet生產系統,尤其是impala,hive和老版本的spark sql,不區分binary和string類型。該參數告訴spark 講binary數據當作字符串處理。

  • spark.sql.parquet.int96AsTimestamp 默認是true。有些parquet生產系統,尤其是parquet和hive,將timestamp翻譯成INT96.該參數會提示Spark SQL講INT96翻譯成timestamp。

  • spark.sql.parquet.compression.codec 默認是snappy。當寫parquet文件的時候設置壓縮格式。如果在option或者properties里配置了compression或者parquet.compression優先級依次是:compression,parquet.compression,spark.sql.parquet.compression.codec。支持的配置類型有:none,uncompressed,snappy,gzip,lzo,brotli,lz4,zstd。在hadoop2.9.0之前,zstd需要安裝ZstandardCodec,brotli需要安裝BrotliCodec。

  • spark.sql.parquet.filterPushdown 默認是true。設置為true代表開啟parquet下推執行優化。

  • spark.sql.hive.convertMetastoreParquet 默認是true。假如設置為false,spark sql會讀取hive parquet表的時候使用Hive SerDe,替代內置的。

  • spark.sql.parquet.mergeSchema 默認是false。當設置為true的時候,parquet數據源會合并讀取所有的parquet文件的schema,否則會從summary文件或者假如沒有summary文件的話隨機的選一些數據文件來合并schema。

  • spark.sql.parquet.writeLegacyFormat 默認是false。如果設置為true 數據會以spark 1.4和更早的版本的格式寫入。比如,decimal類型的值會被以apache parquet的fixed-length byte array格式寫出,該格式是其他系統例如hive,impala等使用的。如果是false,會使用parquet的新版格式。例如,decimals會以int-based格式寫出。如果spark sql要以parquet輸出并且結果會被不支持新格式的其他系統使用的話,需要設置為true。

看完上述內容,你們對如何分析Spark SQL中的Parquet有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

黄平县| 灵川县| 贡觉县| 荣昌县| 准格尔旗| 香河县| 奎屯市| 凤山县| 察隅县| 自贡市| 岱山县| 台中市| 天门市| 昂仁县| 万载县| 曲周县| 无棣县| 隆安县| 通海县| 阿克苏市| 溧水县| 齐齐哈尔市| 特克斯县| 桦川县| 澜沧| 彭水| 卓尼县| 仁寿县| 龙南县| 嘉禾县| 泗水县| 中牟县| 喀喇沁旗| 安溪县| 台东县| 潼关县| 荣昌县| 大同市| 博罗县| 偏关县| 南丹县|