Spark SQL和DataFrame指南[中英对照]

翻译自http://spark.apache.org/docs/1.3.0/sql-programming-guide.htmlhtml

概述(Overview)

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.java

Spark SQL是Spark的一个模块,用于结构化数据处理。它提供了一个编程的抽象被称为DataFrames,也能够做为分布式SQL查询引擎。node

DataFrames

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.sql

DataFrame是一种以命名列方式组织的分布式数据集。它概念上至关于关系型数据库中的表,或者R/Python中的数据帧,可是具备更丰富的优化。有不少方式能够构造出一个DataFrame,例如:结构化数据文件,Hive中的tables,外部数据库或者存在的RDDs.shell

The DataFrame API is available in Scala, Java, and Python.数据库

All of the examples on this page use sample data included in the Spark distribution and can be run in the spark-shell or the pyspark shell.express

DataFrame的API适用于Scala、Java和Python.apache

该页上全部的例子使用Spark分布式中的样本数据,能够运行在spark-shell或者pyspark shell中。编程

入口点: SQLContext

The entry point into all functionality in Spark SQL is the SQLContext class, or one of its descendants. To create a basic SQLContext, all you need is a SparkContext.json

Spark SQL中全部功能的入口点是SQLContext类,或者它子类中的一个。为了建立一个基本的SQLContext,你所须要的是一个SparkContext。

val sc: SparkContext // An existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._

In addition to the basic SQLContext, you can also create a HiveContext, which provides a superset of the functionality provided by the basic SQLContext. Additional features include the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the ability to read data from Hive tables. To use a HiveContext, you do not need to have an existing Hive setup, and all of the data sources available to a SQLContext are still available. HiveContext is only packaged separately to avoid including all of Hive’s dependencies in the default Spark build. If these dependencies are not a problem for your application then using HiveContext is recommended for the 1.3 release of Spark. Future releases will focus on bringing SQLContext up to feature parity with a HiveContext.

除了基本的SQLContext,你还能够建立一个HiveContext,它提供了基本的SQLContext的所提供的功能的超集。这些功能中包括附加的特性,能够编写查询,使用更彻底的HiveQL解析器,访问Hive UDFs,可以从Hive表中读取数据。想要使用HiveContext,你不须要有一个存在的Hive步骤,而且全部SQLContext可用的数据源仍旧可用。HiveContext只是单独打包,以免包含默认Spark build中的全部Hive依赖。若是这些依赖对于你的应用不是一个问题,那么推荐使用Spark 1.3版本的HiveContext。

The specific variant of SQL that is used to parse queries can also be selected using the spark.sql.dialect option. This parameter can be changed using either the setConf method on a SQLContext or by using a SET key=value command in SQL. For a SQLContext, the only dialect available is “sql” which uses a simple SQL parser provided by Spark SQL. In a HiveContext, the default is “hiveql”, though “sql” is also available. Since the HiveQL parser is much more complete, this is recommended for most use cases.

使用spark.sql.dialect选项,能够选择SQL的具体变种,用它来解析查询。这个参数可使用SQLContext上的setConf方法或者在SQL中使用一组key=value命令。对于SQLContext,惟一能够的dialect是“sql”,它可使用SparkSQL提供的一个简单的SQL解析器。在HiveContext中,默认的是“hiveql”,尽管“sql”也是可用的。由于HiveOL解析器更加完整,在大多数状况下, 推荐使用这个。

建立DataFrames

Creating DataFrames

With a SQLContext, applications can create DataFrames from an existing RDD, from a Hive table, or from data sources.

使用SQLContext,应用能够从一个已经存在的RDD、Hive表或者数据源中建立DataFrames。

As an example, the following creates a DataFrame based on the content of a JSON file:

例如,如下根据一个JSON文件建立出一个DataFrame:

val sc: SparkContext // An existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// Create the DataFrame

val df = sqlContext.jsonFile("examples/src/main/resources/people.json")// Show the content of the DataFrame

df.show()

// age  name

// null Michael

// 30   Andy

// 19   Justin

 // Print the schema in a tree format

df.printSchema()

// root// |-- age: long (nullable = true)

// |-- name: string (nullable = true)

 // Select only the "name" column

df.select("name").show()

// name

// Michael

// Andy

// Justin

 // Select everybody, but increment the age by 1

// df.select("name", df("age") + 1).show() //官方文档这样的,可是测试时发现这样编译不经过。下面的形式能够

df.select(df("name"),df("age")+1).show()

// name    (age + 1)

// Michael null

// Andy    31

// Justin  20

 // Select people older than 21

df.filter(df("age") > 21).show()

// age name

// 30  Andy

 // Count people by age

df.groupBy("age").count().show()

// age  count

// null 1

// 19   1

// 30   1

以编程方式运行SQL查询

(Running SQL Queries Programmatically)

The sql function on a SQLContext enables applications to run SQL queries programmatically and returns the result as a DataFrame.

SQLContext中的sql函数使应用能够以编程方式运行SQL查询,而且将结果以DataFrame形式返回。

val sqlContext = ...  // An existing SQLContext

val df = sqlContext.sql("SELECT * FROM table")

RRDs之间的互操做(Interoperating with RDDs)

Spark SQL supports two different methods for converting existing RDDs into DataFrames. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.

Spark SQL支持两种不一样的方法,用于将存在的RDDs转换成DataFrames。第一种方法使用反射来推断包含特定类型的对象的RDD的模式。在写Spark应用时,当你已知schema的状况下,这种基于反射的方式使得代码更加简介,而且效果更好。

The second method for creating DataFrames is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct DataFrames when the columns and their types are not known until runtime.

