中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

大數據開發中Spark-拷問靈魂的問題有哪些

發布時間:2021-12-17 10:09:30 來源:億速云 閱讀:140 作者:柒染 欄目:大數據

大數據開發中Spark-拷問靈魂的問題有哪些,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

1.Spark計算依賴內存,如果目前只有10g內存,但是需要將500G的文件排序并輸出,需要如何操作?

 ①、把磁盤上的500G數據分割為100塊(chunks),每份5GB。(注意,要留一些系統空間!)

②、順序將每份5GB數據讀入內存,使用quick sort算法排序。 

③、把排序好的數據(也是5GB)存放回磁盤。 

④、循環100次,現在,所有的100個塊都已經各自排序了。(剩下的工作就是如何把它們合并排序!) 

⑤、從100個塊中分別讀取5G/100=0.05 G入內存(100input buffers)。 

⑥、執行100路合并,并將合并結果臨時存儲于5g基于內存的輸出緩沖區中。當緩沖區寫滿5GB時,寫入硬盤上最終文件,并清空輸出緩沖區;當100個輸入緩沖區中任何一個處理完畢時,寫入該緩沖區所對應的塊中的下一個0.05 GB,直到全部處理完成。

2.countByValue和countByKey的區別

首先從源碼角度來看:

// PairRDDFunctions.scala
def countByKey(): Map[K, Long] = self.withScope {
  self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}

// RDD.scala
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {
  map(value => (value, null)).countByKey()
}

countByValue(RDD.scala)

  • 作用在普通的RDD

  • 其實現過程調用了 countByKey

countByKey(PairRDDFunctions.scala)

  • 作用在 PairRDD 上

  • 對 key 進行計數

  • 數據要收到Driver端,結果集大時,不適用

問題:

  • countByKey 可以作用在 普通的RDD上嗎

  • countByValue 可以作用在 PairRDD 上嗎

val rdd1: RDD[Int] = sc.makeRDD(1 to 10)
val rdd2: RDD[(Int, Int)] = sc.makeRDD((1 to 10).toList.zipWithIndex)

val result1 = rdd1.countByValue() //可以
val result2 = rdd1.countByKey() //語法錯誤

val result3 = rdd2.countByValue() //可以
val result4 = rdd2.countByKey() //可以

3.兩個rdd join 什么時候有shuffle什么時候沒有shuffle

其中join操作是考驗所有數據庫性能的一項重要指標,對于Spark來說,考驗join的性能就是Shuffle,Shuffle 需要經過磁盤和網絡傳輸,Shuffle數據越少性能越好,有時候可以盡量避免程序進行Shuffle ,那么什么情況下有Shuffle ,什么情況下沒有Shuffle 呢

3.1 Broadcast join

broadcast join 比較好理解,除了自己實現外,Spark SQL 已經幫我們默認來實現了,其實就是小表分發到所有Executors,控制參數是:spark.sql.autoBroadcastJoinThreshold 默認大小是10m, 即小于這個閾值即自動使用broadcast join.

3.2 Bucket join

其實rdd方式和table類似,不同的是后者要寫入Bucket表,這里主要講rdd的方式,原理就是,當兩個rdd根據相同分區方式,預先做好分區,分區結果是一致的,這樣就可以進行Bucket join, 另外這種join沒有預先的算子,需要在寫程序時候自己來開發,對于表的這種join可以看一下 字節跳動在Spark SQL上的核心優化實踐 。可以看下下面的例子

rdd1、rdd2都是Pair RDD

rdd1、rdd2的數據完全相同

一定有shuffle

rdd1 => 5個分區

rdd2 => 6個分區

rdd1 => 5個分區 => (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0),(1, 0), || (2,0),(1, 0), (2,0)

rdd2 => 5個分區 => (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0),(1, 0), || (2,0),(1, 0), (2,0)

一定沒有shuffle

rdd1 => 5個分區 => (1,0), (1,0), (1,0), (1,0), (1,0), || (2,0), (2,0), (2,0), (2,0), (2,0), (2,0), (2,0) || 空 || 空 || 空

rdd2 => 5個分區 => (1,0), (1,0), (1,0), (1,0), (1,0), || (2,0), (2,0), (2,0), (2,0), (2,0), (2,0), (2,0) || 空 || 空 || 空

這樣所有Shuffle的算子,如果數據提前做好了分區(partitionBy),很多情況下沒有Shuffle.

除上面兩種方式外,一般就是有Shufflejoin, 關于spark的join原理可以查看:大數據開發-Spark Join原理詳解

4..transform 是不是一定不觸發action

有個算子例外,那就是sortByKey,其底層有個抽樣算法,水塘抽樣,最后需要根據抽樣的結果,進行RangePartition的,所以從job角度來說會看到兩個job,除了觸發action的本身算子之外,記住下面的

sortByKey → 水塘抽樣→ collect

5.廣播變量是怎么設計的

我們都知道,廣播變量是把數據放到每個excutor上,也都知道廣播變量的數據一定是從driver開始出去的,什么意思呢,如果廣播表放在hive表中,那么它的存儲就是在各個block塊上,也對應多個excutor (不一樣的叫法),首先將數據拉到driver上,然后再進行廣播,廣播時候不是全部廣播,是根據excutor預先用到數據的,首先拿數據,然后通過bt協議進行傳輸,什么是bt協議呢,就是數據在分布式點對點網絡上,根據網絡距離來去拉對應的數據,下載者也是上傳者,這樣就不同每個task (excutor)都從driver上來拉數據,這樣就減少了壓力,另外在spark1.幾的時候還是task級別,現在是共同的一個鎖,整個excutor上的task共享這份數據。

看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

项城市| 德阳市| 信丰县| 恭城| 磐石市| 连南| 峡江县| 视频| 柘城县| 康平县| 华阴市| 内乡县| 吉木萨尔县| 江山市| 东方市| 调兵山市| 东平县| 营山县| 斗六市| 穆棱市| 遂平县| 长沙市| 大竹县| 临武县| 青岛市| 内黄县| 合肥市| 富阳市| 呼伦贝尔市| 虞城县| 汾西县| 高陵县| 龙门县| 寿阳县| 察雅县| 闽清县| 康保县| 永城市| 观塘区| 怀宁县| 梧州市|