中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

mapPartitions的簡單介紹及使用方法

發布時間:2021-07-28 09:16:55 來源:億速云 閱讀:729 作者:chen 欄目:大數據

本篇內容介紹了“mapPartitions的簡單介紹及使用方法”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!


1. mappartition簡介

首先,說到mapPartitions大家肯定想到的是map和MapPartitions的對比。大家都知道mapPartition算子是使用一個函數針對分區計算的,函數參數是一個迭代器。而map只針對每條數據調用的,所以存在訪問外部數據庫等情況時mapParititons更加高效。  
mapPartitions函數:
  /**   * Return a new RDD by applying a function to each partition of this RDD.   *   * `preservesPartitioning` indicates whether the input function preserves the partitioner, which   * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.   */  def mapPartitions[U: ClassTag](      f: Iterator[T] => Iterator[U],      preservesPartitioning: Boolean = false): RDD[U] = withScope {    val cleanedF = sc.clean(f)    new MapPartitionsRDD(      this,      (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),      preservesPartitioning)  }
有代碼可知mapPartitions的函數參數是傳入一個迭代器,返回值是另一個迭代器。
map函數:  
  /**   * Return a new RDD by applying a function to all elements of this RDD.   */  def map[U: ClassTag](f: T => U): RDD[U] = withScope {    val cleanF = sc.clean(f)    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))  }
map函數就是將rdd的元素由T類型轉化為U類型。
綜上可知,map和foreach這類的是針對一個元素調用一次我們的函數,也即是我們的函數參數是單個元素,假如函數內部存在數據庫鏈接、文件等的創建及關閉,那么會導致處理每個元素時創建一次鏈接或者句柄,導致性能底下,很多初學者犯過這種毛病。
而foreachpartition/mapPartitions是針對每個分區調用一次我們的函數,也即是我們函數傳入的參數是整個分區數據的迭代器,這樣避免了創建過多的臨時鏈接等,提升了性能。
下面的例子都是1-20這20個數字,經過map完成a*3的轉換:
val a = sc.parallelize(1 to 20, 2)
def mapTerFunc(a : Int) : Int = {a*3}
val mapResult = a.map(mapTerFunc)
println(mapResult.collect().mkString(","))
結果
  
    
  
  
  
    
      
    
    3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60
           

           

3. mappartitions低效用法


大家通常的做法都是申請一個迭代器buffer,將處理后的數據加入迭代器buffer,然后返回迭代器。如下面的demo。
val a = sc.parallelize(1 to 20, 2)  def terFunc(iter: Iterator[Int]) : Iterator[Int] = {     var res = List[Int]()      while (iter.hasNext)   {           val cur = iter.next;     res.::= (cur*3) ;   }    res.iterator}
val result = a.mapPartitions(terFunc)println(result.collect().mkString(","))
結果亂序了,因為我的list是無序的,可以使用LinkList:
30,27,24,21,18,15,12,9,6,3,60,57,54,51,48,45,42,39,36,33

4. mappartitions高效用法

注意,3中的例子,會在mappartition執行期間,在內存中定義一個數組并且將緩存所有的數據。假如數據集比較大,內存不足,會導致內存溢出,任務失敗。對于這樣的案例,Spark的RDD不支持像mapreduce那些有上下文的寫方法。其實,浪尖有個方法是無需緩存數據的,那就是自定義一個迭代器類。如下例:  
  
    
  
  
  
    
      
    
    
class CustomIterator(iter: Iterator[Int]) extends Iterator[Int] {                       def hasNext : Boolean = {                          iter.hasNext                     }                                               def next : Int= {                           val cur = iter.next                       cur*3                     }                   }                    
                  val result = a.mapPartitions(v => new CustomIterator(v))                   println(result.collect().mkString(","))              
           
結果:
   
     
   
   
   3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60

“mapPartitions的簡單介紹及使用方法”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

灵宝市| 广元市| 文安县| 阳谷县| 东安县| 若尔盖县| 会宁县| 福州市| 凌海市| 聂荣县| 天全县| 阿拉善左旗| 威海市| 福泉市| 望江县| 唐河县| 疏附县| 武陟县| 灯塔市| 渝北区| 塘沽区| 锦州市| 莱阳市| 登封市| 丰都县| 永州市| 宁陵县| 洮南市| 禹州市| 连城县| 泰来县| 鞍山市| 天峻县| 福泉市| 遵义县| 筠连县| 繁峙县| 镇原县| 上高县| 报价| 浦北县|