中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

第93課:SparkStreaming updateStateByKey 基本操作綜合案例實戰和內幕源碼解密

發布時間:2020-06-12 05:50:46 來源:網絡 閱讀:2301 作者:lqding1980 欄目:大數據

   Spark Streaming的DStream為我們提供了一個updateStateByKey方法,它的主要功能是可以隨著時間的流逝在Spark Streaming中為每一個key維護一份state狀態,通過更新函數對該key的狀態不斷更新。對每一個新的batch而言,Spark Streaming會在使用updateStateByKey的時候為已經存在的key進行state的狀態更新(對每個新出現的key,會同樣執行state的更新函數操作),但是如果通過更新函數對state更新后返回none的話,此時刻key對應的state狀態被刪除掉,需要特別說明的是state可以是任意類型的數據結構,這就為我們的計算帶來無限的想象空間;

  重點來了!!!如果要不斷的更新每個key的state,就一定會涉及到狀態的保存和容錯,這個時候就需要開啟checkpoint機制和功能,需要說明的是checkpoint可以保存一切可以存儲在文件系統上的內容,例如:程序未處理的數據及已經擁有的狀態。

  補充說明:關于流式處理對歷史狀態進行保存和更新具有重大實用意義,例如進行廣告(投放廣告和運營廣告效果評估的價值意義,熱點隨時追蹤、熱力圖)

  簡單的來說,如果我們需要進行wordcount,每個batchInterval都會計算出新的一批數據,這批數據如何更新到以前計算的結果上?updateStateByKey就能實現此功能。

函數定義如下:

def updateStateByKey[S: ClassTag](
    updateFunc: (Seq[V], Option[S]) => Option[S]
  ): DStream[(K, S)] = ssc.withScope {
  updateStateByKey(updateFunc, defaultPartitioner())
}

updateStateByKey 需要傳入一個函數,該函數有兩個參數Seq[V]表示最新一次reduce的值的序列,Option[s]表示的是key對應的以前的值。返回的時一個key的最新值。


下面我們用實例演示:

package com.dt.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Created by Administrator on 2016/5/3.
 */
object UpdateStateByKeyDemo {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("UpdateStateByKeyDemo")
    val ssc = new StreamingContext(conf,Seconds(20))
    //要使用updateStateByKey方法,必須設置Checkpoint。
    ssc.checkpoint("/checkpoint/")
    val socketLines = ssc.socketTextStream("spark-master",9999)

    socketLines.flatMap(_.split(",")).map(word=>(word,1))
      .updateStateByKey(
        (currValues:Seq[Int],preValue:Option[Int]) =>{
       val currValue = currValues.sum
         Some(currValue + preValue.getOrElse(0))
    }).print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }
}

打包上傳至spark集群。


打開nc,發送測試數據

root@spark-master:~# nc -lk 9999
hadoop,spark,scala,hive
hadoop,Hbase,spark

運行spark 程序

root@spark-master:~# /usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.streaming.UpdateStateByKeyDemo  --master spark://spark-master:7077 ./spark.jar


查看運行結果:

-------------------------------------------
Time: 1462282180000 ms
-------------------------------------------
(scala,1)
(hive,1)
(spark,2)
(hadoop,2)
(Hbase,1)


我們在nc中再輸入一些數據

root@spark-master:~# nc -lk 9999
hadoop,spark,scala,hive
hadoop,Hbase,spark
hadoop,spark,scala,hive
hadoop,Hbase,spark

再次查看結果:

-------------------------------------------
Time: 1462282200000 ms
-------------------------------------------
(scala,2)
(hive,2)
(spark,4)
(hadoop,4)
(Hbase,2)


可見,它將我們兩次統計結果合并了。


備注:

1、DT大數據夢工廠微信公眾號DT_Spark 
2、IMF晚8點大數據實戰YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

通海县| 桃源县| 边坝县| 沙雅县| 资阳市| 洪江市| 龙口市| 巩义市| 响水县| 波密县| 建平县| 扶绥县| 固安县| 乐安县| 攀枝花市| 咸宁市| 青河县| 交口县| 渭源县| 张家川| 清涧县| 敦煌市| 昌吉市| 呈贡县| 盐亭县| 洪江市| 龙口市| 高尔夫| 文昌市| 河池市| 庐江县| 达日县| 新宁县| 迁西县| 内江市| 浮山县| 海丰县| 湾仔区| 五常市| 中牟县| 安陆市|