中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

(版本定制)第9課:Spark Streaming源碼解讀之

發布時間:2020-02-24 20:06:32 來源:網絡 閱讀:520 作者:Spark_2016 欄目:大數據

本期內容:

    1、Receiver啟動方式的設想

    2、Receiver啟動源碼徹底分析

一:Receiver啟動方式的設想 
1. Spark Streaming通過Receiver持續不斷的從外部數據源接收數據,并把數據匯報給Driver端,由此每個Batch Durations就可以根據匯報的數據生成不同的Job。 
2. Receiver是在Spark Streaming應用程序啟動時啟動的,那么我們找Receiver在哪里啟動就應該去找Spark Streaming的啟動。 
3. Receivers和InputDStreams是一一對應的,默認情況下一般只有一個Receiver.

如何啟動Receiver? 
1. 從Spark Core的角度來看,Receiver的啟動Spark Core并不知道,就相當于Linux的內核之上所有的都是應用程序,因此Receiver是通過Job的方式啟動的

2. 一般情況下,只有一個Receiver,但是可以創建不同的數據來源的InputDStream.

final private[streaming] class DStreamGraph extends Serializable with Logging {

  private val inputStreams = new ArrayBuffer[InputDStream[_]]() //數組
  private val outputStreams = new ArrayBuffer[DStream[_]]()
3.  啟動Receiver的時候,啟動一個Job,這個Job里面有RDD的transformations操作和action的操作,這個Job只有一個partition.這個partition的特殊是里面只有一個成員,
這個成員就是啟動的Receiver.
4.  這樣做的問題:
a)  如果有多個InputDStream,那就要啟動多個Receiver,每個Receiver也就相當于分片partition,那我們啟動Receiver的時候理想的情況下是在不同的機器上啟動Receiver,
但是Spark Core的角度來看就是應用程序,感覺不到Receiver的特殊性,所以就會按照正常的Job啟動的方式來處理,極有可能在一個Executor上啟動多個Receiver.
這樣的話就可能導致負載不均衡。
b)  有可能啟動Receiver失敗,只要集群存在Receiver就不應該失敗。
c)  運行過程中,就默認的而言如果是一個partition的話,那啟動的時候就是一個Task,但是此Task也很可能失敗,因此以Task啟動的Receiver也會掛掉。

由此,可以得出,對于Receiver失敗的話,后果是非常嚴重的,那么Spark Streaming如何防止這些事的呢,下面就尋找Receiver的創建

這里先給出答案,后面源碼會詳細分析: 
a) Spark使用一個Job啟動一個Receiver.最大程度的保證了負載均衡。 
b) Spark Streaming指定每個Receiver運行在哪些Executor上。 
c) 如果Receiver啟動失敗,此時并不是Job失敗,在內部會重新啟動Receiver.

接下來我們通過代碼一步一步解析Receiver是如何啟動的

1、首先我們在編寫具體的應用程序的時候,都會調用StreamingContext的start方法,其實這就是job啟動的源頭,我們先來看下start方法的源碼:

def start(): Unit = synchronized {
  state match {
    case INITIALIZED =>
      startSite.set(DStream.getCreationSite())
      StreamingContext.ACTIVATION_LOCK.synchronized {
        StreamingContext.assertNoOtherContextIsActive()
        try {
          validate()

          // Start the streaming scheduler in a new thread, so that thread local properties
          // like call sites and job groups can be reset without affecting those of the
          // current thread.
          ThreadUtils.runInNewThread("streaming-start") {
            sparkContext.setCallSite(startSite.get)
            sparkContext.clearJobGroup()
            sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
            scheduler.start() //啟動JobScheduler的start方法,啟動子線程,一方面為了本地初始化工作,另外一方面是不要阻塞主線程。
          }
          state = StreamingContextState.ACTIVE
        } catch {
          case NonFatal(e) =>
            logError("Error starting the context, marking it as stopped", e)
            scheduler.stop(false)
            state = StreamingContextState.STOPPED
            throw e
        }
        StreamingContext.setActiveContext(this)
      }
      shutdownHookRef = ShutdownHookManager.addShutdownHook(
        StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
      // Registering Streaming Metrics at the start of the StreamingContext
      assert(env.metricsSystem != null)
      env.metricsSystem.registerSource(streamingSource)
      uiTab.foreach(_.attach())
      logInfo("StreamingContext started")
    case ACTIVE =>
      logWarning("StreamingContext has already been started")
    case STOPPED =>
      throw new IllegalStateException("StreamingContext has already been stopped")
  }
}

2、上面調用start方法的時候,會調用JobScheduler的start()方法,在該方法里面,receiverTracker啟動了,源碼如下:

def start(): Unit = synchronized {
  if (eventLoop != null) return // scheduler has already been started

  logDebug("Starting JobScheduler")
  eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
  }
  eventLoop.start()
  // attach rate controllers of input streams to receive batch completion updates
  for {
    inputDStream <- ssc.graph.getInputStreams
    rateController <- inputDStream.rateController
  } ssc.addStreamingListener(rateController)

  listenerBus.start(ssc.sparkContext)
  receiverTracker = new ReceiverTracker(ssc)
  inputInfoTracker = new InputInfoTracker(ssc)
  receiverTracker.start() //啟動Receiver
  jobGenerator.start()
  logInfo("Started JobScheduler")
}

