中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

基于案例分析Spark Streaming流計算框架的運行源碼怎么寫

發布時間:2021-12-17 14:09:45 來源:億速云 閱讀:118 作者:柒染 欄目:大數據

今天就跟大家聊聊有關基于案例分析Spark Streaming流計算框架的運行源碼怎么寫,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

第一部分案例:

package com.dt.spark.sparkstreaming

import com.robinspark.utils.ConnectionPool
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 使用Spark Streaming+Spark SQL來在線動態計算電商中不同類別中最熱門的商品排名,例如手機這個類別下面最熱門的三種手機、電視這個類別
  下最熱門的三種電視,該實例在實際生產環境下具有非常重大的意義;
  * 實現技術:Spark Streaming+Spark SQL,之所以Spark Streaming能夠使用MLsqlgraphx等功能是因為有foreachRDDTransform
  * 等接口,這些接口中其實是基于RDD進行操作,所以以RDD為基石,就可以直接使用Spark其它所有的功能,就像直接調用API一樣簡單。
  假設說這里的數據的格式:user item category,例如Rocky Samsung Android
  */
object OnlineTheTop3ItemForEachCategory2DB {
  def main(args: Array[String]){
    /**
      * 1步:創建Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息,
      例如說通過setMaster來設置程序要鏈接的Spark集群的MasterURL,如果設置
      local,則代表Spark程序在本地運行,特別適合于機器配置條件非常差(例如
      只有1G的內存)的初學者       *
      */
    val conf = new SparkConf() //創建SparkConf對象
    conf.setAppName("OnlineTheTop3ItemForEachCategory2DB") //設置應用程序的名稱,在程序運行的監控界面可以看到名稱
    conf.setMaster("spark://Master:7077") //此時,程序在Spark集群
    //conf.setMaster("local[2]")
    //設置batchDuration時間間隔來控制Job生成的頻率并且創建Spark Streaming執行的入口
    val ssc = new StreamingContext(conf, Seconds(5))


    ssc.checkpoint("/root/Documents/SparkApps/checkpoint")
    val userClickLogsDStream = ssc.socketTextStream("Master", 9999)
    

    //格式為(category_item,1) =>(K,V)
    val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>
        (clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1), 1))


    val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,
      _-_, Seconds(60), Seconds(20))

    categoryUserClickLogsDStream.foreachRDD { rdd => {
      if (rdd.isEmpty()) {
        println("No data inputted!!!")
      } else {
        val categoryItemRow = rdd.map(reducedItem => {
          val category = reducedItem._1.split("_")(0)
          val item = reducedItem._1.split("_")(1)
          val click_count = reducedItem._2
          Row(category, item, click_count)
        })

        val structType = StructType(Array(
          StructField("category", StringType, true),
          StructField("item", StringType, true),
          StructField("click_count", IntegerType, true)
        ))

        val hiveContext = new HiveContext(rdd.context)
        val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType)


        categoryItemDF.registerTempTable("categoryItemTable")
        val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" +
          " OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +
          " WHERE rank <= 3")
        reseltDataFram.show()

        val resultRowRDD = reseltDataFram.rdd

        resultRowRDD.foreachPartition { partitionOfRecords => {

          if (partitionOfRecords.isEmpty){
            println("This RDD is not null but partition is null")
          } else {
            // ConnectionPool is a static, lazily initialized pool of connections
            val connection = ConnectionPool.getConnection()
            partitionOfRecords.foreach(record => {
              val sql = "insert into categorytop3(category,item,click_count) values('" + record.getAs("category") + "','" +
                record.getAs("item") + "'," + record.getAs("click_count") + ")"
              val stmt = connection.createStatement();
              stmt.executeUpdate(sql);

            })
            ConnectionPool.returnConnection(connection) // return to the pool for future reuse
          }
        }
        }
      }
    }
    }
    /**
      * StreamingContext調用start方法的內部其實是會啟動JobSchedulerStart方法,進行消息循環,在JobScheduler
      * start內部會構造JobGeneratorReceiverTacker,并且調用JobGeneratorReceiverTackerstart方法:
      *   1JobGenerator啟動后會不斷的根據batchDuration生成一個個的Job
      *   2ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor),在Receiver收到
      *   數據后會通過ReceiverSupervisor存儲到Executor并且把數據的Metadata信息發送給Driver中的ReceiverTracker,在ReceiverTracker
      *   內部會通過ReceivedBlockTracker來管理接受到的元數據信息
      每個BatchInterval會產生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD
      * DAG而已,從Java角度講,相當于Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個
      單獨的線程來提交Job到集群運行(其實是在線程中基于RDDAction觸發真正的作業的運行),為什么使用線程池呢?
      *   1,作業不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執行Task有異曲同工之妙;
      *   2,有可能設置了JobFAIR公平調度的方式,這個時候也需要多線程的支持;
      *
      */
    ssc.start()
    ssc.awaitTermination()
  }
}

