中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何分析Spark Streaming的好處與坑

發布時間:2021-12-17 10:50:41 來源:億速云 閱讀:321 作者:柒染 欄目:大數據

如何分析Spark Streaming的好處與坑,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

前言

說人話:其實就是講Spark Streaming 的好處與坑。好處主要從一些大的方面講,坑則是從實際場景中遇到的一些小細節描述。

玫瑰篇

玫瑰篇主要是說Spark Streaming的優勢點。

玫瑰之代碼復用

這主要得益于Spark的設計,以及平臺的全面性。你寫的流處理的代碼可以很方便的適用于Spark平臺上的批處理,交互式處理。因為他們本身都是基于RDD模型的,并且Spark  Streaming的設計者也做了比較好的封裝和兼容。所以我說RDD是個很強大的框,能把各種場景都給框住,這就是高度抽象和思考后的結果。

玫瑰之機器學習

如果你使用Spark MLlib 做模型訓練。恭喜你,首先是很多算法已經支持Spark Streaming,譬如k-means 就支持流式數據更新模型。  其次,你也可以在Spark Streaming中直接將離線計算好的模型load進來,然后對新進來的數據做實時的Predict操作。

玫瑰之SQL支持

Spark Streaming 里天然就可以使用 sql/dataframe/datasets  等。而且時間窗口的使用可以極大擴展這種使用場景,譬如各種系統預警等。類似Storm則需要額外的開發與支持。

玫瑰之吞吐和實時的有效控制

Spark Streaming 可以很好的控制實時的程度(小時,分鐘,秒)。極端情況可以設置到毫秒。

玫瑰之概述

Spark Streaming 可以很好的和Spark其他組件進行交互,獲取其支持。同時Spark 生態圈的快速發展,亦能從中受益。

刺篇

刺篇就是描述Spark Streaming 的一些問題,做選型前關注這些問題可以有效的降低使用風險。

checkpoint 之刺

checkpoint  是個很好的恢復機制。但是方案比較粗暴,直接通過序列化的機制寫入到文件系統,導致代碼變更和配置變更無法生效。實際場景是升級往往比系統崩潰的頻率高太多。但是升級需要能夠無縫的銜接上一次的偏移量。所以spark  streaming在無法容忍數據有丟失的情況下,你需要自己記錄偏移量,然后從上一次進行恢復。

我們目前是重寫了相關的代碼,每次記錄偏移量,不過只有在升級的時候才會讀取自己記錄的偏移量,其他情況都是依然采用checkpoint機制。

Kafka 之刺

這個和Spark Streaming相關,也不太相關。說相關是因為Spark 對很多異常處理比較簡單。很多是和Kafka配置相關的。我舉個例子:

如果消息體太大了,超過 fetch.message.max.bytes=1m ,那么Spark  Streaming會直接拋出OffsetOutOfRangeException異常,然后停止服務。

對應的錯誤會從這行代碼拋出:

if (!iter.hasNext) { assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) finished = true  null.asInstanceOf[R]  }

其實就是消費的完成后 實際的消費數據量和預先估計的量不一致。

你在日志中看到的信息其實是這個代碼答應出來的:

private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =

s"Ran out of messages before reaching ending offset ${part.untilOffset} "  +

s"for topic ${part.topic} partition ${part.partition} start  ${part.fromOffset}." +

" This should not happen, and indicates that messages may have been lost"

解決辦法自然是把 fetch.message.max.bytes 設置大些。

如果你使用Spark  Streaming去追數據,從頭開始消費kafka,而Kafka因為某種原因,老數據快速的被清理掉,也會引發OffsetOutOfRangeException錯誤。并且使得Spark  Streaming程序異常的終止。

解決辦法是事先記錄kafka偏移量和時間的關系(可以隔幾秒記錄一次),然后根據時間找到一個較大的偏移量開始消費。

或者你根據目前Kafka新增數據的消費速度,給smallest獲取到的偏移量再加一個較大的值,避免出現Spark Streaming  在fetch的時候數據不存在的情況。

Kafka partition 映射 RDD partition 之刺

Kafka的分區數決定了你的并行度(我們假設你使用Direct  Approach的模式集成)。為了獲得更大的并行度,則需要進行一次repartition,而repartition  就意味著需要發生Shuffle,在流式計算里,可能會消耗掉我們寶貴的時間。

為了能夠避免Shuffle,并且提高Spark Streaming處理的并行度,我們重寫了  DirectKafkaInputDStream,KafkaRDD,KafkaUtils等類,實現了一個Kafka partition 可以映射為多個RDD  partition的功能。譬如你有M個Kafka partitions,則可映射成 M*N個 RDD partitions。 其中N 為>1  的正整數。

