中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

周期性清除Spark Streaming流狀態的方法是什么

發布時間:2021-12-16 21:23:22 來源:億速云 閱讀:172 作者:柒染 欄目:大數據

本篇文章為大家展示了周期性清除Spark Streaming流狀態的方法是什么,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

在Spark Streaming程序中,我們經常需要使用有狀態的流來統計一些累積性的指標,比如各個商品的PV。簡單的代碼描述如下,使用mapWithState()算子:

 val productPvStream = stream.mapPartitions(records => {
   var result = new ListBuffer[(String, Int)]
     for (record <- records) {
       result += Tuple2(record.key(), 1)
     }
   result.iterator
 }).reduceByKey(_ + _).mapWithState(
   StateSpec.function((productId: String, pv: Option[Int], state: State[Int]) => {
     val sum = pv.getOrElse(0) + state.getOption().getOrElse(0)
     state.update(sum)
     (productId, sum)
 })).stateSnapshots()

現在的問題是,PV并不是一直累加的,而是每天歸零,重新統計數據。要達到在凌晨0點清除狀態的目的,有以下兩種方法。

編寫腳本重啟Streaming程序

用crontab、Azkaban等在凌晨0點調度執行下面的Shell腳本:

stream_app_name='com.xyz.streaming.MallForwardStreaming'
cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`

if [ ${cnt} -eq 1 ]; then
 pid=`ps aux | grep SparkSubmit | grep ${stream_app_name} | awk '{print $2}'`
 kill -9 ${pid}
 sleep 20
 cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`
 if [ ${cnt} -eq 0 ]; then
   nohup sh /path/to/streaming/bin/mall_forward.sh > /path/to/streaming/logs/mall_forward.log 2>&1
 fi
fi

這種方式最簡單,也不需要對程序本身做任何改動。但隨著同時運行的Streaming任務越來越多,就會顯得越來越累贅了。

給StreamingContext設置超時

在程序啟動之前,先計算出當前時間點距離第二天凌晨0點的毫秒數:

def msTillTomorrow = {
 val now = new Date()
 val tomorrow = new Date(now.getYear, now.getMonth, now.getDate + 1)
 tomorrow.getTime - now.getTime
}

然后將Streaming程序的主要邏輯寫在while(true)循環中,并且不像平常一樣調用StreamingContext.awaitTermination()方法,而改用awaitTerminationOrTimeout()方法,即:

while (true) {
   val ssc = new StreamingContext(sc, Seconds(BATCH_INTERVAL))
   ssc.checkpoint(CHECKPOINT_DIR)

   // ...處理邏輯...

   ssc.start()
   ssc.awaitTerminationOrTimeout(msTillTomorrow)
   ssc.stop(false, true)
   Thread.sleep(BATCH_INTERVAL * 1000)
 }

在經過msTillTomorrow毫秒之后,StreamingContext就會超時,再調用其stop()方法(注意兩個參數,stopSparkContext表示是否停止關聯的SparkContext,stopGracefully表示是否優雅停止),就可以停止并重啟StreamingContext。

兩種方法都是仍然采用Spark Streaming的機制進行狀態計算的。如果其他條件允許的話,我們還可以拋棄mapWithState(),直接借助外部存儲自己維護狀態。比如將Redis的Key設計為product_pv:[product_id]:[date],然后在Spark Streaming的每個批次中使用incrby指令,就能方便地統計PV了,不必考慮定時的問題。

上述內容就是周期性清除Spark Streaming流狀態的方法是什么,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

望谟县| 太谷县| 洪洞县| 西昌市| 通河县| 鹤峰县| 永登县| 易门县| 邵东县| 永和县| 南安市| 屏东县| 福鼎市| 昆明市| 庆元县| 奉节县| 长白| 辽阳市| 呈贡县| 宁夏| 阜南县| 象州县| 油尖旺区| 辽阳市| 法库县| 抚远县| 邮箱| 梅河口市| 漳州市| 漠河县| 曲周县| 呼图壁县| 简阳市| 永宁县| 汉源县| 达孜县| 如皋市| 永善县| 广丰县| 扎囊县| 阿荣旗|