Spark SQL和DataFrame指南[中]

翻译自: http://spark.apache.org/docs/1.3.0/sql-programming-guide.htmlhtml

概述(Overview)

Spark SQL是Spark的一个模块,用于结构化数据处理。它提供了一个编程的抽象被称为DataFrames,也能够做为分布式SQL查询引擎。java

DataFrames

DataFrame是一种以命名列方式组织的分布式数据集。它概念上至关于关系型数据库中的表,或者R/Python中的数据帧,可是具备更丰富的优化。有不少方式能够构造出一个DataFrame,例如:结构化数据文件,Hive中的tables,外部数据库或者存在的RDDs。node

DataFrame的API适用于Scala、Java和Python。sql

该页上全部的例子使用Spark分布式中的样本数据,能够运行在spark-shell或者pyspark shell中。shell

入口点: SQLContext

Spark SQL中全部功能的入口点是SQLContext类,或者它子类中的一个。为了建立一个基本的SQLContext,你所须要的是一个SparkContext。数据库

 

val sc: SparkContext // An existing SparkContext.express

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame.apache

import sqlContext.implicits._编程

 

除了基本的SQLContext,你还能够建立一个HiveContext,它提供了基本的SQLContext的所提供的功能的超集。这些功能中包括附加的特性,能够编写查询,使用更彻底的HiveQL解析器,访问Hive UDFs,可以从Hive表中读取数据。想要使用HiveContext,你不须要有一个存在的Hive步骤,而且全部SQLContext可用的数据源仍旧可用。HiveContext只是单独打包,以免包含默认Spark build中的全部Hive依赖。若是这些依赖对于你的应用不是一个问题,那么推荐使用Spark 1.3版本的HiveContext。json

使用spark.sql.dialect选项,能够选择SQL的具体变种,用它来解析查询。这个参数可使用SQLContext上的setConf方法或者在SQL中使用一组key=value命令。对于SQLContext,惟一能够的dialect是“sql”,它可使用SparkSQL提供的一个简单的SQL解析器。在HiveContext中,默认的是“hiveql”,尽管“sql”也是可用的。由于HiveOL解析器更加完整,在大多数状况下, 推荐使用这个。

建立DataFrames

使用SQLContext,应用能够从一个已经存在的RDD、Hive表或者数据源中建立DataFrames。

例如,如下根据一个JSON文件建立出一个DataFrame:

val sc: SparkContext // An existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// Create the DataFrame

val df = sqlContext.jsonFile("examples/src/main/resources/people.json")// Show the content of the DataFrame

df.show()

// age  name

// null Michael

// 30   Andy

// 19   Justin

 // Print the schema in a tree format

df.printSchema()

// root// |-- age: long (nullable = true)

// |-- name: string (nullable = true)

 // Select only the "name" column

df.select("name").show()

// name

// Michael

// Andy

// Justin

 // Select everybody, but increment the age by 1

// df.select("name", df("age") + 1).show() //官方文档这样的,可是测试时发现这样编译不经过。下面的形式能够

df.select(df("name"),df("age")+1).show()

// name    (age + 1)

// Michael null

// Andy    31

// Justin  20

 // Select people older than 21

df.filter(df("age") > 21).show()

// age name

// 30  Andy

 // Count people by age

df.groupBy("age").count().show()

// age  count

// null 1

// 19   1

// 30   1

以编程方式运行SQL查询

SQLContext中的sql函数使应用能够以编程方式运行SQL查询,而且将结果以DataFrame形式返回。

val sqlContext = ...  // An existing SQLContext

val df = sqlContext.sql("SELECT * FROM table")

RRDs之间的互操做(Interoperating with RDDs)

Spark SQL支持两种不一样的方法,用于将存在的RDDs转换成DataFrames。第一种方法使用反射来推断包含特定类型的对象的RDD的模式。在写Spark应用时,当你已知schema的状况下,这种基于反射的方式使得代码更加简介,而且效果更好。

建立DataFrames的第二种方法是经过编程接口,它容许你构建一个模式,而后将其应用到现有的RDD上。这种方式更加的繁琐,它容许你构建一个DataFrame当列以及类型未知,直到运行时才能知道时。