建立DataFrames的第二种方法是经过编程接口,它容许你构建一个模式,而后将其应用到现有的RDD上。这种方式更加的繁琐,它容许你构建一个DataFrame当列以及类型未知,直到运行时才能知道时。

使用反射推断模式

Inferring the Schema Using Reflection

The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Sequences or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.

Spark SQL中的Scala接口支持自动地将包含case类的RDD转换成DataFrame。case类定义了表的模式,case类的参数的名称使用反射来读取,而后称为列的名称。case类还能够嵌套或者包含复杂的类型,例如Sequences或者Arrays。这个RDD能够隐式地转换为DataFrame,而后注册成表,表能够在后续SQL语句中使用

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._// Define the schema using a case class.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,

// you can use custom classes that implement the Product interface.

case class Person(name: String, age: Int)// Create an RDD of Person objects and register it as a table.

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")// The results of SQL queries are DataFrames and support all the normal RDD operations.

// The columns of a row in the result can be accessed by ordinal.

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

一、使用case类定义schema

二、建立一个SQLContext

三、导入sqlContext.implicits._,用于隐式地将RDD转换成DataFrame

四、建立一个DataFrame,并将它注册成表。

五、使用sqlContext提供的sql方法,就可使用SQL语句来查询了。查询后返回的结果是DataFrame,它支持全部的RDD操做

以编程方式指定模式

Programmatically Specifying the Schema

When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.

当case类不能提早定义时(例如,记录的结构被编码在一个String中,或者不一样的用户会将文本数据集和字段进行不一样的解析和投影),DataFrame可使用如下三步,以编程的方式实现:

1.Create an RDD of Rows from the original RDD;

2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.

3.Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.

1.从原有的RDD中建立行的RDD。

2.建立一个由StructType表示的模式,StructType符合由步骤1建立的RDD的行的结构。

3.经过SQLContext提供的createDataFrame方法,将模式应用于行的RDD。

For example:

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// Create an RDD

val people = sc.textFile("examples/src/main/resources/people.txt")// The schema is encoded in a string

val schemaString = "name age"// Import Spark SQL data types and Row.

import org.apache.spark.sql._// Generate the schema based on the string of schema

val schema =  StructType(    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))// Convert records of the RDD (people) to Rows.

val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))// Apply the schema to the RDD.

val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)// Register the DataFrames as a table.

peopleDataFrame.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.

val results = sqlContext.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations.

// The columns of a row in the result can be accessed by ordinal.

results.map(t => "Name: " + t(0)).collect().foreach(println)

 

-----------------------------------------------------------

    //my code

    import org.apache.spark._

    import org.apache.spark.sql._

    import org.apache.spark.sql.types.{StructType, StructField, StringType}

    val conf = new SparkConf().setMaster("local").setAppName("XX")

    val sc = new SparkContext(conf)

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    val schemaString = "fullName age"

    val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,true)))

    val rowRDD = sc.textFile("data/people.txt").map(_.split(" ")).map(p=> Row(p(0),p(1).trim))

    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

    peopleDataFrame.registerTempTable("people")

    val young = sqlContext.sql("select * from people where age <25")

    young.show()

数据源(Data Sources)

Spark SQL supports operating on a variety of data sources through the DataFrame interface. A DataFrame can be operated on as normal RDDs and can also be registered as a temporary table. Registering a DataFrame as a table allows you to run SQL queries over its data. This section describes the general methods for loading and saving data using the Spark Data Sources and then goes into specific options that are available for the built-in data sources.

Spark SQL支持经过DataFrame接口在多种数据源上进行操做。一个DataFrame能够如同一个标准的RDDs那样进行操做,还能够注册成临时的表。将一个DataFrame注册成临时表容许你在它的数据上运行SQL查询。本节介绍使用Spark数据源装载和保存数据的经常使用方法,使用Spark数据源保存数据。而后进入可用于内置数据源的特定选项。

通用的加载/保存功能

(Generic Load/Save Functions)

In the simplest form, the default data source (parquet unless otherwise configured by spark.sql.sources.default) will be used for all operations.

在最简单的形式中,默认的数据源(parquet除非经过spark.sql.sources.default另外进行配置)将被用于全部的操做。

val df = sqlContext.load("people.parquet")
df.select("name", "age").save("namesAndAges.parquet")

手动指定选项

(Manually Specifying Options)

You can also manually specify the data source that will be used along with any extra options that you would like to pass to the data source. Data sources are specified by their fully qualified name (i.e., org.apache.spark.sql.parquet), but for built-in sources you can also use the shorted name (json, parquet, jdbc). DataFrames of any type can be converted into other types using this syntax.

你还能够手动指定数据源,这些数据源将与任何额外的选项一同使用,你但愿将这些选项传入到数据源中。数据源是经过它们的全名来指定的(如org.apache.spark.sql.parquet),可是对于内置的数据源,你也可使用简短的名称(json, parquet, jdbc)。任何类型的DataFrames使用这些语法能够转化成其余的数据源:

val df = sqlContext.load("people.json", "json")

df.select("name", "age").save("namesAndAges.parquet", "parquet")

保存模式

(Save Modes)

Save operations can optionally take a SaveMode, that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not atomic. Thus, it is not safe to have multiple writers attempting to write to the same location. Additionally, when performing a Overwrite, the data will be deleted before writing out the new data.

Save操做能够可选择性地接收一个SaveModel,若是数据已经存在了,指定如何处理已经存在的数据。意识到这些保存模式没有利用任何锁,也不是原子的,这很重要。所以,若是有多个写入者试图往同一个地方写入,这是不安全的。此外,当执行一个Overwrite,在写入新的数据以前会将原来的数据进行删除。

 

