spark sql中的窗口函数

databricks博客给出的窗口函数概述html

Spark SQL supports three kinds of window functions: ranking functions, analytic functions, and aggregate functions. The available ranking functions and analytic functions are summarized in the table below. For aggregate functions, users can use any existing aggregate function as a window function.python

窗口函数包含3种:sql

  1. ranking 排名类
  2. analytic 分析类
  3. aggregate 聚合类

ranking 和 analytic 见下表,全部已经存在的聚合类函数(sum、avg、max、min)均可以做为窗口函数。shell

|Function Type| SQL| DataFrame API| |--|--|--| |Ranking |rank | rank | |Ranking |dense_rank|denseRank| |Ranking |percent_rank |percentRank| |Ranking |ntile|ntile| |Ranking |row_number|rowNumber| |Analytic |cume_dist|cumeDist| |Analytic |first_value |firstValue| |Analytic |last_value |lastValue| |Analytic |lag|lag| |Analytic |lead|lead|express

先用案例说明apache

案例数据:/root/score.json/score.json,学生名字、课程、分数json

{"name":"A","lesson":"Math","score":100}
{"name":"B","lesson":"Math","score":100}
{"name":"C","lesson":"Math","score":99}
{"name":"D","lesson":"Math","score":98}
{"name":"A","lesson":"E","score":100}
{"name":"B","lesson":"E","score":99}
{"name":"C","lesson":"E","score":99}
{"name":"D","lesson":"E","score":98}
./spark-shell --master local #本地启动spark-shell
import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.hive.HiveContext


    sc.setLogLevel("WARN") // 日志级别,可不改
    val hiveContext = new HiveContext(sc)

    val df = hiveContext.read.json("file:///root/score.json")
    case class Score(val name: String, val lesson: String, val score: Int)

    df.registerTempTable("score") // 注册临时表

    // SQL语句
    val stat = "select".
      concat(" name,lesson,score, ").
      concat(" ntile(2) over (partition by lesson order by score desc ) as ntile_2,").
      concat(" ntile(3) over (partition by lesson order by score desc ) as ntile_3,").
      concat(" row_number() over (partition by lesson order by score desc ) as row_number,").
      concat(" rank() over (partition by lesson order by score desc ) as rank, ").
      concat(" dense_rank() over (partition by lesson order by score desc ) as dense_rank, ").
      concat(" percent_rank() over (partition by lesson order by score desc ) as percent_rank ").
      concat(" from score ").
      concat(" order by lesson,name,score")

    hiveContext.sql(stat).show // 执行语句获得的结果

/**
 * 用DataFrame API的方式完成相同的功能。
**/
    val window_spec = Window.partitionBy("lesson").orderBy(df("score").desc) // 窗口函数中公用的子句

    df.select(df("name"), df("lesson"), df("score"),
      ntile(2).over(window_spec).as("ntile_2"),
      ntile(3).over(window_spec).as("ntile_3"),
      row_number().over(window_spec).as("row_number"),
      rank().over(window_spec).as("rank"),
      dense_rank().over(window_spec).as("dense_rank"),
      percent_rank().over(window_spec).as("percent_rank")
    ).orderBy("lesson", "name", "score").show
  • 输出结果彻底同样,以下表所示
name lesson score ntile_2 ntile_3 row_number rank dense_rank percent_rank
A E 100 1 1 1 1 1 0.0
B E 99 1 1 2 2 2 0.3333333333333333
C E 99 2 2 3 2 2 0.3333333333333333
D E 98 2 3 4 4 3 1.0
A Math 100 1 1 1 1 1 0.0
B Math 100 1 1 2 1 1 0.0
C Math 99 2 2 3 3 2 0.6666666666666666
D Math 98 2 3 4 4 3 1.0
  • rank遇到相同的数据则rank并列,所以rank值多是不连续的
  • dense_rank遇到相同的数据则rank并列,可是rank值必定是连续的
  • row_number 很单纯的行号,相似excel的行号,不会由于数据相同而rank的值重复或者有间隔
  • percent_rank = 相同的分组中 (rank -1) / ( count(score) - 1 )
  • ntile(n) 是将同一组数据 循环的往n个 桶中放,返回对应的桶的index,index从1开始。
  • 结合官方博客的python调用dataframe API的写法可知,scala的写法几乎和python的同样。官方博客的地址见最下面的参考。

上面的案例,每一个分组中全部的数据都参与到窗口函数中计算了。考虑下面一种场景:less

  1. 各科成绩 与 该科成绩的 最高分、最高分、平均分相差多少。每一行与此行所属分组聚合后的值再作计算。参与窗口计算的数据是绝对的,就是此行所属的窗口内的全部数据。
  2. 各科成绩按从高到低排序后,比前一名相差多少。每一行与此行的前一行的值相关。参与窗口计算的数据是相对于当前行的。
// 各科成绩和最高分、最高分、平均分差多少分
    // 各科成绩按从高到低排序后,比前一名差多少分
    val window_clause = Window.partitionBy(df("lesson")).orderBy(df("score").desc)
    val window_spec2 = window_clause.rangeBetween(-Int.MaxValue, Int.MaxValue) // 绝对范围
    val window_spec3 = window_clause.rowsBetween(-1, 0) // 相对范围,-1:当前行的前一行,
    df.select(
      df("name"),
      df("lesson"),
      df("score"),
      // 窗口内的第一行的score-当前的行score
      (df("score") - first("score").over(window_spec3)).as("score-last_score"), 

      // 各科成绩和最高分、最高分、平均分差多少分
      (min(df("score")).over(window_spec2)).as("min_score"),
      (df("score") - min(df("score")).over(window_spec2)).as("score-min"),
      (max(df("score")).over(window_spec2)).as("max_score"),
      (df("score") - max(df("score")).over(window_spec2)).as("score-max"),
      (avg(df("score")).over(window_spec2)).as("avg_score"),
      (df("score") - avg(df("score")).over(window_spec2)).as("score-avg")
    ).
      orderBy("lesson", "name", "score").
      show
name lesson score score-last_score min_score score-min max_score score-max avg_score score-avg
A E 100 0 98 2 100 0 99.0 1.0
B E 99 -1 98 1 100 -1 99.0 0.0
C E 99 0 98 1 100 -1 99.0 0.0
D E 98 -1 98 0 100 -2 99.0 -1.0
A Math 100 0 98 2 100 0 99.25 0.75
B Math 100 0 98 2 100 0 99.25 0.75
C Math 99 -1 98 1 100 -1 99.25 -0.25
D Math 98 -1 98 0 100 -2 99.25 -1.25

未完待续函数

  • Analytic functions类型的解析
  • 源码解析

参考:spa

  1. percent_rank
  2. databricks博客
相关文章
相关标签/搜索