使用反射推断模式

Spark SQL中的Scala接口支持自动地将包含case类的RDD转换成DataFrame。case类定义了表的模式,case类的参数的名称使用反射来读取,而后称为列的名称。case类还能够嵌套或者包含复杂的类型,例如Sequences或者Arrays。这个RDD能够隐式地转换为DataFrame,而后注册成表,表能够在后续SQL语句中使用

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._// Define the schema using a case class.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,

// you can use custom classes that implement the Product interface.

case class Person(name: String, age: Int)// Create an RDD of Person objects and register it as a table.

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")// The results of SQL queries are DataFrames and support all the normal RDD operations.

// The columns of a row in the result can be accessed by ordinal.

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

一、使用case类定义schema

二、建立一个SQLContext

三、导入sqlContext.implicits._,用于隐式地将RDD转换成DataFrame

四、建立一个DataFrame,并将它注册成表。

五、使用sqlContext提供的sql方法,就可使用SQL语句来查询了。查询后返回的结果是DataFrame,它支持全部的RDD操做

以编程方式指定模式

当case类不能提早定义时(例如,记录的结构被编码在一个String中,或者不一样的用户会将文本数据集和字段进行不一样的解析和投影),DataFrame可使用如下三步,以编程的方式实现:

1.Create an RDD of Rows from the original RDD;

2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.

3.Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.

1.从原有的RDD中建立行的RDD。

2.建立一个由StructType表示的模式,StructType符合由步骤1建立的RDD的行的结构。

3.经过SQLContext提供的createDataFrame方法,将模式应用于行的RDD。

For example:

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// Create an RDD

val people = sc.textFile("examples/src/main/resources/people.txt")// The schema is encoded in a string

val schemaString = "name age"// Import Spark SQL data types and Row.

import org.apache.spark.sql._// Generate the schema based on the string of schema

val schema =  StructType(    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))// Convert records of the RDD (people) to Rows.

val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))// Apply the schema to the RDD.

val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)// Register the DataFrames as a table.

peopleDataFrame.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.

val results = sqlContext.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations.

// The columns of a row in the result can be accessed by ordinal.

results.map(t => "Name: " + t(0)).collect().foreach(println)

 

-----------------------------------------------------------

    //my code

    import org.apache.spark._

    import org.apache.spark.sql._

    import org.apache.spark.sql.types.{StructType, StructField, StringType}

    val conf = new SparkConf().setMaster("local").setAppName("XX")

    val sc = new SparkContext(conf)

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    val schemaString = "fullName age"

    val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,true)))

    val rowRDD = sc.textFile("data/people.txt").map(_.split(" ")).map(p=> Row(p(0),p(1).trim))

    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

    peopleDataFrame.registerTempTable("people")

    val young = sqlContext.sql("select * from people where age <25")

    young.show()

数据源(Data Sources)

Spark SQL支持经过DataFrame接口在多种数据源上进行操做。一个DataFrame能够如同一个标准的RDDs那样进行操做,还能够注册成临时的表。将一个DataFrame注册成临时表容许你在它的数据上运行SQL查询。本节介绍使用Spark数据源装载和保存数据的经常使用方法,使用Spark数据源保存数据。而后进入可用于内置数据源的特定选项。

通用的加载/保存功能

在最简单的形式中,默认的数据源(parquet除非经过spark.sql.sources.default另外进行配置)将被用于全部的操做。

val df = sqlContext.load("people.parquet")
df.select("name", "age").save("namesAndAges.parquet")

手动指定选项

你还能够手动指定数据源,这些数据源将与任何额外的选项一同使用,你但愿将这些选项传入到数据源中。数据源是经过它们的全名来指定的(如org.apache.spark.sql.parquet),可是对于内置的数据源,你也可使用简短的名称(json, parquet, jdbc)。任何类型的DataFrames使用这些语法能够转化成其余的数据源:

val df = sqlContext.load("people.json", "json")

df.select("name", "age").save("namesAndAges.parquet", "parquet")

保存模式

