中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark廣播變量分析以及如何動態更新廣播變量

發布時間:2021-12-17 09:47:17 來源:億速云 閱讀:620 作者:柒染 欄目:大數據

本篇文章給大家分享的是有關Spark廣播變量分析以及如何動態更新廣播變量,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

今天主要介紹一下基于Spark2.4版本的廣播變量。先前的版本比如Spark2.1之前的廣播變量有兩種實現:HttpBroadcast和TorrentBroadcast,但是鑒于HttpBroadcast有各種弊端,目前已經舍棄這種實現,小編主要闡述TorrentBroadcast
廣播變量概述
廣播變量是一個只讀變量,通過它我們可以將一些共享數據集或者大變量緩存在Spark集群中的各個機器上而不用每個task都需要copy一個副本,后續計算可以重復使用,減少了數據傳輸時網絡帶寬的使用,提高效率。相比于Hadoop的分布式緩存,廣播的內容可以跨作業共享。
廣播變量要求廣播的數據不可變、不能太大但也不能太小(一般幾十M以上)、可被序列化和反序列化、并且必須在driver端聲明廣播變量,適用于廣播多個stage公用的數據,存儲級別目前是MEMORY_AND_DISK。

廣播變量存儲目前基于Spark實現的BlockManager分布式存儲系統,Spark中的shuffle數據、加載HDFS數據時切分過來的block塊都存儲在BlockManager中,不是今天的討論點,這里先不做詳述了。

廣播變量的創建方式和獲取

//創建廣播變量
val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))

//獲取廣播變量
broadcastVar.value
 廣播變量實例化過程

1.首先調用val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))

2.調用BroadcastManager的newBroadcast方法
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)

3.通過廣播工廠的newBroadcast方法進行創建

broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())

在調用BroadcastManager的newBroadcast方法時已完成對廣播工廠的初始化(initialize方法),我們只需看BroadcastFactory的實現TorrentBroadcastFactory中對TorrentBroadcast的實例化過程:

new TorrentBroadcast[T](value_, id)
4.在構建TorrentBroadcast時,將廣播的數據寫入BlockManager
1)首先會將廣播變量序列化后的對象劃分為多個block塊,存儲在driver端的BlockManager,這樣運行在driver端的task就不用創建廣播變量的副本了(具體可以查看TorrentBroadcast的writeBlocks方法) 
2)每個executor在獲取廣播變量時首先從本地的BlockManager獲取。獲取不到就會從driver或者其他的executor上獲取,獲取之后,會將獲取到的數據保存在自己的BlockManager中
3)塊的大小默認4M
conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024

廣播變量初始化過程

1.首先調用broadcastVar.value
2.TorrentBroadcast中lazy變量_value進行初始化,調用readBroadcastBlock() 
3.先從緩存中讀取,對結果進行模式匹配,匹配成功的直接返回
4.讀取不到通過readBlocks()進行讀取  

從driver端或者其他的executor中讀取,將讀取的對象存儲到本地,并存于緩存中

new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)

Spark兩種廣播變量對比

正如【前言】中所說,HttpBroadcast在Spark后續的版本中已經被廢棄,但考慮到部分公司用的Spark版本較低,面試中仍有可能問到兩種實現的相關問題,這里簡單介紹一下:
HttpBroadcast會在driver端的BlockManager里面存儲廣播變量對象,并且將該廣播變量序列化寫入文件中去。  所有獲取廣播數據請求都在driver端,所以存在單點故障和網絡IO性能問題。
TorrentBroadcast會在driver端的BlockManager里面存儲廣播變量對象,并將廣播對象分割成若干序列化block塊(默認4M),存儲于BlockManager。小的block存儲位置信息,存儲于Driver端的BlockManagerMaster。數據請求并非集中于driver端,避免了單點故障和driver端網絡磁盤IO過高。

TorrentBroadcast在executor端存儲一個對象的同時會將獲取的block存儲于BlockManager,并向driver端的BlockManager匯報block的存儲信息。

請求數據的時候會先獲取block的所有存儲位置信息,并且是隨機的在所有存儲了該executor的BlockManager去獲取,避免了數據請求服務集中于一點。