3、我們接著看下receiverTracker的start()方法,在start方法里啟動了RPC消息通信體,為啥呢?因為receiverTracker會監控整個集群中的Receiver,Receiver轉過來要向ReceiverTrackerEndpoint匯報自己的狀態,接收的數據,包括生命周期等信息

/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
  if (isTrackerStarted) {
    throw new SparkException("ReceiverTracker already started")
  }

  if (!receiverInputStreams.isEmpty) { //Receiver的啟動是依據數據流的
    endpoint = ssc.env.rpcEnv.setupEndpoint(
      "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) //匯報狀態信息
    if (!skipReceiverLaunch) launchReceivers() //發起Receiver
    logInfo("ReceiverTracker started")
    trackerState = Started
  }
}

4、基于ReceiverInputDStream(是在Driver端)來獲得具體的Receivers實例,然后再把他們分不到Worker節點上。一個ReceiverInputDStream只產生一個Receiver


/**
 * Get the receivers from the ReceiverInputDStreams, distributes them to the
 * worker nodes as a parallel collection, and runs them.
 */
private def launchReceivers(): Unit = {
  val receivers = receiverInputStreams.map(nis => {
    //一個輸入數據來源只產生一個Receiver
    val rcvr = nis.getReceiver()
    rcvr.setReceiverId(nis.id)
    rcvr
  })

  runDummySparkJob() //啟動虛擬Job來分配Receiver到不同的executor上

  logInfo("Starting " + receivers.length + " receivers")
  endpoint.send(StartAllReceivers(receivers))
}

5、其中runDummySparkJob()為了確保所有節點活著,而且避免所有的receivers集中在一個節點上。

private def runDummySparkJob(): Unit = {
  if (!ssc.sparkContext.isLocal) {
    ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
  }
  assert(getExecutors.nonEmpty)
}

ReceiverInputDStream中的getReceiver()方法獲得receiver對象然后將它發送到worker節點上實例化receiver,然后去接收數據。 
此方法必須要在子類中實現。

/**
 * Gets the receiver object that will be sent to the worker nodes
 * to receive data. This method needs to defined by any specific implementation
 * of a ReceiverInputDStream.
 */
def getReceiver(): Receiver[T]

ReceiverInputDStream是抽象類,所以getReceiver方法必須要在繼承的子類中實現

private[streaming]
class SocketInputDStream[T: ClassTag](
    ssc_ : StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](ssc_) {

  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel) //調用SocketReceiver
  }
}

private[streaming]
class SocketReceiver[T: ClassTag](
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends Receiver[T](storageLevel) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() } //啟動線程,調用Receiver方法
    }.start()
  }

在receive()方法中啟動socket接收數據

  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    var socket: Socket = null
    try {
      logInfo("Connecting to " + host + ":" + port)
      socket = new Socket(host, port) //根據我們應用程序傳入的host和post創建socket對象
      logInfo("Connected to " + host + ":" + port)
      val iterator = bytesToObjects(socket.getInputStream()) //接收數據
      while(!isStopped && iterator.hasNext) {
        store(iterator.next) //接收后的數據進行存儲
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
      case e: java.net.ConnectException =>
        restart("Error connecting to " + host + ":" + port, e)
      case NonFatal(e) =>
        logWarning("Error receiving data", e)
        restart("Error receiving data", e)
    } finally {
      if (socket != null) {
        socket.close()
        logInfo("Closed socket to " + host + ":" + port)
      }
    }
  }
}
6、ReceiverTrackerEndpoint源碼如下:
override def receive: PartialFunction[Any, Unit] = {
  // Local messages
  case StartAllReceivers(receivers) =>
    val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) // receivers就是要啟動的receiver,getExecutors獲得集群中的Executors的列表
    for (receiver <- receivers) {
      val executors = scheduledLocations(receiver.streamId)
      updateReceiverScheduledExecutors(receiver.streamId, executors)
      receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
      startReceiver(receiver, executors) //循環receivers,每次將一個receiver傳入過去。
    }
  case RestartReceiver(receiver) =>
    // Old scheduled executors minus the ones that are not active any more
    val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
    val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
        // Try global scheduling again
        oldScheduledExecutors
      } else {
        val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
        // Clear "scheduledLocations" to indicate we are going to do local scheduling
        val newReceiverInfo = oldReceiverInfo.copy(
          state = ReceiverState.INACTIVE, scheduledLocations = None)
        receiverTrackingInfos(receiver.streamId) = newReceiverInfo
        schedulingPolicy.rescheduleReceiver(
          receiver.streamId,
          receiver.preferredLocation,
          receiverTrackingInfos,
          getExecutors)
      }
    // Assume there is one receiver restarting at one time, so we don't need to update
    // receiverTrackingInfos
    startReceiver(receiver, scheduledLocations)
  case c: CleanupOldBlocks =>
    receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
  case UpdateReceiverRateLimit(streamUID, newRate) =>
    for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
      eP.send(UpdateRateLimit(newRate))
    }
  // Remote messages
  case ReportError(streamId, message, error) =>
    reportError(streamId, message, error)
}
從注釋中可以看到,Spark Streaming指定receiver在那些Executors運行,而不是基于Spark Core中的Task來指定。
通過StartAllReceivers將消息發送給ReceiverTrackerEndpoint