Save操做能够可选择性地接收一个SaveModel,若是数据已经存在了,指定如何处理已经存在的数据。意识到这些保存模式没有利用任何锁,也不是原子的,这很重要。所以,若是有多个写入者试图往同一个地方写入,这是不安全的。此外,当执行一个Overwrite,在写入新的数据以前会将原来的数据进行删除。

 

Scala/Java

Python

Meaning

SaveMode.ErrorIfExists (default)

"error" (default)

When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. 

当往一个数据源中保存一个DataFrame,若是数据已经存在,会抛出一个异常。

SaveMode.Append

"append"

When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data. 

当往一个数据源中保存一个DataFrame,若是data/table已经存在,DataFrame的内容会追加到已经存在的数据后面。

SaveMode.Overwrite

"overwrite"

Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame. 

Overwrite模式意味着当向数据源中保存一个DataFrame时,若是data/table已经存在了,已经存在的数据会被DataFrame中内容覆盖掉。

SaveMode.Ignore

"ignore"

Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL. 

Ignore模式意味着当向数据源中保存一个DataFrame时,若是数据已经存在,save操做不会将DataFrame的内容进行保存,也不会修改已经存在的数据。这与SQL中的`CREATE TABLE IF NOT EXISTS`类似。

 

保存为持久化表

当与HiveContext一块儿工做时,DataFrames也可使用saveAsTable命令保存为持久化的表。不像registerTempTable命令,saveAsTable会将DataFrame的内容进行物化,而且在HiveMetastore中建立一个指向数据的指针。持久化表会仍旧存在即便你的Spark程序从新启动。只要你保持链接到相同的元存储( metastore)。一个持久化表的DataFrame能够经过调用SQLContext上的带有表的名称的table方法来建立。

默认状况下,saveAsTable会建立一个“管理表(managed table)”,意味着元存储控制数据的位置。当一个表被删除后,managed table会自动地删除它们的数据。

Parquet Files

Parquet 是一种柱状的格式,被许多其余数据处理系统所支持。Spark SQL支持度对Parquet文件的读和写,自动保存原有数据的模式。

以编程方式加载数据

Loading Data Programmatically

Using the data from the above example:

使用上面例子中的数据:

// sqlContext from the previous example is used in this example.

// This is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._val people: RDD[Person] = ...

 // An RDD of case class objects, from the previous example.// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.

people.saveAsParquetFile("people.parquet")// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.

// The result of loading a Parquet file is also a DataFrame.

val parquetFile = sqlContext.parquetFile("people.parquet")//Parquet files can also be registered as tables and then used in SQL statements.

parquetFile.registerTempTable("parquetFile")

val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

分区发现

在系统中,如Hive,使用表分区是一个常见的优化途径。在一个分区表中,数据常常存储在不一样的目录中,对每个分区目录中的路径中,对分区列的值进行编码。Parquet数据源如今能够自动地发现而且推断出分区的信息。例如,咱们能够将以前使用的人口数据存储成下列目录结构的分区表,两个额外的列,gender和country做为分区列:

path└── to    └── table        ├── gender=male        │   ├── ...        │   │        │   ├── country=US        │   │   └── data.parquet        │   ├── country=CN        │   │   └── data.parquet        │   └── ...        └── gender=female            ├── ...            │            ├── country=US            │   └── data.parquet            ├── country=CN            │   └── data.parquet            └── ...

经过向SQLContext.parquetFile或者 SQLContext.load中传入path/to/table,Spark SQL会自动地从路径中提取分区信息。如今返回的DataFrame模式变成:

root|-- name: string (nullable = true)|-- age: long (nullable = true)|-- gender: string (nullable = true)|-- country: string (nullable = true)

注意到分区列的数据类型自动被推断出来。目前支持数字的数据类型和string类型。

模式合并

像ProtocolBuffer, Avro和Thrift那样,Parquet还支持模式演化。用户能够从一个简单的模式开始,而且根据须要逐渐地向模式中添加更多的列。这样,用户最终可能会有多个不一样可是具备相互兼容的模式的Parquet文件。Parquet数据源如今能够自动地发现这种状况,而且将全部这些文件的模式进行合并。

// sqlContext from the previous example is used in this example

.// This is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._// Create a simple DataFrame, stored into a partition directory

