#文档说明html
#概述 Spark SQL 是 Spark 用于处理结构化数据的一个模块。不一样于基础的 Spark RDD API,Spark SQL 提供的接口提供了更多关于数据和执行的计算任务的结构信息。Spark SQL 内部使用这些额外的信息来执行一些额外的优化操做。有几种方式能够与 Spark SQL 进行交互,其中包括 SQL 和 Dataset API。当计算一个结果时 Spark SQL 使用的执行引擎是同样的, 它跟你使用哪一种 API 或编程语言来表达计算无关。这种统一意味着开发人员能够很容易地在不一样的 API 之间来回切换,基于哪一种 API 可以提供一种最天然的方式来表达一个给定的变换 (transformation)。java
本文中全部的示例程序都使用 Spark 发行版本中自带的样本数据,而且能够在 spark-shell、pyspark shell 以及 sparkR shell 中运行。python
##SQLmysql
Spark SQL 的用法之一是执行 SQL 查询,它也能够从现有的 Hive 中读取数据,想要了解更多关于如何配置这个特性的细节, 请参考 Hive表 这节。若是从其它编程语言内部运行 SQL,查询结果将做为一个 Dataset/DataFrame 返回。你还可使用命令行或者经过 JDBC/ODBC 来与 SQL 接口进行交互。sql
##Dataset和DataFrameshell
Dataset 是一个分布式数据集,它是 Spark 1.6 版本中新增的一个接口, 它结合了 RDD(强类型,可使用强大的 lambda 表达式函数)和 Spark SQL 的优化执行引擎的好处。Dataset 能够从 JVM 对象构造获得,随后可使用函数式的变换(map,flatMap,filter 等)进行操做。Dataset API 目前支持 Scala 和 Java 语言,还不支持 Python, 不过因为 Python 语言的动态性, Dataset API 的许多好处早就已经可用了(例如,你可使用 row.columnName 来访问数据行的某个字段)。这种场景对于 R 语言来讲是相似的。数据库
DataFrame 是按命名列方式组织的一个 Dataset。从概念上来说,它等同于关系型数据库中的一张表或者 R 和 Python 中的一个 data frame,只不过在底层进行了更多的优化。DataFrame 能够从不少数据源构造获得,好比:结构化的数据文件,Hive 表,外部数据库或现有的 RDD。DataFrame API 支持 Scala, Java, Python 以及 R 语言。在 Scala 和 Java 语言中, DataFrame 由 Row 的 Dataset 来表示的。在 Scala API 中, DataFrame 仅仅只是 Dataset[Row] 的一个类型别名,而在 Java API 中, 开发人员须要使用 Dataset<Row> 来表示一个 DataFrame。apache
在本文中, 咱们每每把 Scala/Java 中 Row 的 Dataset 当作 DataFrame。编程
#入门json
##入口:SparkSession
Spark 中全部的功能入口都是 SparkSession 类。要建立一个基本的 SparkSession 对象,只须要使用 SparkSession.builder() 方法。
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL Example") .config("spark.some.config.option", "some-value") .getOrCreate() // 用于隐式转换,像将RDD转换成DataFrame import spark.implicits._
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。
#####Java
import org.apache.spark.sql.SparkSession; SparkSession spark = SparkSession .builder() .appName("Java Spark SQL Example") .config("spark.some.config.option", "some-value") .getOrCreate();
完整示例代码参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 文件。
#####Python
from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("PythonSQL")\ .config("spark.some.config.option", "some-value")\ .getOrCreate()
完整示例代码参见 Spark 仓库中的 "examples/src/main/python/sql.py" 文件。
Spark 2.0 版本中的 SparkSession 对于 Hive 的一些功能特性提供了内置支持, 包括使用 HiveQL 编写查询语句, 访问 Hive UDF 以及从 Hive 表中读取数据。想要使用这些特性, 须要确保你已经安装了 Hive。
##建立DataFrame
Spark 应用程序可使用 SparkSession 从现有的RDD、Hive 表或 Spark 数据源建立DataFrame。
下面这个示例基于一个 JSON 文件内容建立了一个 DataFrame:
#####Scala
val df = spark.read.json("examples/src/main/resources/people.json") // 将 DataFrame 内容展现到标准输出 df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。
#####Java
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json"); // 将 DataFrame 内容展现到标准输出 df.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
完整示例代码参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 文件。
#####Python
# spark是已存在的 SparkSession 对象 df = spark.read.json("examples/src/main/resources/people.json") # 将 DataFrame 内容展现到标准输出 df.show()
##非强类型Dataset操做(即DataFrame操做)
DataFrame 为 Scala, Java, Python 以及 R 语言中的结构化数据操做提供了一个领域特定语言。
上面咱们也提到过, Spark 2.0 版本中, Scala 和 Java API 中的 DataFrame 只是 Row 的 Dataset。与强类型 Scala/Java Dataset 提供的"强类型变换(typed transformations)"相比,这些操做也被称为"非强类型变换(untyped transformations)" 。
下面提供了几个使用 Dataset 处理结构化数据的基础示例:
#####Scala
// import语句须要使用 $ 符号 import spark.implicits._ // 按照tree格式打印schema df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // 只选择"name"列 df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // 选择全部人, 可是age加 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // 选择年龄大于21岁的人 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // 按年龄统计人数 df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。
DataFrame 上可执行的操做类型的一个完整列表能够参考:API文档 。
除了简单的列引用和表达式以外,DataFrame 还提供了丰富的函数库,包括字符串操做,日期计算,常见的数学操做等。完整列表能够参见:DataFrame函数参考文档 。
#####Java
// col("...") 比 df.col("...") 更好 import static org.apache.spark.sql.functions.col; // 按照tree格式打印schema df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // 只选择"name"列 df.select("name").show(); // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // 选择全部人, 可是age加 1 df.select(col("name"), col("age").plus(1)).show(); // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // 选择年龄大于21岁的人 df.filter(col("age").gt(21)).show(); // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // 按年龄统计人数 df.groupBy("age").count().show(); // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
完整示例代码参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 文件。
DataFrame 上可执行的操做类型的一个完整列表能够参考:API文档 。
除了简单的列引用和表达式以外,DataFrame 还提供了丰富的函数库,包括字符串操做,日期计算,常见的数学操做等。完整列表能够参见:DataFrame函数参考文档。
#####Python
在 Python 中, 可使用属性 (df.age) 或者经过索引 (df['age']) 来访问 DataFrame 的列。 虽然前者对于交互式数据检索很是方便, 可是咱们仍是很是建议开发人员使用后者, 由于后者更面向于将来,而且不会与也是 DataFrame 类上的属性的列名冲突。
# spark是一个已存在的 SparkSession 对象 # 建立 DataFrame df = spark.read.json("examples/src/main/resources/people.json") # 将 DataFrame 内容展现到标准输出 df.show() ## age name ## null Michael ## 30 Andy ## 19 Justin # 按照tree格式打印schema df.printSchema() ## root ## |-- age: long (nullable = true) ## |-- name: string (nullable = true) # 只选择"name"列 df.select("name").show() ## name ## Michael ## Andy ## Justin # 选择全部人, 可是age加 1 df.select(df['name'], df['age'] + 1).show() ## name (age + 1) ## Michael null ## Andy 31 ## Justin 20 # 选择年龄大于21岁的人 df.filter(df['age'] > 21).show() ## age name ## 30 Andy # 按年龄统计人数 df.groupBy("age").count().show() ## age count ## null 1 ## 19 1 ## 30 1
DataFrame 上可执行的操做类型的一个完整列表能够参考:API文档 。
除了简单的列引用和表达式以外,DataFrame 还提供了丰富的函数库,包括字符串操做,日期计算,常见的数学操做等。完整列表能够参见:DataFrame函数参考文档。
##编程方式运行SQL查询
#####Scala
SparkSession 中的 sql 函数可使应用程序可以以编程方式运行 SQL 查询并将查询结果做为一个 DataFrame 返回。
// 将 DataFrame 注册为一个SQL临时视图 df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。
#####Java
SparkSession 中的 sql 函数可使应用程序可以以编程方式运行 SQL 查询并将结果做为一个 Dataset<Row> 返回。
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; // 将 DataFrame 注册为一个SQL临时视图 df.createOrReplaceTempView("people"); Dataset<Row> sqlDF = spark.sql("SELECT * FROM people"); sqlDF.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
完整示例代码参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 文件。
#####Python
SparkSession 中的 sql 函数可使应用程序可以以编程方式运行 SQL 查询并将结果做为一个 DataFrame 返回。
# spark 是一个已存在的 SparkSession 对象 df = spark.sql("SELECT * FROM table")
##建立Dataset
Dataset 和 RDD 相似, 可是 Dataset 使用的是一个专门的编码器(Encoder )来序列化对象以进行跨网络的数据处理和传输, 而不是使用 Java 序列化或者 Kryo 。 虽然编码器和标准的序列化均可以将对象转化成字节, 可是编码器能够根据代码动态生成而且使用一种能够容许 Spark 执行不少像过滤、排序、哈希等操做而不须要将字节反序列化成一个对象的特殊的数据格式。
#####Scala
// 注意: Scala 2.10 中的Case classes 最多只能支持到22个字段,想要突破这个限制, // 你可使用实现了Product接口的自定义类 case class Person(name: String, age: Long) // 为 case classes 建立Encoders val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // 经过引入spark.implicits._来自动地提供可用于大多数经常使用类型的Encoders val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // 经过提供一个类,DataFrames能够转换成一个 Dataset。映射是基于名称的。 val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。
#####Java
import java.util.Arrays; import java.util.Collections; import java.io.Serializable; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; public static class Person implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } // 建立一个Person类的实例 Person person = new Person(); person.setName("Andy"); person.setAge(32); // 建立用于Java bean的Encoders Encoder<Person> personEncoder = Encoders.bean(Person.class); Dataset<Person> javaBeanDS = spark.createDataset( Collections.singletonList(person), personEncoder ); javaBeanDS.show(); // +---+----+ // |age|name| // +---+----+ // | 32|Andy| // +---+----+ // 大多数经常使用类型的Encoders是由 Encoder 类提供的 Encoder<Integer> integerEncoder = Encoders.INT(); Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder); Dataset<Integer> transformedDS = primitiveDS.map(new MapFunction<Integer, Integer>() { @Override public Integer call(Integer value) throws Exception { return value + 1; } }, integerEncoder); transformedDS.collect(); // Returns [2, 3, 4] // 经过提供一个类,DataFrames能够转换成一个 Dataset。映射是基于名称的。 String path = "examples/src/main/resources/people.json"; Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder); peopleDS.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
完整示例代码参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 文件。
##与RDD互操做
Spark SQL 支持两种不一样的方法来将现有 RDD 转化成 Dataset. 第一种方法是使用反射来推导包含特定类型对象的 RDD 的 schema。当你编写 Spark 应用程序的时候若是已经知道了 schema, 那么这种基于反射机制的方法可以使代码更加简洁而且运行良好。
第二种用于建立 Dataset 的方法是经过一个容许你构造一个 schema 并将其应用到一个现有 RDD 上的编程接口。尽管这种方法代码很是冗长, 可是它容许你在不知道列和列类型的状况下构造 Dataset。
###使用反射推导Schema
#####Scala
Spark SQL 的 Scala 接口支持自动地将包含 case class 的 RDD 转化为一个 DataFrame, 其中 case class 定义了表的 schema。case class 的参数名称使用反射读取并映射成表的列名。Case classes能够是嵌套的或者包含一些像 Seq 或 Array 这样的复杂类型。这个 RDD 能够隐式地转换成一个 DataFrame 而后注册成一个表。 能够在后续的 SQL 语句中使用这些表。
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder // 用于从RDD到DataFrame的隐式转换 import spark.implicits._ // 从一个文本文件中建立一个Person对象的RDD,将其转换成一个Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // 将DataFrame注册成一个临时视图 peopleDF.createOrReplaceTempView("people") // SQL语句能够经过Spark的sql方法来运行 val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // 查询结果中某一行的列能够经过字段索引来访问 teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // 或者经过字段名称 teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // 没有用于Dataset[Map[K,V]]的预约义 encoders , 须要显示地定义 implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // 基本类型和 case classes 也能够定义成: implicit val stringIntMapEncoder: Encoder[Map[String, Int]] = ExpressionEncoder() // row.getValuesMap[T] 一次检索多个列到一个 Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19))
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。
#####Java
Spark SQL 支持自动地将一个 JavaBean 的 RDD 转换成一个 DataFrame。使用反射得到的 BeanInfo 对象定义了表的 schema。目前 Spark SQL 不支持包含 Map 字段的 JavaBean,可是支持嵌套的 JavaBean 和 List 或 Array 字段。你能够建立一个实现了 Serializable 接口的JavaBean,而且为它全部的字段生成 getters 和 setters 方法。
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; // 从一个文本文件中建立Person对象的RDD JavaRDD<Person> peopleRDD = spark.read() .textFile("examples/src/main/resources/people.txt") .javaRDD() .map(new Function<String, Person>() { @Override public Person call(String line) throws Exception { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; } }); // 将 schema 应用到 JavaBean 的 RDD 上以获取一个 DataFrame Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class); // 将DataFrame注册成一个临时视图 peopleDF.createOrReplaceTempView("people"); // SQL语句能够经过Spark的sql方法来运行 Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"); // 查询结果中某一行的列能够经过字段索引来访问 Encoder<String> stringEncoder = Encoders.STRING(); Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(new MapFunction<Row, String>() { @Override public String call(Row row) throws Exception { return "Name: " + row.getString(0); } }, stringEncoder); teenagerNamesByIndexDF.show(); // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // 或者经过字段名称 Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(new MapFunction<Row, String>() { @Override public String call(Row row) throws Exception { return "Name: " + row.<String>getAs("name"); } }, stringEncoder); teenagerNamesByFieldDF.show(); // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+
完整示例代码参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 文件。
#####Python
Spark SQL 能够将 Row 对象的 RDD 转化成一个 DataFrame, 并推导其数据类型。Row 是经过将 key/value 键值对列表做为 kwargs 参数传递给 Row 类构造而来。这个列表的键定义了表的列名, 而且其类型是经过抽样整个数据集推导而来, 相似于 JSON 文件上执行的推导。
# spark是一个已存在的SparkSession对象. from pyspark.sql import Row sc = spark.sparkContext # 加载一个文本文件并将每一行转换成一个Row对象. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # 推导 schema , 并将DataFrame注册成一个表. schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") # SQL 能够运行在已经注册成表的DataFrame上 teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # SQL查询的结果是RDD并支持全部标准的RDD操做 teenNames = teenagers.map(lambda p: "Name: " + p.name) for teenName in teenNames.collect(): print(teenName)
###编程方式指定Schema
#####Scala
若是不能事先定义 case class (例如, 要解析编码于一个字符串或是一个文本数据集中的记录结构, 并且字段对于不一样的用户字段映射不一样), 那么按照下面三个步骤能够以编程方式建立一个 DataFrame :
例如:
import org.apache.spark.sql.types._ // 建立一个 RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // schema编码于一个字符串中 val schemaString = "name age" // 基于schema的字符串生成schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // 将RDD (people) 的记录转换成 Row val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // 将schema应用到RDD上 val peopleDF = spark.createDataFrame(rowRDD, schema) // 使用DataFrame建立一个临时视图 peopleDF.createOrReplaceTempView("people") // SQL能够运行在一个使用DataFrame建立的临时视图上 val results = spark.sql("SELECT name FROM people") // SQL查询的结果是DataFrame并支持全部标准的RDD操做 // 查询结果中某一行的列能够经过字段索引或字段名称来访问 results.map(attributes => "Name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。
#####Java
若是不能事先定义 JavaBean (例如, 要解析编码于一个字符串或是一个文本数据集中的记录结构, 并且字段对于不一样的用户字段映射不一样), 那么按照下面三个步骤能够以编程方式建立一个 Dataset<Row>:
例如:
import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; // 建立一个 RDD JavaRDD<String> peopleRDD = spark.sparkContext() .textFile("examples/src/main/resources/people.txt", 1) .toJavaRDD(); // schema编码于一个字符串中 String schemaString = "name age"; // 基于schema的字符串生成schema List<StructField> fields = new ArrayList<>(); for (String fieldName : schemaString.split(" ")) { StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } StructType schema = DataTypes.createStructType(fields); // 将RDD (people) 的记录转换成 Row JavaRDD<Row> rowRDD = peopleRDD.map(new Function<String, Row>() { @Override public Row call(String record) throws Exception { String[] attributes = record.split(","); return RowFactory.create(attributes[0], attributes[1].trim()); } }); // 将schema应用到RDD上 Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema); // 使用DataFrame建立一个临时视图 peopleDataFrame.createOrReplaceTempView("people"); // SQL能够运行在一个使用DataFrame建立的临时视图上 Dataset<Row> results = spark.sql("SELECT name FROM people"); // SQL查询的结果是DataFrame并支持全部标准的RDD操做 // 查询结果中某一行的列能够经过字段索引或字段名称来访问 Dataset<String> namesDS = results.map(new MapFunction<Row, String>() { @Override public String call(Row row) throws Exception { return "Name: " + row.getString(0); } }, Encoders.STRING()); namesDS.show(); // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+
完整示例代码参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 文件。
#####Python
若是不能事先定义 kwargs 字典 (例如, 要解析编码于一个字符串或是一个文本数据集中的记录结构, 并且字段对于不一样的用户字段映射不一样), 那么按照下面三个步骤能够以编程方式建立一个 DataFrame:
例如:
# 引入 SparkSession 和数据类型 from pyspark.sql.types import * # spark 是一个已存在的SparkSession对象 sc = spark.sparkContext # 加载一个文本文件并将每一行转换成一个tuple lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: (p[0], p[1].strip())) # schema编码于一个字符串中 schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) # 将schema应用到RDD上 schemaPeople = spark.createDataFrame(people, schema) # 使用DataFrame建立一个临时视图 schemaPeople.createOrReplaceTempView("people") # SQL能够运行在一个已经注册成表的DataFrame上 results = spark.sql("SELECT name FROM people") # SQL查询的结果是DataFrame并支持全部标准的RDD操做 names = results.map(lambda p: "Name: " + p.name) for name in names.collect(): print(name)
#数据源
经过统一的 DataFrame 接口, Spark SQL 支持在不一样的数据源上进行操做。既能够在 DataFrame 上使用关系型的变换 (transformations) 进行操做,也能够用其建立一个临时视图。将 DataFrame 注册成一个临时视图能够容许你在它的数据上运行SQL查询。本节将描述使用 Spark 数据源加载和保存数据的一些通用方法,而后深刻介绍一下专门用于内置数据源的一些选项。
##通用的加载/保存函数
最简单的方式就是全部操做都使用默认的数据源(除非使用了spark.sql.sources.default 进行配置,不然默认值是 parquet)。
#####Scala
val usersDF = spark.read.load("examples/src/main/resources/users.parquet") usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 文件。
#####Java
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet"); usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
完整示例代码参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"文件。
#####Python
df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
###手动指定选项
你也能够手动指定要使用的数据源,并设置一些你想传递给数据源的额外选项。数据源可由其全限定名指定(例如,org.apache.spark.sql.parquet),而对于内置数据源,你可使用简写(json, parquet, jdbc)。使用下面的语法能够将从任意类型的数据源中加载的 DataFrame 转换成其它的类型。
#####Scala
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 文件。
#####Java
Dataset<Row> peopleDF = spark.read().format("json").load("examples/src/main/resources/people.json"); peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
完整示例代码参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 文件。
#####Python
df = spark.read.load("examples/src/main/resources/people.json", format="json") df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
###直接在文件上运行SQL
除了使用读取 API 将文件加载到 DataFrame 而后执行查询,你还能够直接使用 SQL 查询文件。
#####Scala
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 文件。
#####Java
Dataset<Row> sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
完整示例代码参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 文件。
####Python
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
###保存模式
Save 操做能够选择使用一个 SaveMode,它指定了如何处理数据已经存在的状况。很重要的一点咱们须要意识到的是,这些保存模式都没有加锁而且都不是原子操做。另外,当执行 Overwrite 操做时,写入新数据以前原有的数据会被删除。
Scala/Java | 任何语言 | 含义 |
---|---|---|
SaveMode.ErrorIfExists (默认) | "error" (默认) | 当保存 DataFrame 到数据源时,若是数据已经存在,将会抛出异常。 |
SaveMode.Append | "append" | 当保存 DataFrame 到数据源时,若是数据或表已经存在,则将 DataFrame 内容追加到现有数据末尾。 |
SaveMode.Overwrite | "overwrite" | Overwrite 模式意味着保存 DataFrame 到数据源时,若是数据或表已经存在,则使用 DataFrame 内容覆盖已有数据。 |
SaveMode.Ignore | "ignore" | Ignore 模式意味着保存 DataFrame 到数据源时,若是数据已经存在,那么保存操做将不会保存 DataFrame 内容而且不会改变现有数据。这和 SQL 里的 CREATE TABLE IF NOT EXISTS 相似。 |
###保存到持久化表
可使用 saveAsTable 命令将 DataFrame 做为持久化表保存在 Hive metastore 中。须要注意的是现有的 Hive 部署不必使用这个特性。Spark 将会建立一个默认的本地 Hive metastore (使用 Derby)。不一样于 createOrReplaceTempView 命令,saveAsTable 将会物化 DataFrame 内容并建立一个指向 Hive metastore 中数据的指针。即便 Spark 应用程序重启, 持久化表也会一直存在,只要你维持到同一个 metastore 的链接。经过调用 SparkSession 上带有表名参数的 table 方法就能够建立 DataFrame 的一个持久化表。
默认状况下,saveAsTable 会建立一个 "managed table (托管表或内部表)", 意味着数据的位置将由 metastore 控制。当表被删除时, 托管表(或内部表)也会自动删除它们的数据。
##Parquet文件
Parquet 是一种列式存储格式,不少其它的数据处理系统都支持它。Spark SQL 提供了对可以自动保存原始数据的 schema 的 Parquet 文件的读写支持。写 Parquet 文件时,处于兼容性考虑,全部列都自动地转换为nullable。
###编程方式加载数据
仍然使用上面例子中的数据:
#####Scala
// 引入spark.implicits._以自动提供用于大多数经常使用类型的Encoder import spark.implicits._ val peopleDF = spark.read.json("examples/src/main/resources/people.json") // DataFrame能够保存为Parquet文件,以维护schema信息 peopleDF.write.parquet("people.parquet") // 读入上面建立的parquet文件 // Parquet文件是自描述的,所以schema得以保存 // Parquet文件的加载结果也是一个DataFrame val parquetFileDF = spark.read.parquet("people.parquet") // Parquet文件也能够用于建立一个临时视图接着能够用在SQL语句中 parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 文件。
#####Java
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; // import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json"); // DataFrame能够保存为Parquet文件, 以维护schema信息 peopleDF.write().parquet("people.parquet"); // 读入上面建立的parquet文件 // Parquet文件是自描述的,所以schema得以保存 // Parquet文件的加载结果也是一个DataFrame Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet"); // Parquet文件也能够用于建立一个临时视图接着能够用在SQL语句中 parquetFileDF.createOrReplaceTempView("parquetFile"); Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19"); Dataset<String> namesDS = namesDF.map(new MapFunction<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }, Encoders.STRING()); namesDS.show(); // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+
完整示例代码参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 中。
#####Python
# 以前示例使用的spark变量用在这个示例中 schemaPeople # 以前示例使用的 DataFrame # DataFrame能够保存为Parquet文件,以维护schema信息 schemaPeople.write.parquet("people.parquet") # 读入上面建立的parquet文件 # Parquet文件是自描述的,所以schema得以保存 # Parquet文件的加载结果也是一个DataFrame parquetFile = spark.read.parquet("people.parquet") # Parquet文件也能够用于建立一个临时视图接着能够用在SQL语句中 parquetFile.createOrReplaceTempView("parquetFile"); teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenNames = teenagers.map(lambda p: "Name: " + p.name) for teenName in teenNames.collect(): print(teenName)
#####Sql
CREATE TEMPORARY VIEW parquetTable USING org.apache.spark.sql.parquet OPTIONS ( path "examples/src/main/resources/people.parquet" ) SELECT * FROM parquetTable
###分区发现(Partition Discovery)
像 Hive 这样的系统中,表分区是一个经常使用的优化方法。在一个分区表中,数据一般存储在不一样的目录下,分区列值编码于各个分区目录的路径中。Parquet 数据源如今能够自动地发现和推导分区信息。例如,咱们可使用下面的目录结构把以前使用的人口数据存储到一个分区表中,并使用2个额外的列gender和country来做为分区列:
path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
经过将 path/to/table 传递给 SparkSession.read.parquet 或 SparkSession.read.load 方法, Spark SQL 将会自动从路径中提取分区信息。如今返回的 DataFrame 的 schema 以下:
root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true)
注意:分区列的数据类型是自动推导出来的。目前,分区列只支持数值类型和字符串类型。有时候用户可能不想要自动推导分区列的数据类型,对于这些使用案例,自动类型推导能够经过 spark.sql.sources.partitionColumnTypeInference.enabled 来配置是否开启,其默认值是true。当禁用类型推导后,字符串类型将用于分区列类型。
从Spark 1.6.0 版本开始,分区发现(partition discovery)默认只查找给定路径下的分区。拿上面的例子来讲,若是用户将 path/to/table/gender=male 传递给 SparkSession.read.parquet 或者 SparkSession.read.load 方法,那么 gender 将不会被看成分区列。若是用户想要指定分区发现(partition discovery)开始的基路径,能够在数据源选项中设置 basePath。例如,若是 path/to/table/gender=male 是数据路径,而且用户将 basePath 设置为 path/to/table,那么 gender 将是一个分区列。
###Schema合并(Schema Merging)
和 ProtocolBuffer、Avro 以及 Thrift 同样,Parquet 也支持 schema 演变。用户能够从一个简单的 schema 开始,逐渐往 schema 中增长所须要的列。经过这种方式,用户最终可能会获得多个有着不一样 schema 但互相兼容 的 Parquet 文件。Parquet 数据源如今可以自动检测这种状况并合并全部这些文件的 schema。
由于 schema 合并相对来讲是一个代价高昂的操做,而且在大多数状况下都不须要,因此从Spark 1.5.0 版本开始,默认关闭了Schema合并。你能够这样启用这一功能:
#####Scala
// 用于隐式地将RDD转换成DataFrame import spark.implicits._ // 建立一个简单的DataFrame,将其存储到一个分区目录下 val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") squaresDF.write.parquet("data/test_table/key=1") // 在一个新的分区目录下建立另外一个DataFrame, // 增长一个新列并删除一个现有的列 val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") cubesDF.write.parquet("data/test_table/key=2") // 读取分区表 val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() // 最终的schema由Parquet文件中全部3列以及出如今分区目录路径中的分区列组成 // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key : int (nullable = true)
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 文件。
#####Java
import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; public static class Square implements Serializable { private int value; private int square; // Getters and setters... } public static class Cube implements Serializable { private int value; private int cube; // Getters and setters... } List<Square> squares = new ArrayList<>(); for (int value = 1; value <= 5; value++) { Square square = new Square(); square.setValue(value); square.setSquare(value * value); squares.add(square); } // 建立一个简单的DataFrame,将其存储到一个分区目录下 Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class); squaresDF.write().parquet("data/test_table/key=1"); List<Cube> cubes = new ArrayList<>(); for (int value = 6; value <= 10; value++) { Cube cube = new Cube(); cube.setValue(value); cube.setCube(value * value * value); cubes.add(cube); } // 在一个新的分区目录下建立另外一个DataFrame, // 增长一个新列并删除一个现有的列 Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class); cubesDF.write().parquet("data/test_table/key=2"); // 读取分区表 Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table"); mergedDF.printSchema(); // 最终的schema由Parquet文件中全部3列以及出如今分区目录路径中的分区列组成 // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key : int (nullable = true)
完整示例代码参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 文件。
#####Python
# 以前示例使用的spark变量用在这个示例中 # 建立一个简单的DataFrame,将其存储到一个分区目录下 df1 = spark.createDataFrame(sc.parallelize(range(1, 6))\ .map(lambda i: Row(single=i, double=i * 2))) df1.write.parquet("data/test_table/key=1") # 在一个新的分区目录下建立另外一个DataFrame, # 增长一个新列并删除一个现有的列 df2 = spark.createDataFrame(sc.parallelize(range(6, 11)) .map(lambda i: Row(single=i, triple=i * 3))) df2.write.parquet("data/test_table/key=2") # 读取分区表 df3 = spark.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() # 最终的schema由Parquet文件中全部3列以及出如今分区目录路径中的分区列组成 # root # |-- single: int (nullable = true) # |-- double: int (nullable = true) # |-- triple: int (nullable = true) # |-- key : int (nullable = true)
###Hive metastore Parquet表转换
当读写 Hive metastore Parquet 表时,Spark SQL 会使用自带的 Parquet 支持而不是 Hive SerDe,以便得到更好的性能。这是由 spark.sql.hive.convertMetastoreParquet 配置项控制的,而且默认是开启的。
####Hive/Parquet Schema调整
从表的 schema 处理的角度来看, Hive 和 Parquet 有2个关键的不一样点:
因为这个缘由,在将一个 Hive metastore Parquet 表转换成一个 Spark SQL Parquet 表时,必需要使 Hive metastore schema 和 Parquet schema 协调一致。调整规则以下:
####元数据刷新(Metadata Refreshing)
Spark SQL 会缓存 Parquet 元数据以提升性能。若是启用了Hive metastore Parquet表转换,那么那些转换后的表的 schema 也会被缓存起来。若是这些表被 Hive 或其它外部工具更新, 那么你须要手动地刷新它们以确保元数据的一致性。
#####Scala
// spark是一个已存在的SparkSession对象 spark.catalog.refreshTable("my_table")
#####Java
// spark是一个已存在的SparkSession对象 spark.catalog().refreshTable("my_table");
#####Python
# spark是一个已存在的 HiveContext spark.refreshTable("my_table")
#####Sql
REFRESH TABLE my_table;
###配置
Parquet 配置可使用 SparkSession 上的 setConf 方法或者经过使用 SQL语句运行 SET key=value 命令来进行设置。
属性名 | 默认值 | 含义 |
---|---|---|
spark.sql.parquet.binaryAsString | false | 其它一些使用 Parquet 的系统, 特别是 Impala,Hive 以及老版本的 Spark SQL,当写 Parquet schema 时都不区分二进制数据和字符串。这个标识告诉 Spark SQL 把二进制数据当字符串处理,以兼容这些系统。 |
spark.sql.parquet.int96AsTimestamp | true | 一些使用 Parquet 的系统, 特别是 Impala 和 Hive,将 Timestamp 存储成 INT96。这个标识告诉 Spark SQL 将 INT96 数据解析成 Timestamp,以兼容这些系统。 |
spark.sql.parquet.cacheMetadata | true | 开启 Parquet schema 元数据缓存。能够提高查询静态数据的速度。 |
spark.sql.parquet.compression.codec | gzip | 设置写 Parquet 文件使用的压缩编码格式。可接受的值有:uncompressed, snappy, gzip, lzo |
spark.sql.parquet.filterPushdown | true | 设置为 true 时启用 Parquet 过滤器 push-down 优化 |
spark.sql.hive.convertMetastoreParquet | true | 设置为false时,Spark SQL将使用 Hive SerDe 而不是内置的 parquet 表支持 |
spark.sql.parquet.mergeSchema | false | 若是设为 true,Parquet 数据源将会合并全部数据文件的 schema,不然,从汇总文件或随机选取一个数据文件(若是没有汇总文件)选取schema |
##JSON数据集
#####Scala
Spark SQL 能够自动推导一个 JSON 数据集的 schema 并将其加载为一个 Dataset[Row]。这种转换能够经过在一个 String 的 RDD 或一个 JSON 文件上使用 SparkSession.read.json() 方法来完成。
注意: 做为_json文件_提供的文件并非一个典型的JSON文件。JSON 文件的每一行必须包含一个独立的、完整有效的 JSON 对象。所以,一个常规的多行 json 文件常常会加载失败。
// 由路径指向的JSON数据集,这个路径既能够是一个简单文本文件也能够是一个存储文本文件的目录 val path = "examples/src/main/resources/people.json" val peopleDF = spark.read.json(path) // 推导出的schema可使用printSchema()方法可视化 peopleDF.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // 使用DataFrame建立一个临时视图 peopleDF.createOrReplaceTempView("people") // 使用spark提供的sql方法能够运行SQL语句 val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() // +------+ // | name| // +------+ // |Justin| // +------+ // 或者, 能够为JSON数据集建立一个由RDD[String]表示的DataFrame,每一个字符串存储一个JSON对象 val otherPeopleRDD = spark.sparkContext.makeRDD( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleRDD) otherPeople.show() // +---------------+----+ // | address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 文件。
#####Java
Spark SQL 能够自动推导一个 JSON 数据集的 schema 并将其加载为一个 Dataset<Row>。这种转换能够经过在一个 String 的 RDD 或一个 JSON 文件上使用 SparkSession.read.json() 方法来完成。
注意: 做为_json文件_提供的文件并非一个典型的JSON文件。JSON 文件的每一行必须包含一个独立的、完整有效的 JSON 对象。所以,一个常规的多行 json 文件常常会加载失败。
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; // 由路径指向的JSON数据集,这个路径既能够是一个简单文本文件也能够是一个存储文本文件的目录 Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json"); // 推导出的schema可使用printSchema()方法可视化 people.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // 使用DataFrame建立一个临时视图 people.createOrReplaceTempView("people"); // 使用spark提供的sql方法能够运行SQL语句 Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"); namesDF.show(); // +------+ // | name| // +------+ // |Justin| // +------+
完整示例代码参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 文件。
#####Python
Spark SQL 能够自动推导一个 JSON 数据集的 schema 并将其加载为一个 DataFrame。这种转换能够经过在一个 JSON 文件上使用 SparkSession.read.json 方法来完成。
注意: 做为_json文件_提供的文件并非一个典型的JSON文件。JSON 文件的每一行必须包含一个独立的、完整有效的 JSON 对象。所以,一个常规的多行 json 文件常常会加载失败。
# spark是一个已存在的SparkSession对象 # 由路径指向的JSON数据集,这个路径既能够是一个简单文本文件也能够是一个存储文本文件的目录 people = spark.read.json("examples/src/main/resources/people.json") # 推导出的schema可使用printSchema()方法可视化 people.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # 使用DataFrame建立一个临时视图 people.createOrReplaceTempView("people") # 使用spark提供的sql方法能够运行SQL语句 teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // 或者, 能够为JSON数据集建立一个由RDD[String]表示的DataFrame,每一个字符串存储一个JSON对象 anotherPeopleRDD = sc.parallelize([ '{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']) anotherPeople = spark.jsonRDD(anotherPeopleRDD)
#####Sql
CREATE TEMPORARY VIEW jsonTable USING org.apache.spark.sql.json OPTIONS ( path "examples/src/main/resources/people.json" ) SELECT * FROM jsonTable
##Hive表
Spark SQL 还支持读写存储于 Apache Hive 中的数据。然而因为 Hive 的依赖项太多,在默认的 Spark 发行版本中并无包含这些依赖。Spark 会自动加载 classpath 上的 Hive 依赖。注意:这些 Hive 依赖也必须放到全部的 worker 节点上,由于若是要访问 Hive 中的数据它们须要访问Hive 序列化和反序列化库(SerDes)。
能够将 hive-site.xml,core-site.xml(用于安全配置)以及 hdfs-site.xml(用于HDFS配置)文件放置在 conf/ 目录下来完成 Hive 配置。
使用 Hive 时, 你必需要实例化一个支持 Hive 的 SparkSession, 包括链接到一个持久化的 Hive metastore, Hive serdes 以及 Hive 用户自定义函数。即便用户没有安装部署 Hive 也仍然能够启用Hive支持。若是没有在 hive-site.xml 文件中配置, Spark应用程序启动以后,上下文会自动在当前目录下建立一个 metastore_db 目录并建立一个由 spark.sql.warehouse.dir 配置的、默认值是当前目录下的 spark-warehouse 目录的目录。注意: 从 Spark 2.0.0 版本开始, hive-site.xml 中的 hive.metastore.warehouse.dir 属性已通过时, 你可使用 spark.sql.warehouse.dir 来指定 Hive仓库中数据库的默认存储位置。你可能还须要给启动 Spark 应用程序的用户赋予写权限。
#####Scala
import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession case class Record(key: Int, value: String) // warehouseLocation 指向用于管理数据库和表的默认路径 val warehouseLocation = "file:${system:user.dir}/spark-warehouse" val spark = SparkSession .builder() .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate() import spark.implicits._ import spark.sql sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // HiveQL表示的查询语句 sql("SELECT * FROM src").show() // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // 还支持聚合查询 sql("SELECT COUNT(*) FROM src").show() // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // SQL查询结果自己就是DataFrame并支持全部标准函数 val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") // DaraFrame中数据项的类型是Row,它容许你有序的访问每一列 val stringsDS = sqlDF.map { case Row(key: Int, value: String) => s"Key: $key, Value: $value" } stringsDS.show() // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // 你也能够在HiveContext内部使用DataFrame来建立临时视图 val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records") // 能够关联查询DataFrame数据和存储在Hive中的数据 sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // ...
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" 文件。
#####Java
import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public static class Record implements Serializable { private int key; private String value; public int getKey() { return key; } public void setKey(int key) { this.key = key; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } } // warehouseLocation 指向用于管理数据库和表的默认路径 String warehouseLocation = "file:" + System.getProperty("user.dir") + "spark-warehouse"; SparkSession spark = SparkSession .builder() .appName("Java Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate(); spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // HiveQL表示的查询语句 spark.sql("SELECT * FROM src").show(); // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // 还支持聚合查询 spark.sql("SELECT COUNT(*) FROM src").show(); // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // SQL查询结果自己就是DataFrame并支持全部标准函数 Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key"); // DaraFrame中数据项的类型是Row,它容许你有序的访问每一列 Dataset<String> stringsDS = sqlDF.map(new MapFunction<Row, String>() { @Override public String call(Row row) throws Exception { return "Key: " + row.get(0) + ", Value: " + row.get(1); } }, Encoders.STRING()); stringsDS.show(); // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // 你也能够在HiveContext内部使用DataFrame来建立临时视图 List<Record> records = new ArrayList<>(); for (int key = 1; key < 100; key++) { Record record = new Record(); record.setKey(key); record.setValue("val_" + key); records.add(record); } Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class); recordsDF.createOrReplaceTempView("records"); // 能够关联查询DataFrame数据和存储在Hive中的数据 spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show(); // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // ...
完整示例代码参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java" 文件。
#####Python
# spark是一个已存在的SparkSession对象 spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # HiveQL表示的查询语句 results = spark.sql("FROM src SELECT key, value").collect()
###与不一样版本的Hive Metastore交互
Spark SQL 对 Hive 最重要的一个支持就是能够和 Hive metastore 进行交互,这使得 Spark SQL 能够访问 Hive 表的元数据。从 Spark 1.4.0 版本开始,经过使用下面描述的配置, Spark SQL一个简单的二进制编译版本能够用来查询不一样版本的 Hive metastore。注意:无论用于访问 metastore 的 Hive 是什么版本,Spark SQL 内部都使用 Hive 1.2.1 版本进行编译, 而且使用这个版本的一些类用于内部执行(serdes,UDFs,UDAFs等)。
下面的选项可用来配置用于检索元数据的 Hive 版本:
属性名 | 默认值 | 含义 |
---|---|---|
spark.sql.hive.metastore.version | 1.2.1 | Hive metastore版本,可用选项从0.12.0 到 1.2.1 |
spark.sql.hive.metastore.jars | builtin | 存放用于实例化 HiveMetastoreClient 的 jar 包位置。这个属性能够是下面三个选项之一:1. builtin。使用 Hive 1.2.1 版本,当启用 -Phive 时会和 Spark一块儿打包。若是使用了这个选项, 那么 spark.sql.hive.metastore.version 要么是1.2.1,要么就不定义。2. maven。使用从 Maven 仓库下载的指定版本的 Hive jar包。生产环境部署一般不建议使用这个选项。3. 标准格式的 JVM classpath。这个 classpath 必须包含全部 Hive 及其依赖的 jar包,包括正确版本的 Hadoop。这些 jar 包只须要部署在 driver 节点上,可是若是你使用 yarn cluster模式运行,那么你必需要确保这些 jar 包是和应用程序一块儿打包的。 |
spark.sql.hive.metastore.sharedPrefixes | com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc | 使用类加载器加载的一个逗号分隔的类名前缀列表,它能够在 Spark SQL 和特定版本的 Hive 间共享。一个共享类的示例就是用来访问 Hive metastore 的 JDBC driver。其它须要共享的类,是须要与已经共享的类进行交互的类。例如,log4j 使用的自定义 appender |
spark.sql.hive.metastore.barrierPrefixes | (empty) | 一个逗号分隔的类名前缀列表,对于 Spark SQL 访问的每一个 Hive 版本都须要显式地从新加载这个列表。例如,在一个共享前缀列表(org.apache.spark.*)中声明的 Hive UDF一般须要被共享。 |
##使用JDBC链接其它数据库
Spark SQL 还包括一个可使用 JDBC 从其它数据库读取数据的数据源。该功能比使用 JdbcRDD 更好,由于它的返回结果是一个 DataFrame,这样能够很容易地在 Spark SQL 中进行处理或和其它数据源进行关联操做。JDBC 数据源在 Java 和 Python 中用起来很简单,由于不须要用户提供一个 ClassTag。(注意,这和 Spark SQL JDBC server 不一样,Spark SQL JDBC server 容许其它应用程序使用 Spark SQL 执行查询)
首先,你须要在 Spark classpath 中包含特定数据库的 JDBC driver。例如,为了从 Spark Shell链接到 postgres 数据库,你须要运行下面的命令:
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
经过使用数据源 API, 远程数据库的表能够加载为一个 DataFrame 或 Spark SQL 临时表。支持的一些选项以下:
属性名 | 含义 |
---|---|
url | 链接的 JDBC URL |
dbtable | 须要读取的 JDBC 表。注意:可使用一个 SQL 查询语句的 FROM 子句中任何有效内容。例如, 除使用一个完整的表名以外,你还可使用括号中的子查询。 |
driver | 用来链接到 JDBC URL 的 JDBC driver 类名。 |
partitionColumn, lowerBound, upperBound, numPartitions | 这几个选项,若是指定了其中一个,那么其它选项必须所有指定。它们描述了从多个 worker 并行读入数据时如何进行表分区。partitionColumn 必须是所查询表中的一个数值类型的列。注意,lowerBound 和 upperBound 只是用来决定分区跨度的,而不是过滤表中的行。所以,表中全部的行都会被分区而后返回。 |
fetchSize | JDBC fetch size,用来限制每次获取多少行数据。默认设置成一个比较小的 fetch size (如,Oracle上设为10)有助于 JDBC驱动上的性能提高。 |
#####Scala
val jdbcDF = spark.read.format("jdbc").options( Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")).load()
#####Java
Map<String, String> options = new HashMap<>(); options.put("url", "jdbc:postgresql:dbserver"); options.put("dbtable", "schema.tablename"); Dataset<Row> jdbcDF = spark.read().format("jdbc"). options(options).load();
#####Python
df = spark.read.format('jdbc').options(url='jdbc:postgresql:dbserver', dbtable='schema.tablename').load()
#####Sql
CREATE TEMPORARY VIEW jdbcTable USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:postgresql:dbserver", dbtable "schema.tablename" )
##故障排查
在客户端会话以及全部的executor上,JDBC 驱动器类必须对启动类加载器可见,这是由于 Java 的 DriverManager 类在打开一个链接以前会作一个安全检查,这样就致使它忽略了对于启动类加载器不可见的全部驱动器。一种简单的方法就是修改全部 worker 节点上的 compute_classpath.sh 以包含你驱动器的 jar 包。
有些数据库,好比H2,会把全部的名称转换成大写。在Spark SQL中你也须要使用大写来引用这些名称。
#性能优化
对于有些负载的 Spark 任务,能够将数据放入内存进行缓存或开启一些试验选项来提高性能。
##在内存中缓存数据
经过调用 spark.cacheTable(“tableName”) 或者 dataFrame.cache() 方法, Spark SQL 可使用一种内存列存储格式缓存表。而后 Spark SQL 只扫描必要的列,而且自动调整压缩比例,以最小化内存占用和GC压力。你能够调用 spark.uncacheTable(“tableName”) 方法删除内存中的表。
可使用 SparkSession 类中的 setConf 方法或经过 SQL 语句运行 SET key=value 命令来完成内存缓存配置。
属性名 | 默认值 | 含义 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | true | 若是设置为 true,Spark SQL 将会基于统计数据自动地为每一列选择一种压缩编码方式。 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 控制列式缓存批处理大小。缓存数据时, 较大的批处理大小能够提升内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险。 |
##其它配置选项
下面的选项也能够用来提高执行的查询性能。随着 Spark 自动地执行愈来愈多的优化操做, 这些选项在将来的发布版本中可能会废弃。
属性名 | 默认值 | 含义 |
---|---|---|
spark.sql.files.maxPartitionBytes | 134217728 (128 MB) | 读取文件时单个分区可容纳的最大字节数 |
spark.sql.files.openCostInBytes | 4194304 (4 MB) | 打开文件的估算成本, 按照同一时间可以扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好, 这样的话小文件分区将比大文件分区更快 (先被调度)。 |
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | 用于配置一个表在执行 join 操做时可以广播给全部 worker 节点的最大字节大小。经过将这个值设置为 -1 能够禁用广播。注意,当前数据统计仅支持已经运行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。 |
spark.sql.shuffle.partitions | 200 | 用于配置 join 或聚合操做混洗(shuffle)数据时使用的分区数。 |
#分布式SQL引擎
经过使用 JDBC/ODBC 或者命令行接口,Spark SQL 还能够做为一个分布式查询引擎。在这种模式下,终端用户或应用程序能够运行 SQL 查询来直接与 Spark SQL 交互,而不须要编写任何代码。
##运行Thrift JDBC/ODBC server
这里实现的Thrift JDBC/ODBC server 对应于 Hive 1.2.1 版本中的 HiveServer2。你可使用Spark 或者 Hive 1.2.1 自带的 beeline 脚原本测试这个 JDBC server。
要启动 JDBC/ODBC server, 须要在 Spark 安装目录下运行下面这个命令:
./sbin/start-thriftserver.sh
这个脚本能接受全部 bin/spark-submit 命令行选项,外加一个用于指定 Hive 属性的 --hiveconf 选项。你能够运行 ./sbin/start-thriftserver.sh --help 来查看全部可用选项的一个完整列表。默认状况下,启动的 server 将会在 localhost:10000 上进行监听。你能够覆盖该行为, 好比使用如下环境变量:
export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ...
或者系统属性:
./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...
如今你可使用 beeline 来测试这个 Thrift JDBC/ODBC server:
./bin/beeline
在 beeline 中使用如下命令链接到 JDBC/ODBC server:
beeline> !connect jdbc:hive2://localhost:10000
Beeline 会要求你输入用户名和密码。在非安全模式下,只须要输入你本机的用户名和一个空密码便可。对于安全模式,请参考 beeline 文档中的指示.
将 hive-site.xml,core-site.xml 以及 hdfs-site.xml 文件放置在 conf/ 目录下能够完成 Hive 配置。
你也可使用Hive 自带的 beeline 的脚本。
Thrift JDBC server 还支持经过 HTTP 传输来发送 Thrift RPC 消息。使用下面的设置来启用 HTTP 模式:
hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001 hive.server2.http.endpoint - HTTP endpoint; default is cliservice
为了测试,下面在 HTTP 模式中使用 beeline 链接到 JDBC/ODBC server:
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
##运行Spark SQL CLI
Spark SQL CLI 是一个很方便的工具,它能够在本地模式下运行 Hive metastore 服务,而且执行从命令行中输入的查询语句。注意:Spark SQL CLI 没法与 Thrift JDBC server 通讯。
要启动用Spark SQL CLI, 能够在Spark安装目录运行下面的命令:
./bin/spark-sql
将 hive-site.xml,core-site.xml 以及 hdfs-site.xml 文件放置在 conf/ 目录下能够完成 Hive 配置。你能够运行 ./bin/spark-sql –help 来获取全部可用选项的一个完整列表。
#迁移指南
##Spark SQL从1.6版本升级到2.0版本
SparkSession 如今是 Spark 新的切入点, 它替代了老的 SQLContext 和 HiveContext。注意:为了向下兼容, 老的 SQLContext 和 HiveContext 仍然保留。能够从 SparkSession 获取一个新的 catalog 接口- 现有的访问数据库和表的API, 如 listTables, createExternalTable, dropTempView, cacheTable 都被移到该接口。
Dataset API 和 DataFrame API 进行了统一。在 Scala 中, DataFrame 变成了 Dataset[Row] 的一个类型别名, 而 Java API 使用者必须将 DataFrame 替换成 Dataset<Row>。Dataset 类既提供了强类型变换操做 (如 map, filter 以及 groupByKey) 也提供了非强类型变换操做 (如 select 和 groupBy) 。因为编译期的类型安全不是 Python 和 R 语言的一个特性, Dataset 的概念并不适用于这些语言的 API。相反, DataFrame 仍然是最基本的编程抽象, 就相似于这些语言中单节点数据帧的概念。
Dataset 和 DataFrame API 中 unionAll 已通过时而且由union替代。
Dataset 和 DataFrame API 中 explode 已通过时。或者 functions.explode() 能够结合 select 或 flatMap 一块儿使用。
Dataset 和 DataFrame API 中 registerTempTable 已通过时而且由 createOrReplaceTempView替代。
##Spark SQL从1.5版本升级到1.6版本
从 Spark 1.6 版本开始,Thrift server 默认运行于多会话模式下, 这意味着每一个 JDBC/ODBC 链接都有独有一份 SQL 配置和临时函数注册表的拷贝。尽管如此, 缓存的表仍然能够共享。若是你更喜欢在老的单会话模式中运行 Thrift server,只须要将 spark.sql.hive.thriftServer.singleSession 选项设置为 true 便可。固然,你也可在 spark-defaults.conf 文件中添加这个选项,或者经过 --conf 将其传递给 start-thriftserver.sh:
./sbin/start-thriftserver.sh
--conf spark.sql.hive.thriftServer.singleSession=true
... ```
从1.6.1版本开始, sparkR 中的 withColumn 方法支持向 DataFrame 新增一列 或 替换已有的名称相同的列。
从 Spark 1.6 版本开始, LongType 转换成 TimestampType 将源值以秒而不是毫秒做为单位处理。作出这个变动是为了的匹配 Hive 1.2 版本中从数值类型转换成TimestampType的这个行为以得到更一致的类型。更多细节请参见 SPARK-11724 。
##Spark SQL从1.4版本升级到1.5版本
使用手动管理内存(Tungsten引擎)的执行优化以及用于表达式求值的代码自动生成如今默认是启用的。这些特性能够经过将 spark.sql.tungsten.enabled 的值设置为 false 来同时禁用。
默认不启用 Parquet schema 合并。能够将 spark.sql.parquet.mergeSchema 的值设置为 true 来从新启用。
Python 中对于列的字符串分解如今支持使用点号(.)来限定列或访问内嵌值,例如 df[‘table.column.nestedField’]。然而这也意味着若是你的列名包含任何点号(.)的话,你就必需要使用反引号来转义它们(例如:table.column.with.dots
.nested)。
默认启用内存中列式存储分区修剪。能够经过设置 spark.sql.inMemoryColumarStorage.partitionPruning 值为 false 来禁用它。
再也不支持无精度限制的 decimal,相反, Spark SQL 如今强制限制最大精度为38位。从 BigDecimal 对象推导 schema 时会使用(38,18)这个精度。若是在 DDL 中没有指定精度,则默认使用精度 Decimal(10,0)。
存储的时间戳(Timestamp)如今精确到1us(微秒),而不是1ns(纳秒)
在 sql 方言中,浮点数如今被解析成 decimal。HiveQL 的解析保持不变。
SQL/DataFrame 函数的规范名称均为小写(例如:sum vs SUM)。
JSON 数据源不会再自动地加载其余应用程序建立的新文件(例如,不是由 Spark SQL 插入到dataset中的文件)。对于一个 JSON 持久化表(例如:存储在 Hive metastore 中的表的元数据),用户可使用 REFRESH TABLE 这个 SQL 命令或者 HiveContext 的 refreshTable 方法来把新文件添加进表。对于一个表示 JSON 数据集的 DataFrame, 用户须要重建这个 DataFrame, 这样新的 DataFrame 就会包含新的文件。
pySpark 中的 DataFrame.withColumn 方法支持新增一列或是替换名称相同列。
##Spark SQL从1.3版本升级到1.4版本
###DataFrame数据读写接口
根据用户的反馈,咱们提供了一个用于数据读入(SQLContext.read)和数据写出(DataFrame.write)的新的、更加流畅的API,同时老的API(如:SQLCOntext.parquetFile, SQLContext.jsonFile)将被废弃。
有关 SQLContext.read ( Scala, Java, Python ) 和 DataFrame.write ( Scala, Java, Python ) 的更多信息,请参考API文档。
###DataFrame.groupBy保留分组列
根据用户的反馈,咱们改变了 DataFrame.groupBy().agg() 的默认行为,就是在返回的 DataFrame 结果中保留分组的列。若是你想保持1.3版本中的行为,能够将 spark.sql.retainGroupColumns 设置为 false。
#####Scala
// 在1.3.x版本中, 为了显示分组列 "department", 它必需要做为 agg 函数调用的一部分显示地包含进来 df.groupBy("department").agg($"department", max("age"), sum("expense")) // 在1.4+版本中, 分组列 "department"自动包含进来 df.groupBy("department").agg(max("age"), sum("expense")) // 要回滚到 1.3 的行为 (不包含分组列), 能够进行以下设置: sqlContext.setConf("spark.sql.retainGroupColumns", "false")
#####Java
// 在1.3.x版本中, 为了显示分组列 "department", 它必需要做为 agg 函数调用的一部分显示地包含进来 df.groupBy("department").agg(col("department"), max("age"), sum("expense")); // 在1.4+版本中, 分组列 "department"自动包含进来 df.groupBy("department").agg(max("age"), sum("expense")); // 要回滚到 1.3 的行为 (不包含分组列), 能够进行以下设置: sqlContext.setConf("spark.sql.retainGroupColumns", "false");
#####Python
import pyspark.sql.functions as func # 在1.3.x版本中, 为了显示分组列 "department", 它必需要做为 agg 函数调用的一部分显示地包含进来 df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense")) # 在1.4+版本中, 分组列 "department"自动包含进来 df.groupBy("department").agg(func.max("age"), func.sum("expense")) # 要回滚到 1.3 的行为 (不包含分组列), 能够进行以下设置: sqlContext.setConf("spark.sql.retainGroupColumns", "false")
###DataFrame.withColumn行为变动
1.4版本以前, DataFrame.withColumn() 只支持新增一列。在 DataFrame 结果中指定名称的列老是做为一个新列添加进来,即便已经存在了相同名称的列。从1.4版本开始, DataFrame.withColumn() 支持新增一个和现有列名不重复的新列和替换有相同名称的列。
注意:这个变动只针对 Scala API, 不针对 PySpark 和 SparkR。
##Spark SQL从1.0-1.2版本升级到1.3版本
Spark 1.3 版本咱们去掉了 Spark SQL 的 "Alpha" 标签而且清理了现有的API。从 Spark 1.3 版本开始,Spark SQL 将提供 1.x 系列中其它发行版本的二进制兼容。这个兼容性保证不包括显式地标注为不稳定(例如:DeveloperAPI 或 Experimental)的 API。
###SchemaRDD重命名为DataFrame
升级到 Spark SQL 1.3 后,用户将会注意到最大的改动就是 SchemaRDD 更名为 DataFrame。主要缘由是 DataFrame 再也不直接继承于 RDD,而是经过本身的实现来提供 RDD 中提供的绝大多数功能。经过调用 .rdd 方法 DataFrame 仍然能够转换成 RDD。
在 Scala 中有一个从 SchemaRDD 到 DataFrame 的类型别名来提供某些使用场景下的代码兼容性。但仍然建议用户在代码中改用 DataFrame。Java 和 Python 用户必需要修改代码。
###统一Java和Scala API
Spark 1.3 以前的版本中有两个单独的 Java 兼容类(JavaSQLContext 和 JavaSchemaRDD)能够映射到 Scala API。Spark 1.3 版本将 Java API 和 Scala API 进行了统一。两种语言的用户都应该使用 SQLContext 和 DataFrame。一般状况下这些类都会使用两种语言中都支持的类型(例如:使用 Array 来取代语言特有的集合)。有些状况下没有通用的类型(例如:闭包或maps中用于传值),则会使用函数重载。
另外,移除了 Java 特有的类型 API。Scala 和 Java 用户都应该使用 org.apache.spark.sql.types 包中的类来编程式地描述 schema。
###隔离隐式转换并移除dsl包 (只针对Scala)
Spark 1.3 版本以前的不少示例代码都以 import sqlContext._ 语句做为开头,这样会引入 sqlContext的全部函数。在Spark 1.3 版本中咱们隔离了 RDD 到 DataFrame的隐式转换,将其单独放到 SQLContext 内部的一个对象中。用户如今应该这样写:import sqlContext.implicits._。
另外,隐式转换如今也只能使用 toDF 方法来增长由 Product(例如:case classes 或 元祖)组成的 RDD,而不是自动转换。
使用 DSL(如今被 DataFrame API取代)的内部方法时,用户须要引入 import org.apache.spark.sql.catalyst.dsl。而如今应该要使用公用的 DataFrame 函数 API:import org.apache.spark.sql.functions._
###移除org.apache.spark.sql包中DataType的类型别名 (只针对Scala)
Spark 1.3 版本删除了基础 sql 包中 DataType 的类型别名。开发人员应该引入 org.apache.spark.sql.types 中的类。
###UDF注册迁移到sqlContext.udf中 (针对Java和Scala)
用于注册 UDF 的函数,无论是 DataFrame DSL 仍是 SQL 中用到的,都被迁移到 SQLContext中的 udf 对象中。
#####Scala
sqlContext.udf.register("strLen", (s: String) => s.length())
#####Java
sqlContext.udf().register("strLen", (String s) -> s.length(), DataTypes.IntegerType);
Python UDF注册保持不变。
###Python的DataType再也不是单例的
在 Python 中使用 DataTypes 时,你须要先构造它们(如:StringType()),而不是引用一个单例对象。
##兼容Apache Hive
Spark SQL 在设计时就考虑到了和 Hive metastore,SerDes 以及 UDF 之间的兼容性。目前 Hive SerDes 和 UDF 都是基于 Hive 1.2.1 版本,而且 Spark SQL 能够链接到不一样版本的 Hive metastore(从0.12.0到1.2.1,能够参考[与不一样版本的Hive Metastore交互])
###在现有的Hive仓库中部署
Spark SQL Thrift JDBC server 采用了开箱即用的设计以兼容已有的 Hive 安装版本。你不须要修改现有的 Hive Metastore , 或者改变数据的位置和表的分区。
###支持的Hive特性
Spark SQL 支持绝大部分的 Hive 功能,如:
###不支持的Hive功能
如下是目前还不支持的 Hive 功能列表。在Hive部署中这些功能大部分都用不到。
####Hive核心功能
####Hive高级功能
####Hive输入输出格式
####Hive优化
有少数Hive优化尚未包含在 Spark 中。其中一些(好比索引)因为 Spark SQL 的这种内存计算模型而显得不那么重要。另一些在 Spark SQL 将来的版本中会持续跟踪。
#参考
##数据类型
Spark SQL和 DataFrames 支持下面的数据类型:
数值类型
字符串类型
二进制类型
布尔类型
日期类型
Complex types(复杂类型)
StructType(fields):表示由StructField序列描述的结构。
#####Scala
Spark SQL 全部的数据类型都放在 org.apache.spark.sql.types 这个包下。你能够这样获取它们:
import org.apache.spark.sql.types._
完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。
数据类型 | Scala中值类型 | 用于获取或建立一个数据类型的API |
---|---|---|
ByteType | Byte | ByteType |
ShortType | Short | ShortType |
IntegerType | Int | IntegerType |
LongType | Long | LongType |
FloatType | Float | FloatType |
DoubleType | Double | DoubleType |
DecimalType | java.math.BigDecimal | DecimalType |
StringType | String | StringType |
BinaryType | Array[Byte] | BinaryType |
BooleanType | Boolean | BooleanType |
TimestampType | java.sql.Timestamp | TimestampType |
DateType | java.sql.Date | DateType |
ArrayType | scala.collection.Seq | ArrayType(elementType, [containsNull])注意:containsNull 的默认值是true |
MapType | scala.collection.Map | MapType(keyType, valueType, [valueContainsNull])注意:valueContainsNull 的默认值是true |
StructType | org.apache.spark.sql.Row | StructType(fields)注意:fields是一个StructField序列,另外不容许出现名称重复的字段。 |
StructField | Scala中该字段的数据类型对应的值类型(例如,若是StructField的数据类型为IntegerType,则Scala中其值类型为Int) | StructField(name, dataType, nullable) |
#####Java
Spark SQL全部的数据类型都放在 org.apache.spark.sql.types 这个包下。为了获取或建立一个数据类型, 请使用 org.apache.spark.sql.types.DataTypes 中提供的工厂方法。
数据类型 | Java中值类型 | 用于获取或建立一个数据类型的API |
---|---|---|
ByteType | byte 或 Byte | DataTypes.ByteType |
ShortType | short 或 Short | DataTypes.ShortType |
IntegerType | int 或 Integer | DataTypes.IntegerType |
LongType | long 或 Long | DataTypes.LongType |
FloatType | float 或 Float | DataTypes.FloatType |
DoubleType | double 或 Double | DataTypes.DoubleType |
DecimalType | java.math.BigDecimal | DataTypes.createDecimalType(),DataTypes.createDecimalType(precision, scale). |
StringType | String | DataTypes.StringType |
BinaryType | byte[] | DataTypes.BinaryType |
BooleanType | boolean 或 Boolean | DataTypes.BooleanType |
TimestampType | java.sql.Timestamp | DataTypes.TimestampType |
DateType | java.sql.Date | DataTypes.DateType |
ArrayType | java.util.List | DataTypes.createArrayType(elementType) 注意:containsNull 的默认值是true, DataTypes.createArrayType(elementType, containsNull) |
MapType | java.util.Map | DataTypes.createMapType(keyType, valueType) 注意:valueContainsNull 的默认值是true, DataTypes.createMapType(keyType, valueType,valueContainsNull) |
StructType | org.apache.spark.sql.Row | DataTypes.createStructType(fields) 注意:fields 是一个StructField 列表或数组,另外不容许出现名称重复的字段。 |
StructField | Java中该字段的数据类型对应的值类型 (例如,若是StructField的数据类型为IntegerType,则Java中其值类型为 int) | DataTypes.createStructField(name, dataType, nullable) |
#####Python
Spark SQL全部的数据类型都放在 pyspark.sql.types 这个包下. 你能够这样获取它们:
from pyspark.sql.types import *
数据类型 | Python中值类型 | 用于获取或建立一个数据类型的API |
---|---|---|
ByteType | int 或 long,注意: 数字在运行时会被转化成1字节的有符号整数。请确保数字是在 -128 到 127这个范围内 | ByteType() |
ShortType | int 或 long,注意: 数字在运行时会被转化成2字节的有符号整数。请确保数字是在 -32768 到 32767这个范围内。 | ShortType() |
IntegerType | int 或 long | IntegerType() |
LongType | long,注意: 数字在运行时会被转化成8字节的有符号整数。请确保数字是在 -9223372036854775808 到 9223372036854775807这个范围内.不然, 请将数据转化成 decimal.Decimal 并使用 DecimalType | LongType() |
FloatType | float,注意: 数字在运行时会被转化成4字节的单精度浮点数。 | FloatType() |
DoubleType | float | DoubleType() |
DecimalType | decimal.Decimal | DecimalType() |
StringType | string | StringType() |
BinaryType | bytearray | BinaryType() |
BooleanType | bool | BooleanType() |
TimestampType | datetime.datetime | TimestampType() |
DateType | datetime.date | DateType() |
ArrayType | list, tuple 或 array | ArrayType(elementType, [containsNull]) 注意:containsNull 的默认值是true |
MapType | dict | MapType(keyType, valueType, [valueContainsNull]) 注意:valueContainsNull 的默认值是True |
StructType | list 或 tuple | StructType(fields) 注意:fields 是一个StructFields序列,另外不容许出现名称重复的字段。 |
StructField | Python中该字段的数据类型对应的值类型 (例如,若是StructField的数据类型为IntegerType,则Python中其值类型为 Int) | StructField(name, dataType, nullable) |
##NaN语义
当处理一些不符合标准浮点数语义的 float 或 double 类型时,对于 Not-a-Number(NaN) 须要作一些特殊处理。具体以下: