中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行spark python編程

發布時間:2021-12-02 17:33:32 來源:億速云 閱讀:166 作者:柒染 欄目:云計算

本篇文章給大家分享的是有關如何進行spark python編程,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

spark應用程序結構

  Spark應用程序可分兩部分:driver部分和executor部分初始化SparkContext和主體程序

A:driver部分

      driver部分主要是對SparkContext進行配置、初始化以及關閉。初始化SparkContext是為了構建Spark應用程序的運行環境,在初始化SparkContext,要先導入一些Spark的類和隱式轉換;在executor部分運行完畢后,需要將SparkContext關閉。

B:executor部分

      Spark應用程序的executor部分是對數據的處理,數據分三種:

  • 原生數據,包含輸入的數據和輸出的數據

    • 生成Scala標量數據,如count(返回RDD中元素的個數)、reduce、fold/aggregate;返回幾個標量,如take(返回前幾個元素)。

    • 生成Scala集合數據集,如collect(把RDD中的所有元素倒入 Scala集合類型)、lookup(查找對應key的所有值)。

    • 生成hadoop數據集,如saveAsTextFile、saveAsSequenceFile

    • scala集合數據集,如Array(1,2,3,4,5),Spark使用parallelize方法轉換成RDD。

    • hadoop數據集,Spark支持存儲在hadoop上的文件和hadoop支持的其他文件系統,如本地文件、HBase、SequenceFile和Hadoop的輸入格式。例如Spark使用txtFile方法可以將本地文件或HDFS文件轉換成RDD。

    • 對于輸入原生數據,Spark目前提供了兩種:

    • 對于輸出數據,Spark除了支持以上兩種數據,還支持scala標量

  • RDD,Spark進行并行運算的基本單位,其細節參見RDD 細解。RDD提供了四種算子:

    • 窄依賴算子

    • 寬依賴算子,寬依賴會涉及shuffle類,在DAG圖解析時以此為邊界產生Stage,如圖所示。

    • 輸入輸出一對一的算子,且結果RDD的分區結構不變,主要是map、flatMap;

    • 輸入輸出一對一,但結果RDD的分區結構發生了變化,如union、coalesce;

    • 從輸入中選擇部分元素的算子,如filter、distinct、subtract、sample。

    • 對單個RDD基于key進行重組和reduce,如groupByKey、reduceByKey;

    • 對兩個RDD基于key進行join和重組,如join、cogroup。

    • 輸入算子,將原生數據轉換成RDD,如parallelize、txtFile等

    • 轉換算子,最主要的算子,是Spark生成DAG圖的對象,轉換算子并不立即執行,在觸發行動算子后再提交給driver處理,生成DAG圖 -->  Stage --> Task  --> Worker執行。按轉化算子在DAG圖中作用,可以分成兩種:

    • 緩存算子,對于要多次使用的RDD,可以緩沖加快運行速度,對重要數據可以采用多備份緩存。

    • 行動算子,將運算結果RDD轉換成原生數據,如count、reduce、collect、saveAsTextFile等。

  • 共享變量,在Spark運行時,一個函數傳遞給RDD內的patition操作時,該函數所用到的變量在每個運算節點上都復制并維護了一份,并且各個節點之間不會相互影響。但是在Spark Application中,可能需要共享一些變量,提供Task或驅動程序使用。Spark提供了兩種共享變量:

    • 廣播變量,可以緩存到各個節點的共享變量,通常為只讀,使用方法:

    • >>> from pyspark.context import SparkContext                    >>> sc = SparkContext('local', 'test')                           >>> b = sc.broadcast([1, 2, 3, 4, 5])                                    >>> b.value[1, 2, 3, 4, 5]                                                        >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
    • 累計器,只支持加法操作的變量,可以實現計數器和變量求和。用戶可以調用SparkContext.accumulator(v)創建一個初始值為v的累加器,而運行在集群上的Task可以使用“+=”操作,但這些任務卻不能讀取;只有驅動程序才能獲取累加器的值。使用方法:

如何進行spark python編程

python編程

實驗項目

sogou日志數據分析

實驗數據來源:sogou精簡版數據下載地址

數據格式說明:

訪問時間\t用戶ID\t[查詢詞]\t該URL在返回結果中的排名\t用戶點擊的順序號\t用戶點擊的URL

其中,用戶ID是根據用戶使用瀏覽器訪問搜索引擎時的Cookie信息自動賦值,即同一次使用瀏覽器輸入的不同查詢對應同一個用戶ID。

以上數據格式是官方說明,實際上該數據集中排名和順序號之間不是\t分割,而是空格分割。

一個session內查詢次數最多的用戶的session與相應的查詢次數

import sys  
from pyspark import SparkContext  
  
if __name__ == "__main__":  
    if len(sys.argv) != 2:  
        print >> sys.stderr, "Usage: SogouC <file>"  
        exit(-1)  
    sc = SparkContext(appName="SogouC")  
    sgRDD = sc.textFile(sys.argv[1])  
    print sgRDD.filter(lambda line : len(line.split('\t')) == 5).map(lambda line : (line.split('\t')[1],1)).reduceByKey(lambda x , y : x + y ).map(lambda pair : (pair[1],pair[0])).sortByKey(False).map(lambda pair : (pair[1],pair[0])).take(10)  
    sc.stop()

虛擬集群中任意節點運行命令:./bin/spark-submit --master spark://hadoop1:7077 --executor-memory 3g --driver-memory 1g SogouC.py hdfs://hadoop1:8000/dataguru/data/mini.txt

運行結果:[(u'11579135515147154', 431), (u'6383499980790535', 385), (u'7822241147182134', 370), (u'900755558064074', 335), (u'12385969593715146', 226), (u'519493440787543', 223), (u'787615177142486', 214), (u'502949445189088', 210), (u'2501320721983056', 208), (u'9165829432475153', 201)]

以上就是如何進行spark python編程,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

潞城市| 元谋县| 乌苏市| 永川市| 肥乡县| 类乌齐县| 察隅县| 上犹县| 定襄县| 天柱县| 泾阳县| 南京市| 阿坝县| 凤山市| 连州市| 兰州市| 靖宇县| 滨海县| 游戏| 固阳县| 上犹县| 睢宁县| 阳东县| 云浮市| 永泰县| 富裕县| 克什克腾旗| 库尔勒市| 大城县| 山西省| 巧家县| 合江县| 宝鸡市| 上思县| 文安县| 张家界市| 康定县| 广平县| 揭阳市| 香港| 赤水市|