中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark結構化流處理機制之容錯機制的示例分析

發布時間:2021-12-16 11:24:47 來源:億速云 閱讀:176 作者:小新 欄目:云計算

這篇文章給大家分享的是有關Spark結構化流處理機制之容錯機制的示例分析的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

容錯機制

  端到端的有且僅有一次保證,是結構化流設計的關鍵目標之一.

  結構化流設計了 Structured Streaming sources,sinks等等,來跟蹤確切的處理進度,并讓其重啟或重運行來處理任何故障

  streaming source是類似kafka的偏移量(offsets)來跟蹤流的讀取位置.執行引擎使用檢查點(checkpoint)和預寫日志(write ahead logs)來記錄每個執行其的偏移范圍值

  streaming sinks 是設計用來保證處理的冪等性

  這樣,依靠可回放的數據源(streaming source)和處理冪等(streaming sinks),結構流來做到任何故障下的端到端的有且僅有一次保證

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

其中,spark是SparkSession,lines是DataFrame,DataFrame就是Dataset[Row]。

DataSet

看看Dataset的觸發因子的代碼實現,比如foreach操作:

def foreach(f: T => Unit): Unit = withNewRDDExecutionId {

    rdd.foreach(f)

  }



 private def withNewRDDExecutionId[U](body: => U): U = {

    SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) {

      rddQueryExecution.executedPlan.foreach { plan =>

        plan.resetMetrics()

      }

      body

    }

  }

接著看:

 def withNewExecutionId[T](

      sparkSession: SparkSession,

      queryExecution: QueryExecution,

      name: Option[String] = None)(body: => T): T = {

    val sc = sparkSession.sparkContext

    val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)

    val executionId = SQLExecution.nextExecutionId

    sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)

    executionIdToQueryExecution.put(executionId, queryExecution)

    try {     

      withSQLConfPropagated(sparkSession) {       

        try {         

          body

        } catch {         

        } finally {         

        }

      }

    } finally {

      executionIdToQueryExecution.remove(executionId)

      sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)

    }

  }

執行的真正代碼就是 queryExecution: QueryExecution。 

@transient private lazy val rddQueryExecution: QueryExecution = {

    val deserialized = CatalystSerde.deserialize[T](logicalPlan)

    sparkSession.sessionState.executePlan(deserialized)

  }

看到了看到了,是sessionState.executePlan執行logicalPlan而得到了QueryExecution

這里的sessionState.executePlan其實就是創建了一個QueryExecution對象。然后執行QueryExecution的executedPlan方法得到SparkPlan這個物理計劃。怎么生成的呢?

lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {

    SparkSession.setActiveSession(sparkSession)   

    planner.plan(ReturnAnswer(optimizedPlan.clone())).next()

  }

通過planner.plan方法生成。

planner是SparkPlanner。在BaseSessionStateBuilder類中定義。

protected def planner: SparkPlanner = {

    new SparkPlanner(session.sparkContext, conf, experimentalMethods) {

      override def extraPlanningStrategies: Seq[Strategy] =

        super.extraPlanningStrategies ++ customPlanningStrategies

    }

  }

SparkPlanner類

SparkPlanner對LogicalPlan執行各種策略,返回對應的SparkPlan。比如對于流應用來說,有這樣的策略:DataSourceV2Strategy。

典型的幾個邏輯計劃到物理計劃的映射關系如下:

StreamingDataSourceV2Relation-》ContinuousScanExec

StreamingDataSourceV2Relation-》MicroBatchScanExec

前一種對應與Offset沒有endOffset的情況,后一種對應于有endOffset的情況。前一種是沒有結束的連續流,后一種是有區間的微批處理流。

前一種的時延可以達到1ms,后一種的時延只能達到100ms。

【代碼】:

case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isDefined =>

      val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]

      val scanExec = MicroBatchScanExec(

        r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)

      val withProjection = if (scanExec.supportsColumnar) {

        scanExec

      } else {

        // Add a Project here to make sure we produce unsafe rows.

        ProjectExec(r.output, scanExec)

      }

      withProjection :: Nil

    case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isEmpty =>

      val continuousStream = r.stream.asInstanceOf[ContinuousStream]

      val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get)

      val withProjection = if (scanExec.supportsColumnar) {

        scanExec

      } else {

        // Add a Project here to make sure we produce unsafe rows.

        ProjectExec(r.output, scanExec)

      }

      withProjection :: Nil

感謝各位的閱讀!關于“Spark結構化流處理機制之容錯機制的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

来宾市| 尼勒克县| 包头市| 西乌珠穆沁旗| 翁牛特旗| 襄垣县| 安乡县| 明光市| 潮州市| 芜湖县| 麻城市| 江城| 长沙市| 博乐市| 南郑县| 安多县| 舟山市| 湖州市| 措美县| 潢川县| 鄂托克前旗| 霍州市| 闵行区| 玛沁县| 永寿县| 阿勒泰市| 景宁| 临夏县| 韩城市| 南召县| 乌拉特中旗| 偏关县| 温泉县| 封丘县| 东台市| 新津县| 堆龙德庆县| 兴文县| 唐山市| 西平县| 郸城县|