中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

DStream與RDD關系是什么

發布時間:2021-12-16 15:23:09 來源:億速云 閱讀:234 作者:iii 欄目:云計算

本篇內容主要講解“DStream與RDD關系是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“DStream與RDD關系是什么”吧!

RDD是怎么生成的?RDD依靠什么生成?RDD生成的依據是什么?Spark Streaming中RDD的執行是否和Spark Core中的RDD執行有所不同?運行之后我們對RDD怎么處理?

RDD本身也是基本的對象,例如說BatchInterval為1秒,那么每一秒都會產生RDD,內存中不能完全容納該對象。每個BatchInterval的作業執行完后,怎么對已有的RDD進行管理。

ForEachDStream不一定會觸發Job的執行,和Job的執行沒有關系。

Job的產生是由Spark Streaming框架造成的。

foreachRDD是Spark Streaming的后門,可以直接對RDD進行操作。

DStream就是RDD的模板,后面的DStream與前面的DStream有依賴。

val lines = jsc.socketTextStream("127.0.0.1", 9999)這里產生了SocketInputDStream。

lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).print()這里由SocketInputDStream轉換為FlatMappedDStream,再轉換為MappedDStream,再轉換為ShuffledDStream,再轉換為ForEachDStream。

對于DStream類,源碼中是這樣解釋的。

* DStreams internally is characterized by a few basic properties:
*  - A list of other DStreams that the DStream depends on
*  - A time interval at which the DStream generates an RDD
*  - A function that is used to generate an RDD after each time interval

大致意思是:

1.DStream依賴于其他DStream。

2.每隔BatchDuration,DStream生成一個RDD

3.每隔BatchDuration,DStream內部函數會生成RDD

DStream是從后往前依賴,因為DStream代表Spark Streaming業務邏輯,RDD是從后往前依賴的,DStream是lazy級別的。DStream的依賴關系必須和RDD的依賴關系保持高度一致。

DStream類中generatedRDDs存儲著不同時間對應的RDD實例。每一個DStream實例都有自己的generatedRDDs。實際運算的時候,由于是從后往前推,計算只作用于最后一個DStream。

// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

DStream與RDD關系是什么

generatedRDDs是如何獲取的。DStream的getOrCompute方法,先根據時間判斷HashMap中是否已存在該時間對應的RDD,如果沒有則調用compute得到RDD,并放入到HashMap中。

/**
 * Get the RDD corresponding to the given time; either retrieve it from cache
 * or compute-and-cache it.
 */
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
  // If RDD was already generated, then retrieve it from HashMap,
  // or else compute the RDD
  generatedRDDs.get(time).orElse {
    // Compute the RDD if time is valid (e.g. correct time in a sliding window)
    // of RDD generation, else generate nothing.
    if (isTimeValid(time)) {

      val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
        // Disable checks for existing output directories in jobs launched by the streaming
        // scheduler, since we may need to write output to an existing directory during checkpoint
        // recovery; see SPARK-4835 for more details. We need to have this call here because
        // compute() might cause Spark jobs to be launched.
        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
          compute(time)
        }
      }

      rddOption.foreach { case newRDD =>
        // Register the generated RDD for caching and checkpointing
        if (storageLevel != StorageLevel.NONE) {
          newRDD.persist(storageLevel)
          logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
        }
        if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
          newRDD.checkpoint()
          logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
        }
        generatedRDDs.put(time, newRDD)
      }
      rddOption
    } else {
      None
    }
  }
}

拿DStream的子類ReceiverInputDStream來說明compute方法,內部調用了createBlockRDD這個方法。

/**
 * Generates RDDs with blocks received by the receiver of this stream. */
override def compute(validTime: Time): Option[RDD[T]] = {
  val blockRDD = {
    if (validTime < graph.startTime) {
      // If this is called for any time before the start time of the context,
      // then this returns an empty RDD. This may happen when recovering from a
      // driver failure without any write ahead log to recover pre-failure data.
      new BlockRDD[T](ssc.sc, Array.empty)
    } else {
      // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
      // for this batch
      val receiverTracker = ssc.scheduler.receiverTracker
      val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

      // Register the input blocks information into InputInfoTracker
      val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
      ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

      // Create the BlockRDD
      createBlockRDD(validTime, blockInfos)
    }
  }
  Some(blockRDD)
}

createBlockRDD會返回BlockRDD,由于ReceiverInputDStream沒有父依賴,所以自己生成RDD。

private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
  if (blockInfos.nonEmpty) {
    val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

    // Are WAL record handles present with all the blocks
    val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

    if (areWALRecordHandlesPresent) {
      // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
      val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
      val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
      new WriteAheadLogBackedBlockRDD[T](
        ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
    } else {
      // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
      // others then that is unexpected and log a warning accordingly.
      if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
        if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
          logError("Some blocks do not have Write Ahead Log information; " +
            "this is unexpected and data may not be recoverable after driver failures")
        } else {
          logWarning("Some blocks have Write Ahead Log information; this is unexpected")
        }
      }
      val validBlockIds = blockIds.filter { id =>
        ssc.sparkContext.env.blockManager.master.contains(id)
      }
      if (validBlockIds.size != blockIds.size) {
        logWarning("Some blocks could not be recovered as they were not found in memory. " +
          "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
          "for more details.")
      }
      new BlockRDD[T](ssc.sc, validBlockIds)
    }
  } else {
    // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
    // according to the configuration
    if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
      new WriteAheadLogBackedBlockRDD[T](
        ssc.sparkContext, Array.empty, Array.empty, Array.empty)
    } else {
      new BlockRDD[T](ssc.sc, Array.empty)
    }
  }
}

再拿DStream的子類MappedDStream來說,這里的compute方法,是調用父RDD的getOrCompute方法獲得RDD,再使用map操作。

private[streaming]
class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }
}

從上面兩個DStream的子類,可以說明第一個DStream,即InputDStream的comput方法是自己獲取數據并計算的,而其他的DStream是依賴父DStream的,調用父DStream的getOrCompute方法,然后進行計算。

以上說明了對DStream的操作最后作用于對RDD的操作。

接著看下DStream的另一個子類ForEachDStream,發現其compute方法沒有任何操作,但是重寫了generateJob方法。

private[streaming]
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

從Job生成入手,JobGenerator的generateJobs方法,內部調用的DStreamGraph的generateJobs方法。

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
    //根據特定的時間獲取具體的數據
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    //調用DStreamGraph的generateJobs生成Job
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

DStreamGraph的generateJobs方法調用了OutputStream的generateJob方法,OutputStream就是ForEachDStream。

def generateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
      val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    }
  }
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs
}

到此,相信大家對“DStream與RDD關系是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

巴东县| 桐柏县| 科技| 衡山县| 襄城县| 凌海市| 邹平县| 鹤岗市| 汪清县| 北流市| 右玉县| 营山县| 健康| 上栗县| 三门峡市| 怀来县| 瓮安县| 建昌县| 密山市| 郎溪县| 永平县| 山西省| 神池县| 高清| 阳城县| 探索| 杨浦区| 白朗县| 德令哈市| 始兴县| 建湖县| 上犹县| 鹿泉市| 筠连县| 沁源县| 咸阳市| 缙云县| 红原县| 泊头市| 武汉市| 南宁市|