Scala/Java

Python

Meaning

SaveMode.ErrorIfExists (default)

"error" (default)

When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. 

当往一个数据源中保存一个DataFrame,若是数据已经存在,会抛出一个异常。

SaveMode.Append

"append"

When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data. 

当往一个数据源中保存一个DataFrame,若是data/table已经存在,DataFrame的内容会追加到已经存在的数据后面。

SaveMode.Overwrite

"overwrite"

Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame. 

Overwrite模式意味着当向数据源中保存一个DataFrame时,若是data/table已经存在了,已经存在的数据会被DataFrame中内容覆盖掉。

SaveMode.Ignore

"ignore"

Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL. 

Ignore模式意味着当向数据源中保存一个DataFrame时,若是数据已经存在,save操做不会将DataFrame的内容进行保存,也不会修改已经存在的数据。这与SQL中的`CREATE TABLE IF NOT EXISTS`类似。

 

保存为持久化表

(Saving to Persistent Tables)

When working with a HiveContext, DataFrames can also be saved as persistent tables using the saveAsTable command. Unlike the registerTempTable command, saveAsTable will materialize the contents of the dataframe and create a pointer to the data in the HiveMetastore. Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the table method on a SQLContext with the name of the table.

当与HiveContext一块儿工做时,DataFrames也可使用saveAsTable命令保存为持久化的表。不像registerTempTable命令,saveAsTable会将DataFrame的内容进行物化,而且在HiveMetastore中建立一个指向数据的指针。持久化表会仍旧存在即便你的Spark程序从新启动。只要你保持链接到相同的元存储( metastore)。一个持久化表的DataFrame能够经过调用SQLContext上的带有表的名称的table方法来建立。

By default saveAsTable will create a “managed table”, meaning that the location of the data will be controlled by the metastore. Managed tables will also have their data deleted automatically when a table is dropped.

默认状况下,saveAsTable会建立一个“管理表(managed table)”,意味着元存储控制数据的位置。当一个表被删除后,managed table会自动地删除它们的数据。

Parquet Files

Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data.

Parquet是一种柱状的格式,被许多其余数据处理系统所支持。Spark SQL支持度对Parquet文件的读和写,自动保存原有数据的模式。

以编程方式加载数据

Loading Data Programmatically

Using the data from the above example:

使用上面例子中的数据:

// sqlContext from the previous example is used in this example.

// This is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._val people: RDD[Person] = ...

 // An RDD of case class objects, from the previous example.// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.

people.saveAsParquetFile("people.parquet")// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.

// The result of loading a Parquet file is also a DataFrame.

val parquetFile = sqlContext.parquetFile("people.parquet")//Parquet files can also be registered as tables and then used in SQL statements.

parquetFile.registerTempTable("parquetFile")

val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

分区发现

Partition discovery

Table partitioning is a common optimization approach used in systems like Hive. In a partitioned table, data are usually stored in different directories, with partitioning column values encoded in the path of each partition directory. The Parquet data source is now able to discover and infer partitioning information automatically. For exmaple, we can store all our previously used population data into a partitioned table using the following directory structure, with two extra columns, gender and country as partitioning columns:

在系统中,如Hive,使用表分区是一个常见的优化途径。在一个分区表中,数据常常存储在不一样的目录中,对每个分区目录中的路径中,对分区列的值进行编码。Parquet数据源如今能够自动地发现而且推断出分区的信息。例如,咱们能够将以前使用的人口数据存储成下列目录结构的分区表,两个额外的列,gender和country做为分区列:

path└── to    └── table        ├── gender=male        │   ├── ...        │   │        │   ├── country=US        │   │   └── data.parquet        │   ├── country=CN        │   │   └── data.parquet        │   └── ...        └── gender=female            ├── ...            │            ├── country=US            │   └── data.parquet            ├── country=CN            │   └── data.parquet            └── ...

By passing path/to/table to either SQLContext.parquetFile or SQLContext.load, Spark SQL will automatically extract the partitioning information from the paths. Now the schema of the returned DataFrame becomes:

经过向SQLContext.parquetFile或者 SQLContext.load中传入path/to/table,Spark SQL会自动地从路径中提取分区信息。如今返回的DataFrame模式变成:

root|-- name: string (nullable = true)|-- age: long (nullable = true)|-- gender: string (nullable = true)|-- country: string (nullable = true)

Notice that the data types of the partitioning columns are automatically inferred. Currently, numeric data types and string type are supported.

注意到分区列的数据类型自动被推断出来。目前支持数字的数据类型和string类型。

模式合并

(Schema merging)

Like ProtocolBuffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas. The Parquet data source is now able to automatically detect this case and merge schemas of all these files.

像ProtocolBuffer, Avro和Thrift那样,Parquet还支持模式演化。用户能够从一个简单的模式开始,而且根据须要逐渐地向模式中添加更多的列。这样,用户最终可能会有多个不一样可是具备相互兼容的模式的Parquet文件。Parquet数据源如今能够自动地发现这种状况,而且将全部这些文件的模式进行合并。

// sqlContext from the previous example is used in this example

.// This is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._// Create a simple DataFrame, stored into a partition directory

val df1 = sparkContext.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")

df1.saveAsParquetFile("data/test_table/key=1")// Create another DataFrame in a new partition directory,

// adding a new column and dropping an existing column

val df2 = sparkContext.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")

df2.saveAsParquetFile("data/test_table/key=2")// Read the partitioned table

val df3 = sqlContext.parquetFile("data/test_table")

df3.printSchema()// The final schema consists of all 3 columns in the Parquet files together

// with the partiioning column appeared in the partition directory paths.

