您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關Spark2.x中共享變量的累加器是怎樣的,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
為什么要定義累加器?
在 Spark 應用程序中,我們經常會有這樣的需求,如要需要統計符合某種特性數據的總數,這種需求都需要用到計數器。如果一個變量不被聲明為一個累加器,那么它將在被改變時不會在 driver 端進行全局匯總,即在分布式運行時每個 task 運行的只是原始變量的 一個副本,并不能改變原始變量的值,但是當這個變量被聲明為累加器后,該變量就會有分布式計數的功能。
定義了一個累加器sum,而不是普通變量,實例實例代碼如下:
package com.hadoop.ljs.spark220.studyimport org.apache.spark.{SparkConf, SparkContext}/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-20 19:36 * @version: v1.0 * @description: com.hadoop.ljs.spark220.study */object AccumlatorTest { def main(args: Array[String]): Unit = { val sparkConf=new SparkConf().setMaster("local[*]").setAppName("AccumlatorTest") val sc=new SparkContext(sparkConf) /*定義一個共享變量:累加器*/ val sum=sc.accumulator(0) /*輸入數據*/ val rdd1=sc.parallelize(List(1,2,3,4,5)) /*求和 ,然后各個元素加1*/ val rdd2=rdd1.map(x=>{ sum+=x x }) /*這里是個action操作 沒有這個操作,程序不會執行*/ rdd2.collect() println("求和:"+sum) sc.stop() }}
運行結果如下,sum=15,符合我們的期望值:
結合上面的代碼說一下累加器的執行過程:
1).Accumulator需要在Driver進行定義和并初始化,并進行注冊,同時Accumulator首先需要在Driver進行序列化,然后發送到Executor端;另外,Driver接收到Task任務完成的狀態更新后,會去更新Value的值,然后在Action操作執行后就可以獲取到Accumulator的值了。
2).Executor接收到Task之后會進行反序列化操作,反序列化得到RDD和function,同時在反序列化的同時也去反序列化Accumulator,同時也會向TaskContext完成注冊,完成任務計算之后,隨著Task結果一起返回給Driver端進行處理。
這里有執行過程圖可以參考下:
累加器特性:
1.累加器也是也具有懶加載屬性,只有在action操作執行時,才會強制觸發計算求值;
2.累加器的值只可以在Driver端定義初始化,在Executor端更新,不能在Executor端進行定義初始化,不能在Executor端通過[.value]獲取值,任何工作節點上的Task都不能訪問累加器的值;
3.閉包里的執行器代碼可以使用累加器的 += 方法(在Java中是 add )增加累加器的值。
特別提醒:
累加器在Driver端定義賦初始值,累加器只能在Driver端讀取最后的值,在Excutor端更新。
上述就是小編為大家分享的Spark2.x中共享變量的累加器是怎樣的了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。