中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

pyspark?dataframe列的合并與拆分方法是什么

發布時間:2023-05-11 16:33:34 來源:億速云 閱讀:113 作者:iii 欄目:開發技術

這篇文章主要介紹了pyspark dataframe列的合并與拆分方法是什么的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇pyspark dataframe列的合并與拆分方法是什么文章都會有所收獲,下面我們一起來看看吧。

    pyspark dataframe列的合并與拆分

    使用Spark SQL在對數據進行處理的過程中,可能會遇到對一列數據拆分為多列,或者把多列數據合并為一列。

    這里記錄一下目前想到的對DataFrame列數據進行合并和拆分的幾種方法。

    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .master("local") \
        .appName("dataframe_split") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
     
    sc = spark.sparkContext
    df = spark.read.csv('hdfs://master:9000/dataset/dataframe_split.csv', inferSchema=True, header=True)
    df.show(3)

    原始數據如下所示

    pyspark?dataframe列的合并與拆分方法是什么

    dataframe列數據的分割

    from pyspark.sql.functions import split, explode, concat, concat_ws
    df_split = df.withColumn("s", split(df['score'], " "))
    df_split.show()

     pyspark?dataframe列的合并與拆分方法是什么

    dataframe列數據的拆分

    zipWithIndex:給每個元素生成一個索引 

    排序首先基于分區索引,然后是每個分區內的項目順序.因此,第一個分區中的第一個item索引為0,最后一個分區中的最后一個item的索引最大.當RDD包含多個分區時此方法需要觸發spark作業.

    pyspark?dataframe列的合并與拆分方法是什么

    first_row = df.first()
    numAttrs = len(first_row['score'].split(" "))
    print("新增列的個數", numAttrs)
    attrs = sc.parallelize(["score_" + str(i) for i in range(numAttrs)]).zipWithIndex().collect()
    print("列名:", attrs)
    for name, index in attrs:
        df_split = df_split.withColumn(name, df_split['s'].getItem(index))
    df_split.show()

     pyspark?dataframe列的合并與拆分方法是什么

    dataframe將一行分成多行

    df_explode = df.withColumn("e", explode(split(df['score'], " ")))
    df_explode.show()

    pyspark?dataframe列的合并與拆分方法是什么

    dataframe列數據的合并

    列的合并有兩個函數:一個不添加分隔符concat(),一個添加分隔符concat_ws()

    concat

    df_concat = df_split.withColumn("score_concat", concat(df_split['score_0'], \
                                                           df_split['score_1'], df_split['score_2'], df_split['score_3']))
    df_concat.show()

     pyspark?dataframe列的合并與拆分方法是什么

    caoncat_ws

    df_ws = df_split.withColumn("score_concat", concat_ws('-', df_split['score_0'], \
                                                           df_split['score_1'], df_split['score_2'], df_split['score_3']))
    df_ws.show()

    pyspark?dataframe列的合并與拆分方法是什么

    dataframe多行轉多列

    pivot: 旋轉當前[[dataframe]]列并執行指定的聚合 

    #DataFrame 數據格式:每個用戶對每部電影的評分 userID 用戶ID,movieID 電影ID,rating評分
    df=spark.sparkContext.parallelize([[15,399,2], \
                                       [15,1401,5], \
                                       [15,1608,4], \
                                       [15,20,4], \
                                       [18,100,3], \
                                       [18,1401,3], \
                                       [18,399,1]])\
                        .toDF(["userID","movieID","rating"])
    #pivot 多行轉多列
    resultDF = df.groupBy("userID").pivot("movieID").sum("rating").na.fill(-1)
    #結果
    resultDF.show()

    pyspark dataframe常用操作

    總體原則

    pyspark中,dataframe與sql的耗時會經引擎優化,效率高于rdd,因此盡可能使用dataframe或者sql。執行效率之外,dataframe優于rdd的另一個好處是:dataframe的各個量有語義信息,便于后期維護。比如rdd[0][1][1]這種很難維護,但是,df.info.school.grade就容易理解。

    在使用dataframe過程中,應盡量避免使用udf,因為序列化數據原本在JVM中,現在spark在worker上啟動一個Python進程,需要將全體數據序列化成python可解釋的格式,計算昂貴。

    列相關

    根據已有列生成新列

    from pyspark.sql.functions import length, col, lit, size
    df.withColumn("length_col", length(col("existing_str_col"))) # 將existing_str_col的長度生成新列
    df.withColumn("constant_col", lit("hello")) # 生成一列常量
    df.withColumn("size_col", size(col("existing_array_col"))) # 將existing_array_col的元素個數生成新列

    從已有列選擇部分列

    from pyspark.sql.functions import col
    df = df.select(col("col_1").cast("string"), col("col_2").alias("col_2_")) # 選擇col_1列和col_2列,并將col_1列轉換為string格式,將col_2列重命名為col_2_,此時不再存在col_2

    將幾列連接起來形成新列

    from pyspark.sql.functions import concat_ws
     
    df = df.withColumn("concat_col", concat_ws("_", df.col_1, df.col_2)) # 使用_將col_1和col_2連接起來,構成新列concat_col

    將string列分割成list

    from pyspark.sql.functions import split
     
    df = df.withColumn("split_col", split(df.col, "-")) #按照-將df中的col列分割,此時split_col時一個list,后續或者配合filter(length(...))使用

    統計列均值

    from pyspark.sql.functions import mean
     
    col_mean = df.select(mean(col)).collect()[0][0]

    行相關

    從全體行中選擇部分行(一般調試時使用)

    print(df.take(5)) #交互式的pyspark shell中,等價于df.show(5)

    統計行數量

    print(df.count()) #統計行數量

    從全體行中篩選出部分行

    from pyspark.sql.functions import col
    df = df.filter(col("col_1")==col("col_2")) #保留col_1等于col_2的行

    刪除帶null的行

    df.na.drop("all") # 只有當所有列都為空時,刪除該行
    df.na.drop("any") # 任意列為空時,刪除該行
    df.na.drop("all", colsubset=["col_1","col_2"]) # 當col_1和col_2都為空時,刪除該行

    去除重復行

    df = df.distinct() # 刪除所有列值相同的重復行
    df = df.dropDuplicates(["date", "count"]) # 刪除date, count兩列值相同的行

    一行拆分成多行

    from pyspark.sql.functions import explode, split
     
    df = df.withColumn("sub_str", explode(split(df["str_col"], "_"))) # 將str_col按-拆分成list,list中的每一個元素成為sub_str,與原行中的其他列一起組成新的行

    填補行中的空值

    df.na.fill({"col_name":fill_content}) # 用fill_content填補col_name列的空值

    行前加入遞增(不一定連續)唯一序號

    from pyspark.sql.functions import monotonically_increasing_id
     
    df = df.withColumn("id", monotonically_increasing_id())

    兩個dataframe

    兩個dataframe根據某列拼接

    df_3 = df_1.join(df_2, df_1.col_1==df_2.col_2) # inner join, 只有當df_1中的col_1列值等于df_2中的col_2時,才會拼接
    df_4 = df_1.join(df_2, df_1.col_1==df_2.col_2, "left") # left join, 當df_1中的col_1值不存在于df_2中時,仍會拼接,憑借值填充null

    兩個dataframe合并

    df3 = df1.union(df2)

    聚合操作

    groupBy
    from pyspark.sql.functions import concat_ws, split, explode, collect_list, struct
     
    concat_df = concat_df.groupBy("sample_id", "sample_date").agg(collect_list('feature').alias("feature_list")) # 將同sample_id, sample_date的行聚合成組,feature字段拼成一個list,構成新列feature_list。agg配合groupBy使用,效果等于select。此時concat_df只有兩列:sample_id和feature_list。
    concat_tuple_df = concat_df.groupBy("sample_id", "sample_date").agg(collect_list(struct("feature", "owner")).alias("tuple")) # 將同sample_id, sample_date的行聚合成組, (feature, owner)兩個字段拼成一個單位,組內所有單位拼成一個list,構成新列tuple

    窗口函數

    from pyspark.sql.window import Window
    from pyspark.sql.functions import col, row_number
     
    windowSpec = Window.partitionBy(df.id, df.date).orderBy(col("price").desc(), col("discount").asc()) # 相同id,date的行被聚成組,組內按照price降序,discount升序進行排列
    df = df.withColumn("rank", row_number().over(windowSpec)) #為排序之后的組進行組內編號
    df = df.filter(df.rank<=1) # 取組內top-1行

    讀寫成csv

    from pyspark.sql import SparkSession
    from pyspark import SparkContext
     
    sc = SparkContext(appName="test_rw")
    sc_session = SparkSession(sc)
    df.write.mode("overwrite").options(header="true").csv(output_path)
    df = sc_session.csv.read(input_path, header=True)

    dataframe轉SQL

    from pyspark import SparkContext
    from pyspark.sql import SparkSession
     
    sc = SparkContext(appName='get_sample')
    sc_session = SparkSession(sc)
     
    sample_df.createOrReplaceTempView("item_sample_df")
    sample_df = sc_session.sql(
            '''
                select sample_id
                    ,label
                    ,type_ as type
                    ,split(item_id, "_")[2] as owner
                    ,ftime
                from item_sample_df
            ''')

    自定義函數UDF(如非必要,勿用)

    from pysprak.sql.functions import udf, col
    from pyspark.sql.types import StringType, ArrayType, StructField, StructType
     
     
    def simple_func(v1, v2):
        pass
        # return str
     
    simple_udf = udf(my_func, StringType())
     
    df = df.withColumn("new", simple_udf(df["col_1"], df["col_2"]))
     
     
     
    # 復雜type
     
    def get_entity_func():
        pass
        # return str_list_1, str_list_2
     
    entity_schema = StructType([
                        StructField("location", ArrayType(StringType()), True),
                        StructField("nondigit", ArrayType(StringType()), True)
                    ])
     
    get_entity_udf = udf(get_entity_func, entity_schema)

    dataframe與rdd互相轉換

    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField, StringType, FloatType
     
     
    sc = SparkContext(appName="rdd2df")
    sc_session = SparkSession(sc)
     
    rdd = df.rdd # df轉rdd, 注意每列仍帶header,要map(lambda line: [line.id, line.price])才可以轉換成不帶header
     
    schema = StructType([
                        StructField("id", StringType(), True),
                        StructField("price", FloatType(), True)
                        ])
    df = sc_session.createDataFrame(rdd, schema) # rdd轉df

    關于“pyspark dataframe列的合并與拆分方法是什么”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“pyspark dataframe列的合并與拆分方法是什么”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    丹寨县| 屯留县| 潢川县| 精河县| 信宜市| 石楼县| 尼玛县| 莱阳市| 威远县| 石首市| 富平县| 武川县| 黄石市| 凭祥市| 和田县| 乡宁县| 新余市| 昭平县| 湘潭县| 自治县| 清河县| 穆棱市| 彩票| 祁连县| 泾源县| 普安县| 峨边| 建水县| 区。| 荔波县| 巨野县| 邵武市| 河北区| 陇西县| 沂水县| 余江县| 太仆寺旗| 陆良县| 于都县| 久治县| 柳林县|