版权声明:本文为博主原创文章,未经博主容许不得转载。html
手动码字不易,请你们尊重劳动成果,谢谢java
虽然标题写着是Spark SQL可用函数,可是我并不想直接把它们贴出来。代码是最好的老师,所以我要教你们本身从源码中得到这些函数。由于随着Spark的版本更新,内置函数会愈来愈多,单纯把一个版本的函数列出来可能会误导他人。因此本文所介绍的内容是各版本通用的。express
我这里所指的表达式代码中的函数即sin
、cos
、isnull
这种能够在表达式中编写的函数:apache
df.select(sin($"a").as("sin_a"), cos($"a").as("cos_a")).filter(!isnull($"sin_a"))
这个类型的函数是定义在org.apache.spark.sql.functions伴生对象中。api
在使用时,只用import org.apache.spark.sql.functions._便可使用其中的全部表达式函数。在须要使用这种类型的函数时,只须要打开这个类便可查找你所须要的函数。session
好比sin
函数的定义为:app
/** * @param e angle in radians * @return sine of the angle, as if computed by `java.lang.Math.sin` * * @group math_funcs * @since 1.4.0 */
def sin(e: Column): Column = withExpr { Sin(e.expr) }
/** * @param columnName angle in radians * @return sine of the angle, as if computed by `java.lang.Math.sin` * * @group math_funcs * @since 1.4.0 */
def sin(columnName: String): Column = sin(Column(columnName))
有两个重载版本,第一个重载版本sin(e: Column)
须要填入一个Column
对象,第二个重载版本sin(columnName: String)
就能够直接填入列名字符串了。函数
若是咱们须要把列名转换成Column
对象,咱们可使用如下方法:this
一、使用这个列所在的DataFrame
对象获取,即调用df("列名")
或者df.col("列名")
来精确获取某个DataFrame中的列,使用这种方式能够用来处理两个DataFrame进行join
合并后出现列名重复的状况:
/** * Selects column based on the column name and returns it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. * * @group untypedrel * @since 2.0.0 */
def apply(colName: String): Column = col(colName)
/** * Selects column based on the column name and returns it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. * * @group untypedrel * @since 2.0.0 */
def col(colName: String): Column = colName match {
case "*" =>
Column(ResolvedStar(queryExecution.analyzed.output))
case _ =>
if (sqlContext.conf.supportQuotedRegexColumnName) {
colRegex(colName)
} else {
val expr = resolve(colName)
Column(expr)
}
}
二、使用org.apache.spark.sql.functions中定义的col
或者column
方法来获取列,若是一个DataFrame里有列名重复,则会抛异常:
/** * Returns a [[Column]] based on the given column name. * * @group normal_funcs * @since 1.3.0 */
def col(colName: String): Column = Column(colName)
/** * Returns a [[Column]] based on the given column name. Alias of [[col]]. * * @group normal_funcs * @since 1.3.0 */
def column(colName: String): Column = Column(colName)
三、经过声明import sqlContext.implicits._
来使用$"a"
获取表示a
列的Column对象:
import sparkSession.sqlContext.implicits._
使用表达式字符串有如下几种方式:
一、使用DataFrame
中的selectExpr
方法
二、使用使用org.apache.spark.sql.functions中定义的expr
方法包裹表达式字符串
// 下面两个表达式是等价的:
// 最后的abs为表达式字符串
ds.selectExpr("colA", "colB as newName", "abs(colC)")
ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))
三、使用DataFrame
中的def filter(conditionExpr: String)
或def where(conditionExpr: String)
方法
// 其中 '>' 可看成表达式字符串
peopleDs.filter("age > 15")
peopleDs.where("age > 15")
四、将一个DataFrame注册到一个表名上。以后就可使用sparkSession.sql或者sqlContext.sql对该表进行处理。Spark 2.0以后能够运行全部99个TPC-DS查询。
Spark 2.0以前使用dataFrame.registerTempTable
Spark 2.0以后使用dataFrame.createOrReplaceTempView
如下例子中没有涉及表达式字符串
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
表达式字符串函数的定义是在org.apache.spark.sql.catalyst.analysis.FunctionRegistry伴生对象中,其中expressions变量定义了全部可用的表达式。
举例说expression[Substring]("substr")
声明,即声明了你能够在表达式字符串中使用substr函数。具体使用方法能够进到前面的Substring类中来,如下就是Substring类的声明:
/** * A function that takes a substring of its first argument starting at a given position. * Defined for String and Binary types. * * NOTE: that this is not zero based, but 1-based index. The first character in str has index 1. */
@ExpressionDescription(
usage = "_FUNC_(str, pos[, len]) - Returns the substring of `str` that starts at `pos` and is of length `len`, or the slice of byte array that starts at `pos` and is of length `len`.",
examples = """ Examples: > SELECT _FUNC_('Spark SQL', 5); k SQL > SELECT _FUNC_('Spark SQL', -3); SQL > SELECT _FUNC_('Spark SQL', 5, 1); k """)
case class Substring(str: Expression, pos: Expression, len: Expression)
经过阅读类上的介绍咱们就能够了解其用法。
从Spark 2.3.0开始,官方文档给出了可用函数列表: http://spark.apache.org/docs/latest/api/sql/index.html 即对应着这里所声明的函数。
可是要注意这些函数只适用于Spark 2.3.0及之后版本,里面介绍的部分函数可能在较低Spark版本中不可用。
Spark SQL提供了可自定义字符串函数的功能,定义的该字符串函数只能在定义该函数的sparkSession
内使用,不然会报解析失败:
/** * A collection of methods for registering user-defined functions (UDF). * * The following example registers a Scala closure as UDF: * {{{ * sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1) * }}} * * The following example registers a UDF in Java: * {{{ * sparkSession.udf().register("myUDF", * (Integer arg1, String arg2) -> arg2 + arg1, * DataTypes.StringType); * }}} * * @note The user-defined functions must be deterministic. Due to optimization, * duplicate invocations may be eliminated or the function may even be invoked more times than * it is present in the query. * * @since 2.0.0 */
def udf: UDFRegistration = sessionState.udfRegistration