中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

(版本定制)第8課:Spark Streaming源碼解讀之

發布時間:2020-03-01 17:59:42 來源:網絡 閱讀:531 作者:Spark_2016 欄目:大數據

本篇博客將詳細探討DStream模板下的RDD是如何被創建,然后被執行的。在開始敘述之前,先來思考幾個問題,本篇文章也就是基于此問題構建的。 
1. RDD是誰產生的? 
2. 如何產生RDD? 
帶著這兩個問題開啟我們的探索之旅。

DStream是RDD的模板,每隔一個Batch Interval會根據DStream模板生成一個對應的RDD,然后將RDD存儲到DStream中的generatedRDDs數據結構中,下面是存儲結構格式。

// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

1、簡單的WordCount程序

object WordCount {  def main(args:Array[String]): Unit ={
    val sparkConf = new SparkConf().setMaster("Master:7077").setAppName("WordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(10)) // Timer觸發頻率

    val lines = ssc.socketTextStream("Master",9999) //接收數據
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x,1)).reduceByKey(_+_)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
首先我們先看看print方法,具體的代碼如下:
/**
* Print the first num elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(num: Int): Unit = ssc.withScope {
 def foreachFunc: (RDD[T], Time) => Unit = {
   (rdd: RDD[T], time: Time) => {
     val firstNum = rdd.take(num + 1)
     // scalastyle:off println
     println("-------------------------------------------")
     println("Time: " + time)
     println("-------------------------------------------")
     firstNum.take(num).foreach(println)
     if (firstNum.length > num) println("...")
     println()
     // scalastyle:on println
   }
 }
 foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}

首先定義了一個函數,該函數用來從RDD中取出前幾條數據,并打印出結果與時間等,后面會調用foreachRDD函數。

private def foreachRDD(
   foreachFunc: (RDD[T], Time) => Unit,
   displayInnerRDDOps: Boolean): Unit = {
   new ForEachDStream(this,context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}

/**
* Register this streaming as an output stream. This would ensure that RDDs of this
* DStream will be generated.
*/
private[streaming] def register(): DStream[T] = {
 ssc.graph.addOutputStream(this)
 this
}

def addOutputStream(outputStream: DStream[_]) {
 this.synchronized {
   outputStream.setGraph(this)
   outputStreams += outputStream
 }

在foreachRDD中new出了一個ForEachDStream對象,并將這個注冊給DStreamGraph,ForEachDStream對象也就是DStreamGraph中的outputStreams。

當每到達一個BatchInterval時候,就會調用DStreamingGraph中的generateJobs.

def generateJobs(time: Time): Seq[Job] = {
 logDebug("Generating jobs for time " + time)
 val jobs = this.synchronized {
   outputStreams.flatMap { outputStream =>
     val jobOption = outputStream.generateJob(time)
     jobOption.foreach(_.setCallSite(outputStream.creationSite))
     jobOption
   }
 }
 logDebug("Generated " + jobs.length + " jobs for time " + time)
 jobs
}

這里就會調用outputStream的generateJob方法


private[streaming] def generateJob(time: Time): Option[Job] = {
 getOrCompute(time) match {
   case Some(rdd) => {
     val jobFunc = () => {
       val emptyFunc = { (iterator: Iterator[T]) => {} }
       context.sparkContext.runJob(rdd, emptyFunc)
     }
     Some(new Job(time, jobFunc))
   }
   case None => None
 }
}

這里會調用getOrCompute(time)來產生新RDD,并將其存入到generatedRDDs中,整理的過程如下圖:
(版本定制)第8課:Spark Streaming源碼解讀之


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

淳化县| 鲁山县| 罗平县| 武邑县| 道真| 社旗县| 堆龙德庆县| 梅河口市| 平舆县| 奇台县| 故城县| 兴城市| 扎鲁特旗| 连城县| 井冈山市| 汝南县| 儋州市| 宜君县| 浠水县| 阿荣旗| 太谷县| 陵水| 全州县| 兰溪市| 武川县| 久治县| 台前县| 宁武县| 伽师县| 临朐县| 大庆市| 荣昌县| 朝阳县| 湖北省| 墨竹工卡县| 扬州市| 玉林市| 佛坪县| 松桃| 河津市| 桦甸市|