第二部分源碼解析:

1.根據傳遞的SparkConf參數創建StreamingContext對象,在內部創建SparkContext

  /**

   * Create a StreamingContext by providing the configuration necessary for a new SparkContext.

   * @param conf a org.apache.spark.SparkConf object specifying Spark parameters

   * @param batchDuration the time interval at which streaming data will be divided into batches

   */

  def this(conf: SparkConf, batchDuration: Duration) = {

    this(StreamingContext.createNewSparkContext(conf), null, batchDuration)

  }

  private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {

    new SparkContext(conf)

  }

這說明Spark Streaming也是Spark上的一個應用程序

2.創建Socket輸入流,socketTextStream方法定義如下:

/**

 * Create a input stream from TCP source hostname:port. Data is received using

 * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\\n` delimited

 * lines.

 * @param hostname      Hostname to connect to for receiving data

 * @param port          Port to connect to for receiving data

 * @param storageLevel  Storage level to use for storing the received objects

 *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)

 */

def socketTextStream(

    hostname: String,

    port: Int,

    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {

  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)

}

可看到代碼最后面調用socketStream,socketStream定義如下:

/**

 * Create a input stream from TCP source hostname:port. Data is received using

 * a TCP socket and the receive bytes it interepreted as object using the given

 * converter.

 * @param hostname      Hostname to connect to for receiving data

 * @param port          Port to connect to for receiving data

 * @param converter     Function to convert the byte stream to objects

 * @param storageLevel  Storage level to use for storing the received objects

 * @tparam T            Type of the objects received (after converting bytes to objects)

 */

def socketStream[T: ClassTag](

    hostname: String,

    port: Int,

    converter: (InputStream) => Iterator[T],

    storageLevel: StorageLevel

  ): ReceiverInputDStream[T] = {

  new SocketInputDStream[T](this, hostname, port, converter, storageLevel)

}

實際上生成SocketInputDStream,SocketInputDStream類如下:

private[streaming]

class SocketInputDStream[T: ClassTag](

    ssc_ : StreamingContext,

    host: String,

    port: Int,

    bytesToObjects: InputStream => Iterator[T],

    storageLevel: StorageLevel

  ) extends ReceiverInputDStream[T](ssc_) {

  def getReceiver(): Receiver[T] = {

    new SocketReceiver(host, port, bytesToObjects, storageLevel)

  }

}

SocketInputDStream繼承ReceiverInputDStream。

其中實現getReceiver方法,返回SocketReceiver對象。

總結一下SocketInputDStream的繼承關系:

SocketInputDStream -> ReceiverInputDStream -> InputDStream -> DStream。

DStream是生成RDD的模板,是邏輯級別,當達到Interval的時候這些模板會被BatchData實例化成為RDD和DAG。

看看DStream的源碼片段:

// RDDs generated, marked as private[streaming] so that testsuites can access it

@transient

private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

看看DStream的getOrCompute方法:

/**

 * Get the RDD corresponding to the given time; either retrieve it from cache

 * or compute-and-cache it.

 */

private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {

  // If RDD was already generated, then retrieve it from HashMap,

  // or else compute the RDD

  generatedRDDs.get(time).orElse {

    // Compute the RDD if time is valid (e.g. correct time in a sliding window)

    // of RDD generation, else generate nothing.

    if (isTimeValid(time)) {

      val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {

        // Disable checks for existing output directories in jobs launched by the streaming

        // scheduler, since we may need to write output to an existing directory during checkpoint

        // recovery; see SPARK-4835 for more details. We need to have this call here because

        // compute() might cause Spark jobs to be launched.

        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

          compute(time)

        }

      }

      rddOption.foreach { case newRDD =>

        // Register the generated RDD for caching and checkpointing

        if (storageLevel != StorageLevel.NONE) {

          newRDD.persist(storageLevel)

          logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")

        }

        if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {

          newRDD.checkpoint()

          logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")

        }

        generatedRDDs.put(time, newRDD)

      }

      rddOption

    } else {

      None

    }

  }

}

主要是生成RDD,再將生成的RDD放在HashMap中。具體生成RDD過程以后剖析。

目前大致講了DStream和RDD這些核心概念在Spark Streaming中的使用。

先看看ScreamingContext的start()。start()方法啟動StreamContext,由于Spark應用程序不能有多個SparkContext對象實例,所以Spark Streaming框架在啟動時對狀態進行判斷。代碼如下:

/**

 * Start the execution of the streams.

 *

 * @throws IllegalStateException if the StreamingContext is already stopped.

 */

def start(): Unit = synchronized {

  state match {

    case INITIALIZED =>

      startSite.set(DStream.getCreationSite())

      StreamingContext.ACTIVATION_LOCK.synchronized {

        StreamingContext.assertNoOtherContextIsActive()

        try {

          validate()

          // Start the streaming scheduler in a new thread, so that thread local properties

          // like call sites and job groups can be reset without affecting those of the

          // current thread.

          ThreadUtils.runInNewThread("streaming-start") {

            sparkContext.setCallSite(startSite.get)

            sparkContext.clearJobGroup()

            sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")

            //啟動JobScheduler

            scheduler.start()

          }

          state = StreamingContextState.ACTIVE

        } catch {

          case NonFatal(e) =>

            logError("Error starting the context, marking it as stopped", e)

            scheduler.stop(false)

            state = StreamingContextState.STOPPED

            throw e

        }

        StreamingContext.setActiveContext(this)

      }

      shutdownHookRef = ShutdownHookManager.addShutdownHook(

        StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)

      // Registering Streaming Metrics at the start of the StreamingContext

      assert(env.metricsSystem != null)

      env.metricsSystem.registerSource(streamingSource)

      uiTab.foreach(_.attach())

      logInfo("StreamingContext started")

    case ACTIVE =>

      logWarning("StreamingContext has already been started")

    case STOPPED =>

      throw new IllegalStateException("StreamingContext has already been stopped")

  }

}

初始狀態時,會啟動JobScheduler。

來看下JobScheduler的啟動過程start()。其中啟動了EventLoop、StreamListenerBus、ReceiverTracker和jobGenerator等多項工作。

def start(): Unit = synchronized {

  if (eventLoop != null) return // scheduler has already been started

  logDebug("Starting JobScheduler")

  eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {

    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)

  }

  // 啟動消息循環處理線程。用于處理JobScheduler的各種事件。

  eventLoop.start()

  // attach rate controllers of input streams to receive batch completion updates

  for {

    inputDStream <- ssc.graph.getInputStreams

    rateController <- inputDStream.rateController

  } ssc.addStreamingListener(rateController)

  // 啟動監聽器。用于更新Spark UI中StreamTab的內容。

  listenerBus.start(ssc.sparkContext)

  receiverTracker = new ReceiverTracker(ssc)

  // 生成InputInfoTracker。用于管理所有的輸入的流,以及他們輸入的數據統計。這些信息將通過 StreamingListener監聽。

  inputInfoTracker = new InputInfoTracker(ssc)

  // 啟動ReceiverTracker。用于處理數據接收、數據緩存、Block生成。

  receiverTracker.start()

  // 啟動JobGenerator。用于DStreamGraph初始化、DStream與RDD的轉換、生成Job、提交執行等工作。

  jobGenerator.start()

  logInfo("Started JobScheduler")

}

