Spark SQL 官方文档-中文翻译

Spark SQL 官方文档-中文翻译

Spark版本:Spark 1.5.2html


转载请注明出处:http://www.cnblogs.com/BYRans/

java

1 概述(Overview)

Spark SQL是Spark的一个组件,用于结构化数据的计算。Spark SQL提供了一个称为DataFrames的编程抽象,DataFrames能够充当分布式SQL查询引擎。sql

2 DataFrames

DataFrame是一个分布式的数据集合,该数据集合以命名列的方式进行整合。DataFrame能够理解为关系数据库中的一张表,也能够理解为R/Python中的一个data frame。DataFrames能够经过多种数据构造,例如:结构化的数据文件、hive中的表、外部数据库、Spark计算过程当中生成的RDD等。
DataFrame的API支持4种语言:Scala、Java、Python、R。shell

2.1 入口:SQLContext(Starting Point: SQLContext)

Spark SQL程序的主入口是SQLContext类或它的子类。建立一个基本的SQLContext,你只须要SparkContext,建立代码示例以下:数据库

  • Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  • Java
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

除了基本的SQLContext,也能够建立HiveContext。SQLContext和HiveContext区别与联系为:express

  • SQLContext如今只支持SQL语法解析器(SQL-92语法)
  • HiveContext如今支持SQL语法解析器和HiveSQL语法解析器,默认为HiveSQL语法解析器,用户能够经过配置切换成SQL语法解析器,来运行HiveSQL不支持的语法。
  • 使用HiveContext可使用Hive的UDF,读写Hive表数据等Hive操做。SQLContext不能够对Hive进行操做。
  • Spark SQL将来的版本会不断丰富SQLContext的功能,作到SQLContext和HiveContext的功能容和,最终可能二者会统一成一个Context

HiveContext包装了Hive的依赖包,把HiveContext单独拿出来,能够在部署基本的Spark的时候就不须要Hive的依赖包,须要使用HiveContext时再把Hive的各类依赖包加进来。apache

SQL的解析器能够经过配置spark.sql.dialect参数进行配置。在SQLContext中只能使用Spark SQL提供的”sql“解析器。在HiveContext中默认解析器为”hiveql“,也支持”sql“解析器。编程

2.2 建立DataFrames(Creating DataFrames)

使用SQLContext,spark应用程序(Application)能够经过RDD、Hive表、JSON格式数据等数据源建立DataFrames。下面是基于JSON文件建立DataFrame的示例:json

  • Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
  • Java
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();

2.3 DataFrame操做(DataFrame Operations)

DataFrames支持Scala、Java和Python的操做接口。下面是Scala和Java的几个操做示例:api

  • Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Show the content of the DataFrame
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1
  • Java
JavaSparkContext sc // An existing SparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Show the content of the DataFrame
df.show();
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df.col("name"), df.col("age").plus(1)).show();
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df.col("age").gt(21)).show();
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show();
// age  count
// null 1
// 19   1
// 30   1

详细的DataFrame API请参考 API Documentation

除了简单列引用和表达式,DataFrames还有丰富的library,功能包括string操做、date操做、常见数学操做等。详细内容请参考 DataFrame Function Reference

2.4 运行SQL查询程序(Running SQL Queries Programmatically)

Spark Application可使用SQLContext的sql()方法执行SQL查询操做,sql()方法返回的查询结果为DataFrame格式。代码以下:

  • Scala
val sqlContext = ...  // An existing SQLContext
val df = sqlContext.sql("SELECT * FROM table")
  • Java
SQLContext sqlContext = ...  // An existing SQLContext
DataFrame df = sqlContext.sql("SELECT * FROM table")

2.5 DataFrames与RDDs的相互转换(Interoperating with RDDs)

Spark SQL支持两种RDDs转换为DataFrames的方式:

  • 使用反射获取RDD内的Schema
    • 当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁并且效果也很好。
  • 经过编程接口指定Schema
    • 经过Spark SQL的接口建立RDD的Schema,这种方式会让代码比较冗长。
    • 这种方法的好处是,在运行时才知道数据的列以及列的类型的状况下,能够动态生成Schema

2.5.1 使用反射获取Schema(Inferring the Schema Using Reflection)

Spark SQL支持将JavaBean的RDD自动转换成DataFrame。经过反射获取Bean的基本信息,依据Bean的信息定义Schema。当前Spark SQL版本(Spark 1.5.2)不支持嵌套的JavaBeans和复杂数据类型(如:List、Array)。建立一个实现Serializable接口包含全部属性getters和setters的类来建立一个JavaBean。经过调用createDataFrame并提供JavaBean的Class object,指定一个Schema给一个RDD。示例以下:

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
  new Function<String, Person>() {
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");

      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));

      return person;
    }
  });

// Apply a schema to an RDD of JavaBeans and register it as a table.
DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

2.5.2 经过编程接口指定Schema(Programmatically Specifying the Schema)

当JavaBean不能被预先定义的时候,编程建立DataFrame分为三步:

  • 从原来的RDD建立一个Row格式的RDD
  • 建立与RDD 中Rows结构匹配的StructType,经过该StructType建立表示RDD 的Schema
  • 经过SQLContext提供的createDataFrame方法建立DataFrame,方法参数为RDD 的Schema

示例以下:

import org.apache.spark.api.java.function.Function;
// Import factory methods provided by DataTypes.
import org.apache.spark.sql.types.DataTypes;
// Import StructType and StructField
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;
// Import Row.
import org.apache.spark.sql.Row;
// Import RowFactory.
import org.apache.spark.sql.RowFactory;

// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// Load a text file and convert each line to a JavaBean.
JavaRDD<String> people = sc.textFile("examples/src/main/resources/people.txt");

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName: schemaString.split(" ")) {
  fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows.
JavaRDD<Row> rowRDD = people.map(
  new Function<String, Row>() {
    public Row call(String record) throws Exception {
      String[] fields = record.split(",");
      return RowFactory.create(fields[0], fields[1].trim());
    }
  });

// Apply the schema to the RDD.
DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);

// Register the DataFrame as a table.
peopleDataFrame.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> names = results.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

3 数据源(Data Source)

Spark SQL的DataFrame接口支持多种数据源的操做。一个DataFrame能够进行RDDs方式的操做,也能够被注册为临时表。把DataFrame注册为临时表以后,就能够对该DataFrame执行SQL查询。Data Sources这部分首先描述了对Spark的数据源执行加载和保存的经常使用方法,而后对内置数据源进行深刻介绍。

3.1 通常Load/Save方法

Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL能够方便的执行全部的操做。修改配置项spark.sql.sources.default,可修改默认数据源格式。读取Parquet文件示例以下:

  • Scala
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
  • Java
DataFrame df = sqlContext.read().load("examples/src/main/resources/users.parquet");
df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

3.1.1 手动指定选项(Manually Specifying Options)

当数据源格式不是parquet格式文件时,须要手动指定数据源的格式。数据源格式须要指定全名(例如:org.apache.spark.sql.parquet),若是数据源格式为内置格式,则只须要指定简称(json,parquet,jdbc)。经过指定的数据源格式名,能够对DataFrames进行类型转换操做。示例以下:

  • Scala
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
  • Java
DataFrame df = sqlContext.read().format("json").load("examples/src/main/resources/people.json");
df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

3.1.2 存储模式(Save Modes)

能够采用SaveMode执行存储操做,SaveMode定义了对数据的处理模式。须要注意的是,这些保存模式不使用任何锁定,不是原子操做。此外,当使用Overwrite方式执行时,在输出新数据以前原数据就已经被删除。SaveMode详细介绍以下表:

SaveModes

3.1.3 持久化到表(Saving to Persistent Tables)

当使用HiveContext时,能够经过saveAsTable方法将DataFrames存储到表中。与registerTempTable方法不一样的是,saveAsTable将DataFrame中的内容持久化到表中,并在HiveMetastore中存储元数据。存储一个DataFrame,可使用SQLContext的table方法。table先建立一个表,方法参数为要建立的表的表名,而后将DataFrame持久化到这个表中。

默认的saveAsTable方法将建立一个“managed table”,表示数据的位置能够经过metastore得到。当存储数据的表被删除时,managed table也将自动删除。

3.2 Parquet文件

Parquet是一种支持多种数据处理系统的柱状的数据格式,Parquet文件中保留了原始数据的模式。Spark SQL提供了Parquet文件的读写功能。

3.2.1 读取Parquet文件(Loading Data Programmatically)

读取Parquet文件示例以下:

  • Scala
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")

// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
  • Java
// sqlContext from the previous example is used in this example.

DataFrame schemaPeople = ... // The DataFrame from the previous example.

// DataFrames can be saved as Parquet files, maintaining the schema information.
schemaPeople.write().parquet("people.parquet");

// Read in the Parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a DataFrame.
DataFrame parquetFile = sqlContext.read().parquet("people.parquet");

// Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

3.2.2 解析分区信息(Partition Discovery)

对表进行分区是对数据进行优化的方式之一。在分区的表内,数据经过分区列将数据存储在不一样的目录下。Parquet数据源如今可以自动发现并解析分区信息。例如,对人口数据进行分区存储,分区列为gender和country,使用下面的目录结构:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

经过传递path/to/table给 SQLContext.read.parquet或SQLContext.read.load,Spark SQL将自动解析分区信息。返回的DataFrame的Schema以下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

须要注意的是,数据的分区列的数据类型是自动解析的。当前,支持数值类型和字符串类型。自动解析分区类型的参数为:spark.sql.sources.partitionColumnTypeInference.enabled,默认值为true。若是想关闭该功能,直接将该参数设置为disabled。此时,分区列数据格式将被默认设置为string类型,再也不进行类型解析。

3.2.3 Schema合并(Schema Merging)

像ProtocolBuffer、Avro和Thrift那样,Parquet也支持Schema evolution(Schema演变)。用户能够先定义一个简单的Schema,而后逐渐的向Schema中增长列描述。经过这种方式,用户能够获取多个有不一样Schema但相互兼容的Parquet文件。如今Parquet数据源能自动检测这种状况,并合并这些文件的schemas。

由于Schema合并是一个高消耗的操做,在大多数状况下并不须要,因此Spark SQL从1.5.0开始默认关闭了该功能。能够经过下面两种方式开启该功能:

  • 当数据源为Parquet文件时,将数据源选项mergeSchema设置为true
  • 设置全局SQL选项spark.sql.parquet.mergeSchema为true

示例以下:

  • Scala
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")

// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

3.2.4 Hive metastore Parquet表转换(Hive metastore Parquet table conversion)

当向Hive metastore中读写Parquet表时,Spark SQL将使用Spark SQL自带的Parquet SerDe(SerDe:Serialize/Deserilize的简称,目的是用于序列化和反序列化),而不是用Hive的SerDe,Spark SQL自带的SerDe拥有更好的性能。这个优化的配置参数为spark.sql.hive.convertMetastoreParquet,默认值为开启。

3.2.4.1 Hive/Parquet Schema反射(Hive/Parquet Schema Reconciliation)

从表Schema处理的角度对比Hive和Parquet,有两个区别:

  • Hive区分大小写,Parquet不区分大小写
  • hive容许全部的列为空,而Parquet不容许全部的列全为空

因为这两个区别,当将Hive metastore Parquet表转换为Spark SQL Parquet表时,须要将Hive metastore schema和Parquet schema进行一致化。一致化规则以下:

  • 这两个schema中的同名字段必须具备相同的数据类型。一致化后的字段必须为Parquet的字段类型。这个规则同时也解决了空值的问题。
  • 一致化后的schema只包含Hive metastore中出现的字段。
    • 忽略只出如今Parquet schema中的字段
    • 只在Hive metastore schema中出现的字段设为nullable字段,并加到一致化后的schema中

3.2.4.2 元数据刷新(Metadata Refreshing)

Spark SQL缓存了Parquet元数据以达到良好的性能。当Hive metastore Parquet表转换为enabled时,表修改后缓存的元数据并不能刷新。因此,当表被Hive或其它工具修改时,则必须手动刷新元数据,以保证元数据的一致性。示例以下:

  • Scala
// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")
  • Java
// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")

3.2.5 配置(Configuration)

配置Parquet可使用SQLContext的setConf方法或使用SQL执行SET key=value命令。详细参数说明以下:

Configuration

3.3 JSON数据集

Spark SQL能自动解析JSON数据集的Schema,读取JSON数据集为DataFrame格式。读取JSON数据集方法为SQLContext.read().json()。该方法将String格式的RDD或JSON文件转换为DataFrame。

