中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark Streaming流計算框架如何運行

發布時間:2021-12-16 16:38:33 來源:億速云 閱讀:130 作者:iii 欄目:云計算

這篇文章主要講解了“Spark Streaming流計算框架如何運行”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Spark Streaming流計算框架如何運行”吧!

先貼案例

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}
object StreamingWordCountSelfScala {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala")
    val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次數據
    val lines = ssc.socketTextStream("localhost", 9999) // 監聽 本地9999 socket 端口
    val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 后 reduce
    words.print() // 打印結果
    ssc.start() // 啟動
    ssc.awaitTermination()
    ssc.stop(true)
  }
}

再來回溯下觸發過程。

定時器定時觸發執行某個方法。這里是 longTime => eventLoop.post(GenerateJobs(new Time(longTime))),將一個 GenerateJobs 類型的事件消息發送到 eventLoop的 隊列中。

// JobGenerator.scala line 58
  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

另一方便,eventLoop一直循環取出隊列中的事件消息,當取出 GenerateJobs類型的事件消息時。會調用onReceive(event) 。

// EventLoop.scala line 48
    onReceive(event)

此時的onReceive(event)在 JobGenerator實例化 eventLoop時已經override了。

// JobGenerator.scala line 87
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

調用generatorJobs(time)

// JobGenerator.scala line 181
      case GenerateJobs(time) => generateJobs(time)

graph.generateJobs

// JobGenerator.scala line 248
graph.generateJobs(time)

通過outputStream.generateJob 還原出RDD的整個依賴,并創建出Job。這個outputStream就是ForEachDStream。

// DStreamGraph.scala line 115
        val jobOption = outputStream.generateJob(time)
在本案例中,按照 SocketInputDStream << FlatMappedDStream << MappedDStream << ShuffledDStream << ForEachDStream 的依賴關系
調用parent.getOrCompute,此getOrCompute只在DStream中有定義,所有子類都沒重寫過此方法。
在此方法中,會調用當前DStream的compute方法,而compute中又調用了parent.getOrCompute,同時將當前的DStream的func加入到串聯的RDD之后。

一直循環,直到inputStream,本例中為SocketInputDStream的compute被執行,實際上執行的是ReceiverInputDStream.compute,創建出BlockRDD。

至此整個RDD被還原出來。作為參數傳入Job的構造中。 

至此Job創建成功,但是此Job為Spark Core中的Job,而且也并沒有被提交到spark集群中。

獲取給定時間對應的輸入數據的信息,此時得到的都是元數據,即輸入數據的元數據。

再創建成JobSet,并提交JobSet

// JobGenerator.scala line 251
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

jobScheduler.submitJobSet

交由jobExecutor線程池來處理,這里顯然可以推測出,JobHandler一定是一個Runnable或者Callable接口的實現。

另外jobExecutor默認的線程數量是1,從并發性考慮,建議與outputStreams的數量保持一致:DStreamGraph.outputStreams.size

// JobScheduler.scala line 122
  def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }
JobHandler中封裝的run方法
  1. 發送JobStarted事件消息,用于監控

  2. job.run,真正的Job提交,注意,這里的Job提交是指提交Streaming的Job到Spark 集群,類似普通Spark程序將RDD提交給Spark集群運行

// JobScheduler.scala line 202
    def run() {
      try {
        val formattedTime = UIUtils.formatBatchTime(
          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it's possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
      }
    }

job.run

// Job.scala line 38
  def run() {
    _result = Try(func())
  }

執行func(),而此時的func就是在ForEachDStream中封裝Job的第二個參數。

在本例中,即為

() => foreachFunc(new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(t=>t.split(" "))).map(_.map[U](t=>(t,1))).combineByKey[C](t=>t, (t1,t2)=>t1+t2, (t1,t2)=>t1+t2,partitioner, true),time)

至于如何推導出此RDD,可參考前文。

讀者們,至此,是否有很熟悉的感覺,很明顯,上面的代碼就是一個函數,函數沒有參數,方法體中,執行的代碼中,從new BlockRDD開始,就是我們普通的Spark的程序:新建RDD,然后一連串transform,最后將結果交給foreachFunc 處理。

由此,SparkStreaming最終是轉變為普通的Spark Application來提交給Spark 集群來執行。是否也可以理解Spark Streaming其實就是Spark 的一個應用程序。而已。

感謝各位的閱讀,以上就是“Spark Streaming流計算框架如何運行”的內容了,經過本文的學習后,相信大家對Spark Streaming流計算框架如何運行這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

公安县| 讷河市| 枣强县| 溆浦县| 清河县| 迁安市| 泽库县| 米泉市| 汕尾市| 潜山县| 大城县| 玛曲县| 泸溪县| 乌什县| 宜州市| 西峡县| 榆中县| 永胜县| 尚义县| 泸溪县| 垫江县| 阜城县| 巴彦县| 道孚县| 疏附县| 叶城县| 大理市| 米泉市| 乐至县| 东兴市| 广州市| 峡江县| 迁安市| 南丹县| 金寨县| 新民市| 磴口县| 英吉沙县| 清镇市| 漳平市| 定州市|