中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark 系列(十)—— Spark SQL 外部數據源

發布時間:2020-07-28 16:34:35 來源:網絡 閱讀:687 作者:heibaiying 欄目:大數據

一、簡介

1.1 多數據源支持

Spark 支持以下六個核心數據源,同時 Spark 社區還提供了多達上百種數據源的讀取方式,能夠滿足絕大部分使用場景。

  • CSV
  • JSON
  • Parquet
  • ORC
  • JDBC/ODBC connections
  • Plain-text files

注:以下所有測試文件均可從本倉庫的resources 目錄進行下載

1.2 讀數據格式

所有讀取 API 遵循以下調用格式:

// 格式
DataFrameReader.format(...).option("key", "value").schema(...).load()

// 示例
spark.read.format("csv")
.option("mode", "FAILFAST")          // 讀取模式
.option("inferSchema", "true")       // 是否自動推斷 schema
.option("path", "path/to/file(s)")   // 文件路徑
.schema(someSchema)                  // 使用預定義的 schema      
.load()

讀取模式有以下三種可選項:

讀模式 描述
permissive 當遇到損壞的記錄時,將其所有字段設置為 null,并將所有損壞的記錄放在名為 _corruption t_record 的字符串列中
dropMalformed 刪除格式不正確的行
failFast 遇到格式不正確的數據時立即失敗

1.3 寫數據格式

// 格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

//示例
dataframe.write.format("csv")
.option("mode", "OVERWRITE")         //寫模式
.option("dateFormat", "yyyy-MM-dd")  //日期格式
.option("path", "path/to/file(s)")
.save()

寫數據模式有以下四種可選項:

Scala/Java 描述
SaveMode.ErrorIfExists 如果給定的路徑已經存在文件,則拋出異常,這是寫數據默認的模式
SaveMode.Append 數據以追加的方式寫入
SaveMode.Overwrite 數據以覆蓋的方式寫入
SaveMode.Ignore 如果給定的路徑已經存在文件,則不做任何操作

<br/>

二、CSV

CSV 是一種常見的文本文件格式,其中每一行表示一條記錄,記錄中的每個字段用逗號分隔。

2.1 讀取CSV文件

自動推斷類型讀取讀取示例:

spark.read.format("csv")
.option("header", "false")        // 文件中的第一行是否為列的名稱
.option("mode", "FAILFAST")      // 是否快速失敗
.option("inferSchema", "true")   // 是否自動推斷 schema
.load("/usr/file/csv/dept.csv")
.show()

使用預定義類型:

import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
//預定義數據格式
val myManualSchema = new StructType(Array(
    StructField("deptno", LongType, nullable = false),
    StructField("dname", StringType,nullable = true),
    StructField("loc", StringType,nullable = true)
))
spark.read.format("csv")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()

2.2 寫入CSV文件

df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")

也可以指定具體的分隔符:

df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")

2.3 可選配置

為節省主文篇幅,所有讀寫配置項見文末 9.1 小節。

<br/>

三、JSON

3.1 讀取JSON文件

spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)

需要注意的是:默認不支持一條數據記錄跨越多行 (如下),可以通過配置 multiLinetrue 來進行更改,其默認值為 false

// 默認支持單行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}

//默認不支持多行
{
  "DEPTNO": 10,
  "DNAME": "ACCOUNTING",
  "LOC": "NEW YORK"
}

3.2 寫入JSON文件

df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")

3.3 可選配置

為節省主文篇幅,所有讀寫配置項見文末 9.2 小節。

<br/>

四、Parquet

Parquet 是一個開源的面向列的數據存儲,它提供了多種存儲優化,允許讀取單獨的列非整個文件,這不僅節省了存儲空間而且提升了讀取效率,它是 Spark 是默認的文件格式。

4.1 讀取Parquet文件

spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)

2.2 寫入Parquet文件

df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")

2.3 可選配置

