中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Apache Spark窗口功能的介紹

發布時間:2021-09-14 17:53:03 來源:億速云 閱讀:196 作者:chen 欄目:大數據

這篇文章主要介紹“Apache Spark窗口功能的介紹”,在日常操作中,相信很多人在Apache Spark窗口功能的介紹問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Apache Spark窗口功能的介紹”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

創建Spark DataFrame

現在,我們創建一個示例Spark DataFrame,我們將在整個博客中使用它。 首先,讓我們加載所需的庫。

import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._

現在,我們將使用一些虛擬數據創建DataFrame,這些虛擬數據將用于討論各種窗口函數。

case class Salary(depName: String, empNo: Long, salary: Long)  val empsalary = Seq(   Salary("sales", 1, 5000),   Salary("personnel", 2, 3900),   Salary("sales", 3, 4800),   Salary("sales", 4, 4800),   Salary("personnel", 5, 3500),   Salary("develop", 7, 4200),   Salary("develop", 8, 6000),   Salary("develop", 9, 4500),   Salary("develop", 10, 5200),   Salary("develop", 11, 5200)).toDF()

這是我們的DataFrame的樣子:

Apache Spark窗口功能的介紹

窗口集合函數

讓我們看一些聚合的窗口函數,看看它們如何工作。

首先,我們需要定義窗口的規范。 假設我們要根據部門獲取匯總數據。 因此,在此示例中,我們將基于部門名稱(列:depname)定義窗口。

為聚合函數創建窗口規范

val byDepName = Window.partitionBy("depName")

在窗口上應用聚合函數

現在,在部門內(列:depname),我們可以應用各種聚合函數。 因此,讓我們嘗試查找每個部門的最高和最低工資。  在這里,我們僅選擇了所需的列(depName,max_salary和min_salary),并刪除了重復的記錄。

val agg_sal = empsalary            .withColumn("max_salary", max("salary").over(byDepName))            .withColumn("min_salary", min("salary").over(byDepName))                   agg_sal.select("depname", "max_salary", "min_salary")         .dropDuplicates()         .show()

輸出:

+---------+----------+----------+ |  depname|max_salary|min_salary| +---------+----------+----------+ |  develop|      6000|      4200| |    sales|      5000|      4800| |personnel|      3900|      3500| +---------+----------+----------+

現在讓我們看看它是如何工作的。 我們已經按部門名稱對數據進行了分區:

Apache Spark窗口功能的介紹

現在,當我們執行合計函數時,它將應用于每個分區并返回合計值(在本例中為min和max)。

Apache Spark窗口功能的介紹

注意:可用的匯總函數為最大,最小,總和,平均和計數。

窗口排名功能

在本節中,我們將討論幾種類型的排名函數。

創建用于排序功能的窗口規范

現在,我們要根據員工在部門內的薪水進行排名。 薪水最高的員工將排名第一,薪水最低的員工將排名最后。  在這里,我們將基于部門(列:depname)對數據進行分區,并且在部門內,我們將根據薪水以降序對數據進行排序。

val winSpec = Window.partitionBy("depName").orderBy("salary".desc)

對于每個部門,記錄將根據薪水以降序排序。

Apache Spark窗口功能的介紹

1.等級功能:等級

此函數將返回分區中每個記錄的等級,并跳過任何重復等級之后的后續等級:

val rank_df = empsalary.withColumn("rank", rank().over(winSpec)) rank_df.show()

輸出:

+---------+-----+------+----+ |  depName|empNo|salary|rank| +---------+-----+------+----+ |  develop|    8|  6000|   1| |  develop|   11|  5200|   2| |  develop|   10|  5200|   2| |  develop|    9|  4500|   4| |  develop|    7|  4200|   5| |    sales|    1|  5000|   1| |    sales|    4|  4800|   2| |    sales|    3|  4800|   2| |personnel|    2|  3900|   1| |personnel|    5|  3500|   2| +---------+-----+------+----+

在這里我們可以看到某些等級重復,而有些等級丟失。 例如,在開發部門中,我們有2名等級= 2的員工,而沒有等級=  3的員工,因為等級函數將為相同的值保留相同的等級,并相應地跳過下一個等級。

2.密集等級:densed_rank

此函數將返回分區中每個記錄的等級,但不會跳過任何等級。

val dense_rank_df = empsalary.withColumn("dense_rank", dense_rank().over(winSpec))  dense_rank_df.show()

輸出:

+---------+-----+------+-----------+ |  depName|empNo|salary|desnse_rank| +---------+-----+------+-----------+ |  develop|    8|  6000|          1| |  develop|   10|  5200|          2| |  develop|   11|  5200|          2| |  develop|    9|  4500|          3| |  develop|    7|  4200|          4| |    sales|    1|  5000|          1| |    sales|    3|  4800|          2| |    sales|    4|  4800|          2| |personnel|    2|  3900|          1| |personnel|    5|  3500|          2| +---------+-----+------+-----------+

在這里,我們可以看到某些等級是重復的,但是排名并沒有像我們使用等級功能時那樣丟失。  例如,在開發部門中,我們有2名員工,等級=2。density_rank函數將為相同的值保留相同的等級,但不會跳過下一個等級。

3.行號函數:row_number

此功能將在窗口內分配行號。 如果2行的排序列值相同,則不確定將哪個行號分配給具有相同值的每一行。

val row_num_df = empsalary.withColumn("row_number", row_number().over(winSpec))  row_num_df.show()

輸出:

+---------+-----+------+----------+ |  depName|empNo|salary|row_number| +---------+-----+------+----------+ |  develop|    8|  6000|         1| |  develop|   10|  5200|         2| |  develop|   11|  5200|         3| |  develop|    9|  4500|         4| |  develop|    7|  4200|         5| |    sales|    1|  5000|         1| |    sales|    3|  4800|         2| |    sales|    4|  4800|         3| |personnel|    2|  3900|         1| |personnel|    5|  3500|         2| +---------+-----+------+----------+

4.百分比排名函數:percent_rank

此函數將返回分區中的相對(百分數)等級。

val percent_rank_df = empsalary.withColumn("percent_rank", percent_rank().over(winSpec))  percent_rank_df.show()

輸出:

+---------+-----+------+------------+ |  depName|empNo|salary|percent_rank| +---------+-----+------+------------+ |  develop|    8|  6000|         0.0| |  develop|   10|  5200|        0.25| |  develop|   11|  5200|        0.25| |  develop|    9|  4500|        0.75| |  develop|    7|  4200|         1.0| |    sales|    1|  5000|         0.0| |    sales|    3|  4800|         0.5| |    sales|    4|  4800|         0.5| |personnel|    2|  3900|         0.0| |personnel|    5|  3500|         1.0| +---------+-----+------+------------+

5. N-tile功能:ntile

此功能可以根據窗口規格或分區將窗口進一步細分為n個組。 例如,如果需要將部門進一步劃分為三類,則可以將ntile指定為3。

val ntile_df = empsalary.withColumn("ntile", ntile(3).over(winSpec))  ntile_df.show()

輸出:

+---------+-----+------+-----+ |  depName|empNo|salary|ntile| +---------+-----+------+-----+ |  develop|    8|  6000|    1| |  develop|   10|  5200|    1| |  develop|   11|  5200|    2| |  develop|    9|  4500|    2| |  develop|    7|  4200|    3| |    sales|    1|  5000|    1| |    sales|    3|  4800|    2| |    sales|    4|  4800|    3| |personnel|    2|  3900|    1| |personnel|    5|  3500|    2| +---------+-----+------+-----+

窗口分析功能

接下來,我們將討論諸如累積分布,滯后和超前的分析功能。

1.累積分布函數:cume_dist

此函數提供窗口/分區的值的累積分布。

val winSpec = Window.partitionBy("depName").orderBy("salary")  val cume_dist_df =                empsalary.withColumn("cume_dist",cume_dist().over(winSpec))  cume_dist_df.show()

定義窗口規范并應用cume_dist函數以獲取累積分布。

輸出:

+---------+-----+------+------------------+ |  depName|empNo|salary|         cume_dist| +---------+-----+------+------------------+ |  develop|    7|  4200|               0.2| |  develop|    9|  4500|               0.4| |  develop|   10|  5200|               0.8| |  develop|   11|  5200|               0.8| |  develop|    8|  6000|               1.0| |    sales|    4|  4800|0.6666666666666666| |    sales|    3|  4800|0.6666666666666666| |    sales|    1|  5000|               1.0| |personnel|    5|  3500|               0.5 |personnel|    2|  3900|               1.0| +---------+-----+------+------------------+

2.滯后功能:滯后

此函數將在從DataFrame偏移行之前返回該值。

lag函數采用3個參數(lag(col,count = 1,默認= None)),col:定義需要在其上應用函數的列。 count:需要回顧多少行。  default:定義默認值。

