Spark SQL:从入门到精通(五)[开窗函数]

概述

https://www.cnblogs.com/qiuting/p/7880500.htmlhtml

  • 介绍:
    开窗函数的引入是为了既显示汇集前的数据,又显示汇集后的数据。即在每一行的最后一列添加聚合函数的结果。
    开窗用于为行定义一个窗口(这里的窗口是指运算将要操做的行的集合),它对一组值进行操做,不须要使用 GROUP BY 子句对数据进行分组,可以在同一行中同时返回基础行的列和聚合列。
  • 聚合函数和开窗函数
    聚合函数是将多行变成一行,count,avg…
    开窗函数是将一行变成多行;
    聚合函数若是要显示其余的列必须将列加入到group by中
    开窗函数能够不使用group by,直接将全部信息显示出来
  • 开窗函数分类
  1. 聚合开窗函数
    聚合函数(列) OVER(选项),这里的选项能够是PARTITION BY 子句,但不能够是 ORDER BY 子句。
  2. 排序开窗函数
    排序函数(列) OVER(选项),这里的选项能够是ORDER BY 子句,也能够是 OVER(PARTITION BY 子句 ORDER BY 子句),但不能够是 PARTITION BY 子句。

准备工做

/export/servers/spark/bin/spark-shell --master spark://node01:7077,node02:7077
 
case class Score(name: String, clazz: Int, score: Int)
val scoreDF = spark.sparkContext.makeRDD(Array(
Score("a1", 1, 80),
Score("a2", 1, 78),
Score("a3", 1, 95),
Score("a4", 2, 74),
Score("a5", 2, 92),
Score("a6", 3, 99),
Score("a7", 3, 99),
Score("a8", 3, 45),
Score("a9", 3, 55),
Score("a10", 3, 78),
Score("a11", 3, 100))
).toDF("name", "class", "score")
scoreDF.createOrReplaceTempView("scores")
scoreDF.show()
+----+-----+-----+
|name|class|score|
+----+-----+-----+
|  a1|    1|   80|
|  a2|    1|   78|
|  a3|    1|   95|
|  a4|    2|   74|
|  a5|    2|   92|
|  a6|    3|   99|
|  a7|    3|   99|
|  a8|    3|   45|
|  a9|    3|   55|
| a10|    3|   78|
| a11|    3|  100|
+----+-----+-----+

聚合开窗函数

示例1

OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。
SQL标准容许将全部聚合函数用作聚合开窗函数。java

spark.sql("select count(name) from scores").show
spark.sql("select name, class, score, count(name) over() name_count from scores").show
查询结果以下所示:
+----+-----+-----+----------+                                                   
|name|class|score|name_count|
+----+-----+-----+----------+
|  a1|    1|   80|        11|
|  a2|    1|   78|        11|
|  a3|    1|   95|        11|
|  a4|    2|   74|        11|
|  a5|    2|   92|        11|
|  a6|    3|   99|        11|
|  a7|    3|   99|        11|
|  a8|    3|   45|        11|
|  a9|    3|   55|        11|
| a10|    3|   78|        11|
| a11|    3|  100|        11|
+----+-----+-----+----------+

示例2

OVER 关键字后的括号中还能够添加选项用以改变进行聚合运算的窗口范围。
若是 OVER 关键字后的括号中的选项为空,则开窗函数会对结果集中的全部行进行聚合运算。
开窗函数的 OVER 关键字后括号中的可使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。与 GROUP BY 子句不一样,PARTITION BY 子句建立的分区是独立于结果集的,建立的分区只是供进行聚合计算的,并且不一样的开窗函数所建立的分区也不互相影响。node

下面的 SQL 语句用于显示按照班级分组后每组的人数:
OVER(PARTITION BY class)表示对结果集按照 class 进行分区,而且计算当前行所属的组的聚合计算结果。web

spark.sql("select name, class, score, count(name) over(partition by class) name_count from scores").show
查询结果以下所示: 
+----+-----+-----+----------+                                                   
|name|class|score|name_count|
+----+-----+-----+----------+
|  a1|    1|   80|         3|
|  a2|    1|   78|         3|
|  a3|    1|   95|         3|
|  a6|    3|   99|         6|
|  a7|    3|   99|         6|
|  a8|    3|   45|         6|
|  a9|    3|   55|         6|
| a10|    3|   78|         6|
| a11|    3|  100|         6|
|  a4|    2|   74|         2|
|  a5|    2|   92|         2|
+----+-----+-----+----------+

排序开窗函数

ROW_NUMBER顺序排序

row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号
注意:
在排序开窗函数中使用 PARTITION BY 子句须要放置在ORDER BY 子句以前。sql

示例1

spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
|  a4|    2|   74|   3|
|  a2|    1|   78|   4|
| a10|    3|   78|   5|
|  a1|    1|   80|   6|
|  a5|    2|   92|   7|
|  a3|    1|   95|   8|
|  a6|    3|   99|   9|
|  a7|    3|   99|  10|
| a11|    3|  100|  11|
+----+-----+-----+----+
spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   5|
| a11|    3|  100|   6|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+

RANK跳跃排序

rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。
这个函数求出来的排名结果能够并列(并列第一/并列第二),并列排名以后的排名将是并列的排名加上并列数
简单说每一个人只有一种排名,而后出现两个并列第一名的状况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,可是有两个第一名shell

示例2

spark.sql("select name, class, score, rank() over(order by score) rank from scores").show()                                                     
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
|  a4|    2|   74|   3|
| a10|    3|   78|   4|
|  a2|    1|   78|   4|
|  a1|    1|   80|   6|
|  a5|    2|   92|   7|
|  a3|    1|   95|   8|
|  a6|    3|   99|   9|
|  a7|    3|   99|   9|
| a11|    3|  100|  11|
+----+-----+-----+----+
spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   4|
| a11|    3|  100|   6|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+

DENSE_RANK连续排序

dense_rank() over(order by score) as dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。
这个函数并列排名以后的排名是并列排名加1
简单说每一个人只有一种排名,而后出现两个并列第一名的状况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名svg

示例3

spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
|  a4|    2|   74|   3|
|  a2|    1|   78|   4|
| a10|    3|   78|   4|
|  a1|    1|   80|   5|
|  a5|    2|   92|   6|
|  a3|    1|   95|   7|
|  a6|    3|   99|   8|
|  a7|    3|   99|   8|
| a11|    3|  100|   9|
+----+-----+-----+----+
spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   4|
| a11|    3|  100|   5|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+

NTILE分组排名[了解]

ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,而后 6 等分红 6 个组,并显示所在组的序号。函数

示例4

spark.sql("select name, class, score, ntile(6) over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   1|
|  a4|    2|   74|   2|
|  a2|    1|   78|   2|
| a10|    3|   78|   3|
|  a1|    1|   80|   3|
|  a5|    2|   92|   4|
|  a3|    1|   95|   4|
|  a6|    3|   99|   5|
|  a7|    3|   99|   5|
| a11|    3|  100|   6|
+----+-----+-----+----+
spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   5|
| a11|    3|  100|   6|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+