Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide

Overview

Spark SQL 是 Spark 处理结构化数据的一个模块.与基础的 Spark RDD API 不一样, Spark SQL 提供了查询结构化数据及计算结果等信息的接口.在内部, Spark SQL 使用这个额外的信息去执行额外的优化.有几种方式能够跟 Spark SQL 进行交互, 包括 SQL 和 Dataset API.当使用相同执行引擎进行计算时, 不管使用哪一种 API / 语言均可以快速的计算.这种统一意味着开发人员可以在基于提供最天然的方式来表达一个给定的 transformation API 之间实现轻松的来回切换不一样的 .html

该页面全部例子使用的示例数据都包含在 Spark 的发布中, 而且可使用 spark-shellpyspark shell, 或者 sparkR shell来运行.java

SQL

Spark SQL 的功能之一是执行 SQL 查询.Spark SQL 也可以被用于从已存在的 Hive 环境中读取数据.更多关于如何配置这个特性的信息, 请参考 Hive 表 这部分. 当以另外的编程语言运行SQL 时, 查询结果将以 Dataset/DataFrame的形式返回.您也可使用 命令行或者经过 JDBC/ODBC与 SQL 接口交互.python

Datasets and DataFrames

一个 Dataset 是一个分布式的数据集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的优势(强类型化, 可以使用强大的 lambda 函数)与Spark SQL执行引擎的优势.一个 Dataset 能够从 JVM 对象来 构造 而且使用转换功能(map, flatMap, filter, 等等). Dataset API 在Scala 和Java是可用的.Python 不支持 Dataset API.可是因为 Python 的动态特性, 许多 Dataset API 的优势已经可用了 (也就是说, 你可能经过 name 天生的row.columnName属性访问一行中的字段).这种状况和 R 类似.mysql

一个 DataFrame 是一个 Dataset 组成的指定列.它的概念与一个在关系型数据库或者在 R/Python 中的表是相等的, 可是有不少优化. DataFrames 能够从大量的 sources 中构造出来, 好比: 结构化的文本文件, Hive中的表, 外部数据库, 或者已经存在的 RDDs. DataFrame API 能够在 Scala, Java, Python, 和 R中实现. 在 Scala 和 Java中, 一个 DataFrame 所表明的是一个多个 Row(行)的的 Dataset(数据集合). 在 the Scala API中, DataFrame仅仅是一个 Dataset[Row]类型的别名. 然而, 在 Java API中, 用户须要去使用 Dataset<Row> 去表明一个 DataFrame.git

在此文档中, 咱们将经常会引用 Scala/Java Datasets 的 Rows 做为 DataFrames.github

开始入门

起始点: SparkSession

Spark SQL中全部功能的入口点是 SparkSession 类. 要建立一个 SparkSession, 仅使用 SparkSession.builder()就能够了:sql

import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

Spark 2.0 中的SparkSession 为 Hive 特性提供了内嵌的支持, 包括使用 HiveQL 编写查询的能力, 访问 Hive UDF,以及从 Hive 表中读取数据的能力.为了使用这些特性, 你不须要去有一个已存在的 Hive 设置.shell

建立 DataFrames

在一个 SparkSession中, 应用程序能够从一个 已经存在的 RDD, 从hive表, 或者从 Spark数据源中建立一个DataFrames.数据库

举个例子, 下面就是基于一个JSON文件建立一个DataFrame:express

val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

无类型的Dataset操做 (aka DataFrame 操做)

DataFrames 提供了一个特定的语法用在 ScalaJavaPython and R中机构化数据的操做.

正如上面提到的同样, Spark 2.0中, DataFrames在Scala 和 Java API中, 仅仅是多个 Rows的Dataset. 这些操做也参考了与强类型的Scala/Java Datasets中的”类型转换” 对应的”无类型转换” .

这里包括一些使用 Dataset 进行结构化数据处理的示例 :

// This import is needed to use the $-notation
import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

可以在 DataFrame 上被执行的操做类型的完整列表请参考 API 文档.

除了简单的列引用和表达式以外, DataFrame 也有丰富的函数库, 包括 string 操做, date 算术, 常见的 math 操做以及更多.可用的完整列表请参考  DataFrame 函数指南.

Running SQL Queries Programmatically

SparkSession 的 sql 函数可让应用程序以编程的方式运行 SQL 查询, 并将结果做为一个 DataFrame 返回.

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

全局临时视图

Spark SQL中的临时视图是session级别的, 也就是会随着session的消失而消失. 若是你想让一个临时视图在全部session中相互传递而且可用, 直到Spark 应用退出, 你能够创建一个全局的临时视图.全局的临时视图存在于系统数据库 global_temp中, 咱们必须加上库名去引用它, 好比. SELECT * FROM global_temp.view1.

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

建立Datasets

Dataset 与 RDD 类似, 然而, 并非使用 Java 序列化或者 Kryo 编码器 来序列化用于处理或者经过网络进行传输的对象. 虽然编码器和标准的序列化都负责将一个对象序列化成字节, 编码器是动态生成的代码, 而且使用了一种容许 Spark 去执行许多像 filtering, sorting 以及 hashing 这样的操做, 不须要将字节反序列化成对象的格式.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

RDD的互操做性

Spark SQL 支持两种不一样的方法用于转换已存在的 RDD 成为 Dataset.第一种方法是使用反射去推断一个包含指定的对象类型的 RDD 的 Schema.在你的 Spark 应用程序中当你已知 Schema 时这个基于方法的反射可让你的代码更简洁.

第二种用于建立 Dataset 的方法是经过一个容许你构造一个 Schema 而后把它应用到一个已存在的 RDD 的编程接口.然而这种方法更繁琐, 当列和它们的类型知道运行时都是未知时它容许你去构造 Dataset.

使用反射推断Schema

Spark SQL 的 Scala 接口支持自动转换一个包含 case classes 的 RDD 为 DataFrame.Case class 定义了表的 Schema.Case class 的参数名使用反射读取而且成为了列名.Case class 也能够是嵌套的或者包含像 Seq 或者 Array 这样的复杂类型.这个 RDD 可以被隐式转换成一个 DataFrame 而后被注册为一个表.表能够用于后续的 SQL 语句.

// For implicit conversions from RDDs to DataFrames
import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19)) 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

以编程的方式指定Schema

当 case class 不可以在执行以前被定义(例如, records 记录的结构在一个 string 字符串中被编码了, 或者一个 text 文本 dataset 将被解析而且不一样的用户投影的字段是不同的).一个 DataFrame 可使用下面的三步以编程的方式来建立.

  1. 从原始的 RDD 建立 RDD 的 Row(行);
  2. Step 1 被建立后, 建立 Schema 表示一个 StructType 匹配 RDD 中的 Row(行)的结构.
  3. 经过 SparkSession 提供的 createDataFrame 方法应用 Schema 到 RDD 的 RowS(行).

例如:

import org.apache.spark.sql.types._ // Create an RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name results.map(attributes => "Name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

Aggregations

The built-in DataFrames functions provide common aggregations such as count()countDistinct()avg()max()min(), etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in Scala and Java to work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own.

Untyped User-Defined Aggregate Functions

Users have to extend the UserDefinedAggregateFunction abstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:

import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession object MyAverage extends UserDefinedAggregateFunction { // Data types of input arguments of this aggregate function def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // Data types of values in the aggregation buffer def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // The data type of the returned value def dataType: DataType = DoubleType // Whether this function always returns the same output on the identical input def deterministic: Boolean = true // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // Updates the given aggregation buffer `buffer` with new input data from `input` def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) } // Register the function to access it spark.udf.register("myAverage", MyAverage) val df = spark.read.json("examples/src/main/resources/employees.json") df.createOrReplaceTempView("employees") df.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.

Type-Safe User-Defined Aggregate Functions

User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:

import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.Encoder import org.apache.spark.sql.Encoders import org.apache.spark.sql.SparkSession case class Employee(name: String, salary: Long) case class Average(var sum: Long, var count: Long) object MyAverage extends Aggregator[Employee, Average, Double] { // A zero value for this aggregation. Should satisfy the property that any b + zero = b def zero: Average = Average(0L, 0L) // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer } // Merge two intermediate values def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 } // Transform the output of the reduction def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count // Specifies the Encoder for the intermediate value type def bufferEncoder: Encoder[Average] = Encoders.product // Specifies the Encoder for the final output value type def outputEncoder: Encoder[Double] = Encoders.scalaDouble } val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee] ds.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ // Convert the function to a `TypedColumn` and give it a name val averageSalary = MyAverage.toColumn.name("average_salary") val result = ds.select(averageSalary) result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.

Data Sources (数据源)

Spark SQL 支持经过 DataFrame 接口对各类 data sources (数据源)进行操做. DataFrame 可使用 relational transformations (关系转换)操做, 也可用于建立 temporary view (临时视图). 将 DataFrame 注册为 temporary view (临时视图)容许您对其数据运行 SQL 查询. 本节 描述了使用 Spark Data Sources 加载和保存数据的通常方法, 而后涉及可用于 built-in data sources (内置数据源)的 specific options (特定选项).

Generic Load/Save Functions (通用 加载/保存 功能)

在最简单的形式中, 默认数据源(parquet, 除非另有配置 spark.sql.sources.default )将用于全部操做.

val usersDF = spark.read.load("examples/src/main/resources/users.parquet") usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Manually Specifying Options (手动指定选项)

您还能够 manually specify (手动指定)将与任何你想传递给 data source 的其余选项一块儿使用的 data source . Data sources 由其 fully qualified name (彻底限定名称)(即 org.apache.spark.sql.parquet ), 可是对于 built-in sources (内置的源), 你也可使用它们的 shortnames (短名称)(jsonparquetjdbcorclibsvmcsvtext).从任何 data source type (数据源类型)加载 DataFrames 可使用此 syntax (语法)转换为其余类型.

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Run SQL on files directly (直接在文件上运行 SQL)

不使用读取 API 将文件加载到 DataFrame 并进行查询, 也能够直接用 SQL 查询该文件.

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Save Modes (保存模式)

Save operations (保存操做)能够选择使用 SaveMode , 它指定如何处理现有数据若是存在的话. 重要的是要意识到, 这些 save modes (保存模式)不使用任何 locking (锁定)而且不是 atomic (原子). 另外, 当执行 Overwrite 时, 数据将在新数据写出以前被删除.

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) "error"(default) 将 DataFrame 保存到 data source (数据源)时, 若是数据已经存在, 则会抛出异常.
SaveMode.Append "append" 将 DataFrame 保存到 data source (数据源)时, 若是 data/table 已存在, 则 DataFrame 的内容将被 append (附加)到现有数据中.
SaveMode.Overwrite "overwrite" Overwrite mode (覆盖模式)意味着将 DataFrame 保存到 data source (数据源)时, 若是 data/table 已经存在, 则预期 DataFrame 的内容将 overwritten (覆盖)现有数据.
SaveMode.Ignore "ignore" Ignore mode (忽略模式)意味着当将 DataFrame 保存到 data source (数据源)时, 若是数据已经存在, 则保存操做预期不会保存 DataFrame 的内容, 而且不更改现有数据. 这与 SQL 中的 CREATE TABLE IF NOT EXISTS 相似.

Saving to Persistent Tables (保存到持久表)

DataFrames 也可使用 saveAsTable 命令做为 persistent tables (持久表)保存到 Hive metastore 中. 请注意, existing Hive deployment (现有的 Hive 部署)不须要使用此功能. Spark 将为您建立默认的 local Hive metastore (本地 Hive metastore)(使用 Derby ). 与 createOrReplaceTempView 命令不一样, saveAsTable 将 materialize (实现) DataFrame 的内容, 并建立一个指向 Hive metastore 中数据的指针. 即便您的 Spark 程序从新启动, Persistent tables (持久性表)仍然存在, 由于您保持与同一个 metastore 的链接. 能够经过使用表的名称在 SparkSession上调用 table 方法来建立 persistent tabl (持久表)的 DataFrame .

对于 file-based (基于文件)的 data source (数据源), 例如 text, parquet, json等, 您能够经过 path 选项指定 custom table path (自定义表路径), 例如 df.write.option("path", "/some/path").saveAsTable("t") . 当表被 dropped (删除)时, custom table path (自定义表路径)将不会被删除, 而且表数据仍然存在. 若是未指定自定义表路径, Spark 将把数据写入 warehouse directory (仓库目录)下的默认表路径. 当表被删除时, 默认的表路径也将被删除.

从 Spark 2.1 开始, persistent datasource tables (持久性数据源表)将 per-partition metadata (每一个分区元数据)存储在 Hive metastore 中. 这带来了几个好处:

  • 因为 metastore 只能返回查询的必要 partitions (分区), 所以再也不须要将第一个查询上的全部 partitions discovering 到表中.
  • Hive DDLs 如 ALTER TABLE PARTITION ... SET LOCATION 如今可用于使用 Datasource API 建立的表.

请注意, 建立 external datasource tables (外部数据源表)(带有 path 选项)的表时, 默认状况下不会收集 partition information (分区信息). 要 sync (同步) metastore 中的分区信息, 能够调用 MSCK REPAIR TABLE .

Bucketing, Sorting and Partitioning (分桶, 排序和分区)

对于 file-based data source (基于文件的数据源), 也能够对 output (输出)进行 bucket 和 sort 或者 partition . Bucketing 和 sorting 仅适用于 persistent tables :

peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

在使用 Dataset API 时, partitioning 能够同时与 save 和 saveAsTable 一块儿使用.

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

能够为 single table (单个表)使用 partitioning 和 bucketing:

peopleDF
  .write .partitionBy("favorite_color") .bucketBy(42, "name") .saveAsTable("people_partitioned_bucketed") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

partitionBy 建立一个 directory structure (目录结构), 如 Partition Discovery 部分所述. 所以, 对 cardinality (基数)较高的 columns 的适用性有限. 相反, bucketBy 能够在固定数量的 buckets 中分配数据, 而且能够在 a number of unique values is unbounded (多个惟一值无界时)使用数据.

Parquet Files

Parquet 是许多其余数据处理系统支持的 columnar format (柱状格式). Spark SQL 支持读写 Parquet 文件, 可自动保留 schema of the original data (原始数据的模式). 当编写 Parquet 文件时, 出于兼容性缘由, 全部 columns 都将自动转换为可空.

Loading Data Programmatically (以编程的方式加载数据)

使用上面例子中的数据:

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._ val peopleDF = spark.read.json("examples/src/main/resources/people.json") // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write.parquet("people.parquet") // Read in the parquet file created above // Parquet files are self-describing so the schema is preserved // The result of loading a Parquet file is also a DataFrame val parquetFileDF = spark.read.parquet("people.parquet") // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Partition Discovery (分区发现)

Table partitioning (表分区)是在像 Hive 这样的系统中使用的常见的优化方法. 在 partitioned table (分区表)中, 数据一般存储在不一样的目录中, partitioning column values encoded (分区列值编码)在每一个 partition directory (分区目录)的路径中. Parquet data source (Parquet 数据源)如今能够自动 discover (发现)和 infer (推断)分区信息. 例如, 咱们可使用如下 directory structure (目录结构)将全部之前使用的 population data (人口数据)存储到 partitioned table (分区表)中, 其中有两个额外的列 gender 和 country 做为 partitioning columns (分区列):

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

经过将 path/to/table 传递给 SparkSession.read.parquet 或 SparkSession.read.load , Spark SQL 将自动从路径中提取 partitioning information (分区信息). 如今返回的 DataFrame 的 schema (模式)变成:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

请注意, 会自动 inferred (推断) partitioning columns (分区列)的 data types (数据类型).目前, 支持 numeric data types (数字数据类型)和 string type (字符串类型).有些用户可能不想自动推断 partitioning columns (分区列)的数据类型.对于这些用例, automatic type inference (自动类型推断)能够由 spark.sql.sources.partitionColumnTypeInference.enabled 配置, 默认为 true .当禁用 type inference (类型推断)时, string type (字符串类型)将用于 partitioning columns (分区列).

从 Spark 1.6.0 开始, 默认状况下, partition discovery (分区发现)只能找到给定路径下的 partitions (分区).对于上述示例, 若是用户将 path/to/table/gender=male 传递给 SparkSession.read.parquet 或 SparkSession.read.load , 则 gender 将不被视为 partitioning column (分区列).若是用户须要指定 partition discovery (分区发现)应该开始的基本路径, 则能够在数据源选项中设置 basePath.例如, 当 path/to/table/gender=male 是数据的路径而且用户将 basePath 设置为 path/to/table/gender 将是一个 partitioning column (分区列).

Schema Merging (模式合并)

像 ProtocolBuffer , Avro 和 Thrift 同样, Parquet 也支持 schema evolution (模式演进). 用户能够从一个 simple schema (简单的架构)开始, 并根据须要逐渐向 schema 添加更多的 columns (列). 以这种方式, 用户可能会使用不一样但相互兼容的 schemas 的 multiple Parquet files (多个 Parquet 文件). Parquet data source (Parquet 数据源)如今可以自动检测这种状况并 merge (合并)全部这些文件的 schemas .

因为 schema merging (模式合并)是一个 expensive operation (相对昂贵的操做), 而且在大多数状况下不是必需的, 因此默认状况下从 1.5.0 开始. 你能够按照以下的方式启用它:

  1. 读取 Parquet 文件时, 将 data source option (数据源选项) mergeSchema 设置为 true (以下面的例子所示), 或
  2. 将 global SQL option (全局 SQL 选项) spark.sql.parquet.mergeSchema 设置为 true .
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._ // Create a simple DataFrame, store into a partition directory val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") squaresDF.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") cubesDF.write.parquet("data/test_table/key=2") // Read the partitioned table val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key: int (nullable = true) 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Hive metastore Parquet table conversion (Hive metastore Parquet table 转换)

当读取和写入 Hive metastore Parquet 表时, Spark SQL 将尝试使用本身的 Parquet support (Parquet 支持), 而不是 Hive SerDe 来得到更好的性能. 此 behavior (行为)由 spark.sql.hive.convertMetastoreParquet 配置控制, 默认状况下 turned on (打开).

Hive/Parquet Schema Reconciliation

从 table schema processing (表格模式处理)的角度来讲, Hive 和 Parquet 之间有两个关键的区别.

  1. Hive 不区分大小写, 而 Parquet 不是
  2. Hive 认为全部 columns (列)均可觉得空, 而 Parquet 中的可空性是 significant (重要)的.

因为这个缘由, 当将 Hive metastore Parquet 表转换为 Spark SQL Parquet 表时, 咱们必须调整 metastore schema 与 Parquet schema. reconciliation 规则是:

  1. 在两个 schema 中具备 same name (相同名称)的 Fields (字段)必须具备 same data type (相同的数据类型), 而无论 nullability (可空性). reconciled field 应具备 Parquet 的数据类型, 以便 nullability (可空性)获得尊重.

  2. reconciled schema (调和模式)正好包含 Hive metastore schema 中定义的那些字段.

    • 只出如今 Parquet schema 中的任何字段将被 dropped (删除)在 reconciled schema 中.
    • 仅在 Hive metastore schema 中出现的任何字段在 reconciled schema 中做为 nullable field (可空字段)添加.

Metadata Refreshing (元数据刷新)

Spark SQL 缓存 Parquet metadata 以得到更好的性能. 当启用 Hive metastore Parquet table conversion (转换)时, 这些 converted tables (转换表)的 metadata (元数据)也被 cached (缓存). 若是这些表由 Hive 或其余外部工具更新, 则须要手动刷新以确保 consistent metadata (一致的元数据).

// spark is an existing SparkSession spark.catalog.refreshTable("my_table")

Configuration (配置)

可使用 SparkSession 上的 setConf 方法或使用 SQL 运行 SET key = value 命令来完成 Parquet 的配置.

Property Name (参数名称) Default(默认) Meaning(含义)
spark.sql.parquet.binaryAsString false 一些其余 Parquet-producing systems (Parquet 生产系统), 特别是 Impala, Hive 和旧版本的 Spark SQL , 在 writing out (写出) Parquet schema 时, 不区分 binary data (二进制数据)和 strings (字符串). 该 flag 告诉 Spark SQL 将 binary data (二进制数据)解释为 string (字符串)以提供与这些系统的兼容性.
spark.sql.parquet.int96AsTimestamp true 一些 Parquet-producing systems , 特别是 Impala 和 Hive , 将 Timestamp 存入INT96 . 该 flag 告诉 Spark SQL 将 INT96 数据解析为 timestamp 以提供与这些系统的兼容性.
spark.sql.parquet.cacheMetadata true 打开 Parquet schema metadata 的缓存. 能够加快查询静态数据.
spark.sql.parquet.compression.codec snappy 在编写 Parquet 文件时设置 compression codec (压缩编解码器)的使用. 可接受的值包括: uncompressed, snappy, gzip, lzo .
spark.sql.parquet.filterPushdown true 设置为 true 时启用 Parquet filter push-down optimization .
spark.sql.hive.convertMetastoreParquet true 当设置为 false 时, Spark SQL 将使用 Hive SerDe 做为 parquet tables , 而不是内置的支持.
spark.sql.parquet.mergeSchema false

当为 true 时, Parquet data source (Parquet 数据源) merges (合并)从全部 data files (数据文件)收集的 schemas , 不然若是没有可用的 summary file , 则从 summary file 或 random data file 中挑选 schema .

spark.sql.optimizer.metadataOnly true

若是为 true , 则启用使用表的 metadata 的 metadata-only query optimization 来生成 partition columns (分区列)而不是 table scans (表扫描). 当 scanned (扫描)的全部 columns (列)都是 partition columns (分区列)而且 query (查询)具备知足 distinct semantics (不一样语义)的 aggregate operator (聚合运算符)时, 它将适用.

JSON Datasets (JSON 数据集)

Spark SQL 能够 automatically infer (自动推断)JSON dataset 的 schema, 并将其做为 Dataset[Row] 加载. 这个 conversion (转换)能够在 Dataset[String] 上使用 SparkSession.read.json() 来完成, 或 JSON 文件.

请注意, 以 a json file 提供的文件不是典型的 JSON 文件. 每行必须包含一个 separate (单独的), self-contained valid (独立的有效的)JSON 对象. 有关更多信息, 请参阅 JSON Lines text format, also called newline-delimited JSON .

对于 regular multi-line JSON file (常规的多行 JSON 文件), 将 multiLine 选项设置为 true .

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset. import spark.implicits._ // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files val path = "examples/src/main/resources/people.json" val peopleDF = spark.read.json(path) // The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by spark val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() // +------+ // | name| // +------+ // |Justin| // +------+ // Alternatively, a DataFrame can be created for a JSON dataset represented by // a Dataset[String] storing one JSON object per string val otherPeopleDataset = spark.createDataset( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleDataset) otherPeople.show() // +---------------+----+ // | address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Hive 表

Spark SQL 还支持读取和写入存储在 Apache Hive 中的数据。 可是,因为 Hive 具备大量依赖关系,所以这些依赖关系不包含在默认 Spark 分发中。 若是在类路径中找到 Hive 依赖项,Spark 将自动加载它们。 请注意,这些 Hive 依赖关系也必须存在于全部工做节点上,由于它们将须要访问 Hive 序列化和反序列化库 (SerDes),以访问存储在 Hive 中的数据。

经过将 hive-site.xmlcore-site.xml(用于安全配置)和 hdfs-site.xml (用于 HDFS 配置)文件放在 conf/ 中来完成配置。

当使用 Hive 时,必须用 Hive 支持实例化 SparkSession,包括链接到持续的 Hive 转移,支持 Hive serdes 和 Hive 用户定义的功能。 没有现有 Hive 部署的用户仍然能够启用 Hive 支持。 当 hive-site.xml 未配置时,上下文会自动在当前目录中建立 metastore_db,并建立由 spark.sql.warehouse.dir 配置的目录,该目录默认为Spark应用程序当前目录中的 spark-warehouse 目录 开始了 请注意,自从2.0.0以来,hive-site.xml 中的 hive.metastore.warehouse.dir 属性已被弃用。 而是使用 spark.sql.warehouse.dir 来指定仓库中数据库的默认位置。 您可能须要向启动 Spark 应用程序的用户授予写权限。å

import java.io.File import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession case class Record(key: Int, value: String) // warehouseLocation points to the default location for managed databases and tables val warehouseLocation = new File("spark-warehouse").getAbsolutePath val spark = SparkSession .builder() .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate() import spark.implicits._ import spark.sql sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sql("SELECT * FROM src").show() // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // Aggregation queries are also supported. sql("SELECT COUNT(*) FROM src").show() // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // The results of SQL queries are themselves DataFrames and support all normal functions. val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") // The items in DataFrames are of type Row, which allows you to access each column by ordinal. val stringsDS = sqlDF.map { case Row(key: Int, value: String) => s"Key: $key, Value: $value" } stringsDS.show() // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // You can also use DataFrames to create temporary views within a SparkSession. val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records") // Queries can then join DataFrame data with data stored in Hive. sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // | 5| val_5| 5| val_5| // ... 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" in the Spark repo.

指定 Hive 表的存储格式

建立 Hive 表时,须要定义如何 从/向 文件系统 read/write 数据,即 “输入格式” 和 “输出格式”。 您还须要定义该表如何将数据反序列化为行,或将行序列化为数据,即 “serde”。 如下选项可用于指定存储格式 (“serde”, “input format”, “output format”),例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。 默认状况下,咱们将以纯文本形式读取表格文件。 请注意,Hive 存储处理程序在建立表时不受支持,您可使用 Hive 端的存储处理程序建立一个表,并使用 Spark SQL 来读取它。

Property Name Meaning
fileFormat fileFormat是一种存储格式规范的包,包括 "serde","input format" 和 "output format"。 目前咱们支持6个文件格式:'sequencefile','rcfile','orc','parquet','textfile'和'avro'。
inputFormat, outputFormat 这两个选项将相应的 "InputFormat" 和 "OutputFormat" 类的名称指定为字符串文字,例如: `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。 这两个选项必须成对出现,若是您已经指定了 "fileFormat" 选项,则没法指定它们。
serde 此选项指定 serde 类的名称。 当指定 `fileFormat` 选项时,若是给定的 `fileFormat` 已经包含 serde 的信息,那么不要指定这个选项。 目前的 "sequencefile", "textfile" 和 "rcfile" 不包含 serde 信息,你可使用这3个文件格式的这个选项。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这些选项只能与 "textfile" 文件格式一块儿使用。它们定义如何将分隔的文件读入行。

使用 OPTIONS 定义的全部其余属性将被视为 Hive serde 属性。

与不一样版本的 Hive Metastore 进行交互

Spark SQL 的 Hive 支持的最重要的部分之一是与 Hive metastore 进行交互,这使得 Spark SQL 可以访问 Hive 表的元数据。 从 Spark 1.4.0 开始,使用 Spark SQL 的单一二进制构建可使用下面所述的配置来查询不一样版本的 Hive 转移。 请注意,独立于用于与转移点通讯的 Hive 版本,内部 Spark SQL 将针对 Hive 1.2.1 进行编译,并使用这些类进行内部执行(serdes,UDF,UDAF等)。

如下选项可用于配置用于检索元数据的 Hive 版本:

属性名称 默认值 含义
spark.sql.hive.metastore.version 1.2.1 Hive metastore 版本。 可用选项为 0.12.0 至 1.2.1
spark.sql.hive.metastore.jars builtin 当启用 -Phive 时,使用 Hive 1.2.1,它与 Spark 程序集捆绑在一块儿。选择此选项时,spark.sql.hive.metastore.version 必须为 1.2.1 或未定义。 行家 使用从Maven存储库下载的指定版本的Hive jar。 一般不建议在生产部署中使用此配置。 ***** 应用于实例化 HiveMetastoreClient 的 jar 的位置。该属性能够是三个选项之一:
  1. builtin当启用 -Phive 时,使用 Hive 1.2.1,它与 Spark 程序集捆绑在一块儿。选择此选项时,spark.sql.hive.metastore.version 必须为 1.2.1 或未定义。
  2. maven使用从 Maven 存储库下载的指定版本的 Hive jar。一般不建议在生产部署中使用此配置。
  3. JVM 的标准格式的 classpath。 该类路径必须包含全部 Hive 及其依赖项,包括正确版本的 Hadoop。这些罐只须要存在于 driver 程序中,但若是您正在运行在 yarn 集群模式,那么您必须确保它们与应用程序一块儿打包。
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc

使用逗号分隔的类前缀列表,应使用在 Spark SQL 和特定版本的 Hive 之间共享的类加载器来加载。 一个共享类的示例就是用来访问 Hive metastore 的 JDBC driver。 其它须要共享的类,是须要与已经共享的类进行交互的。 例如,log4j 使用的自定义 appender。

spark.sql.hive.metastore.barrierPrefixes (empty)

一个逗号分隔的类前缀列表,应该明确地为 Spark SQL 正在通讯的 Hive 的每一个版本从新加载。 例如,在一般将被共享的前缀中声明的 Hive UDF (即: org.apache.spark.*)。

JDBC 链接其它数据库

Spark SQL 还包括可使用 JDBC 从其余数据库读取数据的数据源。此功能应优于使用 JdbcRDD。 这是由于结果做为 DataFrame 返回,而且能够轻松地在 Spark SQL 中处理或与其余数据源链接。 JDBC 数据源也更容易从 Java 或 Python 使用,由于它不须要用户提供 ClassTag。(请注意,这不一样于 Spark SQL JDBC 服务器,容许其余应用程序使用 Spark SQL 运行查询)。

要开始使用,您须要在 Spark 类路径中包含特定数据库的 JDBC driver 程序。 例如,要从 Spark Shell 链接到 postgres,您将运行如下命令:

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

可使用 Data Sources API 未来自远程数据库的表做为 DataFrame 或 Spark SQL 临时视图进行加载。 用户能够在数据源选项中指定 JDBC 链接属性。用户 和 密码一般做为登陆数据源的链接属性提供。 除了链接属性外,Spark 还支持如下不区分大小写的选项:

属性名称 含义
url 要链接的JDBC URL。 源特定的链接属性能够在URL中指定。 例如jdbc:jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable 应该读取的 JDBC 表。请注意,可使用在SQL查询的 FROM 子句中有效的任何内容。 例如,您可使用括号中的子查询代替完整表。
driver 用于链接到此 URL 的 JDBC driver 程序的类名。
partitionColumn, lowerBound, upperBound 若是指定了这些选项,则必须指定这些选项。 另外,必须指定 numPartitions. 他们描述如何从多个 worker 并行读取数据时将表给分区。partitionColumn 必须是有问题的表中的数字列。 请注意,lowerBound 和 upperBound 仅用于决定分区的大小,而不是用于过滤表中的行。 所以,表中的全部行将被分区并返回。此选项仅适用于读操做。
numPartitions 在表读写中能够用于并行度的最大分区数。这也肯定并发JDBC链接的最大数量。 若是要写入的分区数超过此限制,则在写入以前经过调用 coalesce(numPartitions) 将其减小到此限制。
fetchsize JDBC 抓取的大小,用于肯定每次数据往返传递的行数。 这有利于提高 JDBC driver 的性能,它们的默认值较小(例如: Oracle 是 10 行)。 该选项仅适用于读取操做。
batchsize JDBC 批处理的大小,用于肯定每次数据往返传递的行数。 这有利于提高 JDBC driver 的性能。 该选项仅适用于写操做。默认值为 1000.
isolationLevel 事务隔离级别,适用于当前链接。 它能够是 NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ, 或 SERIALIZABLE 之一,对应于 JDBC 链接对象定义的标准事务隔离级别,默认为 READ_UNCOMMITTED。 此选项仅适用于写操做。请参考 java.sql.Connection 中的文档。
truncate 这是一个与 JDBC 相关的选项。 启用 SaveMode.Overwrite 时,此选项会致使 Spark 截断现有表,而不是删除并从新建立。 这能够更有效,而且防止表元数据(例如,索引)被移除。 可是,在某些状况下,例如当新数据具备不一样的模式时,它将没法工做。 它默认为 false。 此选项仅适用于写操做。
createTableOptions 这是一个与JDBC相关的选项。 若是指定,此选项容许在建立表时设置特定于数据库的表和分区选项(例如:CREATE TABLE t (name string) ENGINE=InnoDB. )。此选项仅适用于写操做。
createTableColumnTypes 使用数据库列数据类型而不是默认值,建立表时。 数据类型信息应以与 CREATE TABLE 列语法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)")。 指定的类型应该是有效的 spark sql 数据类型。此选项仅适用于写操做。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load() val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password") val jdbcDF2 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

故障排除

  • JDBC driver 程序类必须对客户端会话和全部执行程序上的原始类加载器可见。 这是由于 Java 的 DriverManager 类执行安全检查,致使它忽略原始类加载器不可见的全部 driver 程序,当打开链接时。一个方便的方法是修改全部工做节点上的compute_classpath.sh 以包含您的 driver 程序 JAR。
  • 一些数据库,例如 H2,将全部名称转换为大写。 您须要使用大写字母来引用 Spark SQL 中的这些名称。

性能调优

对于某些工做负载,能够经过缓存内存中的数据或打开一些实验选项来提升性能。

在内存中缓存数据

Spark SQL 能够经过调用 spark.catalog.cacheTable("tableName") 或 dataFrame.cache() 来使用内存中的列格式来缓存表。 而后,Spark SQL 将只扫描所需的列,并将自动调整压缩以最小化内存使用量和 GC 压力。 您能够调用 spark.catalog.uncacheTable("tableName") 从内存中删除该表。

内存缓存的配置可使用 SparkSession 上的 setConf 方法或使用 SQL 运行 SET key=value 命令来完成。

属性名称 默认 含义
spark.sql.inMemoryColumnarStorage.compressed true 当设置为 true 时,Spark SQL 将根据数据的统计信息为每一个列自动选择一个压缩编解码器。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制批量的柱状缓存的大小。更大的批量大小能够提升内存利用率和压缩率,可是在缓存数据时会冒出 OOM 风险。

其余配置选项

如下选项也可用于调整查询执行的性能。这些选项可能会在未来的版本中被废弃,由于更多的优化是自动执行的。

属性名称 默认值 含义
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 在读取文件时,将单个分区打包的最大字节数。
spark.sql.files.openCostInBytes 4194304 (4 MB) 按照字节数来衡量的打开文件的估计费用能够在同一时间进行扫描。 将多个文件放入分区时使用。最好过分估计,那么具备小文件的分区将比具备较大文件的分区(首先计划的)更快。
spark.sql.broadcastTimeout 300

广播链接中的广播等待时间超时(秒)

spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置执行链接时将广播给全部工做节点的表的最大大小(以字节为单位)。 经过将此值设置为-1能够禁用广播。 请注意,目前的统计信息仅支持 Hive Metastore 表,其中已运行命令 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan
spark.sql.shuffle.partitions 200 Configures the number of partitions to use when shuffling data for joins or aggregations.

分布式 SQL 引擎

Spark SQL 也能够充当使用其 JDBC/ODBC 或命令行界面的分布式查询引擎。 在这种模式下,最终用户或应用程序能够直接与 Spark SQL 交互运行 SQL 查询,而不须要编写任何代码。

运行 Thrift JDBC/ODBC 服务器

这里实现的 Thrift JDBC/ODBC 服务器对应于 Hive 1.2 中的 HiveServer2。 您可使用 Spark 或 Hive 1.2.1 附带的直线脚本测试 JDBC 服务器。

要启动 JDBC/ODBC 服务器,请在 Spark 目录中运行如下命令:

./sbin/start-thriftserver.sh

此脚本接受全部 bin/spark-submit 命令行选项,以及 --hiveconf 选项来指定 Hive 属性。 您能够运行 ./sbin/start-thriftserver.sh --help 查看全部可用选项的完整列表。 默认状况下,服务器监听 localhost:10000. 您能够经过环境变量覆盖此行为,即:

export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ...

or system properties:

./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...

如今,您可使用 beeline 来测试 Thrift JDBC/ODBC 服务器:

./bin/beeline

使用 beeline 方式链接到 JDBC/ODBC 服务器:

beeline> !connect jdbc:hive2://localhost:10000

Beeline 将要求您输入用户名和密码。 在非安全模式下,只需输入机器上的用户名和空白密码便可。 对于安全模式,请按照 beeline 文档 中的说明进行操做。

配置Hive是经过将 hive-site.xmlcore-site.xml 和 hdfs-site.xml 文件放在 conf/ 中完成的。

您也可使用 Hive 附带的 beeline 脚本。

Thrift JDBC 服务器还支持经过 HTTP 传输发送 thrift RPC 消息。 使用如下设置启用 HTTP 模式做为系统属性或在 conf/ 中的 hive-site.xml 文件中启用:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

要测试,请使用 beeline 以 http 模式链接到 JDBC/ODBC 服务器:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

运行 Spark SQL CLI

Spark SQL CLI 是在本地模式下运行 Hive 转移服务并执行从命令行输入的查询的方便工具。 请注意,Spark SQL CLI 不能与 Thrift JDBC 服务器通讯。

要启动 Spark SQL CLI,请在 Spark 目录中运行如下命令:

./bin/spark-sql

配置 Hive 是经过将 hive-site.xmlcore-site.xml 和 hdfs-site.xml 文件放在 conf/ 中完成的。 您能够运行 ./bin/spark-sql --help 获取全部可用选项的完整列表。

迁移指南

从 Spark SQL 2.1 升级到 2.2

  • Spark 2.1.1 介绍了一个新的配置 key: spark.sql.hive.caseSensitiveInferenceMode. 它的默认设置是 NEVER_INFER, 其行为与 2.1.0 保持一致. 可是,Spark 2.2.0 将此设置的默认值更改成 “INFER_AND_SAVE”,以恢复与底层文件 schema(模式)具备大小写混合的列名称的 Hive metastore 表的兼容性。使用 INFER_AND_SAVE 配置的 value, 在第一次访问 Spark 将对其还没有保存推测 schema(模式)的任何 Hive metastore 表执行 schema inference(模式推断). 请注意,对于具备数千个 partitions(分区)的表,模式推断多是很是耗时的操做。若是不兼容大小写混合的列名,您能够安全地将spark.sql.hive.caseSensitiveInferenceMode 设置为 NEVER_INFER,以免模式推断的初始开销。请注意,使用新的默认INFER_AND_SAVE 设置,模式推理的结果被保存为 metastore key 以供未来使用。所以,初始模式推断仅发生在表的第一次访问。

从 Spark SQL 2.0 升级到 2.1

  • Datasource tables(数据源表)如今存储了 Hive metastore 中的 partition metadata(分区元数据). 这意味着诸如 ALTER TABLE PARTITION ... SET LOCATION 这样的 Hive DDLs 如今使用 Datasource API 可用于建立 tables(表).
    • 遗留的数据源表能够经过 MSCK REPAIR TABLE 命令迁移到这种格式。建议迁移遗留表利用 Hive DDL 的支持和提供的计划性能。
    • 要肯定表是否已迁移,当在表上发出 DESCRIBE FORMATTED 命令时请查找 PartitionProvider: Catalog 属性.
  • Datasource tables(数据源表)的 INSERT OVERWRITE TABLE ... PARTITION ... 行为的更改。
    • 在之前的 Spark 版本中,INSERT OVERWRITE 覆盖了整个 Datasource table,即便给出一个指定的 partition. 如今只有匹配规范的 partition 被覆盖。
    • 请注意,这仍然与 Hive 表的行为不一样,Hive 表仅覆盖与新插入数据重叠的分区。

从 Spark SQL 1.6 升级到 2.0

  • SparkSession 如今是 Spark 新的切入点, 它替代了老的 SQLContext 和 HiveContext。注意 : 为了向下兼容,老的 SQLContext 和 HiveContext 仍然保留。能够从 SparkSession 获取一个新的 catalog 接口 — 现有的访问数据库和表的 API,如 listTablescreateExternalTabledropTempViewcacheTable 都被移到该接口。

  • Dataset API 和 DataFrame API 进行了统一。在 Scala 中,DataFrame 变成了 Dataset[Row] 类型的一个别名,而 Java API 使用者必须将 DataFrame 替换成 Dataset<Row>。Dataset 类既提供了强类型转换操做(如 mapfilter 以及 groupByKey)也提供了非强类型转换操做(如 select 和 groupBy)。因为编译期的类型安全不是 Python 和 R 语言的一个特性,Dataset 的概念并不适用于这些语言的 API。相反,DataFrame仍然是最基本的编程抽象, 就相似于这些语言中单节点 data frame 的概念。

  • Dataset 和 DataFrame API 中 unionAll 已通过时而且由 union 替代。
  • Dataset 和 DataFrame API 中 explode 已通过时,做为选择,能够结合 select 或 flatMap 使用 functions.explode() 。
  • Dataset 和 DataFrame API 中 registerTempTable 已通过时而且由 createOrReplaceTempView 替代。

  • 对 Hive tables CREATE TABLE ... LOCATION 行为的更改.
    • 从 Spark 2.0 开始,CREATE TABLE ... LOCATION 与 CREATE EXTERNAL TABLE ... LOCATION 是相同的,以防止意外丢弃用户提供的 locations(位置)中的现有数据。这意味着,在用户指定位置的 Spark SQL 中建立的 Hive 表始终是 Hive 外部表。删除外部表将不会删除数据。 用户不能指定 Hive managed tables(管理表)的位置. 请注意,这与Hive行为不一样。
    • 所以,这些表上的 “DROP TABLE” 语句不会删除数据。

从 Spark SQL 1.5 升级到 1.6

  • 从 Spark 1.6 开始,默认状况下服务器在多 session(会话)模式下运行。这意味着每一个 JDBC/ODBC 链接拥有一份本身的 SQL 配置和临时函数注册。缓存表仍在并共享。若是您但愿以旧的单会话模式运行 Thrift server,请设置选项 spark.sql.hive.thriftServer.singleSession 为true。您既能够将此选项添加到 spark-defaults.conf,或者经过 --conf 将它传递给 start-thriftserver.sh
./sbin/start-thriftserver.sh \ --conf spark.sql.hive.thriftServer.singleSession=true \ ... 
  • 从 1.6.1 开始,在 sparkR 中 withColumn 方法支持添加一个新列或更换 DataFrame 同名的现有列。

  • 从 Spark 1.6 开始,LongType 强制转换为 TimestampType 指望是秒,而不是微秒。这种更改是为了匹配 Hive 1.2 的行为,以便从 numeric(数值)类型进行更一致的类型转换到 TimestampType。更多详情请参阅 SPARK-11724 。

从 Spark SQL 1.4 升级到 1.5

  • 使用手动管理的内存优化执行,如今是默认启用的,以及代码生成表达式求值。这些功能既能够经过设置 spark.sql.tungsten.enabled 为 false 来禁止使用。
  • Parquet 的模式合并默认状况下再也不启用。它能够经过设置 spark.sql.parquet.mergeSchema 到 true 以从新启用。
  • 字符串在 Python 列的 columns(列)如今支持使用点(.)来限定列或访问嵌套值。例如 df['table.column.nestedField']。可是,这意味着若是你的列名中包含任何圆点,你如今必须避免使用反引号(如 table.column.with.dots.nested)。
  • 在内存中的列存储分区修剪默认是开启的。它能够经过设置 spark.sql.inMemoryColumnarStorage.partitionPruning 为 false 来禁用。
  • 无限精度的小数列再也不支持,而不是 Spark SQL 最大精度为 38 。当从 BigDecimal 对象推断模式时,如今使用(38,18)。在 DDL 没有指定精度时,则默认保留 Decimal(10, 0)
  • 时间戳如今存储在 1 微秒的精度,而不是 1 纳秒的。
  • 在 sql 语句中,floating point(浮点数)如今解析为 decimal。HiveQL 解析保持不变。
  • SQL / DataFrame 函数的规范名称如今是小写(例如 sum vs SUM)。
  • JSON 数据源不会自动加载由其余应用程序(未经过 Spark SQL 插入到数据集的文件)建立的新文件。对于 JSON 持久表(即表的元数据存储在 Hive Metastore),用户可使用 REFRESH TABLE SQL 命令或 HiveContext 的 refreshTable 方法,把那些新文件列入到表中。对于表明一个 JSON dataset 的 DataFrame,用户须要从新建立 DataFrame,同时 DataFrame 中将包括新的文件。
  • PySpark 中 DataFrame 的 withColumn 方法支持添加新的列或替换现有的同名列。

从 Spark SQL 1.3 升级到 1.4

DataFrame data reader/writer interface

基于用户反馈,咱们建立了一个新的更流畅的 API,用于读取 (SQLContext.read) 中的数据并写入数据 (DataFrame.write), 而且旧的 API 将过期(例如,SQLContext.parquetFileSQLContext.jsonFile).

针对 SQLContext.read ( ScalaJavaPython ) 和 DataFrame.write ( ScalaJavaPython ) 的更多细节,请看 API 文档.

DataFrame.groupBy 保留 grouping columns(分组的列)

根据用户的反馈, 咱们更改了 DataFrame.groupBy().agg() 的默认行为以保留 DataFrame 结果中的 grouping columns(分组列). 为了在 1.3 中保持该行为,请设置 spark.sql.retainGroupColumns 为 false.

// In 1.3.x, in order for the grouping column "department" to show up, // it must be included explicitly as part of the agg function call. df.groupBy("department").agg($"department", max("age"), sum("expense")) // In 1.4+, grouping column "department" is included automatically. df.groupBy("department").agg(max("age"), sum("expense")) // Revert to 1.3 behavior (not retaining grouping column) by: sqlContext.setConf("spark.sql.retainGroupColumns", "false")

DataFrame.withColumn 上的行为更改

以前 1.4 版本中,DataFrame.withColumn() 只支持添加列。该列将始终在 DateFrame 结果中被加入做为新的列,即便现有的列可能存在相同的名称。从 1.4 版本开始,DataFrame.withColumn() 支持添加与全部现有列的名称不一样的列或替换现有的同名列。

请注意,这一变化仅适用于 Scala API,并不适用于 PySpark 和 SparkR。

从 Spark SQL 1.0-1.2 升级到 1.3

在 Spark 1.3 中,咱们从 Spark SQL 中删除了 “Alpha” 的标签,做为一部分已经清理过的可用的 API 。从 Spark 1.3 版本以上,Spark SQL 将提供在 1.X 系列的其余版本的二进制兼容性。这种兼容性保证不包括被明确标记为不稳定的(即 DeveloperApi 类或 Experimental) API。

重命名 DataFrame 的 SchemaRDD

升级到 Spark SQL 1.3 版本时,用户会发现最大的变化是,SchemaRDD 已改名为 DataFrame。这主要是由于 DataFrames 再也不从 RDD 直接继承,而是由 RDDS 本身来实现这些功能。DataFrames 仍然能够经过调用 .rdd 方法转换为 RDDS 。

在 Scala 中,有一个从 SchemaRDD 到 DataFrame 类型别名,能够为一些状况提供源代码兼容性。它仍然建议用户更新他们的代码以使用 DataFrame来代替。Java 和 Python 用户须要更新他们的代码。

Java 和 Scala APIs 的统一

此前 Spark 1.3 有单独的Java兼容类(JavaSQLContext 和 JavaSchemaRDD),借鉴于 Scala API。在 Spark 1.3 中,Java API 和 Scala API 已经统一。两种语言的用户可使用 SQLContext 和 DataFrame。通常来讲论文类尝试使用两种语言的共有类型(如 Array 替代了一些特定集合)。在某些状况下不通用的类型状况下,(例如,passing in closures 或 Maps)使用函数重载代替。

此外,该 Java 的特定类型的 API 已被删除。Scala 和 Java 的用户可使用存在于 org.apache.spark.sql.types 类来描述编程模式。

隔离隐式转换和删除 dsl 包(仅Scala)

许多 Spark 1.3 版本之前的代码示例都以 import sqlContext._ 开始,这提供了从 sqlContext 范围的全部功能。在 Spark 1.3 中,咱们移除了从 RDDs 到 DateFrame 再到 SQLContext 内部对象的隐式转换。用户如今应该写成 import sqlContext.implicits._.

此外,隐式转换如今只能使用方法 toDF 来增长由 Product(即 case classes or tuples)构成的 RDD,而不是自动应用。

当使用 DSL 内部的函数时(如今使用 DataFrame API 来替换), 用户习惯导入 org.apache.spark.sql.catalyst.dsl. 相反,应该使用公共的 dataframe 函数 API: import org.apache.spark.sql.functions._.

针对 DataType 删除在 org.apache.spark.sql 包中的一些类型别名(仅限于 Scala)

Spark 1.3 移除存在于基本 SQL 包的 DataType 类型别名。开发人员应改成导入类 org.apache.spark.sql.types

UDF 注册迁移到 sqlContext.udf 中 (Java & Scala)

用于注册 UDF 的函数,无论是 DataFrame DSL 仍是 SQL 中用到的,都被迁移到 SQLContext 中的 udf 对象中。

sqlContext.udf.register("strLen", (s: String) => s.length())

Python UDF 注册保持不变。

Python DataTypes 再也不是 Singletons(单例的)

在 Python 中使用 DataTypes 时,你须要先构造它们(如:StringType()),而不是引用一个单例对象。

与 Apache Hive 的兼容

Spark SQL 在设计时就考虑到了和 Hive metastore,SerDes 以及 UDF 之间的兼容性。目前 Hive SerDes 和 UDF 都是基于 Hive 1.2.1 版本,而且Spark SQL 能够链接到不一样版本的Hive metastore(从 0.12.0 到 1.2.1,能够参考 与不一样版本的 Hive Metastore 交互

在现有的 Hive Warehouses 中部署

Spark SQL Thrift JDBC server 采用了开箱即用的设计以兼容已有的 Hive 安装版本。你不须要修改现有的 Hive Metastore , 或者改变数据的位置和表的分区。

所支持的 Hive 特性

Spark SQL 支持绝大部分的 Hive 功能,如:

  • Hive query(查询)语句, 包括:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 全部 Hive 操做, 包括:
    • 关系运算符 (===<><>>=<=, 等等)
    • 算术运算符 (+-*/%, 等等)
    • 逻辑运算符 (AND&&OR||, 等等)
    • 复杂类型的构造
    • 数学函数 (signlncos, 等等)
    • String 函数 (instrlengthprintf, 等等)
  • 用户定义函数 (UDF)
  • 用户定义聚合函数 (UDAF)
  • 用户定义 serialization formats (SerDes)
  • 窗口函数
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • Sub-queries(子查询)
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • Sampling
  • Explain
  • Partitioned tables including dynamic partition insertion
  • View
  • 全部的 Hive DDL 函数, 包括:
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • 大部分的 Hive Data types(数据类型), 包括:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

未支持的 Hive 函数

如下是目前还不支持的 Hive 函数列表。在 Hive 部署中这些功能大部分都用不到。

主要的 Hive 功能

  • Tables 使用 buckets 的 Tables: bucket 是 Hive table partition 中的 hash partitioning. Spark SQL 还不支持 buckets.

Esoteric Hive 功能

  • UNION 类型
  • Unique join
  • Column 统计信息的收集: Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore.

Hive Input/Output Formats

  • File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.
  • Hadoop archive

Hive 优化

有少数 Hive 优化尚未包含在 Spark 中。其中一些(好比 indexes 索引)因为 Spark SQL 的这种内存计算模型而显得不那么重要。另一些在 Spark SQL 将来的版本中会持续跟踪。

  • Block 级别的 bitmap indexes 和虚拟 columns (用于构建 indexes)
  • 自动为 join 和 groupBy 计算 reducer 个数 : 目前在 Spark SQL 中, 你须要使用 “SET spark.sql.shuffle.partitions=[num_tasks];” 来控制 post-shuffle 的并行度.
  • 仅 Meta-data 的 query: 对于只使用 metadata 就能回答的查询,Spark SQL 仍然会启动计算结果的任务.
  • Skew data flag: Spark SQL 不遵循 Hive 中 skew 数据的标记.
  • STREAMTABLE hint in join: Spark SQL 不遵循 STREAMTABLE hint.
  • 对于查询结果合并多个小文件: 若是输出的结果包括多个小文件, Hive 能够可选的合并小文件到一些大文件中去,以免溢出 HDFS metadata. Spark SQL 还不支持这样.

参考

数据类型

Spark SQL 和 DataFrames 支持下面的数据类型:

  • Numeric types
    • ByteType: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.
    • ShortType: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.
    • IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.
    • LongType: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.
    • FloatType: Represents 4-byte single-precision floating point numbers.
    • DoubleType: Represents 8-byte double-precision floating point numbers.
    • DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally by java.math.BigDecimal. A BigDecimalconsists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
  • String type
    • StringType: Represents character string values.
  • Binary type
    • BinaryType: Represents byte sequence values.
  • Boolean type
    • BooleanType: Represents boolean values.
  • Datetime type
    • TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
    • DateType: Represents values comprising values of fields year, month, day.
  • Complex types
    • ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type of elementTypecontainsNull is used to indicate if elements in a ArrayType value can have null values.
    • MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described by keyType and the data type of values are described by valueType. For a MapType value, keys are not allowed to have nullvalues. valueContainsNull is used to indicate if values of a MapType value can have null values.
    • StructType(fields): Represents values with the structure described by a sequence of StructFields (fields).
      • StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataTypenullable is used to indicate if values of this fields can have null values.

Spark SQL 的全部数据类型都在包 org.apache.spark.sql.types 中. 你能够用下示例示例来访问它们.

import org.apache.spark.sql.types._ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Data type(数据类型) Scala 中的 Value 类型 访问或建立数据类型的 API
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])
Note(注意): containsNull 的默认值是 true.
MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull])
Note(注意): valueContainsNull 的默认值是 true.
StructType org.apache.spark.sql.Row StructType(fields)
Note(注意): fields 是 StructFields 的 Seq. 全部, 两个 fields 拥有相同的名称是不被容许的.
StructField 该 field(字段)数据类型的 Scala 中的 value 类型 (例如, 数据类型为 IntegerType 的 StructField 是 Int) StructField(name, dataType, [nullable])
Note: nullable 的默认值是 true.

NaN Semantics

当处理一些不符合标准浮点数语义的 float 或 double 类型时,对于 Not-a-Number(NaN) 须要作一些特殊处理. 具体以下:

  • NaN = NaN 返回 true.
  • 在 aggregations(聚合)操做中,全部的 NaN values 将被分到同一个组中.
  • 在 join key 中 NaN 能够当作一个普通的值.
  • NaN 值在升序排序中排到最后,比任何其余数值都大.

 


Spark SQL, DataFrames and Datasets Guide

Overview

Spark SQL 是 Spark 处理结构化数据的一个模块.与基础的 Spark RDD API 不一样, Spark SQL 提供了查询结构化数据及计算结果等信息的接口.在内部, Spark SQL 使用这个额外的信息去执行额外的优化.有几种方式能够跟 Spark SQL 进行交互, 包括 SQL 和 Dataset API.当使用相同执行引擎进行计算时, 不管使用哪一种 API / 语言均可以快速的计算.这种统一意味着开发人员可以在基于提供最天然的方式来表达一个给定的 transformation API 之间实现轻松的来回切换不一样的 .

该页面全部例子使用的示例数据都包含在 Spark 的发布中, 而且可使用 spark-shellpyspark shell, 或者 sparkR shell来运行.

SQL

Spark SQL 的功能之一是执行 SQL 查询.Spark SQL 也可以被用于从已存在的 Hive 环境中读取数据.更多关于如何配置这个特性的信息, 请参考 Hive 表 这部分. 当以另外的编程语言运行SQL 时, 查询结果将以 Dataset/DataFrame的形式返回.您也可使用 命令行或者经过 JDBC/ODBC与 SQL 接口交互.

Datasets and DataFrames

一个 Dataset 是一个分布式的数据集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的优势(强类型化, 可以使用强大的 lambda 函数)与Spark SQL执行引擎的优势.一个 Dataset 能够从 JVM 对象来 构造 而且使用转换功能(map, flatMap, filter, 等等). Dataset API 在Scala 和Java是可用的.Python 不支持 Dataset API.可是因为 Python 的动态特性, 许多 Dataset API 的优势已经可用了 (也就是说, 你可能经过 name 天生的row.columnName属性访问一行中的字段).这种状况和 R 类似.

一个 DataFrame 是一个 Dataset 组成的指定列.它的概念与一个在关系型数据库或者在 R/Python 中的表是相等的, 可是有不少优化. DataFrames 能够从大量的 sources 中构造出来, 好比: 结构化的文本文件, Hive中的表, 外部数据库, 或者已经存在的 RDDs. DataFrame API 能够在 Scala, Java, Python, 和 R中实现. 在 Scala 和 Java中, 一个 DataFrame 所表明的是一个多个 Row(行)的的 Dataset(数据集合). 在 the Scala API中, DataFrame仅仅是一个 Dataset[Row]类型的别名. 然而, 在 Java API中, 用户须要去使用 Dataset<Row> 去表明一个 DataFrame.

在此文档中, 咱们将经常会引用 Scala/Java Datasets 的 Rows 做为 DataFrames.

开始入门

起始点: SparkSession

Spark SQL中全部功能的入口点是 SparkSession 类. 要建立一个 SparkSession, 仅使用 SparkSession.builder()就能够了:

import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

Spark 2.0 中的SparkSession 为 Hive 特性提供了内嵌的支持, 包括使用 HiveQL 编写查询的能力, 访问 Hive UDF,以及从 Hive 表中读取数据的能力.为了使用这些特性, 你不须要去有一个已存在的 Hive 设置.

建立 DataFrames

在一个 SparkSession中, 应用程序能够从一个 已经存在的 RDD, 从hive表, 或者从 Spark数据源中建立一个DataFrames.

举个例子, 下面就是基于一个JSON文件建立一个DataFrame:

val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

无类型的Dataset操做 (aka DataFrame 操做)

DataFrames 提供了一个特定的语法用在 ScalaJavaPython and R中机构化数据的操做.

正如上面提到的同样, Spark 2.0中, DataFrames在Scala 和 Java API中, 仅仅是多个 Rows的Dataset. 这些操做也参考了与强类型的Scala/Java Datasets中的”类型转换” 对应的”无类型转换” .

这里包括一些使用 Dataset 进行结构化数据处理的示例 :

// This import is needed to use the $-notation
import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

可以在 DataFrame 上被执行的操做类型的完整列表请参考 API 文档.

除了简单的列引用和表达式以外, DataFrame 也有丰富的函数库, 包括 string 操做, date 算术, 常见的 math 操做以及更多.可用的完整列表请参考  DataFrame 函数指南.

Running SQL Queries Programmatically

SparkSession 的 sql 函数可让应用程序以编程的方式运行 SQL 查询, 并将结果做为一个 DataFrame 返回.

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

全局临时视图

Spark SQL中的临时视图是session级别的, 也就是会随着session的消失而消失. 若是你想让一个临时视图在全部session中相互传递而且可用, 直到Spark 应用退出, 你能够创建一个全局的临时视图.全局的临时视图存在于系统数据库 global_temp中, 咱们必须加上库名去引用它, 好比. SELECT * FROM global_temp.view1.

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

建立Datasets

Dataset 与 RDD 类似, 然而, 并非使用 Java 序列化或者 Kryo 编码器 来序列化用于处理或者经过网络进行传输的对象. 虽然编码器和标准的序列化都负责将一个对象序列化成字节, 编码器是动态生成的代码, 而且使用了一种容许 Spark 去执行许多像 filtering, sorting 以及 hashing 这样的操做, 不须要将字节反序列化成对象的格式.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

RDD的互操做性

Spark SQL 支持两种不一样的方法用于转换已存在的 RDD 成为 Dataset.第一种方法是使用反射去推断一个包含指定的对象类型的 RDD 的 Schema.在你的 Spark 应用程序中当你已知 Schema 时这个基于方法的反射可让你的代码更简洁.

第二种用于建立 Dataset 的方法是经过一个容许你构造一个 Schema 而后把它应用到一个已存在的 RDD 的编程接口.然而这种方法更繁琐, 当列和它们的类型知道运行时都是未知时它容许你去构造 Dataset.

使用反射推断Schema

Spark SQL 的 Scala 接口支持自动转换一个包含 case classes 的 RDD 为 DataFrame.Case class 定义了表的 Schema.Case class 的参数名使用反射读取而且成为了列名.Case class 也能够是嵌套的或者包含像 Seq 或者 Array 这样的复杂类型.这个 RDD 可以被隐式转换成一个 DataFrame 而后被注册为一个表.表能够用于后续的 SQL 语句.

// For implicit conversions from RDDs to DataFrames
import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19)) 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

以编程的方式指定Schema

当 case class 不可以在执行以前被定义(例如, records 记录的结构在一个 string 字符串中被编码了, 或者一个 text 文本 dataset 将被解析而且不一样的用户投影的字段是不同的).一个 DataFrame 可使用下面的三步以编程的方式来建立.

  1. 从原始的 RDD 建立 RDD 的 Row(行);
  2. Step 1 被建立后, 建立 Schema 表示一个 StructType 匹配 RDD 中的 Row(行)的结构.
  3. 经过 SparkSession 提供的 createDataFrame 方法应用 Schema 到 RDD 的 RowS(行).

例如:

import org.apache.spark.sql.types._ // Create an RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name results.map(attributes => "Name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

Aggregations

The built-in DataFrames functions provide common aggregations such as count()countDistinct()avg()max()min(), etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in Scala and Java to work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own.

Untyped User-Defined Aggregate Functions

Users have to extend the UserDefinedAggregateFunction abstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:

import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession object MyAverage extends UserDefinedAggregateFunction { // Data types of input arguments of this aggregate function def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // Data types of values in the aggregation buffer def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // The data type of the returned value def dataType: DataType = DoubleType // Whether this function always returns the same output on the identical input def deterministic: Boolean = true // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // Updates the given aggregation buffer `buffer` with new input data from `input` def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) } // Register the function to access it spark.udf.register("myAverage", MyAverage) val df = spark.read.json("examples/src/main/resources/employees.json") df.createOrReplaceTempView("employees") df.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.

Type-Safe User-Defined Aggregate Functions

User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:

import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.Encoder import org.apache.spark.sql.Encoders import org.apache.spark.sql.SparkSession case class Employee(name: String, salary: Long) case class Average(var sum: Long, var count: Long) object MyAverage extends Aggregator[Employee, Average, Double] { // A zero value for this aggregation. Should satisfy the property that any b + zero = b def zero: Average = Average(0L, 0L) // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer } // Merge two intermediate values def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 } // Transform the output of the reduction def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count // Specifies the Encoder for the intermediate value type def bufferEncoder: Encoder[Average] = Encoders.product // Specifies the Encoder for the final output value type def outputEncoder: Encoder[Double] = Encoders.scalaDouble } val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee] ds.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ // Convert the function to a `TypedColumn` and give it a name val averageSalary = MyAverage.toColumn.name("average_salary") val result = ds.select(averageSalary) result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.

Data Sources (数据源)

Spark SQL 支持经过 DataFrame 接口对各类 data sources (数据源)进行操做. DataFrame 可使用 relational transformations (关系转换)操做, 也可用于建立 temporary view (临时视图). 将 DataFrame 注册为 temporary view (临时视图)容许您对其数据运行 SQL 查询. 本节 描述了使用 Spark Data Sources 加载和保存数据的通常方法, 而后涉及可用于 built-in data sources (内置数据源)的 specific options (特定选项).

Generic Load/Save Functions (通用 加载/保存 功能)

在最简单的形式中, 默认数据源(parquet, 除非另有配置 spark.sql.sources.default )将用于全部操做.

val usersDF = spark.read.load("examples/src/main/resources/users.parquet") usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Manually Specifying Options (手动指定选项)

您还能够 manually specify (手动指定)将与任何你想传递给 data source 的其余选项一块儿使用的 data source . Data sources 由其 fully qualified name (彻底限定名称)(即 org.apache.spark.sql.parquet ), 可是对于 built-in sources (内置的源), 你也可使用它们的 shortnames (短名称)(jsonparquetjdbcorclibsvmcsvtext).从任何 data source type (数据源类型)加载 DataFrames 可使用此 syntax (语法)转换为其余类型.

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Run SQL on files directly (直接在文件上运行 SQL)

不使用读取 API 将文件加载到 DataFrame 并进行查询, 也能够直接用 SQL 查询该文件.

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Save Modes (保存模式)

Save operations (保存操做)能够选择使用 SaveMode , 它指定如何处理现有数据若是存在的话. 重要的是要意识到, 这些 save modes (保存模式)不使用任何 locking (锁定)而且不是 atomic (原子). 另外, 当执行 Overwrite 时, 数据将在新数据写出以前被删除.

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) "error"(default) 将 DataFrame 保存到 data source (数据源)时, 若是数据已经存在, 则会抛出异常.
SaveMode.Append "append" 将 DataFrame 保存到 data source (数据源)时, 若是 data/table 已存在, 则 DataFrame 的内容将被 append (附加)到现有数据中.
SaveMode.Overwrite "overwrite" Overwrite mode (覆盖模式)意味着将 DataFrame 保存到 data source (数据源)时, 若是 data/table 已经存在, 则预期 DataFrame 的内容将 overwritten (覆盖)现有数据.
SaveMode.Ignore "ignore" Ignore mode (忽略模式)意味着当将 DataFrame 保存到 data source (数据源)时, 若是数据已经存在, 则保存操做预期不会保存 DataFrame 的内容, 而且不更改现有数据. 这与 SQL 中的 CREATE TABLE IF NOT EXISTS 相似.

Saving to Persistent Tables (保存到持久表)

DataFrames 也可使用 saveAsTable 命令做为 persistent tables (持久表)保存到 Hive metastore 中. 请注意, existing Hive deployment (现有的 Hive 部署)不须要使用此功能. Spark 将为您建立默认的 local Hive metastore (本地 Hive metastore)(使用 Derby ). 与 createOrReplaceTempView 命令不一样, saveAsTable 将 materialize (实现) DataFrame 的内容, 并建立一个指向 Hive metastore 中数据的指针. 即便您的 Spark 程序从新启动, Persistent tables (持久性表)仍然存在, 由于您保持与同一个 metastore 的链接. 能够经过使用表的名称在 SparkSession上调用 table 方法来建立 persistent tabl (持久表)的 DataFrame .

对于 file-based (基于文件)的 data source (数据源), 例如 text, parquet, json等, 您能够经过 path 选项指定 custom table path (自定义表路径), 例如 df.write.option("path", "/some/path").saveAsTable("t") . 当表被 dropped (删除)时, custom table path (自定义表路径)将不会被删除, 而且表数据仍然存在. 若是未指定自定义表路径, Spark 将把数据写入 warehouse directory (仓库目录)下的默认表路径. 当表被删除时, 默认的表路径也将被删除.

从 Spark 2.1 开始, persistent datasource tables (持久性数据源表)将 per-partition metadata (每一个分区元数据)存储在 Hive metastore 中. 这带来了几个好处:

  • 因为 metastore 只能返回查询的必要 partitions (分区), 所以再也不须要将第一个查询上的全部 partitions discovering 到表中.
  • Hive DDLs 如 ALTER TABLE PARTITION ... SET LOCATION 如今可用于使用 Datasource API 建立的表.

请注意, 建立 external datasource tables (外部数据源表)(带有 path 选项)的表时, 默认状况下不会收集 partition information (分区信息). 要 sync (同步) metastore 中的分区信息, 能够调用 MSCK REPAIR TABLE .

Bucketing, Sorting and Partitioning (分桶, 排序和分区)

对于 file-based data source (基于文件的数据源), 也能够对 output (输出)进行 bucket 和 sort 或者 partition . Bucketing 和 sorting 仅适用于 persistent tables :

peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

在使用 Dataset API 时, partitioning 能够同时与 save 和 saveAsTable 一块儿使用.

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

能够为 single table (单个表)使用 partitioning 和 bucketing:

peopleDF
  .write .partitionBy("favorite_color") .bucketBy(42, "name") .saveAsTable("people_partitioned_bucketed") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

partitionBy 建立一个 directory structure (目录结构), 如 Partition Discovery 部分所述. 所以, 对 cardinality (基数)较高的 columns 的适用性有限. 相反, bucketBy 能够在固定数量的 buckets 中分配数据, 而且能够在 a number of unique values is unbounded (多个惟一值无界时)使用数据.

Parquet Files

Parquet 是许多其余数据处理系统支持的 columnar format (柱状格式). Spark SQL 支持读写 Parquet 文件, 可自动保留 schema of the original data (原始数据的模式). 当编写 Parquet 文件时, 出于兼容性缘由, 全部 columns 都将自动转换为可空.

Loading Data Programmatically (以编程的方式加载数据)

使用上面例子中的数据:

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._ val peopleDF = spark.read.json("examples/src/main/resources/people.json") // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write.parquet("people.parquet") // Read in the parquet file created above // Parquet files are self-describing so the schema is preserved // The result of loading a Parquet file is also a DataFrame val parquetFileDF = spark.read.parquet("people.parquet") // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Partition Discovery (分区发现)

Table partitioning (表分区)是在像 Hive 这样的系统中使用的常见的优化方法. 在 partitioned table (分区表)中, 数据一般存储在不一样的目录中, partitioning column values encoded (分区列值编码)在每一个 partition directory (分区目录)的路径中. Parquet data source (Parquet 数据源)如今能够自动 discover (发现)和 infer (推断)分区信息. 例如, 咱们可使用如下 directory structure (目录结构)将全部之前使用的 population data (人口数据)存储到 partitioned table (分区表)中, 其中有两个额外的列 gender 和 country 做为 partitioning columns (分区列):

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

经过将 path/to/table 传递给 SparkSession.read.parquet 或 SparkSession.read.load , Spark SQL 将自动从路径中提取 partitioning information (分区信息). 如今返回的 DataFrame 的 schema (模式)变成:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

请注意, 会自动 inferred (推断) partitioning columns (分区列)的 data types (数据类型).目前, 支持 numeric data types (数字数据类型)和 string type (字符串类型).有些用户可能不想自动推断 partitioning columns (分区列)的数据类型.对于这些用例, automatic type inference (自动类型推断)能够由 spark.sql.sources.partitionColumnTypeInference.enabled 配置, 默认为 true .当禁用 type inference (类型推断)时, string type (字符串类型)将用于 partitioning columns (分区列).

从 Spark 1.6.0 开始, 默认状况下, partition discovery (分区发现)只能找到给定路径下的 partitions (分区).对于上述示例, 若是用户将 path/to/table/gender=male 传递给 SparkSession.read.parquet 或 SparkSession.read.load , 则 gender 将不被视为 partitioning column (分区列).若是用户须要指定 partition discovery (分区发现)应该开始的基本路径, 则能够在数据源选项中设置 basePath.例如, 当 path/to/table/gender=male 是数据的路径而且用户将 basePath 设置为 path/to/table/gender 将是一个 partitioning column (分区列).

Schema Merging (模式合并)

像 ProtocolBuffer , Avro 和 Thrift 同样, Parquet 也支持 schema evolution (模式演进). 用户能够从一个 simple schema (简单的架构)开始, 并根据须要逐渐向 schema 添加更多的 columns (列). 以这种方式, 用户可能会使用不一样但相互兼容的 schemas 的 multiple Parquet files (多个 Parquet 文件). Parquet data source (Parquet 数据源)如今可以自动检测这种状况并 merge (合并)全部这些文件的 schemas .

因为 schema merging (模式合并)是一个 expensive operation (相对昂贵的操做), 而且在大多数状况下不是必需的, 因此默认状况下从 1.5.0 开始. 你能够按照以下的方式启用它:

  1. 读取 Parquet 文件时, 将 data source option (数据源选项) mergeSchema 设置为 true (以下面的例子所示), 或
  2. 将 global SQL option (全局 SQL 选项) spark.sql.parquet.mergeSchema 设置为 true .
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._ // Create a simple DataFrame, store into a partition directory val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") squaresDF.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") cubesDF.write.parquet("data/test_table/key=2") // Read the partitioned table val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key: int (nullable = true) 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Hive metastore Parquet table conversion (Hive metastore Parquet table 转换)

当读取和写入 Hive metastore Parquet 表时, Spark SQL 将尝试使用本身的 Parquet support (Parquet 支持), 而不是 Hive SerDe 来得到更好的性能. 此 behavior (行为)由 spark.sql.hive.convertMetastoreParquet 配置控制, 默认状况下 turned on (打开).

Hive/Parquet Schema Reconciliation

从 table schema processing (表格模式处理)的角度来讲, Hive 和 Parquet 之间有两个关键的区别.

  1. Hive 不区分大小写, 而 Parquet 不是
  2. Hive 认为全部 columns (列)均可觉得空, 而 Parquet 中的可空性是 significant (重要)的.

因为这个缘由, 当将 Hive metastore Parquet 表转换为 Spark SQL Parquet 表时, 咱们必须调整 metastore schema 与 Parquet schema. reconciliation 规则是:

  1. 在两个 schema 中具备 same name (相同名称)的 Fields (字段)必须具备 same data type (相同的数据类型), 而无论 nullability (可空性). reconciled field 应具备 Parquet 的数据类型, 以便 nullability (可空性)获得尊重.

  2. reconciled schema (调和模式)正好包含 Hive metastore schema 中定义的那些字段.

    • 只出如今 Parquet schema 中的任何字段将被 dropped (删除)在 reconciled schema 中.
    • 仅在 Hive metastore schema 中出现的任何字段在 reconciled schema 中做为 nullable field (可空字段)添加.

Metadata Refreshing (元数据刷新)

Spark SQL 缓存 Parquet metadata 以得到更好的性能. 当启用 Hive metastore Parquet table conversion (转换)时, 这些 converted tables (转换表)的 metadata (元数据)也被 cached (缓存). 若是这些表由 Hive 或其余外部工具更新, 则须要手动刷新以确保 consistent metadata (一致的元数据).

// spark is an existing SparkSession spark.catalog.refreshTable("my_table")

Configuration (配置)

可使用 SparkSession 上的 setConf 方法或使用 SQL 运行 SET key = value 命令来完成 Parquet 的配置.

Property Name (参数名称) Default(默认) Meaning(含义)
spark.sql.parquet.binaryAsString false 一些其余 Parquet-producing systems (Parquet 生产系统), 特别是 Impala, Hive 和旧版本的 Spark SQL , 在 writing out (写出) Parquet schema 时, 不区分 binary data (二进制数据)和 strings (字符串). 该 flag 告诉 Spark SQL 将 binary data (二进制数据)解释为 string (字符串)以提供与这些系统的兼容性.
spark.sql.parquet.int96AsTimestamp true 一些 Parquet-producing systems , 特别是 Impala 和 Hive , 将 Timestamp 存入INT96 . 该 flag 告诉 Spark SQL 将 INT96 数据解析为 timestamp 以提供与这些系统的兼容性.
spark.sql.parquet.cacheMetadata true 打开 Parquet schema metadata 的缓存. 能够加快查询静态数据.
spark.sql.parquet.compression.codec snappy 在编写 Parquet 文件时设置 compression codec (压缩编解码器)的使用. 可接受的值包括: uncompressed, snappy, gzip, lzo .
spark.sql.parquet.filterPushdown true 设置为 true 时启用 Parquet filter push-down optimization .
spark.sql.hive.convertMetastoreParquet true 当设置为 false 时, Spark SQL 将使用 Hive SerDe 做为 parquet tables , 而不是内置的支持.
spark.sql.parquet.mergeSchema false

当为 true 时, Parquet data source (Parquet 数据源) merges (合并)从全部 data files (数据文件)收集的 schemas , 不然若是没有可用的 summary file , 则从 summary file 或 random data file 中挑选 schema .

spark.sql.optimizer.metadataOnly true

若是为 true , 则启用使用表的 metadata 的 metadata-only query optimization 来生成 partition columns (分区列)而不是 table scans (表扫描). 当 scanned (扫描)的全部 columns (列)都是 partition columns (分区列)而且 query (查询)具备知足 distinct semantics (不一样语义)的 aggregate operator (聚合运算符)时, 它将适用.

JSON Datasets (JSON 数据集)

Spark SQL 能够 automatically infer (自动推断)JSON dataset 的 schema, 并将其做为 Dataset[Row] 加载. 这个 conversion (转换)能够在 Dataset[String] 上使用 SparkSession.read.json() 来完成, 或 JSON 文件.

请注意, 以 a json file 提供的文件不是典型的 JSON 文件. 每行必须包含一个 separate (单独的), self-contained valid (独立的有效的)JSON 对象. 有关更多信息, 请参阅 JSON Lines text format, also called newline-delimited JSON .

对于 regular multi-line JSON file (常规的多行 JSON 文件), 将 multiLine 选项设置为 true .

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset. import spark.implicits._ // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files val path = "examples/src/main/resources/people.json" val peopleDF = spark.read.json(path) // The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by spark val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() // +------+ // | name| // +------+ // |Justin| // +------+ // Alternatively, a DataFrame can be created for a JSON dataset represented by // a Dataset[String] storing one JSON object per string val otherPeopleDataset = spark.createDataset( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleDataset) otherPeople.show() // +---------------+----+ // | address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Hive 表

Spark SQL 还支持读取和写入存储在 Apache Hive 中的数据。 可是,因为 Hive 具备大量依赖关系,所以这些依赖关系不包含在默认 Spark 分发中。 若是在类路径中找到 Hive 依赖项,Spark 将自动加载它们。 请注意,这些 Hive 依赖关系也必须存在于全部工做节点上,由于它们将须要访问 Hive 序列化和反序列化库 (SerDes),以访问存储在 Hive 中的数据。

经过将 hive-site.xmlcore-site.xml(用于安全配置)和 hdfs-site.xml (用于 HDFS 配置)文件放在 conf/ 中来完成配置。

当使用 Hive 时,必须用 Hive 支持实例化 SparkSession,包括链接到持续的 Hive 转移,支持 Hive serdes 和 Hive 用户定义的功能。 没有现有 Hive 部署的用户仍然能够启用 Hive 支持。 当 hive-site.xml 未配置时,上下文会自动在当前目录中建立 metastore_db,并建立由 spark.sql.warehouse.dir 配置的目录,该目录默认为Spark应用程序当前目录中的 spark-warehouse 目录 开始了 请注意,自从2.0.0以来,hive-site.xml 中的 hive.metastore.warehouse.dir 属性已被弃用。 而是使用 spark.sql.warehouse.dir 来指定仓库中数据库的默认位置。 您可能须要向启动 Spark 应用程序的用户授予写权限。å

import java.io.File import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession case class Record(key: Int, value: String) // warehouseLocation points to the default location for managed databases and tables val warehouseLocation = new File("spark-warehouse").getAbsolutePath val spark = SparkSession .builder() .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate() import spark.implicits._ import spark.sql sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sql("SELECT * FROM src").show() // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // Aggregation queries are also supported. sql("SELECT COUNT(*) FROM src").show() // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // The results of SQL queries are themselves DataFrames and support all normal functions. val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") // The items in DataFrames are of type Row, which allows you to access each column by ordinal. val stringsDS = sqlDF.map { case Row(key: Int, value: String) => s"Key: $key, Value: $value" } stringsDS.show() // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // You can also use DataFrames to create temporary views within a SparkSession. val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records") // Queries can then join DataFrame data with data stored in Hive. sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // | 5| val_5| 5| val_5| // ... 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" in the Spark repo.

指定 Hive 表的存储格式

建立 Hive 表时,须要定义如何 从/向 文件系统 read/write 数据,即 “输入格式” 和 “输出格式”。 您还须要定义该表如何将数据反序列化为行,或将行序列化为数据,即 “serde”。 如下选项可用于指定存储格式 (“serde”, “input format”, “output format”),例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。 默认状况下,咱们将以纯文本形式读取表格文件。 请注意,Hive 存储处理程序在建立表时不受支持,您可使用 Hive 端的存储处理程序建立一个表,并使用 Spark SQL 来读取它。

Property Name Meaning
fileFormat fileFormat是一种存储格式规范的包,包括 "serde","input format" 和 "output format"。 目前咱们支持6个文件格式:'sequencefile','rcfile','orc','parquet','textfile'和'avro'。
inputFormat, outputFormat 这两个选项将相应的 "InputFormat" 和 "OutputFormat" 类的名称指定为字符串文字,例如: `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。 这两个选项必须成对出现,若是您已经指定了 "fileFormat" 选项,则没法指定它们。
serde 此选项指定 serde 类的名称。 当指定 `fileFormat` 选项时,若是给定的 `fileFormat` 已经包含 serde 的信息,那么不要指定这个选项。 目前的 "sequencefile", "textfile" 和 "rcfile" 不包含 serde 信息,你可使用这3个文件格式的这个选项。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这些选项只能与 "textfile" 文件格式一块儿使用。它们定义如何将分隔的文件读入行。

使用 OPTIONS 定义的全部其余属性将被视为 Hive serde 属性。

与不一样版本的 Hive Metastore 进行交互

Spark SQL 的 Hive 支持的最重要的部分之一是与 Hive metastore 进行交互,这使得 Spark SQL 可以访问 Hive 表的元数据。 从 Spark 1.4.0 开始,使用 Spark SQL 的单一二进制构建可使用下面所述的配置来查询不一样版本的 Hive 转移。 请注意,独立于用于与转移点通讯的 Hive 版本,内部 Spark SQL 将针对 Hive 1.2.1 进行编译,并使用这些类进行内部执行(serdes,UDF,UDAF等)。

如下选项可用于配置用于检索元数据的 Hive 版本:

属性名称 默认值 含义
spark.sql.hive.metastore.version 1.2.1 Hive metastore 版本。 可用选项为 0.12.0 至 1.2.1
spark.sql.hive.metastore.jars builtin 当启用 -Phive 时,使用 Hive 1.2.1,它与 Spark 程序集捆绑在一块儿。选择此选项时,spark.sql.hive.metastore.version 必须为 1.2.1 或未定义。 行家 使用从Maven存储库下载的指定版本的Hive jar。 一般不建议在生产部署中使用此配置。 ***** 应用于实例化 HiveMetastoreClient 的 jar 的位置。该属性能够是三个选项之一:
  1. builtin当启用 -Phive 时,使用 Hive 1.2.1,它与 Spark 程序集捆绑在一块儿。选择此选项时,spark.sql.hive.metastore.version 必须为 1.2.1 或未定义。
  2. maven使用从 Maven 存储库下载的指定版本的 Hive jar。一般不建议在生产部署中使用此配置。
  3. JVM 的标准格式的 classpath。 该类路径必须包含全部 Hive 及其依赖项,包括正确版本的 Hadoop。这些罐只须要存在于 driver 程序中,但若是您正在运行在 yarn 集群模式,那么您必须确保它们与应用程序一块儿打包。
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc

使用逗号分隔的类前缀列表,应使用在 Spark SQL 和特定版本的 Hive 之间共享的类加载器来加载。 一个共享类的示例就是用来访问 Hive metastore 的 JDBC driver。 其它须要共享的类,是须要与已经共享的类进行交互的。 例如,log4j 使用的自定义 appender。

spark.sql.hive.metastore.barrierPrefixes (empty)

一个逗号分隔的类前缀列表,应该明确地为 Spark SQL 正在通讯的 Hive 的每一个版本从新加载。 例如,在一般将被共享的前缀中声明的 Hive UDF (即: org.apache.spark.*)。

JDBC 链接其它数据库

Spark SQL 还包括可使用 JDBC 从其余数据库读取数据的数据源。此功能应优于使用 JdbcRDD。 这是由于结果做为 DataFrame 返回,而且能够轻松地在 Spark SQL 中处理或与其余数据源链接。 JDBC 数据源也更容易从 Java 或 Python 使用,由于它不须要用户提供 ClassTag。(请注意,这不一样于 Spark SQL JDBC 服务器,容许其余应用程序使用 Spark SQL 运行查询)。

要开始使用,您须要在 Spark 类路径中包含特定数据库的 JDBC driver 程序。 例如,要从 Spark Shell 链接到 postgres,您将运行如下命令:

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

可使用 Data Sources API 未来自远程数据库的表做为 DataFrame 或 Spark SQL 临时视图进行加载。 用户能够在数据源选项中指定 JDBC 链接属性。用户 和 密码一般做为登陆数据源的链接属性提供。 除了链接属性外,Spark 还支持如下不区分大小写的选项:

属性名称 含义
url 要链接的JDBC URL。 源特定的链接属性能够在URL中指定。 例如jdbc:jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable 应该读取的 JDBC 表。请注意,可使用在SQL查询的 FROM 子句中有效的任何内容。 例如,您可使用括号中的子查询代替完整表。
driver 用于链接到此 URL 的 JDBC driver 程序的类名。
partitionColumn, lowerBound, upperBound 若是指定了这些选项,则必须指定这些选项。 另外,必须指定 numPartitions. 他们描述如何从多个 worker 并行读取数据时将表给分区。partitionColumn 必须是有问题的表中的数字列。 请注意,lowerBound 和 upperBound 仅用于决定分区的大小,而不是用于过滤表中的行。 所以,表中的全部行将被分区并返回。此选项仅适用于读操做。
numPartitions 在表读写中能够用于并行度的最大分区数。这也肯定并发JDBC链接的最大数量。 若是要写入的分区数超过此限制,则在写入以前经过调用 coalesce(numPartitions) 将其减小到此限制。
fetchsize JDBC 抓取的大小,用于肯定每次数据往返传递的行数。 这有利于提高 JDBC driver 的性能,它们的默认值较小(例如: Oracle 是 10 行)。 该选项仅适用于读取操做。
batchsize JDBC 批处理的大小,用于肯定每次数据往返传递的行数。 这有利于提高 JDBC driver 的性能。 该选项仅适用于写操做。默认值为 1000.
isolationLevel 事务隔离级别,适用于当前链接。 它能够是 NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ, 或 SERIALIZABLE 之一,对应于 JDBC 链接对象定义的标准事务隔离级别,默认为 READ_UNCOMMITTED。 此选项仅适用于写操做。请参考 java.sql.Connection 中的文档。
truncate 这是一个与 JDBC 相关的选项。 启用 SaveMode.Overwrite 时,此选项会致使 Spark 截断现有表,而不是删除并从新建立。 这能够更有效,而且防止表元数据(例如,索引)被移除。 可是,在某些状况下,例如当新数据具备不一样的模式时,它将没法工做。 它默认为 false。 此选项仅适用于写操做。
createTableOptions 这是一个与JDBC相关的选项。 若是指定,此选项容许在建立表时设置特定于数据库的表和分区选项(例如:CREATE TABLE t (name string) ENGINE=InnoDB. )。此选项仅适用于写操做。
createTableColumnTypes 使用数据库列数据类型而不是默认值,建立表时。 数据类型信息应以与 CREATE TABLE 列语法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)")。 指定的类型应该是有效的 spark sql 数据类型。此选项仅适用于写操做。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load() val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password") val jdbcDF2 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