val winSpec = Window.partitionBy("depName").orderBy("salary")  val lag_df =            empsalary.withColumn("lag", lag("salary", 2).over(winSpec))  lag_df.show()

輸出:

+---------+-----+------+----+ |  depName|empNo|salary| lag| +---------+-----+------+----+ |  develop|    7|  4200|null| |  develop|    9|  4500|null| |  develop|   10|  5200|4200| |  develop|   11|  5200|4500| |  develop|    8|  6000|5200| |    sales|    4|  4800|null| |    sales|    3|  4800|null| |    sales|    1|  5000|4800| |personnel|    5|  3500|null| |personnel|    2|  3900|null| +---------+-----+------+----+

例如,讓我們在當前行之前查找薪水2行。

  • 對于depname = develop,salary = 4500。沒有這樣的行,該行在該行之前2行。 因此它將為空。

Apache Spark窗口功能的介紹
  • 對于部門名稱=發展,薪水= 6000(以藍色突出顯示)。 如果我們提前兩排,我們將獲得5200的薪水(以綠色突出顯示)。

Apache Spark窗口功能的介紹

3.導聯功能:導聯

此函數將返回DataFrame的偏移行之后的值。

val winSpec = Window.partitionBy("depName").orderBy("salary")  val lead_df =            empsalary.withColumn("lead", lead("salary", 2).over(winSpec))  lead_df.show()

lead函數采用3個參數(lead(col,count = 1,默認= None))col:定義需要在其上應用函數的列。  count:對于當前行,我們需要向前/向后查找多少行。 default:定義默認值。

輸出:

+---------+-----+------+----+ |  depName|empNo|salary| lag| +---------+-----+------+----+ |  develop|    7|  4200|5200| |  develop|    9|  4500|5200| |  develop|   10|  5200|6000| |  develop|   11|  5200|null| |  develop|    8|  6000|null| |    sales|    3|  4800|5000| |    sales|    4|  4800|null| |    sales|    1|  5000|null| |personnel|    5|  3500|null| |personnel|    2|  3900|null| +---------+-----+------+----+

讓我們嘗試從當前行中查找前進/后兩行的薪水。

  • 對于depname =開發人員,薪水= 4500(以藍色突出顯示)。 如果我們在前進/后退兩行,我們將獲得5200的薪水(以綠色突出顯示)。

Apache Spark窗口功能的介紹
  • 對于depname =人員,薪水=3500。在此分區中,沒有此行向前2行/在該行之后。 因此我們將獲得空值。

Apache Spark窗口功能的介紹
Apache Spark窗口功能的介紹

自定義窗口定義

默認情況下,窗口的邊界由分區列定義,我們可以通過窗口規范指定順序。 例如,對于開發部門,窗口的開始是工資的最小值,窗口的結束是工資的最大值。

但是,如果我們想更改窗口的邊界怎么辦? 以下功能可用于定義每個分區內的窗口。

1. rangeBetween

使用rangeBetween函數,我們可以顯式定義邊界。例如,從當前薪水開始,將其定義為100,然后將其定義為300,并查看其含義。  從100開始表示窗口將從100個單位開始,從當前值開始以300個值結束(包括開始值和結束值)。

val winSpec = Window.partitionBy("depName")           .orderBy("salary")           .rangeBetween(100L, 300L)

定義窗口規格

起始值和結束值后的L表示該值是Scala Long類型。

val range_between_df = empsalary.withColumn("max_salary", max("salary").over(winSpec))  range_between_df.show()

應用自定義窗口規范

輸出:

+---------+-----+------+----------+ |  depName|empNo|salary|max_salary| +---------+-----+------+----------+ |  develop|    7|  4200|      4500| |  develop|    9|  4500|      null| |  develop|   10|  5200|      null| |  develop|   11|  5200|      null| |  develop|    8|  6000|      null| |    sales|    3|  4800|      5000| |    sales|    4|  4800|      5000| |    sales|    1|  5000|      null| |personnel|    5|  3500|      null| |personnel|    2|  3900|      null| +---------+-----+------+----------+

現在,讓我們嘗試了解輸出。

  • 對于depname = developer,salary = 4200,窗口的開始將是(當前值+開始),即4200 + 100  =4300。窗口的結束將是(當前值+結束),即4200 + 300 = 4500。

由于只有一個薪水值在4300到4500之間,包括開發部門的4500,所以我們將4500作為max_salary作為4200(上面的檢查輸出)。

