Spark SQL表达式内部可用函数与相关源码

Spark SQL表达式内部可用函数与相关源码

版权声明:本文为博主原创文章,未经博主容许不得转载。html

手动码字不易,请你们尊重劳动成果,谢谢java

做者:http://blog.csdn.net/wang_wbqsql

虽然标题写着是Spark SQL可用函数,可是我并不想直接把它们贴出来。代码是最好的老师,所以我要教你们本身从源码中得到这些函数。由于随着Spark的版本更新,内置函数会愈来愈多,单纯把一个版本的函数列出来可能会误导他人。因此本文所介绍的内容是各版本通用的。express

代码中的表达式函数

使用代码中的表达式函数

我这里所指的表达式代码中的函数即sincosisnull这种能够在表达式中编写的函数:apache

df.select(sin($"a").as("sin_a"), cos($"a").as("cos_a")).filter(!isnull($"sin_a"))

获取当前使用版本的表达式函数集合

这个类型的函数是定义在org.apache.spark.sql.functions伴生对象中。api

在使用时,只用import org.apache.spark.sql.functions._便可使用其中的全部表达式函数。在须要使用这种类型的函数时,只须要打开这个类便可查找你所须要的函数。session

好比sin函数的定义为:app

/** * @param e angle in radians * @return sine of the angle, as if computed by `java.lang.Math.sin` * * @group math_funcs * @since 1.4.0 */
  def sin(e: Column): Column = withExpr { Sin(e.expr) }

  /** * @param columnName angle in radians * @return sine of the angle, as if computed by `java.lang.Math.sin` * * @group math_funcs * @since 1.4.0 */
  def sin(columnName: String): Column = sin(Column(columnName))

有两个重载版本,第一个重载版本sin(e: Column)须要填入一个Column对象,第二个重载版本sin(columnName: String)就能够直接填入列名字符串了。函数

获取Column对象

若是咱们须要把列名转换成Column对象,咱们可使用如下方法:this

一、使用这个列所在的DataFrame对象获取,即调用df("列名")或者df.col("列名")来精确获取某个DataFrame中的列,使用这种方式能够用来处理两个DataFrame进行join合并后出现列名重复的状况:

/** * Selects column based on the column name and returns it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. * * @group untypedrel * @since 2.0.0 */
  def apply(colName: String): Column = col(colName)

  /** * Selects column based on the column name and returns it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. * * @group untypedrel * @since 2.0.0 */
  def col(colName: String): Column = colName match {
    case "*" =>
      Column(ResolvedStar(queryExecution.analyzed.output))
    case _ =>
      if (sqlContext.conf.supportQuotedRegexColumnName) {
        colRegex(colName)
      } else {
        val expr = resolve(colName)
        Column(expr)
      }
  }

二、使用org.apache.spark.sql.functions中定义的col或者column方法来获取列,若是一个DataFrame里有列名重复,则会抛异常:

/** * Returns a [[Column]] based on the given column name. * * @group normal_funcs * @since 1.3.0 */
  def col(colName: String): Column = Column(colName)

  /** * Returns a [[Column]] based on the given column name. Alias of [[col]]. * * @group normal_funcs * @since 1.3.0 */
  def column(colName: String): Column = Column(colName)

三、经过声明import sqlContext.implicits._来使用$"a"获取表示a列的Column对象:

import sparkSession.sqlContext.implicits._

表达式字符串中的函数

使用表达式字符串

使用表达式字符串有如下几种方式:

一、使用DataFrame中的selectExpr方法
二、使用使用org.apache.spark.sql.functions中定义的expr方法包裹表达式字符串

// 下面两个表达式是等价的:
   // 最后的abs为表达式字符串
   ds.selectExpr("colA", "colB as newName", "abs(colC)")
   ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))

三、使用DataFrame中的def filter(conditionExpr: String)def where(conditionExpr: String)方法

// 其中 '>' 可看成表达式字符串
   peopleDs.filter("age > 15")
   peopleDs.where("age > 15")

四、将一个DataFrame注册到一个表名上。以后就可使用sparkSession.sql或者sqlContext.sql对该表进行处理。Spark 2.0以后能够运行全部99个TPC-DS查询。

Spark 2.0以前使用dataFrame.registerTempTable
Spark 2.0以后使用dataFrame.createOrReplaceTempView

如下例子中没有涉及表达式字符串

// Register the DataFrame as a global temporary view
    df.createGlobalTempView("people")

    // Global temporary view is tied to a system preserved database `global_temp`
    spark.sql("SELECT * FROM global_temp.people").show()
    // +----+-------+
    // | age| name|
    // +----+-------+
    // |null|Michael|
    // | 30| Andy|
    // | 19| Justin|
    // +----+-------+

    // Global temporary view is cross-session
    spark.newSession().sql("SELECT * FROM global_temp.people").show()
    // +----+-------+
    // | age| name|
    // +----+-------+
    // |null|Michael|
    // | 30| Andy|
    // | 19| Justin|
    // +----+-------+

获取当前使用版本的字符串函数集合

表达式字符串函数的定义是在org.apache.spark.sql.catalyst.analysis.FunctionRegistry伴生对象中,其中expressions变量定义了全部可用的表达式。

举例说expression[Substring]("substr")声明,即声明了你能够在表达式字符串中使用substr函数。具体使用方法能够进到前面的Substring类中来,如下就是Substring类的声明:

/** * A function that takes a substring of its first argument starting at a given position. * Defined for String and Binary types. * * NOTE: that this is not zero based, but 1-based index. The first character in str has index 1. */
@ExpressionDescription(
  usage = "_FUNC_(str, pos[, len]) - Returns the substring of `str` that starts at `pos` and is of length `len`, or the slice of byte array that starts at `pos` and is of length `len`.",
  examples = """ Examples: > SELECT _FUNC_('Spark SQL', 5); k SQL > SELECT _FUNC_('Spark SQL', -3); SQL > SELECT _FUNC_('Spark SQL', 5, 1); k """)
case class Substring(str: Expression, pos: Expression, len: Expression)

经过阅读类上的介绍咱们就能够了解其用法。

从Spark 2.3.0开始,官方文档给出了可用函数列表: http://spark.apache.org/docs/latest/api/sql/index.html 即对应着这里所声明的函数。

可是要注意这些函数只适用于Spark 2.3.0及之后版本,里面介绍的部分函数可能在较低Spark版本中不可用

自定义函数

Spark SQL提供了可自定义字符串函数的功能,定义的该字符串函数只能在定义该函数的sparkSession内使用,不然会报解析失败:

/** * A collection of methods for registering user-defined functions (UDF). * * The following example registers a Scala closure as UDF: * {{{ * sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1) * }}} * * The following example registers a UDF in Java: * {{{ * sparkSession.udf().register("myUDF", * (Integer arg1, String arg2) -> arg2 + arg1, * DataTypes.StringType); * }}} * * @note The user-defined functions must be deterministic. Due to optimization, * duplicate invocations may be eliminated or the function may even be invoked more times than * it is present in the query. * * @since 2.0.0 */
  def udf: UDFRegistration = sessionState.udfRegistration
相关文章
相关标签/搜索