中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark Streaming + Spark SQL如何實現配置化ETL

發布時間:2021-11-15 23:47:58 來源:億速云 閱讀:187 作者:柒染 欄目:云計算

本篇文章給大家分享的是有關Spark Streaming + Spark SQL如何實現配置化ETL,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

傳統的Spark Streaming程序需要:

  • 構建StreamingContext

  • 設置checkpoint

  • 鏈接數據源

  • 各種transform

  • foreachRDD 輸出

通常而言,你可能會因為要走完上面的流程而構建了一個很大的程序,比如一個main方法里上百行代碼,雖然在開發小功能上足夠便利,但是復用度更方面是不夠的,而且不利于協作,所以需要一個更高層的開發包提供支持。

如何開發一個Spark Streaming程序

我只要在配置文件添加如下一個job配置,就可以作為標準的的Spark Streaming 程序提交運行:

{

  "test": {
    "desc": "測試",
    "strategy": "streaming.core.strategy.SparkStreamingStrategy",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "streaming.core.compositor.kafka.MockKafkaStreamingCompositor",
        "params": [
          {
            "metadata.broker.list":"xxx",
            "auto.offset.reset":"largest",
            "topics":"xxx"
          }
        ]      },
      {
        "name": "streaming.core.compositor.spark.JSONTableCompositor",
        "params": [{"tableName":"test"}
        ]      },
      {
        "name": "streaming.core.compositor.spark.SQLCompositor",
        "params": [{"sql":"select a from test"}
        ]      },
      {
        "name": "streaming.core.compositor.RDDPrintOutputCompositor",
        "params": [
          {
          }
        ]      }
    ],
    "configParams": {
    }  }}

上面的配置相當于完成了如下的一個流程:

  1. 從Kafka消費數據

  2. 將Kafka數據轉化為表

  3. 通過SQL進行處理

  4. 打印輸出

是不是很簡單,而且還可以支持熱加載,動態添加job等

特性

該實現的特性有:

  1. 配置化

  2. 支持多Job配置

  3. 支持各種數據源模塊

  4. 支持通過SQL完成數據處理

  5. 支持多種輸出模塊

未來可擴展的支持包含:

  1. 動態添加或者刪除job更新,而不用重啟Spark Streaming

  2. 支持Storm等其他流式引擎

  3. 更好的多job互操作

配置格式說明

該實現完全基于ServiceframeworkDispatcher 完成,核心功能大概只花了三個小時。

這里我們先理出幾個概念:

  1. Spark Streaming 定義為一個App

  2. 每個Action定義為一個Job.一個App可以包含多個Job

配置文件結構設計如下:

{  "job1": {    "desc": "測試",    "strategy": "streaming.core.strategy.SparkStreamingStrategy",    "algorithm": [],    "ref": [],    "compositor": [
      {        "name": "streaming.core.compositor.kafka.MockKafkaStreamingCompositor",        "params": [
          {            "metadata.broker.list":"xxx",            "auto.offset.reset":"largest",            "topics":"xxx"
          }
        ]
      } ,  
    ],    "configParams": {
    }
  },  "job2":{
   ........
 } 
}

一個完整的App 對應一個配置文件。每個頂層配置選項,如job1,job2分別對應一個工作流。他們最終都會運行在一個App上(Spark Streaming實例上)。

  • strategy 用來定義如何組織 compositor,algorithm, ref 的調用關系

  • algorithm作為數據來源

  • compositor 數據處理鏈路模塊。大部分情況我們都是針對該接口進行開發

  • ref 是對其他job的引用。通過配合合適的strategy,我們將多個job組織成一個新的job

  • 每個組件( compositor,algorithm, strategy) 都支持參數配置

上面主要是解析了配置文件的形態,并且ServiceframeworkDispatcher 已經給出了一套接口規范,只要照著實現就行。

模塊實現

那對應的模塊是如何實現的?本質是將上面的配置文件,通過已經實現的模塊,轉化為Spark Streaming程序。

以SQLCompositor 的具體實現為例:

class SQLCompositor[T] extends Compositor[T] {  private var _configParams: util.List[util.Map[Any, Any]] = _  val logger = Logger.getLogger(classOf[SQLCompositor[T]].getName)//策略引擎ServiceFrameStrategy 會調用該方法將配置傳入進來
  override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {    this._configParams = configParams
  }// 獲取配置的sql語句
  def sql = {
    _configParams(0).get("sql").toString
  }  def outputTable = {
    _configParams(0).get("outputTable").toString
  }//執行的主方法,大體是從上一個模塊獲取SQLContext(已經注冊了對應的table),//然后根據該模塊的配置,設置查詢語句,最后得到一個新的dataFrame.// middleResult里的T其實是DStream,我們會傳遞到下一個模塊,Output模塊//params參數則是方便各個模塊共享信息,這里我們將對應處理好的函數傳遞給下一個模塊
  override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {    var dataFrame: DataFrame = null
    val func = params.get("table").asInstanceOf[(RDD[String]) => SQLContext]
    params.put("sql",(rdd:RDD[String])=>{      val sqlContext = func(rdd)
      dataFrame = sqlContext.sql(sql)
      dataFrame
    })
    middleResult
  }
}

上面的代碼就完成了一個SQL模塊。那如果我們要完成一個自定義的.map函數呢?可類似下面的實現:

abstract class MapCompositor[T,U] extends Compositor[T]{  private var _configParams: util.List[util.Map[Any, Any]] = _  val logger = Logger.getLogger(classOf[SQLCompositor[T]].getName)  override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {    this._configParams = configParams
  }  override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {    val dstream = middleResult(0).asInstanceOf[DStream[String]]    val newDstream = dstream.map(f=>parseLog(f))    List(newDstream.asInstanceOf[T])
  }  def parseLog(line:String): U}class YourCompositor[T,U] extends MapCompositor[T,U]{ override def parseLog(line:String):U={
     ....your logical
  }
}

同理你可以實現filter,repartition等其他函數。

該方式提供了一套更為高層的API抽象,用戶只要關注具體實現而無需關注Spark的使用。同時也提供了一套配置化系統,方便構建數據處理流程,并且復用原有的模塊,支持使用SQL進行數據處理。

以上就是Spark Streaming + Spark SQL如何實現配置化ETL,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

潞西市| 阿拉善右旗| 香格里拉县| 宽甸| 宕昌县| 阿合奇县| 博罗县| 宜兴市| 买车| 正安县| 武川县| 芜湖县| 洛扎县| 家居| 云和县| 高密市| 酒泉市| 肥城市| 淮阳县| 峡江县| 高唐县| 安溪县| 报价| 石泉县| 梁河县| 内乡县| 连城县| 浦江县| 同仁县| 威海市| 称多县| 长葛市| 准格尔旗| 甘泉县| 随州市| 依安县| 贵定县| 阳高县| 民县| 思茅市| 那曲县|