總之就是HttpBroadcast導致獲取廣播變量的請求集中于driver端,容易引起driver端單點故障,網絡IO過高影響性能等問題,而TorrentBroadcast獲取廣播變量的請求服務即可以請求到driver端也可以在executor,避免了上述問題,當然這只是主要的優化點。

動態更新廣播變量
通過上面的介紹,大家都知道廣播變量是只讀的,那么在Spark流式處理中如何進行動態更新廣播變量?

既然無法更新,那么只能動態生成,應用場景有實時風控中根據業務情況調整規則庫、實時日志ETL服務中獲取最新的日志格式以及字段變更等。

@volatile private var instance: Broadcast[Array[Int]] = null

//獲取廣播變量單例對象
def getInstance(sc: SparkContext, ctime: Long): Broadcast[Array[Int]] = {
 if (instance == null) {
   synchronized {
     if (instance == null) {
       instance = sc.broadcast(fetchLastestData())
     }
   }
 }
 instance
}

//加載要廣播的數據,并更新廣播變量
def updateBroadCastVar(sc: SparkContext, blocking: Boolean = false): Unit = {
 if (instance != null) {
   //刪除緩存在executors上的廣播副本,并可選擇是否在刪除完成后進行block等待
   //底層可選擇是否將driver端的廣播副本也刪除
   instance.unpersist(blocking)
   
   instance = sc.broadcast(fetchLastestData())
 }
}

def fetchLastestData() = {
 //動態獲取需要更新的數據
 //這里是偽代碼
 Array(1, 2, 3)
}
val dataFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

...
...

stream.foreachRDD { rdd =>
 val current_time = dataFormat.format(new Date())
 val new_time = current_time.substring(14, 16).toLong
 //每10分鐘更新一次
 if (new_time % 10 == 0) {
   updateBroadCastVar(rdd.sparkContext, true)
 }

 rdd.foreachPartition { records =>
   instance.value
   ...
 }
}
注意:上述是給出了一個實現思路的偽代碼,實際生產中還需要進行一定的優化。
此外,這種方式有一定的弊端,就是廣播的數據因為是周期性更新,所以存在一定的滯后性。廣播的周期不能太短,要考慮外部存儲要廣播數據的存儲系統的壓力。具體的還要看具體的業務場景,如果對實時性要求不是特別高的話,可以采取這種,當然也可以參考Flink是如何實現動態廣播的。  

Spark流式程序中為何使用單例模式

1.廣播變量是只讀的,使用單例模式可以減少Spark流式程序中每次job生成執行,頻繁創建廣播變量帶來的開銷

2.廣播變量單例模式也需要做同步處理。在FIFO調度模式下,基本不會發生并發問題。但是如果你改變了調度模式,如采用公平調度模式,同時設置Spark流式程序并行執行的job數大于1,如設置參數spark.streaming.concurrentJobs=4,則必須加上同步代碼

3.在多個輸出流共享廣播變量的情況下,同時配置了公平調度模式,也會產生并發問題。建議在foreachRDD或者transform中使用局部變量進行廣播,避免在公平調度模式下不同job之間產生影響。

除了廣播變量,累加器也是一樣。在Spark流式組件如Spark Streaming底層,每個輸出流都會產生一個job,形成一個job集合提交到線程池里并發執行。

以上就是Spark廣播變量分析以及如何動態更新廣播變量,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

凌海市| 迁西县| 白沙| 东丰县| 汤原县| 奉化市| 运城市| 曲水县| 德化县| 苗栗县| 闽侯县| 定结县| 塔河县| 台前县| 民勤县| 怀来县| 盐边县| 高雄县| 于田县| 乌拉特后旗| 益阳市| 南阳市| 揭东县| 信宜市| 岳阳县| 磐石市| 皮山县| 白朗县| 佛山市| 抚州市| 伊金霍洛旗| 霍林郭勒市| 科尔| 安化县| 南宫市| 哈巴河县| 渝北区| 舒城县| 葵青区| 长兴县| 建水县|