我們期望官方能夠實現將一個Kafka的partitions 映射為多個Spark  的partitions,避免發生Shuffle而導致多次的數據移動。

textFileStream

其實使用textFileStream  的人應該也不少。因為可以很方便的監控HDFS上某個文件夾下的文件,并且進行計算。這里我們遇到的一個問題是,如果底層比如是壓縮文件,遇到有順壞的文件,你是跳不過去的,直接會讓Spark  Streaming 異常退出。 官方并沒有提供合適的方式讓你跳過損壞的文件。

以NewHadoopRDD為例,里面有這么幾行代碼,獲取一條新的數據:

override def getNext(): (K, V) = { try { finished = !reader.next(key, value) } catch { case eof: EOFException => finished = true } if (!finished) { inputMetrics.incRecordsRead(1) } (key, value) }

通過reader 獲取下一條記錄的時候,譬如是一個損壞的gzip文件,可能就會拋出異常,而這個異常是用戶catch不到的,直接讓Spark  Streaming程序掛掉了。

而在 HadoopRDD類中,對應的實現如下:

override def getNext(): (K, V) = { try { finished = !reader.next(key, value) } catch { case eof: EOFException => finished = true } if (!finished) { inputMetrics.incRecordsRead(1) } (key, value) }

這里好歹做了個EOFException。然而,如果是一個壓縮文件,解壓的時候就直接產生錯誤了,一般而言是  IOException,而不是EOFException了,這個時候也就歇菜了。

個人認為應該添加一些配置,允許用戶可以選擇如何對待這種有損壞或者無法解壓的文件。

因為現階段我們并沒有維護一個Spark的私有版本,所以是通過重寫FileInputDStream,NewHadoopRDD 等相關類來修正該問題。

Shuffle 之刺

Shuffle (尤其是每個周期數據量很大的情況)是Spark Streaming 不可避免的疼痛,尤其是數據量極大的情況,因為Spark  Streaming對處理的時間是有限制的。我們有一個場景,是五分鐘一個周期,我們僅僅是做了一個repartion,耗時就達到2.1分鐘(包括到  Kafka取數據)。現階段Spark 的Shuffle實現都需要落磁盤,并且Shuffle Write 和 Shuffle Read  階段是完全分開,后者必須等到前者都完成才能開始工作。我認為Spark Streaming有必要單獨開發一個更快速,完全基于內存的Shuffle方案。

內存之刺

在Spark Streaming中,你也會遇到在Spark中常見的問題,典型如Executor Lost 相關的問題(shuffle fetch  失敗,Task失敗重試等)。這就意味著發生了內存不足或者數據傾斜的問題。這個目前你需要考慮如下幾個點以期獲得解決方案:

相同資源下,增加partition數可以減少內存問題。 原因如下:通過增加partition數,每個task要處理的數據少了,同一時間內,所有正在  運行的task要處理的數量少了很多,所有Executor占用的內存也變小了。這可以緩解數據傾斜以及內存不足的壓力。

關注shuffle read 階段的并行數。例如reduce,group  之類的函數,其實他們都有第二個參數,并行度(partition數),只是大家一般都不設置。不過出了問題再設置一下,也不錯。

給一個Executor 核數設置的太多,也就意味著同一時刻,在該Executor  的內存壓力會更大,GC也會更頻繁。我一般會控制在3個左右。然后通過提高Executor數量來保持資源的總量不變。

監控之刺

Spark Streaming 的UI 上的Executors Tab缺少一個監控,就是Worker內存GC詳情。雖然我們可以將這些信息導入到  第三方監控中,然而終究是不如在 Spark UI上展現更加方便。 為此我們也將該功能列入研發計劃。

看完上述內容,你們掌握如何分析Spark Streaming的好處與坑的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

石渠县| 方山县| 徐汇区| 龙川县| 桂平市| 酒泉市| 自治县| 曲松县| 宁乡县| 嘉兴市| 棋牌| 陆河县| 武安市| 吴旗县| 尚志市| 应用必备| 永和县| 咸宁市| 石河子市| 双鸭山市| 黎川县| 平果县| 江陵县| 庆阳市| 博乐市| 称多县| 改则县| 武乡县| 伊吾县| 绥德县| 沁阳市| 武城县| 阳高县| 观塘区| 莱州市| 昌图县| 阿克苏市| 和平县| 明星| 防城港市| 嘉义市|