// root

// |-- single: int (nullable = true)

// |-- double: int (nullable = true)

// |-- triple: int (nullable = true)

// |-- key : int (nullable = true)

配置

Configuration

Configuration of Parquet can be done using the setConf method on SQLContext or by running SET key=value commands using SQL.

Parquet的配置可使用SQLContext的setConf来设置或者经过使用SQL运行SET key=value命令

Property Name

Default

Meaning

spark.sql.parquet.binaryAsString

false

Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.

 其余的一些产生Parquet的系统,特别是Impala和SparkSQL的老版本,当将Parquet模式写出时不会区分二进制数据和字符串。这个标志告诉Spark SQL将二进制数据解析成字符串,以提供对这些系统的兼容。

spark.sql.parquet.int96AsTimestamp

true

Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. 

其余的一些产生Parquet的系统,特别是Impala,将时间戳存储为INT96的形式。Spark也将时间戳存储为INT96,由于咱们要避免纳秒级字段的精度的损失。这个标志告诉Spark SQL将INT96数据解析为一个时间戳,以提供对这些系统的兼容。

spark.sql.parquet.cacheMetadata

true

Turns on caching of Parquet schema metadata. Can speed up querying of static data. 

打开Parquet模式的元数据的缓存。可以加快对静态数据的查询。

spark.sql.parquet.compression.codec

gzip

Sets the compression codec use when writing Parquet files. Acceptable values include: uncompressed, snappy, gzip, lzo. 

设置压缩编码解码器,当写入一个Parquet文件时。可接收的值包括:uncompressed, snappy, gzip, lzo

spark.sql.parquet.filterPushdown

false

Turn on Parquet filter pushdown optimization. This feature is turned off by default because of a known bug in Paruet 1.6.0rc3 (PARQUET-136). However, if your table doesn't contain any nullable string or binary columns, it's still safe to turn this feature on. 

打开Parquet过滤器的后进先出存储的优化。这个功能默认是被关闭的,由于一个Parquet中的一个已知的bug 1.6.0rc3 (PARQUET-136)。然而,若是你的表中不包含任何的可为空的(nullable)字符串或者二进制列,那么打开这个功能是安全的。

spark.sql.hive.convertMetastoreParquet

true

When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in support. 

当设置成false,Spark SQL会为parquet表使用Hive SerDe(Serialize/Deserilize)而不是内置的支持。

 

JSON数据集

(JSON Datasets)

Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. This conversion can be done using one of two methods in a SQLContext:

• jsonFile - loads data from a directory of JSON files where each line of the files is a JSON object.

• jsonRDD - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.

Spark SQL能够自动推断出JSON数据集的模式,将它做为DataFrame进行加载。这个转换能够经过使用SQLContext中的下面两个方法中的任意一个来完成。

• jsonFile - 从一个JSON文件的目录中加载数据,文件中的每个行都是一个JSON对象。

• jsonRDD - 从一个已经存在的RDD中加载数据,每个RDD的元素是一个包含一个JSON对象的字符串。

Note that the file that is offered as jsonFile is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.

注意,做为jsonFile提供deep文件不是一个典型的JSON文件。每一行必须包含一个分开的独立的有效JSON对象。所以,常规的多行JSON文件一般会失败。

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// A JSON dataset is pointed to by path.

// The path can be either a single text file or a directory storing text files.

val path = "examples/src/main/resources/people.json"

// Create a DataFrame from the file(s) pointed to by path

val people = sqlContext.jsonFile(path)// The inferred schema can be visualized using the printSchema() method.

people.printSchema()

// root

//  |-- age: integer (nullable = true)

//  |-- name: string (nullable = true)// Register this DataFrame as a table.

people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")// Alternatively, a DataFrame can be created for a JSON dataset represented by

// an RDD[String] storing one JSON object per string.

val anotherPeopleRDD = sc.parallelize(  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)

val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)

Hive表

(Hive Tables)

Spark SQL also supports reading and writing data stored in Apache Hive. However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. Hive support is enabled by adding the -Phive and -Phive-thriftserver flags to Spark’s build. This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive.

Spark SQL还支持对存储在Apache Hive中的数据的读和写。可是,由于Hive有大量的依赖,它不包含在默认的Spark assembly中。对Hive的支持是经过在Spark的build中添加 -Phive 和 -Phive-thriftserver 标志来完成。这个命令构建了一个新的assembly jar ,它包含Hive。注意,这个HIve assembly jar还必须出如今全部的worker节点,由于为了访问存储在Hive中的数据,它们会访问Hive的序列化和反序列化库(SerDes) 。

Configuration of Hive is done by placing your hive-site.xml file in conf/.

Hive的配置是经过将你的hive-site.xml文件放到conf/下来完成。

When working with Hive one must construct a HiveContext, which inherits from SQLContext, and adds support for finding tables in the MetaStore and writing queries using HiveQL. Users who do not have an existing Hive deployment can still create a HiveContext. When not configured by the hive-site.xml, the context automatically creates metastore_db and warehouse in the current directory.

当使用Hive时,必须构建一个HiveContext,它继承自SQLContext,对寻找MetaStore中的表和使用HiveQL编写查询提供了支持。没有部署Hive的用户也能够建立一个HiveContext。当没有经过hive-site.xml进行配置,context会在当前目录下自动建立一个metastore_db和warehouse。

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// Queries are expressed in HiveQL

sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

JDBC To Other Databases

Spark SQL also includes a data source that can read data from other databases using JDBC. This functionality should be preferred over using JdbcRDD. This is because the results are returned as a DataFrame and they can easily be processed in Spark SQL or joined with other data sources. The JDBC data source is also easier to use from Java or Python as it does not require the user to provide a ClassTag. (Note that this is different than the Spark SQL JDBC server, which allows other applications to run queries using Spark SQL).