故障排除

  • JDBC driver 程序类必须对客户端会话和全部执行程序上的原始类加载器可见。 这是由于 Java 的 DriverManager 类执行安全检查,致使它忽略原始类加载器不可见的全部 driver 程序,当打开链接时。一个方便的方法是修改全部工做节点上的compute_classpath.sh 以包含您的 driver 程序 JAR。
  • 一些数据库,例如 H2,将全部名称转换为大写。 您须要使用大写字母来引用 Spark SQL 中的这些名称。

性能调优

对于某些工做负载,能够经过缓存内存中的数据或打开一些实验选项来提升性能。

在内存中缓存数据

Spark SQL 能够经过调用 spark.catalog.cacheTable("tableName") 或 dataFrame.cache() 来使用内存中的列格式来缓存表。 而后,Spark SQL 将只扫描所需的列,并将自动调整压缩以最小化内存使用量和 GC 压力。 您能够调用 spark.catalog.uncacheTable("tableName") 从内存中删除该表。

内存缓存的配置可使用 SparkSession 上的 setConf 方法或使用 SQL 运行 SET key=value 命令来完成。

属性名称 默认 含义
spark.sql.inMemoryColumnarStorage.compressed true 当设置为 true 时,Spark SQL 将根据数据的统计信息为每一个列自动选择一个压缩编解码器。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制批量的柱状缓存的大小。更大的批量大小能够提升内存利用率和压缩率,可是在缓存数据时会冒出 OOM 风险。

