中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

PySpark進階--深入剖析wordcount.py

發布時間:2020-08-16 22:04:57 來源:網絡 閱讀:2219 作者:雷頓學院 欄目:大數據

在本文中, 我們借由深入剖析wordcount.py, 來揭開Spark內部各種概念的面紗。我們再次回顧wordcount.py代碼來回答如下問題

  1. 對于大多數語言的Hello Word示例,都有main()函數, wordcount.py的main函數,或者說調用Spark的main() 在哪里

  2. 數據的讀入,各個RDD數據如何轉換

  3. map與flatMap的工作機制,以及區別

  4. reduceByKey的作用

WordCount.py 的代碼如下:

from __future__ import print_functionimport sysfrom operator import add# SparkSession:是一個對Spark的編程入口,取代了原本的SQLContext與HiveContext,方便調用Dataset和DataFrame API# SparkSession可用于創建DataFrame,將DataFrame注冊為表,在表上執行SQL,緩存表和讀取parquet文件。from pyspark.sql import SparkSessionif __name__ == "__main__":    # Python 常用的簡單參數傳入
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        exit(-1)        
    # appName 為 Spark 應用設定一個應用名,改名會顯示在 Spark Web UI 上
    # 假如SparkSession 已經存在就取得已存在的SparkSession,否則創建一個新的。
    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()        
    # 讀取傳入的文件內容,并寫入一個新的RDD實例lines中,此條語句所做工作有些多,不適合初學者,可以截成兩條語句以便理解。
    # map是一種轉換函數,將原來RDD的每個數據項通過map中的用戶自定義函數f映射轉變為一個新的元素。原始RDD中的數據項與新RDD中的數據項是一一對應的關系。
    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])   
    # flatMap與map類似,但每個元素輸入項都可以被映射到0個或多個的輸出項,最終將結果”扁平化“后輸出 
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)                
    # collect() 在驅動程序中將數據集的所有元素作為數組返回。 這在返回足夠小的數據子集的過濾器或其他操作之后通常是有用的。由于collect 是將整個RDD匯聚到一臺機子上,所以通常需要預估返回數據集的大小以免溢出。             
    output = counts.collect()    
    for (word, count) in output:
        print("%s: %i" % (word, count))

    spark.stop()
Spark 入口 SparkSession

Spark2.0中引入了SparkSession的概念,它為用戶提供了一個統一的切入點來使用Spark的各項功能,這邊不妨對照Http Session, 在此Spark就在充當Web service的角色,程序調用Spark功能的時候需要先建立一個Session。因此看到getOrCreate()就很容易理解了, 表明可以視情況新建session或利用已有的session。

    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()

既然將Spark 想象成一個Web server, 也就意味著可能用多個訪問在進行,為了便于監控管理, 對應用命名一個恰當的名稱是個好辦法。Web UI并不是本文的重點,有興趣的同學可以參考 ?Spark Application’s Web Console

加載數據

在建立SparkSession之后, 就是讀入數據并寫入到Dateset中。

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])

為了更好的分解執行過程,是時候借助PySpark了, PySpark是python調用Spark的 API,它可以啟動一個交互式Python Shell。為了方便腳本調試,暫時切換到Linux執行

# pysparkPython 2.7.6 (default, Jun 22 2015, 17:58:13) 
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/02/23 08:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/23 08:30:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/02/23 08:30:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/02/23 08:30:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
SparkSession available as 'spark'.>>> ds = spark.read.text('/home/spark2.1/spark/examples/src/main/python/a.txt')>>> type(ds)
<class 'pyspark.sql.dataframe.DataFrame'>>>> print ds
DataFrame[value: string]>>> lines = ds.rdd

交互式Shell的好處是可以方便的查看變量內容和類型。此刻文件a.txt已經加載到lines中,它是RDD(Resilient Distributed Datasets)彈性分布式數據集的實例。

RDD操作

RDD在內存中的結構可以參考論文, 理解RDD有兩點比較重要:

一是RDD一種只讀、只能由已存在的RDD變換而來的共享內存,然后將所有數據都加載到內存中,方便進行多次重用。

二是RDD的數據默認情況下存放在集群中不同節點的內存中,本身提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDD partition,因為節點故障,導致數據丟了,那么RDD會自動通過自己的數據來源重新計算該partition。

為了探究RDD內部的數據內容,可以利用collect()函數, 它能夠以數組的形式,返回RDD數據集的所有元素。

>>> lines = ds.rdd>>> for i in lines.collect():...     print i... Row(value=u'These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.')

lines存儲的是Row object類型,而我們希望的是對String類型進行處理,所以需要利用map api進一步轉換RDD

>>> lines_map = lines.map(lambda x: x[0])>>> for i in lines_map.collect():...     print i... These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.

為了統計每個單詞的出現頻率,需要對每個單詞分別統計,那么第一步需要將上面的字符串以空格作為分隔符將單詞提取出來,并為每個詞設置一個計數器。比如 These出現次數是1, 我們期望的數據結構是['There', 1]。但是如何將包含字符串的RDD轉換成元素為類似 ['There', 1] 的RDD呢?

>>> flat_map = lines_map.flatMap(lambda x: x.split(' '))>>> rdd_map = flat_map.map(lambda x: [x, 1])>>> for i in rdd_map.collect():...     print i... [u'These', 1]
[u'examples', 1]
[u'give', 1]
[u'a', 1]
[u'quick', 1]

下圖簡要的講述了flatMap 和 map的轉換過程。

PySpark進階--深入剖析wordcount.py

transfrom.png

不難看出,map api只是為所有出現的單詞初始化了計數器為1,并沒有統計相同詞,接下來這個任務由reduceByKey()來完成。在rdd_map 中,所有的詞被視為一個key,而key相同的value則執行reduceByKey內的算子操作,因為統計相同key是累加操作,所以可以直接add操作。

>>> from operator import add>>> add_map = rdd_map.reduceByKey(add)>>> for i in add_map.collect():...     print i... (u'a', 1)
(u'on', 1)
(u'of', 2)
(u'arbitrary', 1)
(u'quick', 1)
(u'the', 2)
(u'or', 1)>>> print rdd_map.count()26>>> print add_map.count()23

根據a.txt 的內容,可知只有 of 和 the 兩個單詞出現了兩次,符合預期。

總結

以上的分解步驟,可以幫我們理解RDD的操作,需要提示的是,RDD將操作分為兩類:transformation與action。無論執行了多少次transformation操作,RDD都不會真正執行運算,只有當action操作被執行時,運算才會觸發。也就是說,上面所有的RDD都是通過collect()觸發的, 那么如果將上述的transformation放入一條簡練語句中, 則展現為原始wordcount.py的書寫形式。

counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)

而真正的action 則是由collect()完成。

output = counts.collect()

至此,已經完成了對wordcount.py的深入剖析,但是有意的忽略了一些更底層的執行過程,比如DAG, stage, 以及Driver程序。



作者:或然子
鏈接:https://www.jianshu.com/p/067907b23546
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權并注明出處。


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

文昌市| 衡水市| 田林县| 镇江市| 卢氏县| 大余县| 晋江市| 盐城市| 斗六市| 西乌| 遂昌县| 原平市| 台东县| 孟津县| 临沭县| 民丰县| 郁南县| 灵石县| 报价| 新蔡县| 新绛县| 深圳市| 长丰县| 罗定市| 锦屏县| 新安县| 天台县| 台州市| 保德县| 柏乡县| 嘉荫县| 原平市| 敦化市| 岳阳县| 太康县| 苍山县| 平顶山市| 定安县| 麟游县| 永兴县| 梨树县|