您好,登錄后才能下訂單哦!
Spark Streaming的DStream為我們提供了一個updateStateByKey方法,它的主要功能是可以隨著時間的流逝在Spark Streaming中為每一個key維護一份state狀態,通過更新函數對該key的狀態不斷更新。對每一個新的batch而言,Spark Streaming會在使用updateStateByKey的時候為已經存在的key進行state的狀態更新(對每個新出現的key,會同樣執行state的更新函數操作),但是如果通過更新函數對state更新后返回none的話,此時刻key對應的state狀態被刪除掉,需要特別說明的是state可以是任意類型的數據結構,這就為我們的計算帶來無限的想象空間;
重點來了!!!如果要不斷的更新每個key的state,就一定會涉及到狀態的保存和容錯,這個時候就需要開啟checkpoint機制和功能,需要說明的是checkpoint可以保存一切可以存儲在文件系統上的內容,例如:程序未處理的數據及已經擁有的狀態。
補充說明:關于流式處理對歷史狀態進行保存和更新具有重大實用意義,例如進行廣告(投放廣告和運營廣告效果評估的價值意義,熱點隨時追蹤、熱力圖)
簡單的來說,如果我們需要進行wordcount,每個batchInterval都會計算出新的一批數據,這批數據如何更新到以前計算的結果上?updateStateByKey就能實現此功能。
函數定義如下:
def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = ssc.withScope { updateStateByKey(updateFunc, defaultPartitioner()) }
updateStateByKey 需要傳入一個函數,該函數有兩個參數Seq[V]表示最新一次reduce的值的序列,Option[s]表示的是key對應的以前的值。返回的時一個key的最新值。
下面我們用實例演示:
package com.dt.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by Administrator on 2016/5/3. */ object UpdateStateByKeyDemo { def main(args: Array[String]) { val conf = new SparkConf().setAppName("UpdateStateByKeyDemo") val ssc = new StreamingContext(conf,Seconds(20)) //要使用updateStateByKey方法,必須設置Checkpoint。 ssc.checkpoint("/checkpoint/") val socketLines = ssc.socketTextStream("spark-master",9999) socketLines.flatMap(_.split(",")).map(word=>(word,1)) .updateStateByKey( (currValues:Seq[Int],preValue:Option[Int]) =>{ val currValue = currValues.sum Some(currValue + preValue.getOrElse(0)) }).print() ssc.start() ssc.awaitTermination() ssc.stop() } }
打包上傳至spark集群。
打開nc,發送測試數據
root@spark-master:~# nc -lk 9999 hadoop,spark,scala,hive hadoop,Hbase,spark
運行spark 程序
root@spark-master:~# /usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.streaming.UpdateStateByKeyDemo --master spark://spark-master:7077 ./spark.jar
查看運行結果:
------------------------------------------- Time: 1462282180000 ms ------------------------------------------- (scala,1) (hive,1) (spark,2) (hadoop,2) (Hbase,1)
我們在nc中再輸入一些數據
root@spark-master:~# nc -lk 9999 hadoop,spark,scala,hive hadoop,Hbase,spark hadoop,spark,scala,hive hadoop,Hbase,spark
再次查看結果:
------------------------------------------- Time: 1462282200000 ms ------------------------------------------- (scala,2) (hive,2) (spark,4) (hadoop,4) (Hbase,2)
可見,它將我們兩次統計結果合并了。
備注:
1、DT大數據夢工廠微信公眾號DT_Spark
2、IMF晚8點大數據實戰YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。