val df1 = sparkContext.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")

df1.saveAsParquetFile("data/test_table/key=1")// Create another DataFrame in a new partition directory,

// adding a new column and dropping an existing column

val df2 = sparkContext.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")

df2.saveAsParquetFile("data/test_table/key=2")// Read the partitioned table

val df3 = sqlContext.parquetFile("data/test_table")

df3.printSchema()// The final schema consists of all 3 columns in the Parquet files together

// with the partiioning column appeared in the partition directory paths.

// root

// |-- single: int (nullable = true)

// |-- double: int (nullable = true)

// |-- triple: int (nullable = true)

// |-- key : int (nullable = true)

配置

Parquet的配置可使用SQLContext的setConf来设置或者经过使用SQL运行SET key=value命令

Property Name

Default

Meaning

spark.sql.parquet.binaryAsString

false

Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.

 其余的一些产生Parquet的系统,特别是Impala和SparkSQL的老版本,当将Parquet模式写出时不会区分二进制数据和字符串。这个标志告诉Spark SQL将二进制数据解析成字符串,以提供对这些系统的兼容。

spark.sql.parquet.int96AsTimestamp

true

Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. 

其余的一些产生Parquet的系统,特别是Impala,将时间戳存储为INT96的形式。Spark也将时间戳存储为INT96,由于咱们要避免纳秒级字段的精度的损失。这个标志告诉Spark SQL将INT96数据解析为一个时间戳,以提供对这些系统的兼容。

spark.sql.parquet.cacheMetadata

true

Turns on caching of Parquet schema metadata. Can speed up querying of static data. 

打开Parquet模式的元数据的缓存。可以加快对静态数据的查询。

spark.sql.parquet.compression.codec

gzip

Sets the compression codec use when writing Parquet files. Acceptable values include: uncompressed, snappy, gzip, lzo. 

设置压缩编码解码器,当写入一个Parquet文件时。可接收的值包括:uncompressed, snappy, gzip, lzo

spark.sql.parquet.filterPushdown

false

Turn on Parquet filter pushdown optimization. This feature is turned off by default because of a known bug in Paruet 1.6.0rc3 (PARQUET-136). However, if your table doesn't contain any nullable string or binary columns, it's still safe to turn this feature on. 

打开Parquet过滤器的后进先出存储的优化。这个功能默认是被关闭的,由于一个Parquet中的一个已知的bug 1.6.0rc3 (PARQUET-136)。然而,若是你的表中不包含任何的可为空的(nullable)字符串或者二进制列,那么打开这个功能是安全的。

spark.sql.hive.convertMetastoreParquet

true

When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in support. 

当设置成false,Spark SQL会为parquet表使用Hive SerDe(Serialize/Deserilize)而不是内置的支持。

 

JSON数据集

Spark SQL能够自动推断出JSON数据集的模式,将它做为DataFrame进行加载。这个转换能够经过使用SQLContext中的下面两个方法中的任意一个来完成。

• jsonFile - 从一个JSON文件的目录中加载数据,文件中的每个行都是一个JSON对象。

• jsonRDD - 从一个已经存在的RDD中加载数据,每个RDD的元素是一个包含一个JSON对象的字符串。

注意,做为jsonFile提供deep文件不是一个典型的JSON文件。每一行必须包含一个分开的独立的有效JSON对象。所以,常规的多行JSON文件一般会失败。

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// A JSON dataset is pointed to by path.

// The path can be either a single text file or a directory storing text files.

val path = "examples/src/main/resources/people.json"

// Create a DataFrame from the file(s) pointed to by path

val people = sqlContext.jsonFile(path)// The inferred schema can be visualized using the printSchema() method.

people.printSchema()

// root

//  |-- age: integer (nullable = true)

//  |-- name: string (nullable = true)// Register this DataFrame as a table.

people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")// Alternatively, a DataFrame can be created for a JSON dataset represented by

// an RDD[String] storing one JSON object per string.

val anotherPeopleRDD = sc.parallelize(  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)

val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)

Hive表

