您好,登錄后才能下訂單哦!
本篇內容介紹了“Spark SQL配置及使用的方法是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
XY個人記
SparkSQL是spark的一個模塊,主入口是SparkSession,將SQL查詢與Spark程序無縫混合。DataFrames和SQL提供了訪問各種數據源(通過JDBC或ODBC連接)的常用方法包括Hive,Avro,Parquet,ORC,JSON和JDBC。您甚至可以跨這些來源加入數據。以相同方式連接到任何數據源。Spark SQL還支持HiveQL語法以及Hive SerDes和UDF,允許您訪問現有的Hive倉庫。
Spark SQL包括基于成本的優化器,列式存儲和代碼生成,以快速進行查詢。同時,它使用Spark引擎擴展到數千個節點和多小時查詢,該引擎提供完整的中間查詢容錯。不要擔心使用不同的引擎來獲取歷史數據。
Spark2.0之前
入口:SQLContext和HiveContext
SQLContext:主要DataFrame的構建以及DataFrame的執行,SQLContext指的是spark中SQL模塊的程序入口
HiveContext:是SQLContext的子類,專門用于與Hive的集成,比如讀取Hive的元數據,數據存儲到Hive表、Hive的窗口分析函數等
Spark2.0之后
入口:SparkSession(spark應用程序的一個整體入口),合并了SQLContext和HiveContext
SparkSQL核心抽象:DataFrame/Dataset type DataFrame = Dataset[Row] //type 給某個數據類型起個別名
SparkSQL除了支持直接的HQL語句的查詢外,還支持通過DSL語句/API進行數 據的操作,主要DataFrame API列表如下:
select:類似于HQL語句中的select,獲取需要的字段信息
where/filter:類似HQL語句中的where語句,根據給定條件過濾數據
sort/orderBy: 全局數據排序功能,類似Hive中的order by語句,按照給定字段進行全部 數據的排序
sortWithinPartitions:類似Hive的sort by語句,按照分區進行數據排序
groupBy:數據聚合操作
limit:獲取前N條數據記錄
集成步驟:
-1. namenode和datanode啟動
-2. 將hive配置文件軟連接或者復制到spark的conf目錄下面
$ ln -s /opt/modules/apache/hive-1.2.1/conf/hive-site.xml or $ cp /opt/modules/apache/hive-1.2.1/conf/hive-site.xml ./
-3. 根據hive-site.xml中不同配置項,采用不同策略操作
根據hive.metastore.uris參數
-a. 當hive.metastore.uris參數為空的時候(默認值)
將Hive元數據庫的驅動jar文件添加spark的classpath環境變量中即可完成SparkSQL到hive的集成
-b. 當hive.metastore.uris非空時候
-1. 啟動hive的metastore服務
./bin/hive --service metastore &
-2. 完成SparkSQL與Hive集成工作
-4.啟動spark-SQL($ bin/spark-sql)時候 發現報錯:
java.lang.ClassNotFoundException: org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.util.Utils$.classForName(Utils.scala:228)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:693)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Failed to load main class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.
You need to build Spark with -Phive and -Phive-thriftserver.
解決辦法:將spark源碼中sql/hive-thriftserver/target/spark-hive-thriftserver_2.11-2.0.2.jar拷貝到spark的jars目錄下
完成。(查看數據庫 spark-sql (default)> show databases; ,它操作的都是Hive)
編寫兩個簡單的SQL
spark-sql (default)> select * from emp;
也可以做兩張變的jion
spark-sql (default)> select a.*,b.* from emp a left join dept b on a.deptno = b.deptno;
可以對表進行一個緩存操作3
> cache table emp; #緩存操作 > uncache table dept; #清除緩存操作 > explain select * from emp; #執行計劃
我們可以看到相應的Storage信息,執行完清除緩存操作后下面的Stages操作消失
啟動一個Spark Shell,可以直接在shell里面編寫SQL語句
$ bin/spark-shell #可以在shell里面寫sql scala> spark.sql("show databases").show scala> spark.sql("use common").show scala> spark.sql("select * from emp a join dept b on a.deptno = b.deptno").show
用一個變量名稱接收DataFrame
比如使用registerTempTable注冊一個臨時表。注:臨時表是所有數據庫公有的不需要指定數據庫
scala> df.registerTempTable("table_regis01")
在我們的4040頁面Environment節點下的Classpath Entries節點里可以看到我們服務所依賴的jar包。http://hadoop01.com:4040/environment/
1.直接添加驅動jar到${SPARK_HOME}/jars
2. 使用參數--jars 添加本地jar包
./bin/spark-shell --jars jars/mysql-connector-java-5.1.27-bin.jar,/opt/modules/hive-1.2.1/lib/servlet-api-2.5.jar
添加多個本地jar的話,用逗號隔開
./bin/spark-shell --jars jars/mysql-connector-java-5.1.27-bin.jar,/opt/modules/hive-1.2.1/lib/*
注意:不能使用*去添加jar包,如果想要添加多個依賴jar,只能一個一個去添加
3. 使用參數--packages添加maven中的第三方jar文件
. bin/spark-shell --packages mysql:mysql-connector-java:5.1.28
可以使用逗號隔開給定多個,格式(groupId:artifactId:version)
(底層執行原理先從maven中央庫下載本地沒有的第三方jar文件到本地,jar文件會先下載到本地的/home/ijeffrey/.ivy2/jars目錄下,最后通過spark.jars來控制添加classpath中)
--exclude-packages 去掉不需要的包
--repositories maven源,指定URL連接
4. 使用SPARK_CLASSPATH環境變量給定jar文件路徑
編輯spark-env.sh文件
SPARK_CLASSPATH=/opt/modules/apache/spark-2.0.2/external_jars/* 外部jar的路徑
5. 將第三方jar文件打包到最終的jar文件中
在IDEA中添加依賴jar到最終的需要運行的spark應用的jar中
ThriftServer底層就是Hive的HiveServer2服務,下面是客戶端連接Hive Server2 方法的相關連接
https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBC
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics #hiveserver2的配置
https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2
配置:
-1. ThriftServer服務運行的Spark環境必須完成SparkSQL和Hive的集成
-2. hive-site.xml中配置hiveserver2服務的相關參數
<!-- 監聽的端口號 --> <property> <name>hive.server2.thrift.bind.port</name> <value>10000</value> </property> <!-- 監聽的主機名 --> <property> <name>hive.server2.thrift.bind.host</name> <value>hadoop01.com</value> </property>
-3. 啟動hive的元數據服務
$ ./bin/hive --service metastore &
-4. 啟動spark的thriftserver服務,也是一個SparkSubmit服務
$ sbin/start-thriftserver.sh
也可以看到相應的WEBUI界面,比之前的多了一個JDBC/ODBC Server
注意:如果需要啟動Spark ThriftServer 服務,需要關閉hiveserver2 服務
-1. 查看進程是否存在
jps -ml | grep HiveThriftServer2
-2. 查看WEB界面是否正常
有JDBC/ODBC Server這個選項就是正常的
-3. 通過spark自帶的beeline命令
./bin/beeline
-4. 通過jdbc來訪問spark的ThriftServer接口
$ bin/beeline #啟動beeline #可以使用!help查看相應的命令 beeline> !help #如connect beeline> !connect Usage: connect <url> <username> <password> [driver] #這樣可以多個用戶連接 beeline> !connect jdbc:hive2://hadoop01.com:10000 #退出 beeline> !quit
連接成功,在4040 頁面也可以看到我們連接的hive
注:如果報錯
No known driver to handle "jdbc:hive2://hadoop01.com:10000"
說明缺少了hive的驅動jar,在我們編譯好的源碼中hive-jdbc-1.2.1.spark2.jar 找到并copy到spark的jars中
向我們java連接mysql一樣,我們使用scala來連接ThriftServer
package com.jeffrey import java.sql.DriverManager object SparkJDBCThriftServerDemo { def main(args: Array[String]): Unit = { //1 添加驅動 val driver = "org.apache.hive.jdbc.HiveDriver" Class.forName(driver) //2 構建連接對象 val url = "jdbc:hive2://hadoop01.com:10000" val conn = DriverManager.getConnection(url,"ijeffrey","123456") //3 sql 語句執行 conn.prepareStatement("use common").execute() var pstmt = conn.prepareStatement("select empno,ename,sal from emp") var rs = pstmt.executeQuery() while (rs.next()){ println(s"empno = ${rs.getInt("empno")} " + s"ename=${rs.getString("ename")} " + s" sal=${rs.getDouble("sal")}") } println("---------------------------------------------------------------------------") pstmt = conn.prepareStatement("select empno,ename,sal from emp where sal > ? and ename = ?") pstmt.setDouble(1,3000) pstmt.setString(2,"KING") rs = pstmt.executeQuery() while (rs.next()){ println(s"empno = ${rs.getInt("empno")} " + s"ename=${rs.getString("ename")} " + s" sal=${rs.getDouble("sal")}") } rs.close() pstmt.close() conn.close() } }
執行結果:
1. 將案例數據上傳到HDFS上
樣例數據在${SPARK_HOME}/examples/src/main/resources/*
2. 編寫SparkSQL程序
啟動一個spark-shell進行編寫
scala> val path = "/spark/data/people.json" scala> val df = spark.read.json(path) scala> df.registerTempTable("tmp04") //通過DataFrame注冊一個臨時表 scala> spark.sql("show tables").show //通過SQL語句進行操作 scala> spark.sql("select * from tmp04").show #saveAsTable 使用之前 先要use table scala> spark.sql("select * from tmp04").write.saveAsTable("test01") #overwrite 覆蓋 append 拼接 ignore 忽略 scala> spark.sql("select * from tmp01").write.mode("overwrite").saveAsTable("test01") scala> spark.sql("select * from tmp01").write.mode("append").saveAsTable("test01") scala> spark.sql("select * from tmp01").write.mode("ignore").saveAsTable("test01")
saveAsTable("test01")默認保存到一張不存在的表中(test01不是臨時表),如果表存在的話就會報錯
SaveMode四種情況:
Append:拼接
Overwrite: 重寫
ErrorIfExists:如果表已經存在,則報錯,默認就是這一種,存在即報錯
Ignore:如果表已經存在了,則忽略這一步操作
除了spark.read.json的方式去讀取數據外,還可以使用spark.sql的方式直接讀取數據
scala> spark.sql("select * from json.`/spark/data/people.json` where age is not null").show +---+------+ |age| name| +---+------+ | 30| Andy| | 19|Justin| +---+------+ # hdfs上的路徑使用`(反票號)引起來
在IDEA中集成Hive的話,需要將hive-site.xml文件放到resources目錄下面
package com.jeffrey.sql import java.util.Properties import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} object HiveJoinMySQLDemo { def main(args: Array[String]): Unit = { System.setProperty("hadoop.home.dir","D:\\hadoop-2.7.3") // 1.構建SparkSession val warehouseLocation = "/user/hive/warehouse" val spark = SparkSession .builder() .master("local") //如果放到集群運行需要注釋掉 .appName("RDD 2 DataFrame") .config("spark.sql.warehouse.dir",warehouseLocation) .enableHiveSupport() .getOrCreate() import spark.implicits._ import spark.sql val url = "jdbc:mysql://hadoop01.com:3306/test" val table = "tb_dept" val props = new Properties() props.put("user","root") props.put("password","123456") // 1.Hive表數據導入到MySQL中 在shell中可以使用paste寫多行 spark.read.table("common.dept") .write .mode(SaveMode.Overwrite) .jdbc(url,table,props) // 2.Hive和MySQL的join操作 //2.1 讀取MySQL的數據 val df: DataFrame = spark .read .jdbc(url,table,props) df.createOrReplaceTempView("tmp_tb_dept") //2.1 數據聚合 spark.sql( """ |select a.*,b.dname,b.loc |from common.emp a |join tmp_tb_dept b on a.deptno = b.deptno """.stripMargin).createOrReplaceTempView("tmp_emp_join_dept_result") spark.sql("select * from tmp_emp_join_dept_result").show() // 對表進行緩存的方法 spark.read.table("tmp_emp_join_dept_result").cache() spark.catalog.cacheTable("tmp_emp_join_dept_result") //輸出到HDFS上 // 方法一 /*spark .read .table("tmp_emp_join_dept_result") .write.parquet("/spark/sql/hive_join_mysql")*/ // 方法二 spark .read .table("tmp_emp_join_dept_result") .write .format("parquet") .save(s"hdfs://hadoop01.com:8020/spark/sql/hive_join_mysql/${System.currentTimeMillis()}") //輸出到Hive中,并且是parquet格式 按照deptno分區 spark .read .table("tmp_emp_join_dept_result") .write .format("parquet") .partitionBy("deptno") .mode(SaveMode.Overwrite) .saveAsTable("hive_emp_dept") println("------------------------------------------------------------") spark.sql("show tables").show() //清空緩存 spark.catalog.uncacheTable("tmp_emp_join_dept_result") } }
可以打成jar文件放在集群上執行
bin/spark-submit \ --class com.jeffrey.sql.HiveJoinMySQLDemo \ --master yarn \ --deploy-mode client \ /opt/datas/jar/hivejoinmysql.jar bin/spark-submit \ --class com.jeffrey.sql.HiveJoinMySQLDemo \ --master yarn \ --deploy-mode cluster \ /opt/datas/logAnalyze.jar
以上即使Spark SQL的基本使用。
HIve支持的函數,SparkSQL基本都是支持的,SparkSQL支持兩種自定義函數,分別是:UDF和UDAF,兩種函數都是通過SparkSession的udf屬性進行函數的注冊使用的;SparkSQL不支持UDTF函數的 自定義使用。
☆ UDF:一條數據輸入,一條數據輸出,一對一的函數,即普通函數
☆ UDAF:多條數據輸入,一條數據輸出,多對一的函數,即聚合函數
“Spark SQL配置及使用的方法是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。