须要注意的是,这里的JSON文件不是常规的JSON格式。JSON文件每一行必须包含一个独立的、自知足有效的JSON对象。若是用多行描述一个JSON对象,会致使读取出错。读取JSON数据集示例以下:

  • Scala
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)

// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// Register this DataFrame as a table.
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
  • Java
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
DataFrame people = sqlContext.read().json("examples/src/main/resources/people.json");

// The inferred schema can be visualized using the printSchema() method.
people.printSchema();
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// Register this DataFrame as a table.
people.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlContext.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
  "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);

3.4 Hive表

Spark SQL支持对Hive的读写操做。须要注意的是,Hive所依赖的包,没有包含在Spark assembly包中。增长Hive时,须要在Spark的build中添加 -Phive 和 -Phivethriftserver配置。这两个配置将build一个新的assembly包,这个assembly包含了Hive的依赖包。注意,必须上这个心的assembly包到全部的worker节点上。由于worker节点在访问Hive中数据时,会调用Hive的 serialization and deserialization libraries(SerDes),此时将用到Hive的依赖包。

Hive的配置文件为conf/目录下的hive-site.xml文件。在YARN上执行查询命令以前,lib_managed/jars目录下的datanucleus包和conf/目录下的hive-site.xml必须能够被driverhe和全部的executors所访问。确保被访问,最方便的方式就是在spark-submit命令中经过--jars选项和--file选项指定。

操做Hive时,必须建立一个HiveContext对象,HiveContext继承了SQLContext,并增长了对MetaStore和HiveQL的支持。除了sql方法,HiveContext还提供了一个hql方法,hql方法能够执行HiveQL语法的查询语句。示例以下:

  • Scala
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
  • Java
// sc is an existing JavaSparkContext.
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc);

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL.
Row[] results = sqlContext.sql("FROM src SELECT key, value").collect();

3.4.1 访问不一样版本的Hive Metastore(Interacting with Different Versions of Hive Metastore)

Spark SQL常常须要访问Hive metastore,Spark SQL能够经过Hive metastore获取Hive表的元数据。从Spark 1.4.0开始,Spark SQL只需简单的配置,就支持各版本Hive metastore的访问。注意,涉及到metastore时Spar SQL忽略了Hive的版本。Spark SQL内部将Hive反编译至Hive 1.2.1版本,Spark SQL的内部操做(serdes, UDFs, UDAFs, etc)都调用Hive 1.2.1版本的class。版本配置项见下面表格:

ShiveMetastore

3.5 JDBC To Other Databases

Spark SQL支持使用JDBC访问其余数据库。当时用JDBC访问其它数据库时,最好使用JdbcRDD。使用JdbcRDD时,Spark SQL操做返回的DataFrame会很方便,也会很方便的添加其余数据源数据。JDBC数据源由于不须要用户提供ClassTag,因此很适合使用Java或Python进行操做。
使用JDBC访问数据源,须要在spark classpath添加JDBC driver配置。例如,从Spark Shell链接postgres的配置为:

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

远程数据库的表,可用DataFrame或Spark SQL临时表的方式调用数据源API。支持的参数有:

option

代码示例以下:

  • Scala