Spark SQL还包含了一个数据源,它可使用JDBC从其余数据库中读取数据。这个功能应该优先使用JdbcRDD。这是由于结果是做为DataFrame返回的,而且在Spark SQL中能够简单的被使用或加入其余数据源。JDBC数据源也可以轻松地被Java或者Python来使用,由于它不须要用户提供一个ClassTag(注意,这不一样于Spark SQL JDBC服务,它容许其余应用使用Spark SQL运行查询)。

To get started you will need to include the JDBC driver for you particular database on the spark classpath. For example, to connect to postgres from the Spark Shell you would run the following command:

要开始使用时,你须要为你的特定数据块在Spark的类路径中添加JDBC驱动。例如,从Spark Shell中链接到postgres,你须要运行如下的命令:

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

Tables from the remote database can be loaded as a DataFrame or Spark SQL Temporary table using the Data Sources API. The following options are supported:

经过使用数据源的API,能够将远程数据库中的表加载为DataFrame或者Spark SQL 临时表。如下的选项是被支持的:

Property Name

Meaning

url

The JDBC URL to connect to. 

要链接到的JDBC URL

dbtable

The JDBC table that should be read. Note that anything that is valid in a `FROM` clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses. 

须要读取的JDBC表。注意,SQL查询中的“From”子句中的任何部分都是可使用的。例如,你能够在括号中使用子查询,而不是一个完整的表。

driver

The class name of the JDBC driver needed to connect to this URL. This class with be loaded on the master and workers before running an JDBC commands to allow the driver to register itself with the JDBC subsystem. 

须要链接到的URL的JDBC驱动的类名。这个类会在运行一个JDBC命令以前被加载到master和workers上,容许驱动注册本身和JDBC子系统。

partitionColumn, lowerBound, upperBound, numPartitions

These options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. 

若是这些选项中的一个被指定了,那么全部的选项必须被指定。它们描述了当从多个workers上并行读取时如何进行分区。partitionColumn必须是表中的一个数字列。

 

val jdbcDF = sqlContext.load("jdbc", Map(  "url" -> "jdbc:postgresql:dbserver",  "dbtable" -> "schema.tablename"))

故障排除

(Troubleshooting)

*  The JDBC driver class must be visible to the primordial class loader on the client session and on all executors. This is because Java’s DriverManager class does a security check that results in it ignoring all drivers not visible to the primordial class loader when one goes to open a connection. One convenient way to do this is to modify compute_classpath.sh on all worker nodes to include your driver JARs.

* 在客户端session以及全部executors上,JDBC驱动类必须对原来的类加载器是可见的。这是由于Java的DriverManager 类进行安全检查,这致使打开一个链接时,它会忽略全部对于原始来加载器不可见的驱动。一个方便的方法是在全部的worker节点上修改compute_classpath.sh以包含驱动的JARs。

*  Some databases, such as H2, convert all names to upper case. You’ll need to use upper case to refer to those names in Spark SQL.

* 一些数据库,例如H2,将全部的名称转化成大写的。在Spark SQL中你须要使用大写来指定那些名称。

性能调节(Performance Tuning)

For some workloads it is possible to improve performance by either caching data in memory, or by turning on some experimental options.

对于一些工做负载来讲,经过将数据缓存到内存中或者打开一些实验性的选项,可能会提升性能。

在内存中缓存数据

(Caching Data In Memory)

Spark SQL can cache tables using an in-memory columnar format by calling sqlContext.cacheTable("tableName") or dataFrame.cache(). Then Spark SQL will scan only required columns and will automatically tune compression to minimize memory usage and GC pressure. You can call sqlContext.uncacheTable("tableName") to remove the table from memory.

Configuration of in-memory caching can be done using the setConf method on SQLContext or by running SET key=value commands using SQL.

Spark SQL可使用内存中的柱状格式来对表进行缓存,经过调用sqlContext.cacheTable("tableName") 或者r dataFrame.cache()。而后Spark SQL会扫描仅仅须要的列以及自动地调节压缩,以减小内存使用和GC压力。你能够调用sqlContext.uncacheTable("tableName") 来将表从内存中移除。内存缓存的配置可使用SQLContext上的setConf或者经过使用SQL执行SET key=value命令的方式来完成。

Property Name

Default

Meaning

spark.sql.inMemoryColumnarStorage.compressed

true

When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data. 

当设置为true时,SparkSQL会根据统计出的各项数据为每个列选择一种压缩编解码器。

spark.sql.inMemoryColumnarStorage.batchSize

10000

Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data. 

为列状缓存控制批处理大小。较大的批大小能够提升内存的利用率和压缩,可是缓存数据时有OOMs的风险。

其余的配置选项

Other Configuration Options

The following options can also be used to tune the performance of query execution. It is possible that these options will be deprecated in future release as more optimizations are performed automatically.

如下选项也能够用来调节执行查询时的性能。随着更多的优化被自动地执行,这些选项有可能会在未来的版本中被弃用,

Property Name

Default

Meaning

spark.sql.autoBroadcastJoinThreshold

10485760 (10 MB)

Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been run. 

spark.sql.codegen

false

When true, code will be dynamically generated at runtime for expression evaluation in a specific query. For some queries with complicated expression this option can lead to significant speed-ups. However, for simple queries this can actually slow down query execution. 

spark.sql.shuffle.partitions

200

Configures the number of partitions to use when shuffling data for joins or aggregations. 

分布式SQL引擎

(Distributed SQL Engine)

