中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark操作中的aggregate、aggregateByKey怎么理解

發布時間:2021-12-17 11:30:36 來源:億速云 閱讀:208 作者:柒染 欄目:編程語言

今天就跟大家聊聊有關Spark操作中的之aggregate、aggregateByKey怎么理解,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

1. aggregate函數

將每個分區里面的元素進行聚合,然后用combine函數將每個分區的結果和初始值(zeroValue)進行combine操作。這個函數最終返回的類型不需要和RDD中元素類型一致。

seqOp操作會聚合各分區中的元素,然后combOp操作把所有分區的聚合結果再次聚合,兩個操作的初始值都是zeroValue. seqOp的操作是遍歷分區中的所有元素(T),第一個T跟zeroValue做操作,結果再作為與第二個T做操作的zeroValue,直到遍歷完整個分區。combOp操作是把各分區聚合的結果,再聚合。aggregate函數返回一個跟RDD不同類型的值。因此,需要一個操作seqOp來把分區中的元素T合并成一個U,另外一個操作combOp把所有U聚合。

例子程序:

scala> val rdd = List(1,2,3,4,5,6,7,8,9)rdd: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> rdd.par.aggregate((0,0))((acc,number) => (acc._1 + number, acc._2 + 1),(par1,par2) => (par1._1 + par2._1, par1._2 + par2._2))res0: (Int, Int) = (45,9)scala> res0._1 / res0._2res1: Int = 5

過程大概這樣:

首先,初始值是(0,0),這個值在后面2步會用到。然后,(acc,number) => (acc._1 + number, acc._2 + 1),number即是函數定義中的T,這里即是List中的元素。所以acc._1 + number,acc._2 + 1的過程如下。

1. 0+1, 0+12. 1+2, 1+13. 3+3, 2+14. 6+4, 3+15. 10+5, 4+16. 15+6, 5+17. 21+7, 6+18. 28+8, 7+19. 36+9, 8+1

結果即是(45,9)。這里演示的是單線程計算過程,實際Spark執行中是分布式計算,可能會把List分成多個分區,假如3個,p1(1,2,3,4),p2(5,6,7,8),p3(9),經過計算各分區的的結果(10,4),(26,4),(9,1),這樣,執行(par1,par2) =>(par1._1 + par2._1, par1._2 + par2._2)就是(10+26+9,4+4+1)即(45,9),再求平均值就簡單了。

2. aggregateByKey函數:

對PairRDD中相同的Key值進行聚合操作,在聚合過程中同樣使用了一個中立的初始值。和aggregate函數類似,aggregateByKey返回值的類型不需要和RDD中value的類型一致。因為aggregateByKey是對相同Key中的值進行聚合操作,所以aggregateByKey'函數最終返回的類型還是PairRDD,對應的結果是Key和聚合后的值,而aggregate函數直接返回的是非RDD的結果。

例子程序:

import org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject AggregateByKeyOp { def main(args:Array[String]){   val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey").setMaster("local")  val sc: SparkContext = new SparkContext(sparkConf)      val data=List((1,3),(1,2),(1,4),(2,3))   val rdd=sc.parallelize(data, 2)      

//合并不同partition中的值,a,b得數據類型為zeroValue的數據類型   def combOp(a:String,b:String):String={    println("combOp: "+a+"\t"+b)    a+b   }   

//合并在同一個partition中的值,a的數據類型為zeroValue的數據類型,b的數據類型為原value的數據類型   def seqOp(a:String,b:Int):String={    println("SeqOp:"+a+"\t"+b)    a+b   }   rdd.foreach(println)   

//zeroValue:中立值,定義返回value的類型,并參與運算   

//seqOp:用來在同一個partition中合并值   

//combOp:用來在不同partiton中合并值   

val aggregateByKeyRDD=rdd.aggregateByKey("100")(seqOp, combOp)   sc.stop() }}

運行結果:

將數據拆分成兩個分區

//分區一數據(1,3)(1,2)//分區二數據(1,4)(2,3)

//分區一相同key的數據進行合并seq: 100 3 

//(1,3)開始和中立值進行合并 合并結果為 1003seq: 1003 2 //(1,2)再次合并 結果為 10032

//分區二相同key的數據進行合并seq: 100 4 

//(1,4) 開始和中立值進行合并 1004seq: 100 3 //(2,3) 開始和中立值進行合并 1003

將兩個分區的結果進行合并/

/key為2的,只在一個分區存在,不需要合并 (2,1003)(2,1003)

//key為1的, 在兩個分區存在,并且數據類型一致,合并comb: 10032 1004(1,100321004)

看完上述內容,你們對Spark操作中的之aggregate、aggregateByKey怎么理解有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

辽源市| 井研县| 玉溪市| 龙井市| 海口市| 浦北县| 桦川县| 文水县| 阜平县| 乃东县| 全南县| 山阴县| 昌宁县| 德清县| 平果县| 涿鹿县| 叙永县| 大港区| 镇坪县| 满洲里市| 凭祥市| 博客| 宝丰县| 清水县| 杭锦后旗| 扶沟县| 陕西省| 永登县| 乌拉特中旗| 武清区| 印江| 太仓市| 兴城市| 双城市| 祥云县| 兴文县| 淮北市| 大关县| 彭泽县| 离岛区| 商河县|