中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark driver端得到executor返回值的方法

發布時間:2021-11-12 17:16:14 來源:億速云 閱讀:221 作者:柒染 欄目:大數據

這篇文章將為大家詳細講解有關Spark driver端得到executor返回值的方法,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

有人說spark的代碼不優雅,這個浪尖就忍不了了。實際上,說spark代碼不優雅的主要是對scala不熟悉,spark代碼我覺得還是很贊的,最值得閱讀的大數據框架之一。

今天這篇文章不是為了爭辯Spark 代碼優雅與否,主要是講一下理解了spark源碼之后我們能使用的一些小技巧吧。

spark 使用的時候,總有些需求比較另類吧,比如有球友問過這樣一個需求:

浪尖,我想要在driver端獲取executor執行task返回的結果,比如task是個規則引擎,我想知道每條規則命中了幾條數據,請問這個怎么做呢?

這個是不是很騷氣,也很常見,按理說你輸出之后,在mysql里跑條sql就行了,但是這個往往顯的比較麻煩。而且有時候,在 driver可能還要用到這些數據呢?具體該怎么做呢?

大部分的想法估計是collect方法,那么用collect如何實現呢?大家自己可以考慮一下,我只能告訴你不簡單,不如輸出到數據庫里,然后driver端寫sql分析一下。

還有一種考慮就是使用自定義累加器。這樣就可以在executor端將結果累加然后在driver端使用,不過具體實現也是很麻煩。大家也可以自己琢磨一下下~

那么,浪尖就給大家介紹一個比較常用也比較騷的操作吧。

其實,這種操作我們最先想到的應該是count函數,因為他就是將task的返回值返回到driver端,然后進行聚合的。我們可以從idea count函數點擊進去,可以看到

  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

也即是sparkcontext的runJob方法。

Utils.getIteratorSize _這個方法主要是計算每個iterator的元素個數,也即是每個分區的元素個數,返回值就是元素個數:

/**   * Counts the number of elements of an iterator using a while loop rather than calling   * [[scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower   * in the current version of Scala.   */  def getIteratorSize[T](iterator: Iterator[T]): Long = {    var count = 0L    while (iterator.hasNext) {      count += 1L      iterator.next()    }    count  }

然后就是runJob返回的是一個數組,每個數組的元素就是我們task執行函數的返回值,然后調用sum就得到我們的統計值了。

那么我們完全可以借助這個思路實現我們開頭的目標。浪尖在這里直接上案例了:

import org.apache.spark.{SparkConf, SparkContext, TaskContext}import org.elasticsearch.hadoop.cfg.ConfigurationOptions
object es2sparkRunJob {
 def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)
   conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1")    conf.set(ConfigurationOptions.ES_PORT, "9200")    conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")    conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")    conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")    conf.set("es.write.rest.error.handlers", "ignoreConflict")    conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")
   val sc = new SparkContext(conf)    import org.elasticsearch.spark._
   val rdd = sc.esJsonRDD("posts").repartition(10)
   rdd.count()    val func = (itr : Iterator[(String,String)]) => {      var count = 0      itr.foreach(each=>{        count += 1      })      (TaskContext.getPartitionId(),count)    }
   val res = sc.runJob(rdd,func)
   res.foreach(println)
   sc.stop()  }}

例子中driver端獲取的就是每個task處理的數據量。

關于Spark driver端得到executor返回值的方法就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

鄄城县| 东乌| 平塘县| 大名县| 瓦房店市| 手机| 恩平市| 临清市| 肇州县| 禹城市| 广宗县| 吉林市| 永福县| 曲沃县| 中宁县| 福泉市| 东莞市| 鹤壁市| 鄂伦春自治旗| 永丰县| 张家口市| 临夏县| 墨脱县| 福鼎市| 昆明市| 平顺县| 铜陵市| 城口县| 福贡县| 铜川市| 广西| 中阳县| 巴彦淖尔市| 黔西| 尼勒克县| 淄博市| 大冶市| 增城市| 南溪县| 平潭县| 洪湖市|