Spark SQL can also act as a distributed query engine using its JDBC/ODBC or command-line interface. In this mode, end-users or applications can interact with Spark SQL directly to run SQL queries, without the need to write any code.

Spark SQL也可使用它的JDBC/ODBC或者命令行接口做为分布式查询引擎。以这种模式,终端用户或者应用能够直接与Spark SQL交互来运行SQL查询,而不须要写任何代码。

运行Thrift JDBC/ODBC 服务

(Running the Thrift JDBC/ODBC server)

The Thrift JDBC/ODBC server implemented here corresponds to the HiveServer2 in Hive 0.13. You can test the JDBC server with the beeline script that comes with either Spark or Hive 0.13.

这里实现的Thrift JDBC/ODBC至关于Hive0.13中的HiveServer2.你能够用Spark或者Hive0.13自带的beeline脚原本测试JDBC服务。

To start the JDBC/ODBC server, run the following in the Spark directory:

在Spark目录中运行下面的命令来启动JDBC/ODBC server

./sbin/start-thriftserver.sh

This script accepts all bin/spark-submit command line options, plus a --hiveconf option to specify Hive properties. You may run ./sbin/start-thriftserver.sh --help for a complete list of all available options. By default, the server listens on localhost:10000. You may override this bahaviour via either environment variables, i.e.:

该脚本接受全部的bin/spark-submit命令行选项,外加一个--hiveconf选项来指定Hive属性。可能执行./sbin/start-thriftserver.sh --help来查看完整的可用选项列表。默认状况下,该服务会监听localhost:10000。你能够经过任一环境变量覆盖这个bahaviour,也就是:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>

export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>./sbin/start-thriftserver.sh \  --master <master-uri> \  ...

或者系统属性

./sbin/start-thriftserver.sh \  --hiveconf hive.server2.thrift.port=<listening-port> \  --hiveconf hive.server2.thrift.bind.host=<listening-host> \  --master <master-uri>  ...

Now you can use beeline to test the Thrift JDBC/ODBC server:

如今你可使用beeline来测试Thrift JDBC/ODBC服务:

./bin/beeline

Connect to the JDBC/ODBC server in beeline with:

使用beeline来链接JDBC/ODBC:

beeline> !connect jdbc:hive2://localhost:10000

Beeline will ask you for a username and password. In non-secure mode, simply enter the username on your machine and a blank password. For secure mode, please follow the instructions given in the beeline documentation.

Configuration of Hive is done by placing your hive-site.xml file in conf/.

You may also use the beeline script that comes with Hive.

Thrift JDBC server also supports sending thrift RPC messages over HTTP transport. Use the following setting to enable HTTP mode as system property or in hive-site.xml file in conf/:

Beeline会向你询问用户名和密码。在非安全模式下,仅仅输入你机子的用户名以及一个空白的密码。对于安全模式,请按照beeline文档给出的指示。

经过将hive-site.xml文件放到conf/下来完成Hive的配置。

你也可使用Hive自带的beeline脚本。

Thrift JDBC服务还支持经过HTTP传输发送Thrift RPC消息。使用下列设置做为系统属性或者经过conf/中hive-site.xml文件来启用HTTP模式:

hive.server2.transport.mode - Set this to value: httphive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001hive.server2.http.endpoint - HTTP endpoint; default is cliservice

To test, use beeline to connect to the JDBC/ODBC server in http mode with:

为了测试,使用beeline以http模式链接JDBC/ODBC服务:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

运行Spark SQL CLI

(Running the Spark SQL CLI)

The Spark SQL CLI is a convenient tool to run the Hive metastore service in local mode and execute queries input from the command line. Note that the Spark SQL CLI cannot talk to the Thrift JDBC server.

Spark SQL CLI是一种方便的工具用来以本地模式运行Hive metastore服务,而且从命令行输入执行查询。注意,Spark SQL CLI不能和Thrift JDBC服务进行会话。

To start the Spark SQL CLI, run the following in the Spark directory:

在Spark目录下,执行如下命令来启动Spark SQL CLI:

./bin/spark-sql

Configuration of Hive is done by placing your hive-site.xml file in conf/. You may run ./bin/spark-sql --help for a complete list of all available options.

Hive的配置已经完成,经过将 hive-site.xml 文件放入conf/中。你能够运行 ./bin/spark-sql --help来查看完整的可用选项的列表。

迁移指南

(Migration Guide)

从Spark SQL1.0-1.2升级到1.3Upgrading from Spark SQL 1.0-1.2 to 1.3

In Spark 1.3 we removed the “Alpha” label from Spark SQL and as part of this did a cleanup of the available APIs. From Spark 1.3 onwards, Spark SQL will provide binary compatibility with other releases in the 1.X series. This compatibility guarantee excludes APIs that are explicitly marked as unstable (i.e., DeveloperAPI or Experimental).

在Spark 1.3中,咱们从Spark SQL中移除了“Alpha”标签,做为其中的一部分,对可用的APIs进行了清理。从Spark 1.3之后,Spark SQL会与1.X系列中的其余版本提供二进制兼容。这种兼容性保证不包括的APIs明确地标记为不稳定。(也就是 DeveloperAPI 或者rExperimental)

重命名SchemaRDD为DataFrame

(Rename of SchemaRDD to DataFrame)

The largest change that users will notice when upgrading to Spark SQL 1.3 is that SchemaRDD has been renamed to DataFrame. This is primarily because DataFrames no longer inherit from RDD directly, but instead provide most of the functionality that RDDs provide though their own implementation. DataFrames can still be converted to RDDs by calling the .rdd method.