Spark SQL还支持对存储在Apache Hive中的数据的读和写。可是,由于Hive有大量的依赖,它不包含在默认的Spark assembly中。对Hive的支持是经过在Spark的build中添加 -Phive 和 -Phive-thriftserver 标志来完成。这个命令构建了一个新的assembly jar ,它包含Hive。注意,这个HIve assembly jar还必须出如今全部的worker节点,由于为了访问存储在Hive中的数据,它们会访问Hive的序列化和反序列化库(SerDes) 。

Hive的配置是经过将你的hive-site.xml文件放到conf/下来完成。

当使用Hive时,必须构建一个HiveContext,它继承自SQLContext,对寻找MetaStore中的表和使用HiveQL编写查询提供了支持。没有部署Hive的用户也能够建立一个HiveContext。当没有经过hive-site.xml进行配置,context会在当前目录下自动建立一个metastore_db和warehouse。

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// Queries are expressed in HiveQL

sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

JDBC To Other Databases

Spark SQL还包含了一个数据源,它可使用JDBC从其余数据库中读取数据。这个功能应该优先使用JdbcRDD。这是由于结果是做为DataFrame返回的,而且在Spark SQL中能够简单的被使用或加入其余数据源。JDBC数据源也可以轻松地被Java或者Python来使用,由于它不须要用户提供一个ClassTag(注意,这不一样于Spark SQL JDBC服务,它容许其余应用使用Spark SQL运行查询)。

要开始使用时,你须要为你的特定数据块在Spark的类路径中添加JDBC驱动。例如,从Spark Shell中链接到postgres,你须要运行如下的命令:

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

经过使用数据源的API,能够将远程数据库中的表加载为DataFrame或者Spark SQL 临时表。如下的选项是被支持的:

Property Name

Meaning

url

The JDBC URL to connect to. 

要链接到的JDBC URL

dbtable

The JDBC table that should be read. Note that anything that is valid in a `FROM` clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses. 

须要读取的JDBC表。注意,SQL查询中的“From”子句中的任何部分都是可使用的。例如,你能够在括号中使用子查询,而不是一个完整的表。

driver

The class name of the JDBC driver needed to connect to this URL. This class with be loaded on the master and workers before running an JDBC commands to allow the driver to register itself with the JDBC subsystem. 

须要链接到的URL的JDBC驱动的类名。这个类会在运行一个JDBC命令以前被加载到master和workers上,容许驱动注册本身和JDBC子系统。

partitionColumn, lowerBound, upperBound, numPartitions

These options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. 

若是这些选项中的一个被指定了,那么全部的选项必须被指定。它们描述了当从多个workers上并行读取时如何进行分区。partitionColumn必须是表中的一个数字列。

 

val jdbcDF = sqlContext.load("jdbc", Map(  "url" -> "jdbc:postgresql:dbserver",  "dbtable" -> "schema.tablename"))

故障排除

* 在客户端session以及全部executors上,JDBC驱动类必须对原来的类加载器是可见的。这是由于Java的DriverManager 类进行安全检查,这致使打开一个链接时,它会忽略全部对于原始来加载器不可见的驱动。一个方便的方法是在全部的worker节点上修改compute_classpath.sh以包含驱动的JARs。

* 一些数据库,例如H2,将全部的名称转化成大写的。在Spark SQL中你须要使用大写来指定那些名称。

性能调节(Performance Tuning)

对于一些工做负载来讲,经过将数据缓存到内存中或者打开一些实验性的选项,可能会提升性能。

在内存中缓存数据

Spark SQL可使用内存中的柱状格式来对表进行缓存,经过调用sqlContext.cacheTable("tableName") 或者rdataFrame.cache()。而后Spark SQL会扫描仅仅须要的列以及自动地调节压缩,以减小内存使用和GC压力。你能够调用sqlContext.uncacheTable("tableName") 来将表从内存中移除。内存缓存的配置可使用SQLContext上的setConf或者经过使用SQL执行SET key=value命令的方式来完成。

Property Name

Default

Meaning

spark.sql.inMemoryColumnarStorage.compressed

true

When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data. 

当设置为true时,SparkSQL会根据统计出的各项数据为每个列选择一种压缩编解码器。

spark.sql.inMemoryColumnarStorage.batchSize

10000

Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data. 

