您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關spark sql是如何變成執行計劃的的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
Spark SQL模塊,主要就是處理跟SQL解析相關的一些內容,說得更通俗點就是怎么把一個SQL語句解析成Dataframe或者說RDD的任務。以Spark 2.4.3為例,Spark SQL這個大模塊分為三個子模塊,如下圖所示
其中Catalyst可以說是Spark內部專門用來解析SQL的一個框架,在Hive中類似的框架是Calcite(將SQL解析成MapReduce任務)。Catalyst將SQL解析任務分成好幾個階段
而Core模塊其實就是Spark SQL主要解析的流程,當然這個過程中會去調用Catalyst的一些內容。這模塊里面比較常用的類包括SparkSession,DataSet等。
主要流程大概可以分為以下幾步:
Parser:Sql語句經過Antlr4解析,生成Unresolved Logical Plan
Analysis:analyzer與catalog進行綁定(catlog存儲元數據),生成Logical Plan;
Logical Optimizations:optimizer對Logical Plan優化,生成Optimized LogicalPlan;
Physical Planning:前面的 logical plan 不能被 Spark 執行,而這個過程是把 logical plan 轉換成多個 physical plans,然后利用代價模型(cost model)選擇最佳的 physical plan;
prepareForExecution()將 Physical Plan 轉換成 executed Physical Plan;
Code Generation:這個過程會把 SQL 查詢生成 Java 字節碼。
execute()執行可執行物理計劃,得到RDD;
-- t1 id,value,cid,did 1,1,1,1 10,1,1,2 -- t2 id,value,cid,did 10,1,1,1 10,1,1,1 SELECT sum(v) FROM ( SELECT t1.id, 1 + 2 + t1.value AS v FROM t1 JOIN t2 WHERE t1.id = t2.id AND t1.cid = 1 AND t1.did = t1.cid + 1 AND t2.id > 5 ) iteblog
調用詞法分析器 SqlBaseLexer.java 和語法分析器SqlBaseParser.java構建語法樹。生成語法樹之后,使用 AstBuilder 將語法樹轉換成 LogicalPlan,這個 LogicalPlan 也被稱為 Unresolved LogicalPlan。解析后的邏輯計劃如下:
== Parsed Logical Plan == 'Project [unresolvedalias('sum('v), None)] +- 'SubqueryAlias iteblog +- 'Project ['t1.id, ((1 + 2) + 't1.value) AS v#16] +- 'Filter ((('t1.id = 't2.id) AND ('t1.cid = 1)) AND (('t1.did = ('t1.cid + 1)) AND ('t2.id > 5))) +- 'Join Inner :- 'UnresolvedRelation [t1] +- 'UnresolvedRelation [t2]
Unresolved LogicalPlan是按照sql直接翻譯過來的,可以對照著SQL從下往上看的,t1 和 t2 兩張表被生成了 UnresolvedRelation。
在 SQL 解析階段生成了 Unresolved LogicalPlan,從上圖可以看出邏輯算子樹中包含了 UnresolvedRelation 和 unresolvedalias 等對象。Unresolved LogicalPlan 僅僅是一種數據結構,不包含任何數據信息,比如不知道數據源、數據類型,不同的列來自于哪張表等。Analyzer 階段會使用事先定義好的 Rule 以及 SessionCatalog 等信息對 Unresolved LogicalPlan 進行 transform。SessionCatalog 主要用于各種函數資源信息和元數據信息(數據庫、數據表、數據視圖、數據分區與函數等)的統一管理。而Rule 是定義在 Analyzer 里面的,如下具體如下:
lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, new ResolveHints.ResolveBroadcastHints(conf), ResolveHints.ResolveCoalesceHints, ResolveHints.RemoveAllHints), Batch("Simple Sanity Check", Once, LookupFunctions), Batch("Substitution", fixedPoint, CTESubstitution, WindowsSubstitution, EliminateUnions, new SubstituteUnresolvedOrdinals(conf)), Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: //解析表的函數 ResolveRelations :: //解析表或視圖 ResolveReferences :: //解析列 ResolveCreateNamedStruct :: ResolveDeserializer :: //解析反序列化操作類 ResolveNewInstance :: ResolveUpCast :: //解析類型轉換 ResolveGroupingAnalytics :: ResolvePivot :: ResolveOrdinalInOrderByAndGroupBy :: ResolveAggAliasInGroupBy :: ResolveMissingReferences :: ExtractGenerator :: ResolveGenerate :: ResolveFunctions :: //解析函數 ResolveAliases :: //解析表別名 ResolveSubquery :: //解析子查詢 ResolveSubqueryColumnAliases :: ResolveWindowOrder :: ResolveWindowFrame :: ResolveNaturalAndUsingJoin :: ResolveOutputRelation :: ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: TimeWindowing :: ResolveInlineTables(conf) :: ResolveHigherOrderFunctions(catalog) :: ResolveLambdaVariables(conf) :: ResolveTimeZone(conf) :: ResolveRandomSeed :: TypeCoercion.typeCoercionRules(conf) ++ extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*), Batch("View", Once, AliasViewChild(conf)), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, HandleNullInputsForUDF), Batch("FixNullability", Once, FixNullability), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, CleanupAliases) )
補充表的信息,比如字段、類型,綁定select、where各種字段和表的關系。綁定之后:
== Analyzed Logical Plan == sum(v): bigint Aggregate [sum(cast(v#16 as bigint)) AS sum(v)#22L] +- SubqueryAlias iteblog +- Project [id#0, ((1 + 2) + value#1) AS v#16] +- Filter (((id#0 = id#8) AND (cid#2 = 1)) AND ((did#3 = (cid#2 + 1)) AND (id#8 > 5))) +- Join Inner :- SubqueryAlias t1 : +- Relation[id#0,value#1,cid#2,did#3] parquet +- SubqueryAlias t2 +- Relation[id#8,value#9,cid#10,did#11] parquet
表的字段信息補全,文件來自parquet
跟之后的join、filter等等的字段做綁定
sum被解析成 Aggregate 函數
對 Unresolved LogicalPlan 進行相關 transform 操作得到了 Analyzed Logical Plan,這個 Analyzed Logical Plan 是可以直接轉換成 Physical Plan 然后在 Spark 中執行。但是如果直接這么弄的話,得到的 Physical Plan 很可能不是最優的,因為在實際應用中,很多低效的寫法會帶來執行效率的問題,需要進一步對Analyzed Logical Plan 進行處理,得到更優的邏輯算子樹。于是, 針對 SQL 邏輯算子樹的優化器 Optimizer 應運而生。
這個階段的優化器主要是基于規則的(Rule-based Optimizer,簡稱 RBO),而絕大部分的規則都是啟發式規則,也就是基于直觀或經驗而得出的規則,比如列裁剪(過濾掉查詢不需要使用到的列)、謂詞下推(將過濾盡可能地下沉到數據源端)、常量累加(比如 1 + 2 這種事先計算好) 以及常量替換(比如 SELECT * FROM table WHERE i = 5 AND j = i + 3 可以轉換成 SELECT * FROM table WHERE i = 5 AND j = 8)等等。
與前文介紹綁定邏輯計劃階段類似,這個階段所有的規則也是實現 Rule 抽象類,多個規則組成一個 Batch,多個 Batch 組成一個 batches,同樣也是在 RuleExecutor 中進行執行,由于前文已經介紹了 Rule 的執行過程,本節就不再贅述。
那么針對前文的 SQL 語句,這個過程都會執行哪些優化呢?這里按照 Rule 執行順序一一進行說明。
謂詞下推在 Spark SQL 是由 PushDownPredicate 實現的,這個過程主要將過濾條件盡可能地下推到底層,最好是數據源。所以針對我們上面介紹的 SQL,使用謂詞下推優化得到的邏輯計劃如下
從上圖可以看出,經過列裁剪后,t1 表只需要查詢 id 和 value 兩個字段;t2 表只需要查詢 id 字段。這樣減少了數據的傳輸,而且如果底層的文件格式為列存(比如 Parquet),可以大大提高數據的掃描速度的。
常量替換在 Spark SQL 是由 ConstantPropagation 實現的。也就是將變量替換成常量,比如 SELECT * FROM table WHERE i = 5 AND j = i + 3 可以轉換成 SELECT * FROM table WHERE i = 5 AND j = 8。這個看起來好像沒什么的,但是如果掃描的行數非常多可以減少很多的計算時間的開銷的。經過這個優化,得到的邏輯計劃如下:
優化后的邏輯計劃:
== Optimized Logical Plan == Aggregate [sum(cast(v#16 as bigint)) AS sum(v)#22L] +- Project [(3 + value#1) AS v#16] +- Join Inner, (id#0 = id#8) :- Project [id#0, value#1] : +- Filter (((((isnotnull(cid#2) AND isnotnull(did#3)) AND (cid#2 = 1)) AND (did#3 = 2)) AND (id#0 > 5)) AND isnotnull(id#0)) : +- Relation[id#0,value#1,cid#2,did#3] parquet +- Project [id#8] +- Filter (isnotnull(id#8) AND (id#8 > 5)) +- Relation[id#8,value#9,cid#10,did#11] parquet
到這里,優化邏輯計劃階段就算完成了。另外,Spark 內置提供了多達70個優化 Rule
前面介紹的邏輯計劃在 Spark 里面其實并不能被執行的,為了能夠執行這個 SQL,一定需要翻譯成物理計劃,到這個階段 Spark 就知道如何執行這個 SQL 了。和前面邏輯計劃綁定和優化不一樣,這里使用的是策略(Strategy),而且前面介紹的邏輯計劃綁定和優化經過 Transformations 動作之后,樹的類型并沒有改變,也就是說:Expression 經過 Transformations 之后得到的還是 Transformations ;Logical Plan 經過 Transformations 之后得到的還是 Logical Plan。而到了這個階段,經過 Transformations 動作之后,樹的類型改變了,由 Logical Plan 轉換成 Physical Plan 了。
一個邏輯計劃(Logical Plan)經過一系列的策略處理之后,得到多個物理計劃(Physical Plans),物理計劃在 Spark 是由 SparkPlan 實現的。多個物理計劃再經過代價模型(Cost Model)得到選擇后的物理計劃(Selected Physical Plan)。
Cost Model 對應的就是基于代價的優化(Cost-based Optimizations,CBO),核心思想是計算每個物理計劃的代價,然后得到最優的物理計劃。
== Physical Plan == *(3) HashAggregate(keys=[], functions=[sum(cast(v#16 as bigint))], output=[sum(v)#18L]) +- Exchange SinglePartition, true, [id=#70] +- *(2) HashAggregate(keys=[], functions=[partial_sum(cast(v#16 as bigint))], output=[sum#21L]) +- *(2) Project [(3 + value#1) AS v#16] +- *(2) BroadcastHashJoin [id#0], [id#8], Inner, BuildRight :- *(2) Project [id#0, value#1] : +- *(2) Filter (((((isnotnull(cid#2) AND isnotnull(did#3)) AND (cid#2 = 1)) AND (did#3 = 2)) AND (id#0 > 5)) AND isnotnull(id#0)) : +- *(2) ColumnarToRow : +- FileScan parquet [id#0,value#1,cid#2,did#3] Batched: true, DataFilters: [isnotnull(cid#2), isnotnull(did#3), (cid#2 = 1), (did#3 = 2), (id#0 > 5), isnotnull(id#0)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/bytedance/Documents/Own/project/practice/src/main/data/data1/t1.csv], PartitionFilters: [], PushedFilters: [IsNotNull(cid), IsNotNull(did), EqualTo(cid,1), EqualTo(did,2), GreaterThan(id,5), IsNotNull(id)], ReadSchema: struct<id:int,value:int,cid:int,did:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#64] +- *(1) Project [id#8] +- *(1) Filter (isnotnull(id#8) AND (id#8 > 5)) +- *(1) ColumnarToRow +- FileScan parquet [id#8] Batched: true, DataFilters: [isnotnull(id#8), (id#8 > 5)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/bytedance/Documents/Own/project/practice/src/main/data/data1/t2.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,5)], ReadSchema: struct<id:int>
Join inner變成了broadcastHashJoin
感謝各位的閱讀!關于“spark sql是如何變成執行計劃的”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。