当升级到Spark SQL 1.3后,用户能够发现最大的改变是SchemaRDD重命名为DataFrame。这主要是由于DataFrame再也不直接继承自RDD。而是本身实现了RDDs提供的大多数功能。DataFrames仍旧能够经过调用.rdd方法来转化成RDDs。

In Scala there is a type alias from SchemaRDD to DataFrame to provide source compatibility for some use cases. It is still recommended that users update their code to use DataFrame instead. Java and Python users will need to update their code.

Scala中有一种类型别名,从 SchemaRDD到DataFrame为一些用例提供了源的兼容性。但仍是建议用户更新它们的代码来使用DataFrame。Java和Python 用户须要更新它们的代码。

Java和Scala APIs的统一

(Unification of the Java and Scala APIs)

Prior to Spark 1.3 there were separate Java compatible classes (JavaSQLContext and JavaSchemaRDD) that mirrored the Scala API. In Spark 1.3 the Java API and Scala API have been unified. Users of either language should use SQLContext and DataFrame. In general theses classes try to use types that are usable from both languages (i.e. Array instead of language specific collections). In some cases where no common type exists (e.g., for passing in closures or Maps) function overloading is used instead.

Spark1.3以前,有单独的Java兼容性类(JavaSQLContext 和 JavaSchemaRDD)映射成Scala API。在Spark1.3中,Java API和Scala API进行了统一。任意语言的用户应该使用SQLContext和DataFrame。一般,这些类会尝试使用两种语言中均可用的类型(即,Array而不是语言特定的集合)。在有些状况下,若是没有相同的类型存在,则会使用函数重载来替代。

Additionally the Java specific types API has been removed. Users of both Scala and Java should use the classes present inorg.apache.spark.sql.types to describe schema programmatically.

此外,Java指定的类型API被移除了。Scala和Java的用户应该使用inorg.apache.spark.sql.types中的类来以编程方式描述模式。

隐式转换的隔离以及dsl包的删除(只有Scala)

(Isolation of Implicit Conversions and Removal of dsl Package (Scala-only))

Many of the code examples prior to Spark 1.3 started with import sqlContext._, which brought all of the functions from sqlContext into scope. In Spark 1.3 we have isolated the implicit conversions for converting RDDs into DataFrames into an object inside of the SQLContext. Users should now write import sqlContext.implicits._.

在Spark 1.3以前许多代码的例子以import sqlContext._开始,它会将sqlContext中全部的函数引入到scope中。在Spark 1.3中,咱们将在SQLContext内部的RDDs转换成DataFrames转成成对象进行了隐式转化的隔离。用户如今须要写 import sqlContext.implicits._。

Additionally, the implicit conversions now only augment RDDs that are composed of Products (i.e., case classes or tuples) with a method toDF, instead of applying automatically.

此外,隐式转换如今只有组成Rroducts的RDDs参数带有一个toDF方法,而不是自动地应用。

When using function inside of the DSL (now replaced with the DataFrame API) users used to import org.apache.spark.sql.catalyst.dsl. Instead the public dataframe functions API should be used: import org.apache.spark.sql.functions._.

当使用DSL(如今替代为 DataFrame API)内部的方法时,用户以前会import org.apache.spark.sql.catalyst.dsl。如今使用公共的dataframe方法API应该用import org.apache.spark.sql.functions._。

为DataType删除在org.apache.spark.sql中的类型别名(只有Scala)(Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only))

Spark 1.3 removes the type aliases that were present in the base sql package for DataType. Users should instead import the classes in org.apache.spark.sql.types

Spark 1.3为DataType删除了出如今根本的sql包中的类型别名。如今,用户应该引入org.apache.spark.sql.types中的类。

UDF注册移到sqlContext.udf 中(Java 和 Scala)

(UDF Registration Moved to sqlContext.udf (Java & Scala))

Functions that are used to register UDFs, either for use in the DataFrame DSL or SQL, have been moved into the udf object in SQLContext.

用于注册UDFs的函数,不是用于DataFrame DSL就是SQL,已经被移动了SQLContext中的udf对象中。

sqlCtx.udf.register("strLen", (s: String) => s.length())

Python UDF registration is unchanged.

Python的UDF注册没有改变。

Python中的DataTypes再也不是单例了(Python DataTypes No Longer Singletons)

When using DataTypes in Python you will need to construct them (i.e. StringType()) instead of referencing a singleton.

当使用Python中的DataTypes,你须要建立它们,而不是引用一个单例。

与Apache Hive的兼容性

(Compatibility with Apache Hive)

Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs. Currently Spark SQL is based on Hive 0.12.0 and 0.13.1.

Spark SQL被设计出来用于兼容Hive Metastore, SerDes 以及 UDFs。目前的Spark SQL是基于Hive 0.12.0和0.13.1。

部署在现有的Hive仓库中(Deploying in Existing Hive Warehouses)

The Spark SQL Thrift JDBC server is designed to be “out of the box” compatible with existing Hive installations. You do not need to modify your existing Hive Metastore or change the data placement or partitioning of your tables.

The Spark SQL Thrigt JDBC服务被设计出来“当即可用” 的兼容现有的Hive安装。你不须要修改已经存在的Hive Metastore 或者更改数据位置或者表分区。

支持的Hive特性

(Supported Hive Features)

Spark SQL supports the vast majority of Hive features, such as:

Spark SQL 支持大量的Hive特性,例如:

· Hive query statements, including:

o SELECT

o GROUP BY

o ORDER BY

o CLUSTER BY

o SORT BY

· All Hive operators, including:

o Relational operators (=, ⇔, ==, <>, <, >, >=, <=, etc)

o Arithmetic operators (+, -, *, /, %, etc)

