您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
當我在測試SparkStreaming的狀態操作mapWithState算子時,當我們設置timeout(3s)的時候,3s過后數據還是不會過期,不對此key進行操作,等到30s左右才會清除過期的數據。
百度了很久,關于timeout的資料很少,更沒有解決這個問題的文章,所以說,百度也不是萬能的,有時候還是需要靠自己。
所以我就在周末研究了一下,然后將結果整理了出來,希望能幫助大家更全面的理解Spark狀態計算。
按理說Spark Streaming實時處理,數據就像流水,每個批次之間的數據都是獨立的,處理完就處理完了,不留下任何狀態。但是免不了一些有狀態的操作,例如統計從流啟動到現在,某個單詞出現了多少次,所以狀態操作就出現了。
狀態操作分為updateStateByKey和mapWithState,兩者有著很大的區別。簡單的來說,前者每次輸出的都是全量狀態,后者輸出的是增量狀態。
過期這一塊估計很多人開始都理解錯了,我剛開始理解就是數據從出現,經過多少秒之后就會過期。其實不是,這里的過期指的是空閑時間。
注釋大概是這個意思:timeout()傳入一個時間間隔參數,如果一個key在大于此間隔沒有此key的數據流入,則被認為是空閑的,就會單獨調用一次mapWithState中的func來清除這些空閑數據狀態。
使用了timeout()之后,需要使用以下代碼來在間隔內清除失效key。
stream.checkpoint(Seconds(6))
checkpoint的時候,會開啟全面掃描,才會對state中的失效key進行清理。
val conf = new SparkConf().setMaster("local[2]").setAppName("state")val ssc = new StreamingContext(conf, Seconds(3))ssc.checkpoint("./tmp")val streams: DStream[(String, Int)] = ssc.socketTextStream("localhost", 9999) .map(x => (x, 1))val result = streams.mapWithState(StateSpec.function((k: String, v: Option[Int], state: State[Int]) => { val count = state.getOption().getOrElse(0)println(k)println(v)var sum = 0if (!state.isTimingOut()) { sum = count + v.get state.update(sum)} else { println("timeout")}Option(sum) }) .timeout(Seconds(3)))// 這行代碼是觸發清除機制的關鍵// result.checkpoint(Seconds(6))result.print()ssc.start()ssc.awaitTermination()
使用上面的代碼進行測試,設置過期時間為3s。但是3s過后發現key并沒有過期,也不會被清除,大概30S之后被清除。
在9999端口輸入一個tom后,不再進行任何操作。測試結果如下:
tom Some(1) ------------------------------------------- Time: 1618228587000 ms ------------------------------------------- Some(1) tom None timeout ------------------------------------------- Time: 1618228614000 ms ------------------------------------------- Some(0)
從測試結果可以看出,從輸入到清除大概是27s。
我們現在將注釋的代碼放開,每6s進行checkpoint一次,輸入tom:
tom Some(1) ------------------------------------------- Time: 1618228497000 ms ------------------------------------------- Some(1) tom None timeout ------------------------------------------- Time: 1618228506000 ms ------------------------------------------- Some(0)
從生成到清除用了9秒,正好是過期時間 + 下一個窗口時間,觸發了checkpoint。
第一次學狀態操作的時候,就考慮如何去掉一些過期的key,通過timeout()的方法沒有完成自己想法,從網上也沒有找到解決方案,所以就暫且擱置在一邊了。后來又回過頭來考慮這個問題,然后根據自己的想法去猜想、去驗證。
每個Dstream的計算邏輯都在compute()中,這里是調用了internalStream的getOrCompute(),根據繼承關系,調用的是父類Dstream的此方法:
getOrCompute()主要功能為:計算、緩存、checkpoint。這里只需要記住幾個地方:checkpointDuration,即checkpoint間隔,和調用了checkpoint()。其實真正的計算還是調用了compute(),接著去看compute()
compute()里面也調用了getOrCompute()方法,其實和上面調用的一樣,都是Dstream的,這里主要看的是使用createFromRDD()生成的StateRDD。
這個StateRDD就是參與狀態計算的數據集合,首先看它是如何生成的:
再看看StateRDD的compute()是如何計算的:
從compute()看出,當doFullScan為true的時候,才會觸發過期key的清除,updateRecordWithData()負責全面掃描清除過期key。
這不,思路就來了,我們只要找到開啟FullScan的方法,不就可以自行觸發清除機制了嗎!
那么,我們先看看doFullScan的默認值:
默認是沒開啟的,接著通過快捷鍵看看哪些地方使用了doFullScan:
從圖中看出,有兩處代碼修改了doFullScan,我們找到這兩處代碼:
第一個基本上排除,那么就剩下第二個:checkpoint(),我們要知道的是,狀態操作必須要checkpoint。
還記得在2中的getOrCompute()嗎,當checkpointDuration不為null的時候,調用checkpoint()。
我們來看3中InternalMapWithStateDStream是如何定義這個duration的:
如圖,sideDuration是窗口時間,乘以系數10就是默認的checkpoint時長,所以當我設置窗口為3s時,checkpoint周期就是30s,30s才會清理一次過期key。
而通過checkpoint(interval)可以設置checkpoint的間隔,所以覆蓋了上面程序中默認的30s。
最后提一提,FullScan是在這個類中開啟的,所以先看看這個Record的注釋介紹:
意思就是負責存儲StateRDD的狀態KV,updateRecordWithData()負責清除過期的Record,我們來看看這個方法的實現:
removeTimedoutData就是是否開啟全面掃描,即doFullScan的值。
以上就是SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。