为列状缓存控制批处理大小。较大的批大小能够提升内存的利用率和压缩,可是缓存数据时有OOMs的风险。

其余的配置选项

如下选项也能够用来调节执行查询时的性能。随着更多的优化被自动地执行,这些选项有可能会在未来的版本中被弃用,

Property Name

Default

Meaning

spark.sql.autoBroadcastJoinThreshold

10485760 (10 MB)

Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been run. 

spark.sql.codegen

false

When true, code will be dynamically generated at runtime for expression evaluation in a specific query. For some queries with complicated expression this option can lead to significant speed-ups. However, for simple queries this can actually slow down query execution. 

spark.sql.shuffle.partitions

200

Configures the number of partitions to use when shuffling data for joins or aggregations. 

分布式SQL引擎

Spark SQL也可使用它的JDBC/ODBC或者命令行接口做为分布式查询引擎。以这种模式,终端用户或者应用能够直接与Spark SQL交互来运行SQL查询,而不须要写任何代码。

运行Thrift JDBC/ODBC 服务

这里实现的Thrift JDBC/ODBC至关于Hive0.13中的HiveServer2.你能够用Spark或者Hive0.13自带的beeline脚原本测试JDBC服务。

在Spark目录中运行下面的命令来启动JDBC/ODBC server:

./sbin/start-thriftserver.sh

该脚本接受全部的bin/spark-submit命令行选项,外加一个--hiveconf选项来指定Hive属性。可能执行./sbin/start-thriftserver.sh --help来查看完整的可用选项列表。默认状况下,该服务会监听localhost:10000。你能够经过任一环境变量覆盖这个bahaviour,也就是:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>

export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>./sbin/start-thriftserver.sh \  --master <master-uri> \  ...

或者系统属性

./sbin/start-thriftserver.sh \  --hiveconf hive.server2.thrift.port=<listening-port> \  --hiveconf hive.server2.thrift.bind.host=<listening-host> \  --master <master-uri>  ...

如今你可使用beeline来测试Thrift JDBC/ODBC服务:

./bin/beeline

使用beeline来链接JDBC/ODBC:

beeline> !connect jdbc:hive2://localhost:10000

Beeline会向你询问用户名和密码。在非安全模式下,仅仅输入你机子的用户名以及一个空白的密码。对于安全模式,请按照beeline文档给出的指示。

经过将hive-site.xml文件放到conf/下来完成Hive的配置。

你也可使用Hive自带的beeline脚本。

Thrift JDBC服务还支持经过HTTP传输发送Thrift RPC消息。使用下列设置做为系统属性或者经过conf/中hive-site.xml文件来启用HTTP模式:

hive.server2.transport.mode - Set this to value: httphive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001hive.server2.http.endpoint - HTTP endpoint; default is cliservice

为了测试,使用beeline以http模式链接JDBC/ODBC服务:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

运行Spark SQL CLI

Spark SQL CLI是一种方便的工具用来以本地模式运行Hive metastore服务,而且从命令行输入执行查询。注意,Spark SQL CLI不能和Thrift JDBC服务进行会话。

在Spark目录下,执行如下命令来启动Spark SQL CLI:

./bin/spark-sql

Hive的配置已经完成,经过将 hive-site.xml 文件放入conf/中。你能够运行 ./bin/spark-sql --help来查看完整的可用选项的列表。

迁移指南

从Spark SQL1.0-1.2升级到1.3Upgrading from Spark SQL 1.0-1.2 to 1.3。

在Spark 1.3中,咱们从Spark SQL中移除了“Alpha”标签,做为其中的一部分,对可用的APIs进行了清理。从Spark 1.3之后,Spark SQL会与1.X系列中的其余版本提供二进制兼容。这种兼容性保证不包括的APIs明确地标记为不稳定。(也就是 DeveloperAPI 或者rExperimental)

重命名SchemaRDD为DataFrame

当升级到Spark SQL 1.3后,用户能够发现最大的改变是SchemaRDD重命名为DataFrame。这主要是由于DataFrame再也不直接继承自RDD。而是本身实现了RDDs提供的大多数功能。DataFrames仍旧能够经过调用.rdd方法来转化成RDDs。