o Logical operators (AND, &&, OR, ||, etc)

o Complex type constructors

o Mathematical functions (sign, ln, cos, etc)

o String functions (instr, length, printf, etc)

· User defined functions (UDF)

· User defined aggregation functions (UDAF)

· User defined serialization formats (SerDes)

· Joins

o JOIN

o {LEFT|RIGHT|FULL} OUTER JOIN

o LEFT SEMI JOIN

o CROSS JOIN

· Unions

· Sub-queries

o SELECT col FROM ( SELECT a + b AS col from t1) t2

· Sampling

· Explain

· Partitioned tables

· View

· All Hive DDL Functions, including:

o CREATE TABLE

o CREATE TABLE AS SELECT

o ALTER TABLE

· Most Hive Data types, including:

o TINYINT

o SMALLINT

o INT

o BIGINT

o BOOLEAN

o FLOAT

o DOUBLE

o STRING

o BINARY

o TIMESTAMP

o DATE

o ARRAY<>

o MAP<>

o STRUCT<>

不支持的Hive功能

(Unsupported Hive Functionality)

Below is a list of Hive features that we don’t support yet. Most of these features are rarely used in Hive deployments.

下面是不支持的Hive特性的列表。大多数特性不多会在Hive部署中用到。

Major Hive Features

· Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL doesn’t support buckets yet.

· 带有buckets的Table:bucket是Hive表分区中的哈希分区。Spark SQL不支持buckets。

Esoteric Hive Features 

* UNION type * Unique join * Column statistics collecting: Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore.

Hive Input/Output Formats

· File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.

· Hadoop archive

· CLI文件格式:对于结果,显示回CLI,Spark SQL 只支持TextOutputFormat

· Hadoop存档

Hive Optimizations

A handful of Hive optimizations are not yet included in Spark. Some of these (such as indexes) are less important due to Spark SQL’s in-memory computational model. Others are slotted for future releases of Spark SQL.

少许的Hive优化没有包含在Spark中。因为Spark SQL的内存计算模型它们中的有一些(例如索引)是次要的。其余的一些会来的Spark SQL版本中加入。

· Block level bitmap indexes and virtual columns 

· Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.

· Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still launches tasks to compute the result.

· Skew data flag: Spark SQL does not follow the skew data flags in Hive.

· STREAMTABLE hint in join: Spark SQL does not follow the STREAMTABLE hint.

· Merge multiple small files for query results: if the result output contains multiple small files, Hive can optionally merge the small files into fewer large files to avoid overflowing the HDFS metadata. Spark SQL does not support that.

· 块级别的位图索引和虚拟列(用来建立索引)

· 自动为joins和groupbys决定reducers的数量:目前在Spark SQL中,你须要使用“SET spark.sql.shuffle.partitions=[num_tasks];”来控制并行的post-shuffle的度。

· 仅元数据查询:对于仅使用元数据来回答的查询,Spark SQL仍是启动任务来计算结果。

· 偏斜数据标志:Spark SQL不遵循Hive中的偏斜数据标志。

· join中的STREAMTABLE hint:Spark SQL不遵循STREAMTABLE hint。

· 为查询结果合并多个小文件:若是结果输出中包含多个小文件,Hive能够选择性地将多个小文件合并成更少的更大的文件,避免HDFS元数据的溢出。Spark SQL不支持这些。

数据类型

(Data Types)

Spark SQL and DataFrames support the following data types:

Spark SQL 和 DataFrames支持如下的数据类型:

· Numeric types

o ByteType: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.

o ShortType: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.

o IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.

o LongType: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.

o FloatType: Represents 4-byte single-precision floating point numbers.

o DoubleType: Represents 8-byte double-precision floating point numbers.

o DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally by java.math.BigDecimal. A BigDecimal consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.

· String type

o StringType: Represents character string values.

· Binary type

o BinaryType: Represents byte sequence values.

· Boolean type

o BooleanType: Represents boolean values.

· Datetime type

o TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.

o DateType: Represents values comprising values of fields year, month, day.

· Complex types

o ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type of elementType.containsNull is used to indicate if elements in a ArrayType value can have null values.

o MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described by keyType and the data type of values are described by valueType. For a MapType value, keys are not allowed to have nullvalues. valueContainsNull is used to indicate if values of a MapType value can have null values.

o StructType(fields): Represents values with the structure described by a sequence of StructFields (fields).

§ StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataType. nullable is used to indicate if values of this fields can have null values.

All data types of Spark SQL are located in the package org.apache.spark.sql.types. You can access them by doing

Spark SQL中全部的数据类型的都在包 package org.apache.spark.sql.types中,你能够这样访问它们:

import  org.apache.spark.sql.types._

 

Data type

Value type in Scala

API to access or create a data type

ByteType

Byte

ByteType

ShortType

Short

ShortType

IntegerType

Int

IntegerType

LongType

Long

LongType

FloatType

Float

FloatType

DoubleType

Double

DoubleType

DecimalType

java.math.BigDecimal

DecimalType

StringType

String

StringType

BinaryType

Array[Byte]

BinaryType

BooleanType

Boolean

BooleanType

TimestampType

java.sql.Timestamp

TimestampType

DateType

java.sql.Date

DateType

ArrayType

scala.collection.Seq

ArrayType(elementType, [containsNull])
Note: The default value of containsNull is true.

MapType

scala.collection.Map

MapType(keyType, valueType, [valueContainsNull])
Note: The default value of valueContainsNull is true.

StructType

org.apache.spark.sql.Row

StructType(fields)
Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.

StructField

The value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType)

StructField(name, dataType, nullable)

相关文章
相关标签/搜索