JobScheduler中的消息處理函數processEvent,處理三類消息:Job已開始,Job已完成,錯誤報告。

private def processEvent(event: JobSchedulerEvent) {

  try {

    event match {

      case JobStarted(job, startTime) => handleJobStart(job, startTime)

      case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)

      case ErrorReported(m, e) => handleError(m, e)

    }

  } catch {

    case e: Throwable =>

      reportError("Error in job scheduler", e)

  }

}

我們再粗略地分析一下JobScheduler.start()中啟動的工作。

先看JobScheduler.start()啟動的第一項工作EventLoop。EventLoop用于處理JobScheduler的各種事件。

EventLoop中有事件隊列:

private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

還有一個線程處理隊列中的事件:

 private val eventThread = new Thread(name) {

    setDaemon(true)

    override def run(): Unit = {

      try {

        while (!stopped.get) {

          val event = eventQueue.take()

          try {

            onReceive(event)

          } catch {

            case NonFatal(e) => {

              try {

                onError(e)

              } catch {

                case NonFatal(e) => logError("Unexpected error in " + name, e)

              }

            }

          }

        }

      } catch {

        case ie: InterruptedException => // exit even if eventQueue is not empty

        case NonFatal(e) => logError("Unexpected error in " + name, e)

      }

    }

  }

這個線程中的onReceive、onError,在JobScheduler中的EventLoop實例化時已定義。
JobScheduler.start()啟動的第二項工作StreamListenerBus。用于異步傳遞StreamingListenerEvents到注冊的StreamingListeners。用于更新Spark UI中StreamTab的內容。

  以下代碼用于傳遞各種事件:

  override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {

    event match {

      case receiverStarted: StreamingListenerReceiverStarted =>

        listener.onReceiverStarted(receiverStarted)

      case receiverError: StreamingListenerReceiverError =>

        listener.onReceiverError(receiverError)

      case receiverStopped: StreamingListenerReceiverStopped =>

        listener.onReceiverStopped(receiverStopped)

      case batchSubmitted: StreamingListenerBatchSubmitted =>

        listener.onBatchSubmitted(batchSubmitted)

      case batchStarted: StreamingListenerBatchStarted =>

        listener.onBatchStarted(batchStarted)

      case batchCompleted: StreamingListenerBatchCompleted =>

        listener.onBatchCompleted(batchCompleted)

      case outputOperationStarted: StreamingListenerOutputOperationStarted =>

        listener.onOutputOperationStarted(outputOperationStarted)

      case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>

        listener.onOutputOperationCompleted(outputOperationCompleted)

      case _ =>

    }

  }

看JobScheduler.start()啟動的第三項工作ReceiverTracker。ReceiverTracker的start()中,內部實例化ReceiverTrackerEndpoint這個Rpc消息通信體。