Scala中有一种类型别名,从 SchemaRDD到DataFrame为一些用例提供了源的兼容性。但仍是建议用户更新它们的代码来使用DataFrame。Java和Python 用户须要更新它们的代码。

Java和Scala APIs的统一

Spark1.3以前,有单独的Java兼容性类(JavaSQLContext 和 JavaSchemaRDD)映射成Scala API。在Spark1.3中,Java API和Scala API进行了统一。任意语言的用户应该使用SQLContext和DataFrame。一般,这些类会尝试使用两种语言中均可用的类型(即,Array而不是语言特定的集合)。在有些状况下,若是没有相同的类型存在,则会使用函数重载来替代。

此外,Java指定的类型API被移除了。Scala和Java的用户应该使用inorg.apache.spark.sql.types中的类来以编程方式描述模式。

隐式转换的隔离以及dsl包的删除(只有Scala)

在Spark 1.3以前许多代码的例子以import sqlContext._开始,它会将sqlContext中全部的函数引入到scope中。在Spark 1.3中,咱们将在SQLContext内部的RDDs转换成DataFrames转成成对象进行了隐式转化的隔离。用户如今须要写 import sqlContext.implicits._。

此外,隐式转换如今只有组成Rroducts的RDDs参数带有一个toDF方法,而不是自动地应用。

当使用DSL(如今替代为 DataFrame API)内部的方法时,用户以前会import org.apache.spark.sql.catalyst.dsl。如今使用公共的dataframe方法API应该用import org.apache.spark.sql.functions._。

为DataType删除在org.apache.spark.sql中的类型别名(只有Scala)

Spark 1.3为DataType删除了出如今根本的sql包中的类型别名。如今,用户应该引入org.apache.spark.sql.types中的类。

UDF注册移到sqlContext.udf 中(Java 和 Scala)

用于注册UDFs的函数,不是用于DataFrame DSL就是SQL,已经被移动了SQLContext中的udf对象中。

sqlCtx.udf.register("strLen", (s: String) => s.length())

Python的UDF注册没有改变。

Python中的DataTypes再也不是单例了(Python DataTypes No Longer Singletons)。

当使用Python中的DataTypes,你须要建立它们(i.e. StringType()),而不是引用一个单例。

与Apache Hive的兼容性

Spark SQL被设计出来用于兼容Hive Metastore, SerDes 以及 UDFs。目前的Spark SQL是基于Hive 0.12.0和0.13.1。

部署在现有的Hive仓库中(Deploying in Existing Hive Warehouses)

The Spark SQL Thrigt JDBC服务被设计出来“当即可用” 的兼容现有的Hive安装。你不须要修改已经存在的Hive Metastore 或者更改数据位置或者表分区。

支持的Hive特性

Spark SQL 支持大量的Hive特性,例如:

· Hive query statements, including:

o SELECT

o GROUP BY

o ORDER BY

o CLUSTER BY

o SORT BY

· All Hive operators, including:

o Relational operators (=, ⇔, ==, <>, <, >, >=, <=, etc)

o Arithmetic operators (+, -, *, /, %, etc)

o Logical operators (AND, &&, OR, ||, etc)

o Complex type constructors

o Mathematical functions (sign, ln, cos, etc)

o String functions (instr, length, printf, etc)

· User defined functions (UDF)

· User defined aggregation functions (UDAF)

· User defined serialization formats (SerDes)

· Joins

o JOIN

o {LEFT|RIGHT|FULL} OUTER JOIN

o LEFT SEMI JOIN

o CROSS JOIN

· Unions

· Sub-queries

o SELECT col FROM ( SELECT a + b AS col from t1) t2

· Sampling

· Explain

· Partitioned tables

· View

· All Hive DDL Functions, including:

o CREATE TABLE

o CREATE TABLE AS SELECT

o ALTER TABLE

· Most Hive Data types, including:

o TINYINT

o SMALLINT

o INT

o BIGINT

o BOOLEAN

o FLOAT

o DOUBLE

o STRING

o BINARY

o TIMESTAMP

o DATE

o ARRAY<>

o MAP<>

o STRUCT<>

不支持的Hive功能

