Spark SQL, DataFrames and Datasets Guide
- Overview
- 开始入门
- Data Sources (数据源)
- 性能调优
- 分布式 SQL 引擎
- 迁移指南
- 参考
Overview
Spark SQL 是 Spark 处理结构化数据的一个模块.与基础的 Spark RDD API 不一样, Spark SQL 提供了查询结构化数据及计算结果等信息的接口.在内部, Spark SQL 使用这个额外的信息去执行额外的优化.有几种方式能够跟 Spark SQL 进行交互, 包括 SQL 和 Dataset API.当使用相同执行引擎进行计算时, 不管使用哪一种 API / 语言均可以快速的计算.这种统一意味着开发人员可以在基于提供最天然的方式来表达一个给定的 transformation API 之间实现轻松的来回切换不一样的 .html
该页面全部例子使用的示例数据都包含在 Spark 的发布中, 而且可使用 spark-shell
, pyspark
shell, 或者 sparkR
shell来运行.java
SQL
Spark SQL 的功能之一是执行 SQL 查询.Spark SQL 也可以被用于从已存在的 Hive 环境中读取数据.更多关于如何配置这个特性的信息, 请参考 Hive 表 这部分. 当以另外的编程语言运行SQL 时, 查询结果将以 Dataset/DataFrame的形式返回.您也可使用 命令行或者经过 JDBC/ODBC与 SQL 接口交互.python
Datasets and DataFrames
一个 Dataset 是一个分布式的数据集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的优势(强类型化, 可以使用强大的 lambda 函数)与Spark SQL执行引擎的优势.一个 Dataset 能够从 JVM 对象来 构造 而且使用转换功能(map, flatMap, filter, 等等). Dataset API 在Scala 和Java是可用的.Python 不支持 Dataset API.可是因为 Python 的动态特性, 许多 Dataset API 的优势已经可用了 (也就是说, 你可能经过 name 天生的row.columnName
属性访问一行中的字段).这种状况和 R 类似.mysql
一个 DataFrame 是一个 Dataset 组成的指定列.它的概念与一个在关系型数据库或者在 R/Python 中的表是相等的, 可是有不少优化. DataFrames 能够从大量的 sources 中构造出来, 好比: 结构化的文本文件, Hive中的表, 外部数据库, 或者已经存在的 RDDs. DataFrame API 能够在 Scala, Java, Python, 和 R中实现. 在 Scala 和 Java中, 一个 DataFrame 所表明的是一个多个 Row
(行)的的 Dataset(数据集合). 在 the Scala API中, DataFrame
仅仅是一个 Dataset[Row]
类型的别名. 然而, 在 Java API中, 用户须要去使用 Dataset<Row>
去表明一个 DataFrame
.git
在此文档中, 咱们将经常会引用 Scala/Java Datasets 的 Row
s 做为 DataFrames.github
开始入门
起始点: SparkSession
Spark SQL中全部功能的入口点是 SparkSession
类. 要建立一个 SparkSession
, 仅使用 SparkSession.builder()
就能够了:sql
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._
Spark 2.0 中的SparkSession
为 Hive 特性提供了内嵌的支持, 包括使用 HiveQL 编写查询的能力, 访问 Hive UDF,以及从 Hive 表中读取数据的能力.为了使用这些特性, 你不须要去有一个已存在的 Hive 设置.shell
建立 DataFrames
在一个 SparkSession
中, 应用程序能够从一个 已经存在的 RDD
, 从hive表, 或者从 Spark数据源中建立一个DataFrames.数据库
举个例子, 下面就是基于一个JSON文件建立一个DataFrame:express
val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
无类型的Dataset操做 (aka DataFrame 操做)
DataFrames 提供了一个特定的语法用在 Scala, Java, Python and R中机构化数据的操做.
正如上面提到的同样, Spark 2.0中, DataFrames在Scala 和 Java API中, 仅仅是多个 Row
s的Dataset. 这些操做也参考了与强类型的Scala/Java Datasets中的”类型转换” 对应的”无类型转换” .
这里包括一些使用 Dataset 进行结构化数据处理的示例 :
// This import is needed to use the $-notation
import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
可以在 DataFrame 上被执行的操做类型的完整列表请参考 API 文档.
除了简单的列引用和表达式以外, DataFrame 也有丰富的函数库, 包括 string 操做, date 算术, 常见的 math 操做以及更多.可用的完整列表请参考 DataFrame 函数指南.
Running SQL Queries Programmatically
SparkSession
的 sql
函数可让应用程序以编程的方式运行 SQL 查询, 并将结果做为一个 DataFrame
返回.
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
全局临时视图
Spark SQL中的临时视图是session级别的, 也就是会随着session的消失而消失. 若是你想让一个临时视图在全部session中相互传递而且可用, 直到Spark 应用退出, 你能够创建一个全局的临时视图.全局的临时视图存在于系统数据库 global_temp
中, 咱们必须加上库名去引用它, 好比. SELECT * FROM global_temp.view1
.
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
建立Datasets
Dataset 与 RDD 类似, 然而, 并非使用 Java 序列化或者 Kryo 编码器 来序列化用于处理或者经过网络进行传输的对象. 虽然编码器和标准的序列化都负责将一个对象序列化成字节, 编码器是动态生成的代码, 而且使用了一种容许 Spark 去执行许多像 filtering, sorting 以及 hashing 这样的操做, 不须要将字节反序列化成对象的格式.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
RDD的互操做性
Spark SQL 支持两种不一样的方法用于转换已存在的 RDD 成为 Dataset.第一种方法是使用反射去推断一个包含指定的对象类型的 RDD 的 Schema.在你的 Spark 应用程序中当你已知 Schema 时这个基于方法的反射可让你的代码更简洁.
第二种用于建立 Dataset 的方法是经过一个容许你构造一个 Schema 而后把它应用到一个已存在的 RDD 的编程接口.然而这种方法更繁琐, 当列和它们的类型知道运行时都是未知时它容许你去构造 Dataset.
使用反射推断Schema
Spark SQL 的 Scala 接口支持自动转换一个包含 case classes 的 RDD 为 DataFrame.Case class 定义了表的 Schema.Case class 的参数名使用反射读取而且成为了列名.Case class 也能够是嵌套的或者包含像 Seq
或者 Array
这样的复杂类型.这个 RDD 可以被隐式转换成一个 DataFrame 而后被注册为一个表.表能够用于后续的 SQL 语句.
// For implicit conversions from RDDs to DataFrames
import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19))
以编程的方式指定Schema
当 case class 不可以在执行以前被定义(例如, records 记录的结构在一个 string 字符串中被编码了, 或者一个 text 文本 dataset 将被解析而且不一样的用户投影的字段是不同的).一个 DataFrame
可使用下面的三步以编程的方式来建立.
- 从原始的 RDD 建立 RDD 的
Row
(行); - Step 1 被建立后, 建立 Schema 表示一个
StructType
匹配 RDD 中的Row
(行)的结构. - 经过
SparkSession
提供的createDataFrame
方法应用 Schema 到 RDD 的 RowS(行).
例如:
import org.apache.spark.sql.types._ // Create an RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name results.map(attributes => "Name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+
Aggregations
The built-in DataFrames functions provide common aggregations such as count()
, countDistinct()
, avg()
, max()
, min()
, etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in Scala and Java to work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own.
Untyped User-Defined Aggregate Functions
Users have to extend the UserDefinedAggregateFunction abstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:
import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession object MyAverage extends UserDefinedAggregateFunction { // Data types of input arguments of this aggregate function def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // Data types of values in the aggregation buffer def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // The data type of the returned value def dataType: DataType = DoubleType // Whether this function always returns the same output on the identical input def deterministic: Boolean = true // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // Updates the given aggregation buffer `buffer` with new input data from `input` def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) } // Register the function to access it spark.udf.register(