def start(): Unit = synchronized {

  if (isTrackerStarted) {

    throw new SparkException("ReceiverTracker already started")

  }

  if (!receiverInputStreams.isEmpty) {

    endpoint = ssc.env.rpcEnv.setupEndpoint(

      "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))

    if (!skipReceiverLaunch) launchReceivers()

    logInfo("ReceiverTracker started")

    trackerState = Started

  }

}

在ReceiverTracker啟動的過程中會調用其launchReceivers方法:

/**

 * Get the receivers from the ReceiverInputDStreams, distributes them to the

 * worker nodes as a parallel collection, and runs them.

 */

private def launchReceivers(): Unit = {

  val receivers = receiverInputStreams.map(nis => {

    val rcvr = nis.getReceiver()

    rcvr.setReceiverId(nis.id)

    rcvr

  })

  runDummySparkJob()

  logInfo("Starting " + receivers.length + " receivers")

  endpoint.send(StartAllReceivers(receivers))

}

其中調用了runDummySparkJob方法來啟動Spark Streaming的框架第一個Job,其中collect這個action操作會觸發Spark Job的執行。這個方法是為了確保每個Slave都注冊上,避免所有Receiver都在一個節點,使后面的計算能負載均衡

/**

 * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the

 * receivers to be scheduled on the same node.

 *

 * TODO Should poll the executor number and wait for executors according to

 * "spark.scheduler.minRegisteredResourcesRatio" and

 * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.

 */

private def runDummySparkJob(): Unit = {

  if (!ssc.sparkContext.isLocal) {

    ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()

  }

  assert(getExecutors.nonEmpty)

}

ReceiverTracker.launchReceivers()還調用了endpoint.send(StartAllReceivers(receivers))方法,Rpc消息通信體發送StartAllReceivers消息。

ReceiverTrackerEndpoint它自己接收到消息后,先根據調度策略獲得Recevier在哪個Executor上運行,然后在調用startReceiver(receiver, executors)方法,來啟動Receiver。

override def receive: PartialFunction[Any, Unit] = {

  // Local messages

  case StartAllReceivers(receivers) =>

    val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)

    for (receiver <- receivers) {

      val executors = scheduledLocations(receiver.streamId)

      updateReceiverScheduledExecutors(receiver.streamId, executors)

      receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation

      startReceiver(receiver, executors)

    }

在startReceiver方法中,ssc.sparkContext.submitJob提交Job的時候傳入startReceiverFunc這個方法,因為startReceiverFunc該方法是在Executor上執行的。而在startReceiverFunc方法中是實例化ReceiverSupervisorImpl對象,該對象是對Receiver進行管理和監控。這個Job是Spark Streaming框架為我們啟動的第二個Job,且一直運行。因為supervisor.awaitTermination()該方法會阻塞等待退出。

/**

 * Start a receiver along with its scheduled executors

 */

private def startReceiver(

    receiver: Receiver[_],

    scheduledLocations: Seq[TaskLocation]): Unit = {

  def shouldStartReceiver: Boolean = {

    // It's okay to start when trackerState is Initialized or Started

    !(isTrackerStopping || isTrackerStopped)

  }

  val receiverId = receiver.streamId

  if (!shouldStartReceiver) {

    onReceiverJobFinish(receiverId)

    return

  }

  val checkpointDirOption = Option(ssc.checkpointDir)

  val serializableHadoopConf =

    new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

  // Function to start the receiver on the worker node

  val startReceiverFunc: Iterator[Receiver[_]] => Unit =

    (iterator: Iterator[Receiver[_]]) => {

      if (!iterator.hasNext) {

        throw new SparkException(

          "Could not start receiver as object not found.")

      }

      if (TaskContext.get().attemptNumber() == 0) {

        val receiver = iterator.next()

        assert(iterator.hasNext == false)

        //實例化Receiver監控者

        val supervisor = new ReceiverSupervisorImpl(

          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)

        supervisor.start()

        supervisor.awaitTermination()

      } else {

        // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.

      }

    }

  // Create the RDD using the scheduledLocations to run the receiver in a Spark job

  val receiverRDD: RDD[Receiver[_]] =

    if (scheduledLocations.isEmpty) {

      ssc.sc.makeRDD(Seq(receiver), 1)

    } else {

      val preferredLocations = scheduledLocations.map(_.toString).distinct

      ssc.sc.makeRDD(Seq(receiver -> preferredLocations))

    }

  receiverRDD.setName(s"Receiver $receiverId")

  ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")

  ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

  val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](

    receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())

  // We will keep restarting the receiver job until ReceiverTracker is stopped

  future.onComplete {

    case Success(_) =>

      if (!shouldStartReceiver) {

        onReceiverJobFinish(receiverId)

      } else {

        logInfo(s"Restarting Receiver $receiverId")

        self.send(RestartReceiver(receiver))

      }

    case Failure(e) =>

      if (!shouldStartReceiver) {

        onReceiverJobFinish(receiverId)

      } else {

        logError("Receiver has been stopped. Try to restart it.", e)

        logInfo(s"Restarting Receiver $receiverId")

        self.send(RestartReceiver(receiver))

      }

  }(submitJobThreadPool)

  logInfo(s"Receiver ${receiver.streamId} started")

}