Parquet 文件有著自己的存儲規則,因此其可選配置項比較少,常用的有如下兩個:

讀寫操作 配置項 可選值 默認值 描述
Write compression or codec None,<br/>uncompressed,<br/>bzip2,<br/>deflate, gzip,<br/>lz4, or snappy None 壓縮文件格式
Read mergeSchema true, false 取決于配置項 spark.sql.parquet.mergeSchema 當為真時,Parquet 數據源將所有數據文件收集的 Schema 合并在一起,否則將從摘要文件中選擇 Schema,如果沒有可用的摘要文件,則從隨機數據文件中選擇 Schema。

更多可選配置可以參閱官方文檔:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

<br/>

五、ORC

ORC 是一種自描述的、類型感知的列文件格式,它針對大型數據的讀寫進行了優化,也是大數據中常用的文件格式。

5.1 讀取ORC文件

spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)

4.2 寫入ORC文件

csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")

<br/>

六、SQL Databases

Spark 同樣支持與傳統的關系型數據庫進行數據讀寫。但是 Spark 程序默認是沒有提供數據庫驅動的,所以在使用前需要將對應的數據庫驅動上傳到安裝目錄下的 jars 目錄中。下面示例使用的是 Mysql 數據庫,使用前需要將對應的 mysql-connector-java-x.x.x.jar 上傳到 jars 目錄下。

6.1 讀取數據

讀取全表數據示例如下,這里的 help_keyword 是 mysql 內置的字典表,只有 help_keyword_idname 兩個字段。

spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")            //驅動
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")   //數據庫地址
.option("dbtable", "help_keyword")                    //表名
.option("user", "root").option("password","root").load().show(10)

從查詢結果讀取數據:

val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root").option("password", "root")
.option("dbtable", pushDownQuery)
.load().show()

//輸出
+---------------+-----------+
|help_keyword_id|       name|
+---------------+-----------+
|              0|         <>|
|              1|     ACTION|
|              2|        ADD|
|              3|AES_DECRYPT|
|              4|AES_ENCRYPT|
|              5|      AFTER|
|              6|    AGAINST|
|              7|  AGGREGATE|
|              8|  ALGORITHM|
|              9|        ALL|
|             10|      ALTER|
|             11|    ANALYSE|
|             12|    ANALYZE|
|             13|        AND|
|             14|    ARCHIVE|
|             15|       AREA|
|             16|         AS|
|             17|   ASBINARY|
|             18|        ASC|
|             19|     ASTEXT|
+---------------+-----------+

也可以使用如下的寫法進行數據的過濾:

val props = new java.util.Properties
props.setProperty("driver", "com.mysql.jdbc.Driver")
props.setProperty("user", "root")
props.setProperty("password", "root")
val predicates = Array("help_keyword_id < 10  OR name = 'WHEN'")   //指定數據過濾條件
spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql", "help_keyword", predicates, props).show() 

//輸出:
+---------------+-----------+
|help_keyword_id|       name|
+---------------+-----------+
|              0|         <>|
|              1|     ACTION|
|              2|        ADD|
|              3|AES_DECRYPT|
|              4|AES_ENCRYPT|
|              5|      AFTER|
|              6|    AGAINST|
|              7|  AGGREGATE|
|              8|  ALGORITHM|
|              9|        ALL|
|            604|       WHEN|
+---------------+-----------+

可以使用 numPartitions 指定讀取數據的并行度:

option("numPartitions", 10)

在這里,除了可以指定分區外,還可以設置上界和下界,任何小于下界的值都會被分配在第一個分區中,任何大于上界的值都會被分配在最后一個分區中。

val colName = "help_keyword_id"   //用于判斷上下界的列
val lowerBound = 300L    //下界
val upperBound = 500L    //上界
val numPartitions = 10   //分區綜述
val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword",
                             colName,lowerBound,upperBound,numPartitions,props)

想要驗證分區內容,可以使用 mapPartitionsWithIndex 這個算子,代碼如下:

jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => {
    val buffer = new ListBuffer[String]
    while (iterator.hasNext) {
        buffer.append(index + "分區:" + iterator.next())
    }
    buffer.toIterator
}).foreach(println)

執行結果如下:help_keyword 這張表只有 600 條左右的數據,本來數據應該均勻分布在 10 個分區,但是 0 分區里面卻有 319 條數據,這是因為設置了下限,所有小于 300 的數據都會被限制在第一個分區,即 0 分區。同理所有大于 500 的數據被分配在 9 分區,即最后一個分區。

Spark 系列(十)—— Spark SQL 外部數據源

6.2 寫入數據

val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("user", "root").option("password", "root")
.option("dbtable", "emp")
.save()

<br/>

七、Text

Text 文件在讀寫性能方面并沒有任何優勢,且不能表達明確的數據結構,所以其使用的比較少,讀寫操作如下:

7.1 讀取Text數據

spark.read.textFile("/usr/file/txt/dept.txt").show()

7.2 寫入Text數據

df.write.text("/tmp/spark/txt/dept")

<br/>

八、數據讀寫高級特性

8.1 并行讀

多個 Executors 不能同時讀取同一個文件,但它們可以同時讀取不同的文件。這意味著當您從一個包含多個文件的文件夾中讀取數據時,這些文件中的每一個都將成為 DataFrame 中的一個分區,并由可用的 Executors 并行讀取。

8.2 并行寫

寫入的文件或數據的數量取決于寫入數據時 DataFrame 擁有的分區數量。默認情況下,每個數據分區寫一個文件。

8.3 分區寫入

分區和分桶這兩個概念和 Hive 中分區表和分桶表是一致的。都是將數據按照一定規則進行拆分存儲。需要注意的是 partitionBy 指定的分區和 RDD 中分區不是一個概念:這里的分區表現為輸出目錄的子目錄,數據分別存儲在對應的子目錄中。

val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")

輸出結果如下:可以看到輸出被按照部門編號分為三個子目錄,子目錄中才是對應的輸出文件。

Spark 系列(十)—— Spark SQL 外部數據源

8.3 分桶寫入

分桶寫入就是將數據按照指定的列和桶數進行散列,目前分桶寫入只支持保存為表,實際上這就是 Hive 的分桶表。

val numberBuckets = 10
val columnToBucketBy = "empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")

8.5 文件大小管理

如果寫入產生小文件數量過多,這時會產生大量的元數據開銷。Spark 和 HDFS 一樣,都不能很好的處理這個問題,這被稱為“small file problem”。同時數據文件也不能過大,否則在查詢時會有不必要的性能開銷,因此要把文件大小控制在一個合理的范圍內。

在上文我們已經介紹過可以通過分區數量來控制生成文件的數量,從而間接控制文件大小。Spark 2.2 引入了一種新的方法,以更自動化的方式控制文件大小,這就是 maxRecordsPerFile 參數,它允許你通過控制寫入文件的記錄數來控制文件大小。

 // Spark 將確保文件最多包含 5000 條記錄
df.write.option(“maxRecordsPerFile”, 5000)

<br>

九、可選配置附錄

9.1 CSV讀寫可選配置