在for循環中為每個receiver分配相應的executor。并調用startReceiver方法:

Receiver是以job的方式啟動的!!! 這里你可能會有疑惑,沒有RDD和來的Job呢?首先,在startReceiver方法中,會將Receiver封裝成RDD

receiverRDD: RDD[Receiver[_]] =
  (scheduledLocations.isEmpty) {
    ssc..makeRDD((receiver))
  } {
    preferredLocations = scheduledLocations.map(_.toString).distinct
    ssc..makeRDD((receiver -> preferredLocations))
  }

封裝成RDD后,將RDD提交到集群中運行

future = ssc.sparkContext.submitJob[Receiver[_]](
  receiverRDDstartReceiverFunc()(__) => ())

task被發送到executor中,從RDD中取出“Receiver”然后對它執行startReceiverFunc:

// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
  (iterator: Iterator[Receiver[_]]) => {
    if (!iterator.hasNext) {
      throw new SparkException(
        "Could not start receiver as object not found.")
    }
    if (TaskContext.get().attemptNumber() == 0) {
      val receiver = iterator.next()
      assert(iterator.hasNext == false)
      val supervisor = new ReceiverSupervisorImpl( //Receiver注冊
        receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
      supervisor.start() //啟動Receiver
      supervisor.awaitTermination()
    } else {
      // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
    }
  }

在函數中創建了一個ReceiverSupervisorImpl對象。它用來管理具體的Receiver。

首先它會將Receiver注冊到ReceiverTracker中
override protected def onReceiverStart(): Boolean = {
 val msg = RegisterReceiver(
   streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
 trackerEndpoint.askWithRetry[Boolean](msg)
}

如果注冊成功,通過supervisor.start()來啟動Receiver


/** Start the supervisor */
def start() {
  onStart()
  startReceiver() //啟動Receiver
}
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
  case Success(_) =>
    if (!shouldStartReceiver) {
      onReceiverJobFinish(receiverId)
    } else {
      logInfo(s"Restarting Receiver $receiverId")
      self.send(RestartReceiver(receiver))
    }
  case Failure(e) =>
    if (!shouldStartReceiver) {
      onReceiverJobFinish(receiverId)
    } else {
      logError("Receiver has been stopped. Try to restart it.", e)
      logInfo(s"Restarting Receiver $receiverId")
      self.send(RestartReceiver(receiver))
    }
}(submitJobThreadPool)
logInfo(s"Receiver ${receiver.streamId} started")

回到receiverTracker的startReceiver方法中,只要Receiver對應的Job結束了(無論是正常還是異常結束),而ReceiverTracker還沒有停止。
它將會向ReceiverTrackerEndpoint發送一個ReStartReceiver的方法。

// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
  case Success(_) =>
    if (!shouldStartReceiver) {
      onReceiverJobFinish(receiverId)
    } else {
      logInfo(s"Restarting Receiver $receiverId")
      self.send(RestartReceiver(receiver))
    }
  case Failure(e) =>
    if (!shouldStartReceiver) {
      onReceiverJobFinish(receiverId)
    } else {
      logError("Receiver has been stopped. Try to restart it.", e)
      logInfo(s"Restarting Receiver $receiverId")
      self.send(RestartReceiver(receiver))
    }
}(submitJobThreadPool)
logInfo(s"Receiver ${receiver.streamId} started")

重新為Receiver選擇一個executor,并再次運行Receiver。直到ReceiverTracker啟動為止。

 
在ReceiverTracker的receive方法中startReceiver方法第一個參數就是receiver,從實現的可以看出for循環不斷取出receiver,然后調用startReceiver。由此就可以得出一個Job只啟動一個Receiver. 
如果Receiver啟動失敗,此時并不會認為是作業失敗,會重新發消息給ReceiverTrackerEndpoint重新啟動Receiver,這樣也就確保了Receivers一定會被啟動,這樣就不會像Task啟動Receiver的話如果失敗受重試次數的影響。

簡單的流程圖:

(版本定制)第9課:Spark Streaming源碼解讀之


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

临江市| 康平县| 西峡县| 五原县| 鱼台县| 阳江市| 四会市| 崇明县| 拜城县| 洛阳市| 永宁县| 抚顺市| 应城市| 叙永县| 常宁市| 芒康县| 保德县| 巴中市| 泾阳县| 高尔夫| 天峨县| 海南省| 文山县| 东乌珠穆沁旗| 方城县| 台安县| 黄浦区| 八宿县| 旺苍县| 兴国县| 大城县| 建水县| 伽师县| 个旧市| 新昌县| 新绛县| 凉山| 枝江市| 新源县| 沙洋县| 关岭|