接下來看下ReceiverSupervisorImpl的啟動過程,先啟動所有注冊上的BlockGenerator對象,然后向ReceiverTrackerEndpoint發送RegisterReceiver消息,再調用receiver的onStart方法。

/** Start the supervisor */

def start() {

  onStart()

  startReceiver()

}

其中的onStart():

override protected def onStart() {

  registeredBlockGenerators.foreach { _.start() }

}

其中的startReceiver():

/** Start receiver */

def startReceiver(): Unit = synchronized {

  try {

    if (onReceiverStart()) {

      logInfo("Starting receiver")

      receiverState = Started

      receiver.onStart()

      logInfo("Called receiver onStart")

    } else {

      // The driver refused us

      stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)

    }

  } catch {

    case NonFatal(t) =>

      stop("Error starting receiver " + streamId, Some(t))

  }

}

override protected def onReceiverStart(): Boolean = {

  val msg = RegisterReceiver(

    streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)

  trackerEndpoint.askWithRetry[Boolean](msg)

}

其中在Driver運行的ReceiverTrackerEndpoint對象接收到RegisterReceiver消息后,將streamId, typ, host, executorId, receiverEndpoint封裝為ReceiverTrackingInfo保存到內存對象receiverTrackingInfos這個HashMap中。

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

  // Remote messages

  case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>

    val successful =

      registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)

    context.reply(successful)

  case AddBlock(receivedBlockInfo) =>

    if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {

      walBatchingThreadPool.execute(new Runnable {

        override def run(): Unit = Utils.tryLogNonFatalError {

          if (active) {

            context.reply(addBlock(receivedBlockInfo))

          } else {

            throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")

          }

        }

      })

    } else {

      context.reply(addBlock(receivedBlockInfo))

    }

/** Register a receiver */

private def registerReceiver(

    streamId: Int,

    typ: String,

    host: String,

    executorId: String,

    receiverEndpoint: RpcEndpointRef,

    senderAddress: RpcAddress

  ): Boolean = {

  if (!receiverInputStreamIds.contains(streamId)) {

    throw new SparkException("Register received for unexpected id " + streamId)

  }

  if (isTrackerStopping || isTrackerStopped) {

    return false

  }

  val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations

  val acceptableExecutors = if (scheduledLocations.nonEmpty) {

      // This receiver is registering and it's scheduled by

      // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.

      scheduledLocations.get

    } else {

      // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling

      // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.

      scheduleReceiver(streamId)

    }

  def isAcceptable: Boolean = acceptableExecutors.exists {

    case loc: ExecutorCacheTaskLocation => loc.executorId == executorId

    case loc: TaskLocation => loc.host == host

  }

  if (!isAcceptable) {

    // Refuse it since it's scheduled to a wrong executor

    false

  } else {

    val name = s"${typ}-${streamId}"

    val receiverTrackingInfo = ReceiverTrackingInfo(

      streamId,

      ReceiverState.ACTIVE,

      scheduledLocations = None,

      runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),

      name = Some(name),

      endpoint = Some(receiverEndpoint))

    receiverTrackingInfos.put(streamId, receiverTrackingInfo)

    listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))

    logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)

    true

  }

}

Receiver的啟動,以ssc.socketTextStream("localhost", 9999)為例,創建的是SocketReceiver對象。內部啟動一個線程來連接Socket Server,讀取socket數據并存儲。