讀\寫操作 配置項 可選值 默認值 描述
Both seq 任意字符 ,(逗號) 分隔符
Both header true, false false 文件中的第一行是否為列的名稱。
Read escape 任意字符 \ 轉義字符
Read inferSchema true, false false 是否自動推斷列類型
Read ignoreLeadingWhiteSpace true, false false 是否跳過值前面的空格
Both ignoreTrailingWhiteSpace true, false false 是否跳過值后面的空格
Both nullValue 任意字符 “” 聲明文件中哪個字符表示空值
Both nanValue 任意字符 NaN 聲明哪個值表示 NaN 或者缺省值
Both positiveInf 任意字符 Inf 正無窮
Both negativeInf 任意字符 -Inf 負無窮
Both compression or codec None,<br/>uncompressed,<br/>bzip2, deflate,<br/>gzip, lz4, or<br/>snappy none 文件壓縮格式
Both dateFormat 任何能轉換為 Java 的 <br/>SimpleDataFormat 的字符串 yyyy-MM-dd 日期格式
Both timestampFormat 任何能轉換為 Java 的 <br/>SimpleDataFormat 的字符串 yyyy-MMdd’T’HH:mm:ss.SSSZZ 時間戳格式
Read maxColumns 任意整數 20480 聲明文件中的最大列數
Read maxCharsPerColumn 任意整數 1000000 聲明一個列中的最大字符數。
Read escapeQuotes true, false true 是否應該轉義行中的引號。
Read maxMalformedLogPerPartition 任意整數 10 聲明每個分區中最多允許多少條格式錯誤的數據,超過這個值后格式錯誤的數據將不會被讀取
Write quoteAll true, false false 指定是否應該將所有值都括在引號中,而不只是轉義具有引號字符的值。
Read multiLine true, false false 是否允許每條完整記錄跨域多行

9.2 JSON讀寫可選配置

讀\寫操作 配置項 可選值 默認值
Both compression or codec None,<br/>uncompressed,<br/>bzip2, deflate,<br/>gzip, lz4, or<br/>snappy none
Both dateFormat 任何能轉換為 Java 的 SimpleDataFormat 的字符串 yyyy-MM-dd
Both timestampFormat 任何能轉換為 Java 的 SimpleDataFormat 的字符串 yyyy-MMdd’T’HH:mm:ss.SSSZZ
Read primitiveAsString true, false false
Read allowComments true, false false
Read allowUnquotedFieldNames true, false false
Read allowSingleQuotes true, false true
Read allowNumericLeadingZeros true, false false
Read allowBackslashEscapingAnyCharacter true, false false
Read columnNameOfCorruptRecord true, false Value of spark.sql.column&NameOf
Read multiLine true, false false

9.3 數據庫讀寫可選配置

屬性名稱 含義
url 數據庫地址
dbtable 表名稱
driver 數據庫驅動
partitionColumn,<br/>lowerBound, upperBoun 分區總數,上界,下界
numPartitions 可用于表讀寫并行性的最大分區數。如果要寫的分區數量超過這個限制,那么可以調用 coalesce(numpartition) 重置分區數。
fetchsize 每次往返要獲取多少行數據。此選項僅適用于讀取數據。
batchsize 每次往返插入多少行數據,這個選項只適用于寫入數據。默認值是 1000。
isolationLevel 事務隔離級別:可以是 NONE,READ_COMMITTED, READ_UNCOMMITTED,REPEATABLE_READ 或 SERIALIZABLE,即標準事務隔離級別。<br/>默認值是 READ_UNCOMMITTED。這個選項只適用于數據讀取。
createTableOptions 寫入數據時自定義創建表的相關配置
createTableColumnTypes 寫入數據時自定義創建列的列類型

數據庫讀寫更多配置可以參閱官方文檔:https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

參考資料

  1. Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02
  2. https://spark.apache.org/docs/latest/sql-data-sources.html

更多大數據系列文章可以參見 GitHub 開源項目大數據入門指南

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

达孜县| 建瓯市| 微山县| 彰化市| 长顺县| 夏津县| 北安市| 龙胜| 定边县| 林口县| 宝兴县| 邢台市| 平乐县| 阜城县| 鄂托克旗| 远安县| 宽城| 应用必备| 绥化市| 闽侯县| 梨树县| 台州市| 同江市| 青阳县| 开鲁县| 喀什市| 余庆县| 东莞市| 湖北省| 太保市| 慈利县| 临漳县| 仁化县| 彰武县| 南华县| 海阳市| 图木舒克市| 云安县| 清水县| 连云港市| 灵山县|