Apache Spark窗口功能的介紹
  • 同樣,對于depname = develop,salary = 4500,窗口將為(開始:4500 + 100 = 4600,結束:4500 +  300 = 4800)。 但是開發部門沒有薪水值在4600到4800之間,因此最大值不會為空(上面的檢查輸出)。

Apache Spark窗口功能的介紹

這里有一些特殊的邊界值可以使用。

  • Window.currentRow:指定一行中的當前值。

  • Window.unboundedPreceding:這可以用于使窗口無限制地開始。

  • Window.unbounded以下:此方法可用于使窗口具有無限的末端。

例如,我們需要從員工工資中找到最高工資,該最高工資大于300。  因此,我們將起始值定義為300L,將結束值定義為Window.unboundedFollowing:

val winSpec = Window.partitionBy("depName").orderBy("salary")              .rangeBetween(300L, Window.unboundedFollowing)  val range_unbounded_df = empsalary.withColumn("max_salary", max("salary").over(winSpec))  range_unbounded_df.show()

輸出:

+---------+-----+------+----------+ |  depName|empNo|salary|max_salary| +---------+-----+------+----------+ |  develop|    7|  4200|      6000| |  develop|    9|  4500|      6000| |  develop|   10|  5200|      6000| |  develop|   11|  5200|      6000| |  develop|    8|  6000|      null| |    sales|    3|  4800|      null| |    sales|    4|  4800|      null| |    sales|    1|  5000|      null| |personnel|    5|  3500|      3900| |personnel|    2|  3900|      null| +---------+-----+------+----------+

因此,對于depname =人員,薪水=3500。窗口將是(開始:3500 + 300 = 3800,結束:無邊界)。  因此,此范圍內的最大值為3900(檢查上面的輸出)。

同樣,對于depname = sales,salary = 4800,窗口將為(開始:4800 + 300、5100,結束:無邊界)。  由于銷售部門的值不大于5100,因此結果為空。

2.rowsBetween

通過rangeBetween,我們使用排序列的值定義了窗口的開始和結束。 但是,我們也可以使用相對行位置定義窗口的開始和結束。

例如,我們要創建一個窗口,其中窗口的開始是當前行之前的一行,結束是當前行之后的一行。

定義自定義窗口規范

val winSpec = Window.partitionBy("depName")             .orderBy("salary").rowsBetween(-1, 1)

應用自定義窗口規范

val rows_between_df = empsalary.withColumn("max_salary", max("salary").over(winSpec))  rows_between_df.show()

輸出:

+---------+-----+------+----------+ |  depName|empNo|salary|max_salary| +---------+-----+------+----------+ |  develop|    7|  4200|      4500| |  develop|    9|  4500|      5200| |  develop|   10|  5200|      5200| |  develop|   11|  5200|      6000| |  develop|    8|  6000|      6000| |    sales|    3|  4800|      4800| |    sales|    4|  4800|      5000| |    sales|    1|  5000|      5000| |personnel|    5|  3500|      3900| |personnel|    2|  3900|      3900| +---------+-----+------+----------+

現在,讓我們嘗試了解輸出。

  • 對于depname =開發,salary = 4500,將定義一個窗口,該窗口在當前行之前和之后一行(以綠色突出顯示)。  因此窗口內的薪水為(4200、4500、5200),最高為5200(上面的檢查輸出)。

Apache Spark窗口功能的介紹
  • 同樣,對于depname = sales,salary = 5000,將在當前行的前后定義一個窗口。  由于此行之后沒有行,因此該窗口將只有2行(以綠色突出顯示),其薪水分別為(4800,5000)和max為5000(上面的檢查輸出)。

Apache Spark窗口功能的介紹

我們還可以像以前使用rangeBetween一樣使用特殊邊界Window.unboundedPreceding,Window.unboundedFollowing和Window.currentRow。

注意:rowsBetween不需要排序,但是我使用它來使每次運行的結果保持一致。

到此,關于“Apache Spark窗口功能的介紹”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

鹰潭市| 周至县| 渭南市| 广昌县| 遂平县| 陵水| 罗城| 拜城县| 建平县| 福鼎市| 罗江县| 迁安市| 横峰县| 峨眉山市| 陵川县| 佛冈县| 杭锦旗| 荔浦县| 马龙县| 赞皇县| 武川县| 二连浩特市| 杨浦区| 济源市| 莆田市| 五大连池市| 哈尔滨市| 浠水县| 涞源县| 普陀区| 彰化县| 石棉县| 灵川县| 黑河市| 岳池县| 怀宁县| 黄平县| 临江市| 雅安市| 长白| 邵武市|