private[streaming]

class SocketReceiver[T: ClassTag](

    host: String,

    port: Int,

    bytesToObjects: InputStream => Iterator[T],

    storageLevel: StorageLevel

  ) extends Receiver[T](storageLevel) with Logging {

  def onStart() {

    // Start the thread that receives data over a connection

    new Thread("Socket Receiver") {

      setDaemon(true)

      override def run() { receive() }

    }.start()

  }

  def onStop() {

    // There is nothing much to do as the thread calling receive()

    // is designed to stop by itself isStopped() returns false

  }

  /** Create a socket connection and receive data until receiver is stopped */

  def receive() {

    var socket: Socket = null

    try {

      logInfo("Connecting to " + host + ":" + port)

      socket = new Socket(host, port)

      logInfo("Connected to " + host + ":" + port)

      val iterator = bytesToObjects(socket.getInputStream())

      while(!isStopped && iterator.hasNext) {

        store(iterator.next)

      }

      if (!isStopped()) {

        restart("Socket data stream had no more data")

      } else {

        logInfo("Stopped receiving")

      }

    } catch {

      case e: java.net.ConnectException =>

        restart("Error connecting to " + host + ":" + port, e)

      case NonFatal(e) =>

        logWarning("Error receiving data", e)

        restart("Error receiving data", e)

    } finally {

      if (socket != null) {

        socket.close()

        logInfo("Closed socket to " + host + ":" + port)

      }

    }

  }

}

接下來看JobScheduler.start()中啟動的第四項工作JobGenerator。

JobGenerator有成員RecurringTimer,用于啟動消息系統和定時器。按照batchInterval時間間隔定期發送GenerateJobs消息。

//根據創建StreamContext時傳入的batchInterval,定時發送GenerateJobs消息

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,

  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

JobGenerator的start()方法:

/** Start generation of jobs */

def start(): Unit = synchronized {

  if (eventLoop != null) return // generator has already been started

  // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.

  // See SPARK-10125

  checkpointWriter

  eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {

    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = {

      jobScheduler.reportError("Error in job generator", e)

    }

  }

  // 啟動消息循環處理線程

  eventLoop.start()

  if (ssc.isCheckpointPresent) {

    restart()

  } else {

    // 開啟定時生成Job的定時器

    startFirstTime()

  }

}

JobGenerator.start()中的startFirstTime()的定義:

/** Starts the generator for the first time */

private def startFirstTime() {

  val startTime = new Time(timer.getStartTime())

  graph.start(startTime - graph.batchDuration)

  timer.start(startTime.milliseconds)

  logInfo("Started JobGenerator at " + startTime)

}

JobGenerator.start()中的processEvent()的定義:

/** Processes all events */

private def processEvent(event: JobGeneratorEvent) {

  logDebug("Got event " + event)

  event match {

    case GenerateJobs(time) => generateJobs(time)

    case ClearMetadata(time) => clearMetadata(time)

    case DoCheckpoint(time, clearCheckpointDataLater) =>

      doCheckpoint(time, clearCheckpointDataLater)

    case ClearCheckpointData(time) => clearCheckpointData(time)

  }

}

其中generateJobs的定義:

/** Generate jobs and perform checkpoint for the given `time`.  */

private def generateJobs(time: Time) {

  // Set the SparkEnv in this thread, so that job generation code can access the environment

  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager

  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.

  SparkEnv.set(ssc.env)

  Try {

    // 根據特定的時間獲取具體的數據

    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch

    //調用DStreamGraph的generateJobs生成Job

    graph.generateJobs(time) // generate jobs using allocated block

  } match {

    case Success(jobs) =>

      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

    case Failure(e) =>

      jobScheduler.reportError("Error generating jobs for time " + time, e)

  }

  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

}

/** Perform checkpoint for the give `time`. */