val jdbcDF = sqlContext.read.format("jdbc").options( 
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()
  • Java
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:postgresql:dbserver");
options.put("dbtable", "schema.tablename");

DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load();

3.6 故障排除(Troubleshooting)

  • 在客户端session和全部的executors上,JDBC driver必须对启动类加载器(primordial class loader)设置为visible。由于当建立一个connection时,Java的DriverManager类会执行安全验证,安全验证将忽略全部对启动类加载器为非visible的driver。一个很方便的解决方法是,修改全部worker节点上的compute_classpath.sh脚本,将driver JARs添加至脚本。
  • 有些数据库(例:H2)将全部的名字转换为大写,因此在这些数据库中,Spark SQL也须要将名字所有大写。

4 性能调优

4.1 缓存数据至内存(Caching Data In Memory)

Spark SQL能够经过调用sqlContext.cacheTable("tableName") 或者dataFrame.cache(),将表用一种柱状格式( an in­memory columnar format)缓存至内存中。而后Spark SQL在执行查询任务时,只需扫描必需的列,从而以减小扫描数据量、提升性能。经过缓存数据,Spark SQL还能够自动调节压缩,从而达到最小化内存使用率和下降GC压力的目的。调用sqlContext.uncacheTable("tableName")可将缓存的数据移出内存。

可经过两种配置方式开启缓存数据功能:

  • 使用SQLContext的setConf方法
  • 执行SQL命令 SET key=value

Cache-In-Memory

4.2 调优参数(Other Configuration Options)

能够经过配置下表中的参数调节Spark SQL的性能。在后续的Spark版本中将逐渐加强自动调优功能,下表中的参数在后续的版本中或许将再也不须要配置。

optionsTunningPfms

5 分布式SQL引擎

使用Spark SQL的JDBC/ODBC或者CLI,能够将Spark SQL做为一个分布式查询引擎。终端用户或应用不须要编写额外的代码,能够直接使用Spark SQL执行SQL查询。

5.1 运行Thrift JDBC/ODBC服务

这里运行的Thrift JDBC/ODBC服务与Hive 1.2.1中的HiveServer2一致。能够在Spark目录下执行以下命令来启动JDBC/ODBC服务:

./sbin/start-thriftserver.sh

这个命令接收全部 bin/spark-submit 命令行参数,添加一个 --hiveconf 参数来指定Hive的属性。详细的参数说明请执行命令 ./sbin/start-thriftserver.sh --help
服务默认监听端口为localhost:10000。有两种方式修改默认监听端口:

  • 修改环境变量:
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...
  • 修改系统属性
./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

使用 beeline 来测试Thrift JDBC/ODBC服务:

./bin/beeline

链接到Thrift JDBC/ODBC服务

beeline> !connect jdbc:hive2://localhost:10000

在非安全模式下,只须要输入机器上的一个用户名便可,无需密码。在安全模式下,beeline会要求输入用户名和密码。安全模式下的详细要求,请阅读beeline documentation的说明。

配置Hive须要替换 conf/ 目录下的 hive-site.xml

Thrift JDBC服务也支持经过HTTP传输发送thrift RPC messages。开启HTTP模式须要将下面的配参数配置到系统属性或 conf/: 下的 hive-site.xml

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

测试http模式,可使用beeline连接JDBC/ODBC服务:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

5.2 运行Spark SQL CLI

Spark SQL CLI能够很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。须要注意的是,Spark SQL CLI不能与Thrift JDBC服务交互。
在Spark目录下执行以下命令启动Spark SQL CLI:

./bin/spark-sql

配置Hive须要替换 conf/ 下的 hive-site.xml 。执行 ./bin/spark-sql --help 可查看详细的参数说明 。

6 Migration Guide

6.1 与Hive的兼容(Compatibility with Apache Hive)

Spark SQL与Hive Metastore、SerDes、UDFs相兼容。Spark SQL兼容Hive Metastore从0.12到1.2.1的全部版本。Spark SQL也与Hive SerDes和UDFs相兼容,当前SerDes和UDFs是基于Hive 1.2.1。

6.1.1 在Hive warehouse中部署Spark SQL

Spark SQL Thrift JDBC服务与Hive相兼容,在已存在的Hive上部署Spark SQL Thrift服务不须要对已存在的Hive Metastore作任何修改,也不须要对数据作任何改动。

6.1.2 Spark SQL支持的Hive特性

Spark SQL支持多部分的Hive特性,例如:

  • Hive查询语句,包括:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 全部Hive运算符,包括
    • 比较操做符(=, ⇔, ==, <>, <, >, >=, <=, etc)
    • 算术运算符(+, -, *, /, %, etc)
    • 逻辑运算符(AND, &&, OR, ||, etc)
    • 复杂类型构造器
    • 数学函数(sign,ln,cos,etc)
    • 字符串函数(instr,length,printf,etc)
  • 用户自定义函数(UDF)
  • 用户自定义聚合函数(UDAF)
  • 用户自定义序列化格式器(SerDes)
  • 窗口函数
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • 子查询
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • Sampling
  • Explain
  • 表分区,包括动态分区插入
  • 视图
  • 全部的Hive DDL函数,包括:
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • 大部分的Hive数据类型,包括:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

6.1.3 不支持的Hive功能

下面是当前不支持的Hive特性,其中大部分特性在实际的Hive使用中不多用到。

Major Hive Features

  • Tables with buckets:bucket是在一个Hive表分区内进行hash分区。Spark SQL当前不支持。

Esoteric Hive Features

  • UNION type
  • Unique join
  • Column statistics collecting:当期Spark SQL不智齿列信息统计,只支持填充Hive Metastore的sizeInBytes列。

Hive Input/Output Formats

  • File format for CLI: 这个功能用于在CLI显示返回结果,Spark SQL只支持TextOutputFormat
  • Hadoop archive

Hive优化
部分Hive优化尚未添加到Spark中。没有添加的Hive优化(好比索引)对Spark SQL这种in-memory计算模型来讲不是特别重要。下列Hive优化将在后续Spark SQL版本中慢慢添加。

  • 块级别位图索引和虚拟列(用于创建索引)
  • 自动检测joins和groupbys的reducer数量:当前Spark SQL中须要使用“ SET spark.sql.shuffle.partitions=[num_tasks]; ”控制post-shuffle的并行度,不能自动检测。
  • 仅元数据查询:对于能够经过仅使用元数据就能完成的查询,当前Spark SQL仍是须要启动任务来计算结果。
  • 数据倾斜标记:当前Spark SQL不遵循Hive中的数据倾斜标记
  • jion中STREAMTABLE提示:当前Spark SQL不遵循STREAMTABLE提示
  • 查询结果为多个小文件时合并小文件:若是查询结果包含多个小文件,Hive能合并小文件为几个大文件,避免HDFS metadata溢出。当前Spark SQL不支持这个功能。

7 Reference

7.1 Data Types

Spark SQL和DataFrames支持的数据格式以下:

  • 数值类型
    • ByteType: 表明1字节有符号整数. 数值范围: -128 到 127.
    • ShortType: 表明2字节有符号整数. 数值范围: -32768 到 32767.
    • IntegerType: 表明4字节有符号整数. 数值范围: -2147483648 t到 2147483647.
    • LongType: 表明8字节有符号整数. 数值范围: -9223372036854775808 到 9223372036854775807.
    • FloatType: 表明4字节单精度浮点数。
    • DoubleType: 表明8字节双精度浮点数。
    • DecimalType: 表示任意精度的有符号十进制数。内部使用java.math.BigDecimal.A实现。
    • BigDecimal由一个任意精度的整数非标度值和一个32位的整数组成。
  • String类型
    • StringType: 表示字符串值。
  • Binary类型
    • BinaryType: 表明字节序列值。
  • Boolean类型
    • BooleanType: 表明布尔值。
  • Datetime类型
    • TimestampType: 表明包含的年、月、日、时、分和秒的时间值
    • DateType: 表明包含的年、月、日的日期值
  • 复杂类型
    • ArrayType(elementType, containsNull): 表明包含一系列类型为elementType的元素。若是在一个将ArrayType值的元素能够为空值,containsNull指示是否容许为空。
    • MapType(keyType, valueType, valueContainsNull): 表明一系列键值对的集合。key不容许为空,valueContainsNull指示value是否容许为空
    • StructType(fields): 表明带有一个StructFields(列)描述结构数据。
      • StructField(name, dataType, nullable): 表示StructType中的一个字段。name表示列名、dataType表示数据类型、nullable指示是否容许为空。

Spark SQL全部的数据类型在 org.apache.spark.sql.types 包内。不一样语言访问或建立数据类型方法不同:

  • Scala
    代码中添加 import org.apache.spark.sql.types._,再进行数据类型访问或建立操做。

scalaAccessDataTypes

  • Java
    可使用 org.apache.spark.sql.types.DataTypes 中的工厂方法,以下表:

javaAccessDataTypes

7.2 NaN 语义

当处理float或double类型时,若是类型不符合标准的浮点语义,则使用专门的处理方式NaN。须要注意的是:

  • NaN = NaN 返回 true
  • 能够对NaN值进行聚合操做
  • 在join操做中,key为NaN时,NaN值与普通的数值处理逻辑相同
  • NaN值大于全部的数值型数据,在升序排序中排在最后
相关文章
相关标签/搜索