您好,登錄后才能下訂單哦!
大數據開發中Spar的Stage,Executor,Driver該如何理解,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
對于Spark新手來說,首先對于Spark的運行機制不了解,往往跟你交流的時候,互相都不知道在說什么,比如部署模式和運行模式,可能都混為一談,對于有一定開發經驗的老手,即使知道運行機制,可能在表述上,對Spark的各種術語也不是很懂,因此理解Spark術語,是Spark開發者之間溝通的必要之路,下面從Spark的運行機制開始,到WordCount案例來理解Spark中的各種術語。
首先拿官網的一張圖,來說明,其是分布式集群上spark應用程序的一般執行框架。主要由sparkcontext(spark上下文)、cluster manager(資源管理器)和?executor(單個節點的執行進程)。其中cluster manager負責整個集群的統一資源管理。executor是應用執行的主要進程,內部含有多個task線程以及內存空間。
Spark的主要運行流程如下:
應用程序在使用spark-submit提交后,根據提交時的參數設置(deploy mode)在相應位置初始化sparkcontext,即spark的運行環境,并創建DAG Scheduler和Task Scheduer,Driver根據應用程序執行代碼,將整個程序根據action算子劃分成多個job,每個job內部構建DAG圖,DAG Scheduler將DAG圖劃分為多個stage,同時每個stage內部劃分為多個task,DAG Scheduler將taskset傳給Task Scheduer,Task Scheduer負責集群上task的調度。至于stage和task的關系以及是如何劃分的我們后面再詳細講。
Driver根據sparkcontext中的資源需求向resource manager申請資源,包括executor數及內存資源。
資源管理器收到請求后在滿足條件的work node節點上創建executor進程。
Executor創建完成后會向driver反向注冊,以便driver可以分配task給他執行。
當程序執行完后,driver向resource manager注銷所申請的資源。
從運行機制上,我們來繼續解釋下面的名詞術語,
driver就是我們編寫的spark應用程序,用來創建sparkcontext或者sparksession,driver會和cluster mananer通信,并分配task到executor上執行
負責整個程序的資源調度,目前的主要調度器有:
YARN
Spark Standalone
Mesos
Executors其實是一個獨立的JVM進程,在每個工作節點上會起一個,主要用來執行task,一個executor內,可以同時并行的執行多個task。
Job是用戶程序一個完整的處理流程,是邏輯的叫法。
一個Job可以包含多個Stage,Stage之間是串行的,State的觸發是由一些shuffle,reduceBy,save動作產生的
一個Stage可以包含多個task,比如sc.textFile("/xxxx").map().filter(),其中map和filter就分別是一個task。每個task的輸出就是下一個task的輸出。
partition是spark里面數據源的一部分,一個完整的數據源會被spark切分成多個partition以方便spark可以發送到多個executor上去并行執行任務。
RDD是分布式彈性數據集,在spark里面一個數據源就可以看成是一個大的RDD,RDD由多個partition組成,spark加載的數據就會被存在RDD里面,當然在RDD內部其實是切成多個partition了。
那么問題來了一個spark job是如何執行的?
(1)我們寫好的spark程序,也稱驅動程序,會向Cluster Manager提交一個job
(2)Cluster Manager會檢查數據本地行并尋找一個最合適的節點來調度任務
(3)job會被拆分成不同stage,每個stage又會被拆分成多個task
(4)驅動程序發送task到executor上執行任務
(5)驅動程序會跟蹤每個task的執行情況,并更新到master node節點上,這一點我們可以在spark master UI上進行查看
(6)job完成,所有節點的數據會被最終再次聚合到master節點上,包含了平均耗時,最大耗時,中位數等等指標。
部署模式 就是說的,Cluster Manager,一般有Standalone, Yarn ,而運行模式說的是Drvier的運行機器,是集群還是提交任務的機器,分別對應Cluster和Client模式,區別在于運行結果,日志,穩定性等。
Job:Job是由Action觸發的,因此一個Job包含一個Action和N個Transform操作;
Stage:Stage是由于shuffle操作而進行劃分的Task集合,Stage的劃分是根據其寬窄依賴關系;
Task:最小執行單元,因為每個Task只是負責一個分區的數據
處理,因此一般有多少個分區就有多少個Task,這一類的Task其實是在不同的分區上執行一樣的動作;
下面是一段WordCount程序
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("yarn").setAppName("WordCount") val sc = new SparkContext(conf) val lines1: RDD[String] = sc.textFile("data/spark/wc.txt") val lines2: RDD[String] = sc.textFile("data/spark/wc2.txt") val j1 = lines1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) val j2 = lines2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) j1.join(j2).collect() sc.stop() } }
Yarn模式在生產環境用的較多,因此從Yarn的部署模式來看,代碼上只有一個action操作collect,所以只有一個Job, Job又由于Shuffle的原因被劃分為3個stage, 分別是flatMap 和 map 和 reduceBykey 算一個Stage0, 另外的line2又算一個,Stage1, 而Stage3 是前面兩個結果join,然后collect, 且stage3依賴于 stage1 和 stage0, 但stage0 和 stage1 是并行的,在實際的生產環境下,要去看依賴stage的依賴圖,可以明顯看到依賴的關系。
關于大數據開發中Spar的Stage,Executor,Driver該如何理解問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。