其余配置选项

如下选项也可用于调整查询执行的性能。这些选项可能会在未来的版本中被废弃,由于更多的优化是自动执行的。

属性名称 默认值 含义
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 在读取文件时,将单个分区打包的最大字节数。
spark.sql.files.openCostInBytes 4194304 (4 MB) 按照字节数来衡量的打开文件的估计费用能够在同一时间进行扫描。 将多个文件放入分区时使用。最好过分估计,那么具备小文件的分区将比具备较大文件的分区(首先计划的)更快。
spark.sql.broadcastTimeout 300

广播链接中的广播等待时间超时(秒)

spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置执行链接时将广播给全部工做节点的表的最大大小(以字节为单位)。 经过将此值设置为-1能够禁用广播。 请注意,目前的统计信息仅支持 Hive Metastore 表,其中已运行命令 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan
spark.sql.shuffle.partitions 200 Configures the number of partitions to use when shuffling data for joins or aggregations.

分布式 SQL 引擎

Spark SQL 也能够充当使用其 JDBC/ODBC 或命令行界面的分布式查询引擎。 在这种模式下,最终用户或应用程序能够直接与 Spark SQL 交互运行 SQL 查询,而不须要编写任何代码。

运行 Thrift JDBC/ODBC 服务器

这里实现的 Thrift JDBC/ODBC 服务器对应于 Hive 1.2 中的 HiveServer2。 您可使用 Spark 或 Hive 1.2.1 附带的直线脚本测试 JDBC 服务器。

