https://www.cnblogs.com/qiuting/p/7880500.htmlhtml
/export/servers/spark/bin/spark-shell --master spark://node01:7077,node02:7077 case class Score(name: String, clazz: Int, score: Int) val scoreDF = spark.sparkContext.makeRDD(Array( Score("a1", 1, 80), Score("a2", 1, 78), Score("a3", 1, 95), Score("a4", 2, 74), Score("a5", 2, 92), Score("a6", 3, 99), Score("a7", 3, 99), Score("a8", 3, 45), Score("a9", 3, 55), Score("a10", 3, 78), Score("a11", 3, 100)) ).toDF("name", "class", "score") scoreDF.createOrReplaceTempView("scores") scoreDF.show() +----+-----+-----+ |name|class|score| +----+-----+-----+ | a1| 1| 80| | a2| 1| 78| | a3| 1| 95| | a4| 2| 74| | a5| 2| 92| | a6| 3| 99| | a7| 3| 99| | a8| 3| 45| | a9| 3| 55| | a10| 3| 78| | a11| 3| 100| +----+-----+-----+
OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。
SQL标准容许将全部聚合函数用作聚合开窗函数。java
spark.sql("select count(name) from scores").show spark.sql("select name, class, score, count(name) over() name_count from scores").show
查询结果以下所示: +----+-----+-----+----------+ |name|class|score|name_count| +----+-----+-----+----------+ | a1| 1| 80| 11| | a2| 1| 78| 11| | a3| 1| 95| 11| | a4| 2| 74| 11| | a5| 2| 92| 11| | a6| 3| 99| 11| | a7| 3| 99| 11| | a8| 3| 45| 11| | a9| 3| 55| 11| | a10| 3| 78| 11| | a11| 3| 100| 11| +----+-----+-----+----------+
OVER 关键字后的括号中还能够添加选项用以改变进行聚合运算的窗口范围。
若是 OVER 关键字后的括号中的选项为空,则开窗函数会对结果集中的全部行进行聚合运算。
开窗函数的 OVER 关键字后括号中的可使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。与 GROUP BY 子句不一样,PARTITION BY 子句建立的分区是独立于结果集的,建立的分区只是供进行聚合计算的,并且不一样的开窗函数所建立的分区也不互相影响。node
下面的 SQL 语句用于显示按照班级分组后每组的人数:
OVER(PARTITION BY class)表示对结果集按照 class 进行分区,而且计算当前行所属的组的聚合计算结果。web
spark.sql("select name, class, score, count(name) over(partition by class) name_count from scores").show
查询结果以下所示: +----+-----+-----+----------+ |name|class|score|name_count| +----+-----+-----+----------+ | a1| 1| 80| 3| | a2| 1| 78| 3| | a3| 1| 95| 3| | a6| 3| 99| 6| | a7| 3| 99| 6| | a8| 3| 45| 6| | a9| 3| 55| 6| | a10| 3| 78| 6| | a11| 3| 100| 6| | a4| 2| 74| 2| | a5| 2| 92| 2| +----+-----+-----+----------+
row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号
注意:
在排序开窗函数中使用 PARTITION BY 子句须要放置在ORDER BY 子句以前。sql
spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 2| | a4| 2| 74| 3| | a2| 1| 78| 4| | a10| 3| 78| 5| | a1| 1| 80| 6| | a5| 2| 92| 7| | a3| 1| 95| 8| | a6| 3| 99| 9| | a7| 3| 99| 10| | a11| 3| 100| 11| +----+-----+-----+----+ spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 5| | a11| 3| 100| 6| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+
rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。
这个函数求出来的排名结果能够并列(并列第一/并列第二),并列排名以后的排名将是并列的排名加上并列数
简单说每一个人只有一种排名,而后出现两个并列第一名的状况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,可是有两个第一名shell
spark.sql("select name, class, score, rank() over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 2| | a4| 2| 74| 3| | a10| 3| 78| 4| | a2| 1| 78| 4| | a1| 1| 80| 6| | a5| 2| 92| 7| | a3| 1| 95| 8| | a6| 3| 99| 9| | a7| 3| 99| 9| | a11| 3| 100| 11| +----+-----+-----+----+ spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 4| | a11| 3| 100| 6| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+
dense_rank() over(order by score) as dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。
这个函数并列排名以后的排名是并列排名加1
简单说每一个人只有一种排名,而后出现两个并列第一名的状况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名svg
spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 2| | a4| 2| 74| 3| | a2| 1| 78| 4| | a10| 3| 78| 4| | a1| 1| 80| 5| | a5| 2| 92| 6| | a3| 1| 95| 7| | a6| 3| 99| 8| | a7| 3| 99| 8| | a11| 3| 100| 9| +----+-----+-----+----+ spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 4| | a11| 3| 100| 5| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+
ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,而后 6 等分红 6 个组,并显示所在组的序号。函数
spark.sql("select name, class, score, ntile(6) over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 1| | a4| 2| 74| 2| | a2| 1| 78| 2| | a10| 3| 78| 3| | a1| 1| 80| 3| | a5| 2| 92| 4| | a3| 1| 95| 4| | a6| 3| 99| 5| | a7| 3| 99| 5| | a11| 3| 100| 6| +----+-----+-----+----+ spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 5| | a11| 3| 100| 6| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+