下面是不支持的Hive特性的列表。大多数特性不多会在Hive部署中用到。

Major Hive Features

· Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL doesn’t support buckets yet.

· 带有buckets的Table:bucket是Hive表分区中的哈希分区。Spark SQL不支持buckets。

Esoteric Hive Features 

* UNION type * Unique join * Column statistics collecting: Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore.

Hive Input/Output Formats

· File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.

· Hadoop archive

· CLI文件格式:对于结果,显示回CLI,Spark SQL 只支持TextOutputFormat

· Hadoop存档

Hive Optimizations

少许的Hive优化没有包含在Spark中。因为Spark SQL的内存计算模型它们中的有一些(例如索引)是次要的。其余的一些会来的Spark SQL版本中加入。

· Block level bitmap indexes and virtual columns 

· Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.

· Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still launches tasks to compute the result.

· Skew data flag: Spark SQL does not follow the skew data flags in Hive.

· STREAMTABLE hint in join: Spark SQL does not follow the STREAMTABLE hint.

· Merge multiple small files for query results: if the result output contains multiple small files, Hive can optionally merge the small files into fewer large files to avoid overflowing the HDFS metadata. Spark SQL does not support that.

· 块级别的位图索引和虚拟列(用来建立索引)

· 自动为joins和groupbys决定reducers的数量:目前在Spark SQL中,你须要使用“SET spark.sql.shuffle.partitions=[num_tasks];”来控制并行的post-shuffle的度。

· 仅元数据查询:对于仅使用元数据来回答的查询,Spark SQL仍是启动任务来计算结果。

· 偏斜数据标志:Spark SQL不遵循Hive中的偏斜数据标志。

· join中的STREAMTABLE hint:Spark SQL不遵循STREAMTABLE hint。

· 为查询结果合并多个小文件:若是结果输出中包含多个小文件,Hive能够选择性地将多个小文件合并成更少的更大的文件,避免HDFS元数据的溢出。Spark SQL不支持这些。

数据类型

Spark SQL 和 DataFrames支持如下的数据类型:

· Numeric types

o ByteType: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.

o ShortType: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.

o IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.

o LongType: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.

o FloatType: Represents 4-byte single-precision floating point numbers.

o DoubleType: Represents 8-byte double-precision floating point numbers.

o DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally by java.math.BigDecimal. A BigDecimal consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.

· String type

o StringType: Represents character string values.

· Binary type

o BinaryType: Represents byte sequence values.

· Boolean type

o BooleanType: Represents boolean values.

· Datetime type

o TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.

o DateType: Represents values comprising values of fields year, month, day.

· Complex types

o ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type of elementType.containsNull is used to indicate if elements in a ArrayType value can have null values.

o MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described by keyType and the data type of values are described by valueType. For a MapType value, keys are not allowed to have nullvalues. valueContainsNull is used to indicate if values of a MapType value can have null values.

o StructType(fields): Represents values with the structure described by a sequence of StructFields (fields).

§ StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataType. nullable is used to indicate if values of this fields can have null values.

Spark SQL中全部的数据类型的都在包 package org.apache.spark.sql.types中,你能够这样访问它们:

import  org.apache.spark.sql.types._

 

Data type

Value type in Scala

API to access or create a data type

ByteType

Byte

ByteType

ShortType

Short

ShortType

IntegerType

Int

IntegerType

LongType

Long

LongType

FloatType

Float

FloatType

DoubleType

Double

DoubleType

DecimalType

java.math.BigDecimal

DecimalType

StringType

String

StringType

BinaryType

Array[Byte]

BinaryType

BooleanType

Boolean

BooleanType

TimestampType

java.sql.Timestamp

TimestampType

DateType

java.sql.Date

DateType

ArrayType

scala.collection.Seq

ArrayType(elementType, [containsNull])
Note: The default value of containsNull is true.

MapType

scala.collection.Map

MapType(keyType, valueType, [valueContainsNull])
Note: The default value of valueContainsNull is true.

StructType

org.apache.spark.sql.Row

StructType(fields)
Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.

StructField

The value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType)

StructField(name, dataType, nullable)

相关文章
相关标签/搜索