要启动 JDBC/ODBC 服务器,请在 Spark 目录中运行如下命令:

./sbin/start-thriftserver.sh

此脚本接受全部 bin/spark-submit 命令行选项,以及 --hiveconf 选项来指定 Hive 属性。 您能够运行 ./sbin/start-thriftserver.sh --help 查看全部可用选项的完整列表。 默认状况下,服务器监听 localhost:10000. 您能够经过环境变量覆盖此行为,即:

export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ...

or system properties:

./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...

如今,您可使用 beeline 来测试 Thrift JDBC/ODBC 服务器:

./bin/beeline

使用 beeline 方式链接到 JDBC/ODBC 服务器:

beeline> !connect jdbc:hive2://localhost:10000

Beeline 将要求您输入用户名和密码。 在非安全模式下,只需输入机器上的用户名和空白密码便可。 对于安全模式,请按照 beeline 文档 中的说明进行操做。

配置Hive是经过将 hive-site.xmlcore-site.xml 和 hdfs-site.xml 文件放在 conf/ 中完成的。

您也可使用 Hive 附带的 beeline 脚本。

Thrift JDBC 服务器还支持经过 HTTP 传输发送 thrift RPC 消息。 使用如下设置启用 HTTP 模式做为系统属性或在 conf/ 中的 hive-site.xml 文件中启用:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

