您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關MaxCompute Spark開發的示例分析,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
MaxCompute Spark是MaxCompute提供的兼容開源的Spark計算服務,它在統一的計算資源和數據集權限體系之上,提供Spark計算框架,支持用戶以熟悉的開發使用方式提交運行Spark作業,以滿足更豐富的數據處理分析場景。
下面將重點介紹MaxCompute Spark能夠支撐的應用場景,同時說明開發的依賴條件和環境準備,重點對Spark作業開發、提交到MaxCompute集群執行、診斷進行介紹。
MaxCompute Spark是阿里云提供的Spark on MaxCompute的解決方案,能夠讓Spark應用運行在托管的MaxCompute計算環境中。為了能夠在MaxCompute環境中安全地運行Spark作業,MaxCompute提供了以下SDK和MaxCompute Spark定制發布包。
SDK定位于開源應用接入MaxCompute SDK:
提供了集成所需的API說明以及相關功能Demo,用戶可以基于項目提供的Spark-1.x以及Spark-2.x的example項目構建自己的應用,并且提交到MaxCompute集群上。
MaxCompute Spark客戶端發布包:
集成了MaxCompute認證功功能,作為客戶端工具,用于通過Spark-submit方式提交作業到MaxCompute項目中運行,目前提供了面向Spark1.x和Spark2.x的2個發布包:spark-1.6.3和spark-2.3.0 SDK在開發時,可以通過配置Maven依賴進行引用。Spark客戶端需要根據開發的Spark版本,提前下載。如,需要開發Spark1.x應用,應下載spark-1.6.3版本客戶端;如需開發Spark2.x應用,應下載spark-2.3.0客戶端。
MaxCompute Spark發布包:集成了MaxCompute認證功功能,作為客戶端工具,用于通過Spark-submit方式提交作業到MaxCompute項目中運行,目前提供了面向Spark1.x和Spark2.x的2個發布包:
spark-1.6.3
spark-2.3.0
請根據需要開發的Spark版本,選擇合適的版本下載并解壓Maxcompute Spark發布包。
JAVA_HOME設置
# 盡量使用JDK 1.7+ 1.8+ 最佳
export JAVA_HOME=/path/to/jdk
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
SPARK_HOME設置
export SPARK_HOME=/path/to/spark_extracted_package
export PATH=$SPARK_HOME/bin:$PATH
在 $SPARK_HOME/conf
路徑下存在spark-defaults.conf.template文件,這個可以作為spark-defaults.conf的模版,需要在該文件中設置MaxCompute相關的賬號信息后,才可以提交Spark任務到MaxCompute。默認配置內容如下,將空白部分根據實際的賬號信息填上即可,其余的配置可以保持不變。
# MaxCompute賬號信息
spark.hadoop.odps.project.name =
spark.hadoop.odps.access.id =
spark.hadoop.odps.access.key =
# 以下配置保持不變
spark.sql.catalogImplementation=odps
spark.hadoop.odps.task.major.version = cupid_v2
spark.hadoop.odps.cupid.container.image.enable = true
spark.hadoop.odps.cupid.container.vm.engine.type = hyper
spark.hadoop.odps.end.point = http://service.cn.maxcompute.aliyun.com/api
spark.hadoop.odps.runtime.end.point = http://service.cn.maxcompute.aliyun-inc.com/api
若作業需要訪問MaxCompute表,需要依賴odps-spark-datasource模塊,本節介紹如何把該依賴編譯安裝到本地maven倉庫;若無需訪問可直接跳過。
git clone代碼,github地址: https://github.com/aliyun/aliyun-cupid-sdk/tree/3.3.2-public
#git clone git@github.com:aliyun/aliyun-cupid-sdk.git
編譯模塊
#cd ${path to aliyun-cupid-sdk}
#git checkout 3.3.2-public
// 編譯并安裝cupid-sdk
#cd ${path to aliyun-cupid-sdk}/core/cupid-sdk/
#mvn clean install -DskipTests
// 編譯并安裝datasource。依賴cupid-sdk
// for spark-2.x
# cd ${path to aliyun-cupid-sdk}/spark/spark-2.x/datasource
# mvn clean install -DskipTests
// for spark-1.x
# cd ${path to aliyun-cupid-sdk}/spark/spark-1.x/datasource
#mvn clean install -DskipTests
添加依賴
<!-- Spark-1.x請依賴此模塊 -->
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_2.10</artifactId>
<version>3.3.2-public</version>
</dependency>
<!-- Spark-2.x請依賴此模塊 -->
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_2.11</artifactId>
<version>3.3.2-public</version>
</dependency>
若作業需要訪問OSS,直接添加以下依賴即可
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>hadoop-fs-oss</artifactId>
<version>3.3.2-public</version>
</dependency>
MaxCompute產品提供了兩個應用構建的模版,用戶可以基于此模版進行開發,最后統一構建整個項目后用生成的應用包即可直接提交到MaxCompute集群上運行Spark應用。
MaxCompute Spark提供兩個應用構建模版,用戶可以基于此模版進行開發,最后統一構建整個項目后用生成的應用包即可直接提交到MaxCompute集群上運行Spark應用。首先需要把代碼clone下來
#git clone git@github.com:aliyun/aliyun-cupid-sdk.git
#cd aliyun-cupid-sdk
#checkout 3.3.2-public
#cd archetypes
// for Spark-1.x
sh Create-AliSpark-1.x-APP.sh spark-1.x-demo /tmp
// for Spark-2.x
Create-AliSpark-2.x-APP.sh spark-2.x-demo /tmp
以上命令會在/tmp目錄下創建名為 spark-1.x-demo(spark-2.x-demo)的maven project,執行以下命令進行編譯和提交作業:
#cd /tmp/spark-2.x/demo
#mvn clean package
// 提交作業
$SPARK_HOME/bin/spark-submit \
--master yarn-cluster \
--class SparkPi \
/tmp/spark-2.x-demo/target/AliSpark-2.x-quickstart-1.0-SNAPSHOT-shaded.jar
# Usage: sh Create-AliSpark-2.x-APP.sh <app_name> <target_path>
sh Create-AliSpark-2.x-APP.sh spark-2.x-demo /tmp/
cd /tmp/spark-2.x-demo
mvn clean package
# 冒煙測試
# 1 利用編譯出來的shaded jar包
# 2 按照文檔所示下載MaxCompute Spark客戶端
# 3 參考文檔”置環境變量”指引,填寫MaxCompute項目相關配置項
# 執行spark-submit命令 如下
$SPARK_HOME/bin/spark-submit \
--master yarn-cluster \
--class SparkPi \
/tmp/spark-2.x-demo/target/AliSpark-2.x-quickstart-1.0-SNAPSHOT-shaded.jar
pom.xml 須知
請注意 用戶構建Spark應用的時候,由于是用MaxCompute提供的Spark客戶端去提交應用,故需要注意一些依賴scope的定義
spark-core spark-sql等所有spark社區發布的包,用provided scope
odps-spark-datasource 用默認的compile scope
<!-- spark相關依賴, provided -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- datasource依賴, 用于訪問MaxCompute表 -->
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
<version>3.3.2-public</version>
</dependency>
案例說明
WordCount
詳細代碼
提交方式
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.WordCount \
${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar
Spark-SQL on MaxCompute Table
詳細代碼
提交方式
# 運行可能會報Table Not Found的異常,因為用戶的MaxCompute Project中沒有代碼中指定的表
# 可以參考代碼中的各種接口,實現對應Table的SparkSQL應用
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.sparksql.SparkSQL \
${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar
GraphX PageRank
詳細代碼
提交方式
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.graphx.PageRank \
${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar
Mllib Kmeans-ON-OSS
詳細代碼
提交方式
# 代碼中的OSS賬號信息相關需要填上,再編譯提交
conf.set("spark.hadoop.fs.oss.accessKeyId", "***")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "***")
conf.set("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \
${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar
OSS UnstructuredData
詳細代碼
提交方式
# 代碼中的OSS賬號信息相關需要填上,再編譯提交
conf.set("spark.hadoop.fs.oss.accessKeyId", "***")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "***")
conf.set("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \
${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar
pom.xml 須知
請注意 用戶構建Spark應用的時候,由于是用MaxCompute提供的Spark客戶端去提交應用,故需要注意一些依賴scope的定義
spark-core spark-sql等所有spark社區發布的包,用provided scope
odps-spark-datasource 用默認的compile scope
<!-- spark相關依賴, provided -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>cupid-sdk</artifactId>
<scope>provided</scope>
</dependency>
<!-- datasource依賴, 用于訪問MaxCompute表 -->
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
<version>3.3.2-public</version>
</dependency>
案例說明
WordCount
詳細代碼
提交方式
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.WordCount \
${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar
Spark-SQL 操作MaxCompute表
詳細代碼
提交方式
# 運行可能會報Table Not Found的異常,因為用戶的MaxCompute Project中沒有代碼中指定的表
# 可以參考代碼中的各種接口,實現對應Table的SparkSQL應用
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.sparksql.SparkSQL \
${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar
GraphX PageRank
詳細代碼
提交方式
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.graphx.PageRank \
${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar
Mllib Kmeans-ON-OSS
KmeansModelSaveToOss
詳細代碼
提交方式
# 代碼中的OSS賬號信息相關需要填上,再編譯提交
val spark = SparkSession
.builder()
.config("spark.hadoop.fs.oss.accessKeyId", "***")
.config("spark.hadoop.fs.oss.accessKeySecret", "***")
.config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
.appName("KmeansModelSaveToOss")
.getOrCreate()
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \
${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar
OSS UnstructuredData
SparkUnstructuredDataCompute
詳細代碼
提交方式
# 代碼中的OSS賬號信息相關需要填上,再編譯提交
val spark = SparkSession
.builder()
.config("spark.hadoop.fs.oss.accessKeyId", "***")
.config("spark.hadoop.fs.oss.accessKeySecret", "***")
.config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
.appName("SparkUnstructuredDataCompute")
.getOrCreate()
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \
${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar
需要文件
若需要訪問MaxCompute表,則需要參考第三節(訪問MaxCompute表所需依賴)編譯datasource包
SparkSQL應用示例(spark1.6)
from pyspark import SparkContext, SparkConf
from pyspark.sql import OdpsContext
if __name__ == '__main__':
conf = SparkConf().setAppName("odps_pyspark")
sc = SparkContext(conf=conf)
sql_context = OdpsContext(sc)
df = sql_context.sql("select id, value from cupid_wordcount")
df.printSchema()
df.show(200)
df_2 = sql_context.sql("select id, value from cupid_partition_table1 where pt1 = 'part1'")
df_2.show(200)
#Create Drop Table
sql_context.sql("create table TestCtas as select * from cupid_wordcount").show()
sql_context.sql("drop table TestCtas").show()
提交運行:
./bin/spark-submit \
--jars ${path to odps-spark-datasource_2.10-3.3.2-public.jar} \
example.py
SparkSQL應用示例(spark2.3)
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession.builder.appName("spark sql").getOrCreate()
df = spark.sql("select id, value from cupid_wordcount")
df.printSchema()
df.show(10, 200)
df_2 = spark.sql("SELECT product,category,revenue FROM (SELECT product,category,revenue, dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank FROM productRevenue) tmp WHERE rank <= 2");
df_2.printSchema()
df_2.show(10, 200)
df_3 = spark.sql("select id, value from cupid_partition_table1 where pt1 = 'part1'")
df_3.show(10, 200)
#Create Drop Table
spark.sql("create table TestCtas as select * from cupid_wordcount").show()
spark.sql("drop table TestCtas").show()
提交運行:
spark-submit --master yarn-cluster \
--jars ${path to odps-spark-datasource_2.11-3.3.2-public.jar \
example.py
對于用戶使用Spark on MaxCompute對VPC環境內的RDS、Redis、ECS主機部署的服務等,受限于VPC的訪問限制,暫時還無法訪問,即將在近期支持。
case1. 作業無需訪問MaxCompute表和OSS
用戶jar包可直接運行,參照第二節準備開發環境和修改配置。注意,對于spark或hadoop的依賴必須設成provided。
case2. 作業需要訪問MaxCompute表
參考第三節編譯datasource并安裝到本地maven倉庫,在pom中添加依賴后重新打包即可。
case3. 作業需要訪問OSS
參考第四節在pom中添加依賴后重新打包即可。
目前MaxCompute Spark支持以下幾種運行方式:local模式,cluster模式,和在DataWorks中執行模式。
local模式主要是讓用戶能夠方便的調試應用代碼,使用方式跟社區相同,我們添加了用tunnel讀寫ODPS表的功能。用戶可以在ide和命令行中使用該模式,需要添加配置spark.master=local[N],其中N表示執行該模式所需要的cpu資源。此外,local模式下的讀寫表是通過讀寫tunnel完成的,需要在Spark-defaults.conf中增加tunnel配置項(請根據MaxCompute項目所在的region及網絡環境填寫對應的Tunnel Endpoint地址):tunnel_end_point=http://dt.cn-beijing.maxcompute.aliyun.com。命令行執行該模式的方式如下:
1.bin/spark-submit --master local[4] \
--class com.aliyun.odps.spark.examples.SparkPi \
${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar
在Cluster模式中,用戶需要指定自定義程序入口Main,Main結束(Success or Fail)spark job就會結束。使用場景適合于離線作業,可以與阿里云DataWorks產品結合進行作業調度。命令行提交方式如下:
1.bin/spark-submit --master yarn-cluster \
–class SparkPi \
${ProjectRoot}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar
用戶可以在DataWorks中運行MaxCompute Spark離線作業(cluster模式),以方便與其他類型執行節點集成和調度。
第二步:在創建的業務流程中,從數據開發組件中選擇ODPS Spark節點。
spark.hadoop.odps.cupid.webproxy.endpoint
(取值填寫項目所在region的endpoint,如http://service.cn.maxcompute.aliyun-inc.com/api)、spark.hadoop.odps.moye.trackurl.host(取值填寫:http://jobview.odps.aliyun.com)提交作業后,需要根據作業日志來檢查作業是否正常提交并執行,MaxCompute對于Spark作業提供了Logview工具以及Spark Web-UI來幫助開發者進行作業診斷。
例如,通過Spark-submit方式(dataworks執行spark任務時也會產生相應日志)提交作業,在作業日志中會打印以下關鍵內容:
通過日志輸出的logview在瀏覽器中可以查看CUPID類型的任務執行的基本信息。
單擊TempRoot的StdOut按鈕可以查看SparkPi的輸出結果:
日志中打印出上述的TrackingUrl,表示您的作業已經提交到MaxCompute集群,這個TrackingUrl非常關鍵,它既是SparkWebUI,也是HistoryServer的Url。在瀏覽器中打開這個Url,可以追蹤Spark作業的運行情況。
單擊driver的stdout即可以查看Spark作業的輸出內容。
上述就是小編為大家分享的MaxCompute Spark開發的示例分析了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。