Spark聚合开窗与自定义累加器的高级应用-Spark商业应用实战

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归做者(秦凯新)全部,禁止转载,欢迎学习。git

1 Spark开窗函数与聚合函数

1.1 Spark开窗函数与聚合函数的区别

开窗函数与聚合函数同样,都是对行的集合组进行聚合计算。可是二者却有本质区别,待我细细给你道来,绝对让你震撼无穷。github

  • 开窗函数用于为行定义一个窗口(这里的窗口是指运算将要操做的行的集合),它是对一组值进行操做,不须要使用 GROUP BY 子句对数据进行分组,可以在同一行中同时返回基础行的列和聚合列。极端点说:能够返回全部行的同时外带开窗聚合的列。可是 基于GROUP BY 进行聚合是不行的,由于select中不容许出现非GROUP BY 字段。sql

  • 聚合函数则不一样:不容许同时返回全部列的同时外带聚合(sum,max 等)多行的列。json

2 Spark聚合开窗函数使用技巧

开窗函数的调用格式为: 函数名(列) OVER(选项)函数

  • 第一大类:[聚合开窗函数 -> 聚合函数(列) ] OVER (选项),这里的选项能够是 PARTITION BY 子句,但不但是 ORDER BY 子句,选项能够为空,表示聚合函数向开窗函数的转换。

2.1 Spark开窗函数使用技巧

  • 聚合开窗函数 OVER 关键字 : 表示把聚合函数当成聚合开窗函数而不是聚合函数。post

  • (1)SQL 标准容许将全部聚合函数用作聚合开窗函数。经过over()进行转换学习

    sparkSession.sql("select name, class, score, count(name) over() name_count from score")
    复制代码

  • (2)开窗函数的 OVER 关键字后括号中的可使用 PARTITION BY 子句来定义行 的分区来供进行聚合计算。经过over(partition by 列 ) 进行分组开窗,此处与 GROUP BY 子句不一样测试

    sparkSession.sql("select name, class, score, count(name) over(partition by class) name_count from score").show()
    复制代码

能够看到:over只是实现了聚合函数到窗函数的转换。且不用group by。开窗函数的 OVER 关键字后括号中的可使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。与 GROUP BY 子句不一样,由于GROUP BY不容许同时返回全部列的同时外带聚合(sum,max 等)多行的的列。

3 Spark排序开窗函数使用技巧

第二大类:[排序开窗函数 -> 排序函数(列) ] OVER(选项),这里的选项能够是ORDER BY 子句,也能够是OVER(PARTITION BY 子句 ORDER BY 子句),但不能够是 PARTITION BY 子句。大数据

  • 对于排序开窗函数来说,它支持的开窗函数分别为: ROW_NUMBER(行号)、 RANK(排名)、 DENSE_RANK(密集排名)和 NTILE(分组排名)。spa

    sparkSession.sql("select name, class, score, row_number() over(order by score) rank from
      score").show()
    复制代码

sparkSession.sql("select name, class, score, rank() over(order by score) rank from
    score").show()
复制代码

sparkSession.sql("select name, class, score, dense_rank() over(order by score) rank from
    score").show()
复制代码

sparkSession.sql("select name, class, score, ntile(6) over(order by score) rank from
    score").show()
复制代码

4:用户自定义聚合函数(UDAF)

4.1 弱类型 UDAF 函数经过继承 UserDefinedAggregateFunction 来实现用户自定义聚合函数。

4.2 强类型 UDAF 函数经过继承 Aggregator 来实现强类型自定义聚合函数。

4:用户自定义聚合函数(UDF)

4.1 注册自定义函数

字符串拼接:

spark.udf.register("concat_long_string", 
     (v1: Long, v2: String, split: String) => v1.toString + split + v2)
复制代码

Json抽取字段值:

spark.udf.register("get_json_object", (json: String, field: String) => {
  val jsonObject = JSONObject.fromObject(json);
  jsonObject.getString(field)})
复制代码

udaf全数据拼接:

spark.udf.register("group_concat_distinct", new GroupConcatDistinctUDAF())
复制代码

5 使用自定义函数

6 累加器高级用法

6.1自定义累加器

6.2如何使用累加器

7 总结

本节内容主要探讨了开船函数和自定义累加器的高阶高级使用案例,可能部分截图来自github公开源码,部分是个人测试案例,若有雷同某位大神私有内容,请直接留言于我,我来从新修正案例。

秦凯新 于深圳

相关文章
相关标签/搜索