要测试,请使用 beeline 以 http 模式链接到 JDBC/ODBC 服务器:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

运行 Spark SQL CLI

Spark SQL CLI 是在本地模式下运行 Hive 转移服务并执行从命令行输入的查询的方便工具。 请注意,Spark SQL CLI 不能与 Thrift JDBC 服务器通讯。

要启动 Spark SQL CLI,请在 Spark 目录中运行如下命令:

./bin/spark-sql

配置 Hive 是经过将 hive-site.xmlcore-site.xml 和 hdfs-site.xml 文件放在 conf/ 中完成的。 您能够运行 ./bin/spark-sql --help 获取全部可用选项的完整列表。

迁移指南

从 Spark SQL 2.1 升级到 2.2

  • Spark 2.1.1 介绍了一个新的配置 key: spark.sql.hive.caseSensitiveInferenceMode. 它的默认设置是 NEVER_INFER, 其行为与 2.1.0 保持一致. 可是,Spark 2.2.0 将此设置的默认值更改成 “INFER_AND_SAVE”,以恢复与底层文件 schema(模式)具备大小写混合的列名称的 Hive metastore 表的兼容性。使用 INFER_AND_SAVE 配置的 value, 在第一次访问 Spark 将对其还没有保存推测 schema(模式)的任何 Hive metastore 表执行 schema inference(模式推断). 请注意,对于具备数千个 partitions(分区)的表,模式推断多是很是耗时的操做。若是不兼容大小写混合的列名,您能够安全地将spark.sql.hive.caseSensitiveInferenceMode 设置为 NEVER_INFER,以免模式推断的初始开销。请注意,使用新的默认INFER_AND_SAVE 设置,模式推理的结果被保存为 metastore key 以供未来使用。所以,初始模式推断仅发生在表的第一次访问。

从 Spark SQL 2.0 升级到 2.1

  • Datasource tables(数据源表)如今存储了 Hive metastore 中的 partition metadata(分区元数据). 这意味着诸如 ALTER TABLE PARTITION ... SET LOCATION 这样的 Hive DDLs 如今使用 Datasource API 可用于建立 tables(表).
    • 遗留的数据源表能够经过 MSCK REPAIR TABLE 命令迁移到这种格式。建议迁移遗留表利用 Hive DDL 的支持和提供的计划性能。
    • 要肯定表是否已迁移,当在表上发出 DESCRIBE FORMATTED 命令时请查找 PartitionProvider: Catalog 属性.
  • Datasource tables(数据源表)的 INSERT OVERWRITE TABLE ... PARTITION ... 行为的更改。
    • 在之前的 Spark 版本中,INSERT OVERWRITE 覆盖了整个 Datasource table,即便给出一个指定的 partition. 如今只有匹配规范的 partition 被覆盖。
    • 请注意,这仍然与 Hive 表的行为不一样,Hive 表仅覆盖与新插入数据重叠的分区。

从 Spark SQL 1.6 升级到 2.0

  • SparkSession 如今是 Spark 新的切入点, 它替代了老的 SQLContext 和 HiveContext。注意 : 为了向下兼容,老的 SQLContext 和 HiveContext 仍然保留。能够从 SparkSession 获取一个新的 catalog 接口 — 现有的访问数据库和表的 API,如 listTablescreateExternalTabledropTempViewcacheTable 都被移到该接口。

  • Dataset API 和 DataFrame API 进行了统一。在 Scala 中,DataFrame 变成了 Dataset[Row] 类型的一个别名,而 Java API 使用者必须将 DataFrame 替换成 Dataset<Row>。Dataset 类既提供了强类型转换操做(如 mapfilter 以及 groupByKey)也提供了非强类型转换操做(如 select 和 groupBy)。因为编译期的类型安全不是 Python 和 R 语言的一个特性,Dataset 的概念并不适用于这些语言的 API。相反,DataFrame仍然是最基本的编程抽象, 就相似于这些语言中单节点 data frame 的概念。

  • Dataset 和 DataFrame API 中 unionAll 已通过时而且由 union 替代。
  • Dataset 和 DataFrame API 中 explode 已通过时,做为选择,能够结合 select 或 flatMap 使用 functions.explode() 。
  • Dataset 和 DataFrame API 中 registerTempTable 已通过时而且由 createOrReplaceTempView 替代。

  • 对 Hive tables CREATE TABLE ... LOCATION 行为的更改.
    • 从 Spark 2.0 开始,CREATE TABLE ... LOCATION 与 CREATE EXTERNAL TABLE ... LOCATION 是相同的,以防止意外丢弃用户提供的 locations(位置)中的现有数据。这意味着,在用户指定位置的 Spark SQL 中建立的 Hive 表始终是 Hive 外部表。删除外部表将不会删除数据。 用户不能指定 Hive managed tables(管理表)的位置. 请注意,这与Hive行为不一样。
    • 所以,这些表上的 “DROP TABLE” 语句不会删除数据。