private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {

  if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {

    logInfo("Checkpointing graph for time " + time)

    ssc.graph.updateCheckpointData(time)

    checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)

  }

}

DStreamGraph的generateJobs方法,調用輸出流的generateJob方法來生成Jobs集合。

// 輸出流:具體Action的輸出操作

private val outputStreams = new ArrayBuffer[DStream[_]]()

def generateJobs(time: Time): Seq[Job] = {

  logDebug("Generating jobs for time " + time)

  val jobs = this.synchronized {

    outputStreams.flatMap { outputStream =>

      val jobOption = outputStream.generateJob(time)

      jobOption.foreach(_.setCallSite(outputStream.creationSite))

      jobOption

    }

  }

  logDebug("Generated " + jobs.length + " jobs for time " + time)

  jobs

}

來看下DStream的generateJob方法,調用getOrCompute方法來獲取當Interval的時候,DStreamGraph會被BatchData實例化成為RDD,如果有RDD則封裝jobFunc方法,里面包含context.sparkContext.runJob(rdd, emptyFunc),然后返回封裝后的Job。

/**

 * Generate a SparkStreaming job for the given time. This is an internal method that

 * should not be called directly. This default implementation creates a job

 * that materializes the corresponding RDD. Subclasses of DStream may override this

 * to generate their own jobs.

 */

private[streaming] def generateJob(time: Time): Option[Job] = {

  getOrCompute(time) match {

    case Some(rdd) => {

      val jobFunc = () => {

        val emptyFunc = { (iterator: Iterator[T]) => {} }

        context.sparkContext.runJob(rdd, emptyFunc)

      }

      Some(new Job(time, jobFunc))

    }

    case None => None

  }

}

接下來看JobScheduler的submitJobSet方法,向線程池中提交JobHandler。而JobHandler實現了Runnable 接口,最終調用了job.run()這個方法。看一下Job類的定義,其中run方法調用的func為構造Job時傳入的jobFunc,其包含了context.sparkContext.runJob(rdd, emptyFunc)操作,最終導致Job的提交。

def submitJobSet(jobSet: JobSet) {

  if (jobSet.jobs.isEmpty) {

    logInfo("No jobs added for time " + jobSet.time)

  } else {

    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))

    jobSets.put(jobSet.time, jobSet)

    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))

    logInfo("Added jobs for time " + jobSet.time)

  }

}

private class JobHandler(job: Job) extends Runnable with Logging {

    import JobScheduler._

    def run() {

      try {

        val formattedTime = UIUtils.formatBatchTime(

          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)

        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"

        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(

          s"""Streaming job from $batchLinkText""")

        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)

        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

        // We need to assign `eventLoop` to a temp variable. Otherwise, because

        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then

        // it's possible that when `post` is called, `eventLoop` happens to null.

        var _eventLoop = eventLoop

        if (_eventLoop != null) {

          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))

          // Disable checks for existing output directories in jobs launched by the streaming

          // scheduler, since we may need to write output to an existing directory during checkpoint

          // recovery; see SPARK-4835 for more details.

          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

            job.run()

          }

          _eventLoop = eventLoop

          if (_eventLoop != null) {

            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))

          }

        } else {

          // JobScheduler has been stopped.

        }

      } finally {

        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)

        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)

      }

    }

  }

}

看完上述內容,你們對基于案例分析Spark Streaming流計算框架的運行源碼怎么寫有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

天门市| 新余市| 峨眉山市| 河北区| 潞西市| 贡嘎县| 广汉市| 奉节县| 延津县| 巴彦县| 策勒县| 淮阳县| 涿鹿县| 卢氏县| 游戏| 芮城县| 乐至县| 渭南市| 丹阳市| 绵竹市| 黔江区| 攀枝花市| 白水县| 西宁市| 高台县| 罗江县| 东安县| 弋阳县| 会宁县| 原阳县| 外汇| 保山市| 安阳县| 长子县| 罗城| 苍溪县| 大安市| 南开区| 连城县| 鹤庆县| 吉安县|