从 Spark SQL 1.5 升级到 1.6

  • 从 Spark 1.6 开始,默认状况下服务器在多 session(会话)模式下运行。这意味着每一个 JDBC/ODBC 链接拥有一份本身的 SQL 配置和临时函数注册。缓存表仍在并共享。若是您但愿以旧的单会话模式运行 Thrift server,请设置选项 spark.sql.hive.thriftServer.singleSession 为true。您既能够将此选项添加到 spark-defaults.conf,或者经过 --conf 将它传递给 start-thriftserver.sh
./sbin/start-thriftserver.sh \ --conf spark.sql.hive.thriftServer.singleSession=true \ ... 
  • 从 1.6.1 开始,在 sparkR 中 withColumn 方法支持添加一个新列或更换 DataFrame 同名的现有列。

  • 从 Spark 1.6 开始,LongType 强制转换为 TimestampType 指望是秒,而不是微秒。这种更改是为了匹配 Hive 1.2 的行为,以便从 numeric(数值)类型进行更一致的类型转换到 TimestampType。更多详情请参阅 SPARK-11724 。

从 Spark SQL 1.4 升级到 1.5

  • 使用手动管理的内存优化执行,如今是默认启用的,以及代码生成表达式求值。这些功能既能够经过设置 spark.sql.tungsten.enabled 为 false 来禁止使用。
  • Parquet 的模式合并默认状况下再也不启用。它能够经过设置 spark.sql.parquet.mergeSchema 到 true 以从新启用。
  • 字符串在 Python 列的 columns(列)如今支持使用点(.)来限定列或访问嵌套值。例如 df['table.column.nestedField']。可是,这意味着若是你的列名中包含任何圆点,你如今必须避免使用反引号(如 table.column.with.dots.nested)。
  • 在内存中的列存储分区修剪默认是开启的。它能够经过设置 spark.sql.inMemoryColumnarStorage.partitionPruning 为 false 来禁用。
  • 无限精度的小数列再也不支持,而不是 Spark SQL 最大精度为 38 。当从 BigDecimal 对象推断模式时,如今使用(38,18)。在 DDL 没有指定精度时,则默认保留 Decimal(10, 0)
  • 时间戳如今存储在 1 微秒的精度,而不是 1 纳秒的。
  • 在 sql 语句中,floating point(浮点数)如今解析为 decimal。HiveQL 解析保持不变。
  • SQL / DataFrame 函数的规范名称如今是小写(例如 sum vs SUM)。
  • JSON 数据源不会自动加载由其余应用程序(未经过 Spark SQL 插入到数据集的文件)建立的新文件。对于 JSON 持久表(即表的元数据存储在 Hive Metastore),用户可使用 REFRESH TABLE SQL 命令或 HiveContext 的 refreshTable 方法,把那些新文件列入到表中。对于表明一个 JSON dataset 的 DataFrame,用户须要从新建立 DataFrame,同时 DataFrame 中将包括新的文件。
  • PySpark 中 DataFrame 的 withColumn 方法支持添加新的列或替换现有的同名列。

从 Spark SQL 1.3 升级到 1.4

DataFrame data reader/writer interface

基于用户反馈,咱们建立了一个新的更流畅的 API,用于读取 (SQLContext.read) 中的数据并写入数据 (DataFrame.write), 而且旧的 API 将过期(例如,SQLContext.parquetFileSQLContext.jsonFile).

针对 SQLContext.read ( ScalaJavaPython ) 和 DataFrame.write ( ScalaJavaPython ) 的更多细节,请看 API 文档.

DataFrame.groupBy 保留 grouping columns(分组的列)

根据用户的反馈, 咱们更改了 DataFrame.groupBy().agg() 的默认行为以保留 DataFrame 结果中的 grouping columns(分组列). 为了在 1.3 中保持该行为,请设置 spark.sql.retainGroupColumns 为 false.

// In 1.3.x, in order for the grouping column "department" to show up, // it must be included explicitly as part of the agg function call. df.groupBy("department").agg($"department", max("age"), sum("expense")) // In 1.4+, grouping column "department" is included automatically. df.groupBy("department").agg(max("age"), sum("expense")) // Revert to 1.3 behavior (not retaining grouping column) by: sqlContext.setConf("spark.sql.retainGroupColumns", "false")

DataFrame.withColumn 上的行为更改

以前 1.4 版本中,DataFrame.withColumn() 只支持添加列。该列将始终在 DateFrame 结果中被加入做为新的列,即便现有的列可能存在相同的名称。从 1.4 版本开始,DataFrame.withColumn() 支持添加与全部现有列的名称不一样的列或替换现有的同名列。

请注意,这一变化仅适用于 Scala API,并不适用于 PySpark 和 SparkR。

从 Spark SQL 1.0-1.2 升级到 1.3

在 Spark 1.3 中,咱们从 Spark SQL 中删除了 “Alpha” 的标签,做为一部分已经清理过的可用的 API 。从 Spark 1.3 版本以上,Spark SQL 将提供在 1.X 系列的其余版本的二进制兼容性。这种兼容性保证不包括被明确标记为不稳定的(即 DeveloperApi 类或 Experimental) API。

重命名 DataFrame 的 SchemaRDD

升级到 Spark SQL 1.3 版本时,用户会发现最大的变化是,SchemaRDD 已改名为 DataFrame。这主要是由于 DataFrames 再也不从 RDD 直接继承,而是由 RDDS 本身来实现这些功能。DataFrames 仍然能够经过调用 .rdd 方法转换为 RDDS 。

在 Scala 中,有一个从 SchemaRDD 到 DataFrame 类型别名,能够为一些状况提供源代码兼容性。它仍然建议用户更新他们的代码以使用 DataFrame来代替。Java 和 Python 用户须要更新他们的代码。

Java 和 Scala APIs 的统一

此前 Spark 1.3 有单独的Java兼容类(JavaSQLContext 和 JavaSchemaRDD),借鉴于 Scala API。在 Spark 1.3 中,Java API 和 Scala API 已经统一。两种语言的用户可使用 SQLContext 和 DataFrame。通常来讲论文类尝试使用两种语言的共有类型(如 Array 替代了一些特定集合)。在某些状况下不通用的类型状况下,(例如,passing in closures 或 Maps)使用函数重载代替。

此外,该 Java 的特定类型的 API 已被删除。Scala 和 Java 的用户可使用存在于 org.apache.spark.sql.types 类来描述编程模式。

隔离隐式转换和删除 dsl 包(仅Scala)

许多 Spark 1.3 版本之前的代码示例都以 import sqlContext._ 开始,这提供了从 sqlContext 范围的全部功能。在 Spark 1.3 中,咱们移除了从 RDDs 到 DateFrame 再到 SQLContext 内部对象的隐式转换。用户如今应该写成 import sqlContext.implicits._.

此外,隐式转换如今只能使用方法 toDF 来增长由 Product(即 case classes or tuples)构成的 RDD,而不是自动应用。

当使用 DSL 内部的函数时(如今使用 DataFrame API 来替换), 用户习惯导入 org.apache.spark.sql.catalyst.dsl. 相反,应该使用公共的 dataframe 函数 API: import org.apache.spark.sql.functions._.

针对 DataType 删除在 org.apache.spark.sql 包中的一些类型别名(仅限于 Scala)

Spark 1.3 移除存在于基本 SQL 包的 DataType 类型别名。开发人员应改成导入类 org.apache.spark.sql.types

UDF 注册迁移到 sqlContext.udf 中 (Java & Scala)

用于注册 UDF 的函数,无论是 DataFrame DSL 仍是 SQL 中用到的,都被迁移到 SQLContext 中的 udf 对象中。

sqlContext.udf.register("strLen", (s: String) => s.length())

Python UDF 注册保持不变。

Python DataTypes 再也不是 Singletons(单例的)

在 Python 中使用 DataTypes 时,你须要先构造它们(如:StringType()),而不是引用一个单例对象。

与 Apache Hive 的兼容

Spark SQL 在设计时就考虑到了和 Hive metastore,SerDes 以及 UDF 之间的兼容性。目前 Hive SerDes 和 UDF 都是基于 Hive 1.2.1 版本,而且Spark SQL 能够链接到不一样版本的Hive metastore(从 0.12.0 到 1.2.1,能够参考 与不一样版本的 Hive Metastore 交互

在现有的 Hive Warehouses 中部署

Spark SQL Thrift JDBC server 采用了开箱即用的设计以兼容已有的 Hive 安装版本。你不须要修改现有的 Hive Metastore , 或者改变数据的位置和表的分区。

所支持的 Hive 特性

Spark SQL 支持绝大部分的 Hive 功能,如:

  • Hive query(查询)语句, 包括:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 全部 Hive 操做, 包括:
    • 关系运算符 (===<><>>=<=, 等等)
    • 算术运算符 (+-*/%, 等等)
    • 逻辑运算符 (AND&&OR||, 等等)
    • 复杂类型的构造
    • 数学函数 (signlncos, 等等)
    • String 函数 (instrlengthprintf, 等等)
  • 用户定义函数 (UDF)
  • 用户定义聚合函数 (UDAF)
  • 用户定义 serialization formats (SerDes)
  • 窗口函数
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • Sub-queries(子查询)
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • Sampling
  • Explain
  • Partitioned tables including dynamic partition insertion
  • View
  • 全部的 Hive DDL 函数, 包括:
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • 大部分的 Hive Data types(数据类型), 包括:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

未支持的 Hive 函数

如下是目前还不支持的 Hive 函数列表。在 Hive 部署中这些功能大部分都用不到。

主要的 Hive 功能

  • Tables 使用 buckets 的 Tables: bucket 是 Hive table partition 中的 hash partitioning. Spark SQL 还不支持 buckets.

Esoteric Hive 功能

  • UNION 类型
  • Unique join
  • Column 统计信息的收集: Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore.

Hive Input/Output Formats

  • File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.
  • Hadoop archive

Hive 优化

有少数 Hive 优化尚未包含在 Spark 中。其中一些(好比 indexes 索引)因为 Spark SQL 的这种内存计算模型而显得不那么重要。另一些在 Spark SQL 将来的版本中会持续跟踪。

  • Block 级别的 bitmap indexes 和虚拟 columns (用于构建 indexes)
  • 自动为 join 和 groupBy 计算 reducer 个数 : 目前在 Spark SQL 中, 你须要使用 “SET spark.sql.shuffle.partitions=[num_tasks];” 来控制 post-shuffle 的并行度.
  • 仅 Meta-data 的 query: 对于只使用 metadata 就能回答的查询,Spark SQL 仍然会启动计算结果的任务.
  • Skew data flag: Spark SQL 不遵循 Hive 中 skew 数据的标记.
  • STREAMTABLE hint in join: Spark SQL 不遵循 STREAMTABLE hint.
  • 对于查询结果合并多个小文件: 若是输出的结果包括多个小文件, Hive 能够可选的合并小文件到一些大文件中去,以免溢出 HDFS metadata. Spark SQL 还不支持这样.

参考

数据类型

Spark SQL 和 DataFrames 支持下面的数据类型:

  • Numeric types
    • ByteType: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.
    • ShortType: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.
    • IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.
    • LongType: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.
    • FloatType: Represents 4-byte single-precision floating point numbers.
    • DoubleType: Represents 8-byte double-precision floating point numbers.
    • DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally by java.math.BigDecimal. A BigDecimalconsists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
  • String type
    • StringType: Represents character string values.
  • Binary type
    • BinaryType: Represents byte sequence values.
  • Boolean type
    • BooleanType: Represents boolean values.
  • Datetime type
    • TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
    • DateType: Represents values comprising values of fields year, month, day.
  • Complex types
    • ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type of elementTypecontainsNull is used to indicate if elements in a ArrayType value can have null values.
    • MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described by keyType and the data type of values are described by valueType. For a MapType value, keys are not allowed to have nullvalues. valueContainsNull is used to indicate if values of a MapType value can have null values.
    • StructType(fields): Represents values with the structure described by a sequence of StructFields (fields).
      • StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataTypenullable is used to indicate if values of this fields can have null values.

Spark SQL 的全部数据类型都在包 org.apache.spark.sql.types 中. 你能够用下示例示例来访问它们.

import org.apache.spark.sql.types._ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Data type(数据类型) Scala 中的 Value 类型 访问或建立数据类型的 API
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])
Note(注意): containsNull 的默认值是 true.
MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull])
Note(注意): valueContainsNull 的默认值是 true.
StructType org.apache.spark.sql.Row StructType(fields)
Note(注意): fields 是 StructFields 的 Seq. 全部, 两个 fields 拥有相同的名称是不被容许的.
StructField 该 field(字段)数据类型的 Scala 中的 value 类型 (例如, 数据类型为 IntegerType 的 StructField 是 Int) StructField(name, dataType, [nullable])
Note: nullable 的默认值是 true.

NaN Semantics

当处理一些不符合标准浮点数语义的 float 或 double 类型时,对于 Not-a-Number(NaN) 须要作一些特殊处理. 具体以下:

  • NaN = NaN 返回 true.
  • 在 aggregations(聚合)操做中,全部的 NaN values 将被分到同一个组中.
  • 在 join key 中 NaN 能够当作一个普通的值.
  • NaN 值在升序排序中排到最后,比任何其余数值都大.


相关文章
相关标签/搜索