《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南

《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南

spark-1.6.0 [原文地址]html

Spark SQL, DataFrames 以及 Datasets 编程指南

概要

Spark SQL是Spark中处理结构化数据的模块。与基础的Spark RDD API不一样,Spark SQL的接口提供了更多关于数据的结构信息和计算任务的运行时信息。在Spark内部,Spark SQL会可以用于作优化的信息比RDD API更多一些。Spark SQL现在有了三种不一样的API:SQL语句、DataFrame API和最新的Dataset API。不过真正运行计算的时候,不管你使用哪一种API或语言,Spark SQL使用的执行引擎都是同一个。这种底层的统一,使开发者能够在不一样的API之间来回切换,你能够选择一种最天然的方式,来表达你的需求。java

 

本文中全部的示例都使用Spark发布版本中自带的示例数据,而且能够在spark-shell、pyspark shell以及sparkR shell中运行。python

 

SQL

Spark SQL的一种用法是直接执行SQL查询语句,你可以使用最基本的SQL语法,也能够选择HiveQL语法。Spark SQL能够从已有的Hive中读取数据。更详细的请参考Hive Tables 这一节。若是用其余编程语言运行SQL,Spark SQL将以DataFrame返回结果。你还能够经过命令行command-line 或者 JDBC/ODBC 使用Spark SQL。mysql

DataFrames

DataFrame是一种分布式数据集合,每一条数据都由几个命名字段组成。概念上来讲,她和关系型数据库的表 或者 R和Python中的data frame等价,只不过在底层,DataFrame采用了更多优化。DataFrame能够从不少数据源(sources)加载数据并构造获得,如:结构化数据文件,Hive中的表,外部数据库,或者已有的RDD。sql

DataFrame API支持ScalaJavaPython, and Rshell

Datasets

Dataset是Spark-1.6新增的一种API,目前仍是实验性的。Dataset想要把RDD的优点(强类型,可使用lambda表达式函数)和Spark SQL的优化执行引擎的优点结合到一块儿。Dataset能够由JVM对象构建(constructed )获得,然后Dataset上可使用各类transformation算子(map,flatMap,filter 等)。数据库

Dataset API 对 Scala 和 Java的支持接口是一致的,但目前还不支持Python,不过Python自身就有语言动态特性优点(例如,你可使用字段名来访问数据,row.columnName)。对Python的完整支持在将来的版本会增长进来。apache

入门

入口:SQLContext

Spark SQL全部的功能入口都是SQLContext 类,及其子类。不过要建立一个SQLContext对象,首先须要有一个SparkContext对象。编程

val sc: SparkContext // 假设已经有一个 SparkContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 用于包含RDD到DataFrame隐式转换操做
import sqlContext.implicits._

除了SQLContext以外,你也能够建立HiveContext,HiveContext是SQLContext 的超集。json

除了SQLContext的功能以外,HiveContext还提供了完整的HiveQL语法,UDF使用,以及对Hive表中数据的访问。要使用HiveContext,你并不须要安装Hive,并且SQLContext能用的数据源,HiveContext也同样能用。HiveContext是单独打包的,从而避免了在默认的Spark发布版本中包含全部的Hive依赖。若是这些依赖对你来讲不是问题(不会形成依赖冲突等),建议你在Spark-1.3以前使用HiveContext。然后续的Spark版本,将会逐渐把SQLContext升级到和HiveContext功能差很少的状态。

spark.sql.dialect选项能够指定不一样的SQL变种(或者叫SQL方言)。这个参数能够在SparkContext.setConf里指定,也能够经过 SQL语句的SET key=value命令指定。对于SQLContext,该配置目前惟一的可选值就是”sql”,这个变种使用一个Spark SQL自带的简易SQL解析器。而对于HiveContext,spark.sql.dialect 默认值为”hiveql”,固然你也能够将其值设回”sql”。仅就目前而言,HiveSQL解析器支持更加完整的SQL语法,因此大部分状况下,推荐使用HiveContext。

建立DataFrame

Spark应用能够用SparkContext建立DataFrame,所需的数据来源能够是已有的RDD(existing RDD),或者Hive表,或者其余数据源(data sources.)

如下是一个从JSON文件建立DataFrame的小栗子:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 将DataFrame内容打印到stdout
df.show()

DataFrame操做

DataFrame提供告终构化数据的领域专用语言支持,包括ScalaJavaPython and R.

这里咱们给出一个结构化数据处理的基本示例:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 建立一个 DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 展现 DataFrame 的内容
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// 打印数据树形结构
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// select "name" 字段
df.select("name").show()
// name
// Michael
// Andy
// Justin

// 展现全部人,但全部人的 age 都加1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// 筛选出年龄大于21的人
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// 计算各个年龄的人数
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1

DataFrame的完整API列表请参考这里:API Documentation

除了简单的字段引用和表达式支持以外,DataFrame还提供了丰富的工具函数库,包括字符串组装,日期处理,常见的数学函数等。完整列表见这里:DataFrame Function Reference.

编程方式执行SQL查询

SQLContext.sql能够执行一个SQL查询,并返回DataFrame结果。

val sqlContext = ... // 已有一个 SQLContext 对象
val df = sqlContext.sql("SELECT * FROM table")

建立Dataset

Dataset API和RDD相似,不过Dataset不使用Java序列化或者Kryo,而是使用专用的编码器(Encoder )来序列化对象和跨网络传输通讯。若是这个编码器和标准序列化都能把对象转字节,那么编码器就能够根据代码动态生成,并使用一种特殊数据格式,这种格式下的对象不须要反序列化回来,就能容许Spark进行操做,如过滤、排序、哈希等。

// 对普通类型数据的Encoder是由 importing sqlContext.implicits._ 自动提供的
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // 返回: Array(2, 3, 4)

// 如下这行不只定义了case class,同时也自动为其建立了Encoder
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()

// DataFrame 只需提供一个和数据schema对应的class便可转换为 Dataset。Spark会根据字段名进行映射。
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]

和RDD互操做

Spark SQL有两种方法将RDD转为DataFrame。

1. 使用反射机制,推导包含指定类型对象RDD的schema。这种基于反射机制的方法使代码更简洁,并且若是你事先知道数据schema,推荐使用这种方式;

2. 编程方式构建一个schema,而后应用到指定RDD上。这种方式更啰嗦,但若是你事先不知道数据有哪些字段,或者数据schema是运行时读取进来的,那么你极可能须要用这种方式。

利用反射推导schema

Spark SQL的Scala接口支持自动将包含case class对象的RDD转为DataFrame。对应的case class定义了表的schema。case class的参数名经过反射,映射为表的字段名。case class还能够嵌套一些复杂类型,如Seq和Array。RDD隐式转换成DataFrame后,能够进一步注册成表。随后,你就能够对表中数据使用SQL语句查询了。

// sc 是已有的 SparkContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 为了支持RDD到DataFrame的隐式转换
import sqlContext.implicits._

// 定义一个case class.
// 注意:Scala 2.10的case class最多支持22个字段,要绕过这一限制,
// 你可使用自定义class,并实现Product接口。固然,你也能够改用编程方式定义schema
case class Person(name: String, age: Int)

// 建立一个包含Person对象的RDD,并将其注册成table
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// sqlContext.sql方法能够直接执行SQL语句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// SQL查询的返回结果是一个DataFrame,且可以支持全部常见的RDD算子
// 查询结果中每行的字段能够按字段索引访问:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// 或者按字段名访问:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] 会一次性返回多列,并以Map[String, T]为返回结果类型
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// 返回结果: Map("name" -> "Justin", "age" -> 19)

编程方式定义Schema

若是不能事先经过case class定义schema(例如,记录的字段结构是保存在一个字符串,或者其余文本数据集中,须要先解析,又或者字段对不一样用户有所不一样),那么你可能须要按如下三个步骤,以编程方式的建立一个DataFrame:

  1. 从已有的RDD建立一个包含Row对象的RDD
  2. 用StructType建立一个schema,和步骤1中建立的RDD的结构相匹配
  3. 把获得的schema应用于包含Row对象的RDD,调用这个方法来实现这一步:SQLContext.createDataFrame

For example:

例如:

// sc 是已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 建立一个RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// 数据的schema被编码与一个字符串中
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL 各个数据类型
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// 基于前面的字符串生成schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// 将RDD[people]的各个记录转换为Rows,即:获得一个包含Row对象的RDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// 将schema应用到包含Row对象的RDD上,获得一个DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// 将DataFrame注册为table
peopleDataFrame.registerTempTable("people")

// 执行SQL语句
val results = sqlContext.sql("SELECT name FROM people")

// SQL查询的结果是DataFrame,且可以支持全部常见的RDD算子
// 而且其字段能够以索引访问,也能够用字段名访问
results.map(t => "Name: " + t(0)).collect().foreach(println)

数据源

Spark SQL支持基于DataFrame操做一系列不一样的数据源。DataFrame既能够当成一个普通RDD来操做,也能够将其注册成一个临时表来查询。把DataFrame注册为table以后,你就能够基于这个table执行SQL语句了。本节将描述加载和保存数据的一些通用方法,包含了不一样的Spark数据源,而后深刻介绍一下内建数据源可用选项。

通用加载/保存函数

在最简单的状况下,全部操做都会以默认类型数据源来加载数据(默认是Parquet,除非修改了spark.sql.sources.default 配置)。

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

手动指定选项

你也能够手动指定数据源,并设置一些额外的选项参数。数据源可由其全名指定(如,org.apache.spark.sql.parquet),而对于内建支持的数据源,可使用简写名(json, parquet, jdbc)。任意类型数据源建立的DataFrame均可以用下面这种语法转成其余类型数据格式。

val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

直接对文件使用SQL

Spark SQL还支持直接对文件使用SQL查询,不须要用read方法把文件加载进来。

val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

保存模式

Save操做有一个可选参数SaveMode,用这个参数能够指定如何处理数据已经存在的状况。很重要的一点是,这些保存模式都没有加锁,因此其操做也不是原子性的。另外,若是使用Overwrite模式,实际操做是,先删除数据,再写新数据。

仅Scala/Java 全部支持的语言 含义
SaveMode.ErrorIfExists (default) "error" (default) (默认模式)从DataFrame向数据源保存数据时,若是数据已经存在,则抛异常。
SaveMode.Append "append" 若是数据或表已经存在,则将DataFrame的数据追加到已有数据的尾部。
SaveMode.Overwrite "overwrite" 若是数据或表已经存在,则用DataFrame数据覆盖之。
SaveMode.Ignore "ignore" 若是数据已经存在,那就放弃保存DataFrame数据。这和SQL里CREATE TABLE IF NOT EXISTS有点相似。

保存到持久化表

在使用HiveContext的时候,DataFrame能够用saveAsTable方法,将数据保存成持久化的表。与registerTempTable不一样,saveAsTable会将DataFrame的实际数据内容保存下来,而且在HiveMetastore中建立一个游标指针。持久化的表会一直保留,即便Spark程序重启也没有影响,只要你链接到同一个metastore就能够读取其数据。读取持久化表时,只须要用用表名做为参数,调用SQLContext.table方法便可获得对应DataFrame。

默认状况下,saveAsTable会建立一个”managed table“,也就是说这个表数据的位置是由metastore控制的。一样,若是删除表,其数据也会同步删除。

Parquet文件

Parquet 是一种流行的列式存储格式。Spark SQL提供对Parquet文件的读写支持,并且Parquet文件可以自动保存原始数据的schema。写Parquet文件的时候,全部的字段都会自动转成nullable,以便向后兼容。

编程方式加载数据

仍然使用上面例子中的数据:

// 咱们继续沿用以前例子中的sqlContext对象
// 为了支持RDD隐式转成DataFrame
import sqlContext.implicits._

val people: RDD[Person] = ... // 和上面例子中相同,一个包含case class对象的RDD

// 该RDD将隐式转成DataFrame,而后保存为parquet文件
people.write.parquet("people.parquet")

// 读取上面保存的Parquet文件(多个文件 - Parquet保存完实际上是不少个文件)。Parquet文件是自描述的,文件中保存了schema信息
// 加载Parquet文件,并返回DataFrame结果
val parquetFile = sqlContext.read.parquet("people.parquet")

// Parquet文件(多个)能够注册为临时表,而后在SQL语句中直接查询
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

分区发现

像Hive这样的系统,一个很经常使用的优化手段就是表分区。在一个支持分区的表中,数据是保存在不一样的目录中的,而且将分区键以编码方式保存在各个分区目录路径中。Parquet数据源如今也支持自动发现和推导分区信息。例如,咱们能够把以前用的人口数据存到一个分区表中,其目录结构以下所示,其中有2个额外的字段,gender和country,做为分区键:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

在这个例子中,若是须要读取Parquet文件数据,咱们只须要把 path/to/table 做为参数传给 SQLContext.read.parquet 或者 SQLContext.read.load。Spark SQL可以自动的从路径中提取出分区信息,随后返回的DataFrame的schema以下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

注意,分区键的数据类型将是自动推导出来的。目前,只支持数值类型和字符串类型数据做为分区键。

有的用户可能不想要自动推导出来的分区键数据类型。这种状况下,你能够经过 spark.sql.sources.partitionColumnTypeInference.enabled (默认是true)来禁用分区键类型推导。禁用以后,分区键老是被当成字符串类型。

从Spark-1.6.0开始,分区发现默认只在指定目录的子目录中进行。以上面的例子来讲,若是用户把 path/to/table/gender=male 做为参数传给 SQLContext.read.parquet 或者 SQLContext.read.load,那么gender就不会被做为分区键。若是用户想要指定分区发现的基础目录,能够经过basePath选项指定。例如,若是把 path/to/table/gender=male做为数据目录,而且将basePath设为 path/to/table,那么gender仍然会最为分区键。

Schema合并

像ProtoBuffer、Avro和Thrift同样,Parquet也支持schema演变。用户从一个简单的schema开始,逐渐增长所需的新字段。这样的话,用户最终会获得多个schema不一样但互相兼容的Parquet文件。目前,Parquet数据源已经支持自动检测这种状况,并合并全部文件的schema。

由于schema合并相对代价比较大,而且在多数状况下不是必要的,因此从Spark-1.5.0以后,默认是被禁用的。你能够这样启用这一功能:

  1. 读取Parquet文件时,将选项mergeSchema设为true(见下面的示例代码)
  2. 或者,将全局选项spark.sql.parquet.mergeSchema设为true
// 继续沿用以前的sqlContext对象
// 为了支持RDD隐式转换为DataFrame
import sqlContext.implicits._

// 建立一个简单的DataFrame,存到一个分区目录中
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")

// 建立另外一个DataFrame放到新的分区目录中,
// 并增长一个新字段,丢弃一个老字段
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")

// 读取分区表
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

// 最终的schema将由3个字段组成(single,double,triple)
// 而且分区键出如今目录路径中
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

Hive metastore Parquet table转换

在读写Hive metastore Parquet 表时,Spark SQL用的是内部的Parquet支持库,而不是Hive SerDe,由于这样性能更好。这一行为是由spark.sql.hive.convertMetastoreParquet 配置项来控制的,并且默认是启用的。

Hive/Parquet schema调和

Hive和Parquet在表结构处理上主要有2个不一样点:

  1. Hive大小写敏感,而Parquet不是
  2. Hive全部字段都是nullable的,而Parquet须要显示设置

因为以上缘由,咱们必须在Hive metastore Parquet table转Spark SQL Parquet table的时候,对Hive metastore schema作调整,调整规则以下:

  1. 两种schema中字段名和字段类型必须一致(不考虑nullable)。调和后的字段类型必须在Parquet格式中有相对应的数据类型,因此nullable是也是须要考虑的。
  2. 调和后Spark SQL Parquet table schema将包含如下字段:
    • 只出如今Parquet schema中的字段将被丢弃
    • 只出如今Hive metastore schema中的字段将被添加进来,并显式地设为nullable。

刷新元数据

Spark SQL会缓存Parquet元数据以提升性能。若是Hive metastore Parquet table转换被启用的话,那么转换过来的schema也会被缓存。这时候,若是这些表由Hive或其余外部工具更新了,你必须手动刷新元数据。

// 注意,这里sqlContext是一个HiveContext
sqlContext.refreshTable("my_table")

配置

Parquet配置能够经过 SQLContext.setConf 或者 SQL语句中 SET key=value来指定。

属性名 默认值 含义
spark.sql.parquet.binaryAsString false 有些老系统,如:特定版本的Impala,Hive,或者老版本的Spark SQL,不区分二进制数据和字符串类型数据。这个标志的意思是,让Spark SQL把二进制数据当字符串处理,以兼容老系统。
spark.sql.parquet.int96AsTimestamp true 有些老系统,如:特定版本的Impala,Hive,把时间戳存成INT96。这个配置的做用是,让Spark SQL把这些INT96解释为timestamp,以兼容老系统。
spark.sql.parquet.cacheMetadata true 缓存Parquet schema元数据。能够提高查询静态数据的速度。
spark.sql.parquet.compression.codec gzip 设置Parquet文件的压缩编码格式。可接受的值有:uncompressed, snappy, gzip(默认), lzo
spark.sql.parquet.filterPushdown true 启用过滤器下推优化,能够讲过滤条件尽可能推导最下层,已取得性能提高
spark.sql.hive.convertMetastoreParquet true 若是禁用,Spark SQL将使用Hive SerDe,而不是内建的对Parquet tables的支持
spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.
ParquetOutputCommitter
Parquet使用的数据输出类。这个类必须是 org.apache.hadoop.mapreduce.OutputCommitter的子类。通常来讲,它也应该是 org.apache.parquet.hadoop.ParquetOutputCommitter的子类。注意:1. 若是启用spark.speculation, 这个选项将被自动忽略

 

2. 这个选项必须用hadoop configuration设置,而不是Spark SQLConf

3. 这个选项会覆盖 spark.sql.sources.outputCommitterClass

Spark SQL有一个内建的org.apache.spark.sql.parquet.DirectParquetOutputCommitter, 这个类的在输出到S3的时候比默认的ParquetOutputCommitter类效率高。

spark.sql.parquet.mergeSchema false 若是设为true,那么Parquet数据源将会merge 全部数据文件的schema,不然,schema是从summary file获取的(若是summary file没有设置,则随机选一个)

JSON数据集

Spark SQL在加载JSON数据的时候,能够自动推导其schema并返回DataFrame。用SQLContext.read.json读取一个包含String的RDD或者JSON文件,便可实现这一转换。

注意,一般所说的json文件只是包含一些json数据的文件,而不是咱们所须要的JSON格式文件。JSON格式文件必须每一行是一个独立、完整的的JSON对象。所以,一个常规的多行json文件常常会加载失败。

// sc是已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 数据集是由路径指定的
// 路径既能够是单个文件,也能够仍是存储文本文件的目录
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)

// 推导出来的schema,可由printSchema打印出来
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// 将DataFrame注册为table
people.registerTempTable("people")

// 跑SQL语句吧!
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// 另外一种方法是,用一个包含JSON字符串的RDD来建立DataFrame
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

Hive表

Spark SQL支持从Apache Hive读写数据。然而,Hive依赖项太多,因此没有把Hive包含在默认的Spark发布包里。要支持Hive,须要在编译spark的时候增长-Phive和-Phive-thriftserver标志。这样编译打包的时候将会把Hive也包含进来。注意,hive的jar包也必须出如今全部的worker节点上,访问Hive数据时候会用到(如:使用hive的序列化和反序列化SerDes时)。

Hive配置在conf/目录下hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配置)文件中。请注意,若是在YARN cluster(yarn-cluster mode)模式下执行一个查询的话,lib_mananged/jar/下面的datanucleus 的jar包,和conf/下的hive-site.xml必须在驱动器(driver)和全部执行器(executor)均可用。一种简便的方法是,经过spark-submit命令的–jars和–file选项来提交这些文件。

若是使用Hive,则必须构建一个HiveContext,HiveContext是派生于SQLContext的,添加了在Hive Metastore里查询表的支持,以及对HiveQL的支持。用户没有现有的Hive部署,也能够建立一个HiveContext。若是没有在hive-site.xml里配置,那么HiveContext将会自动在当前目录下建立一个metastore_db目录,再根据HiveConf设置建立一个warehouse目录(默认/user/hive/warehourse)。因此请注意,你必须把/user/hive/warehouse的写权限赋予启动spark应用程序的用户。

// sc是一个已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 这里用的是HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

和不一样版本的Hive Metastore交互

Spark SQL对Hive最重要的支持之一就是和Hive metastore进行交互,这使得Spark SQL能够访问Hive表的元数据。从Spark-1.4.0开始,Spark SQL有专门单独的二进制build版本,能够用来访问不一样版本的Hive metastore,其配置表以下。注意,无论所访问的hive是什么版本,Spark SQL内部都是以Hive 1.2.1编译的,并且内部使用的Hive类也是基于这个版本(serdes,UDFs,UDAFs等)

如下选项可用来配置Hive版本以便访问其元数据:

属性名 默认值 含义
spark.sql.hive.metastore.version 1.2.1 Hive metastore版本,可选的值为0.12.0 到 1.2.1
spark.sql.hive.metastore.jars builtin 初始化HiveMetastoreClient的jar包。这个属性能够是如下三者之一:

 

  1. builtin

目前内建为使用Hive-1.2.1,编译的时候启用-Phive,则会和spark一块儿打包。若是没有-Phive,那么spark.sql.hive.metastore.version要么是1.2.1,要就是未定义

  1. maven

使用maven仓库下载的jar包版本。这个选项建议不要再生产环境中使用

  1. JVM格式的classpath。这个classpath必须包含全部Hive及其依赖的jar包,且包含正确版本的hadoop。这些jar包必须部署在driver节点上,若是你使用yarn-cluster模式,那么必须确保这些jar包也随你的应用程序一块儿打包
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc
一个逗号分隔的类名前缀列表,这些类使用classloader加载,且能够在Spark SQL和特定版本的Hive间共享。例如,用来访问hive metastore 的JDBC的driver就须要这种共享。其余须要共享的类,是与某些已经共享的类有交互的类。例如,自定义的log4j appender
spark.sql.hive.metastore.barrierPrefixes (empty) 一个逗号分隔的类名前缀列表,这些类在每一个Spark SQL所访问的Hive版本中都会被显式的reload。例如,某些在共享前缀列表(spark.sql.hive.metastore.sharedPrefixes)中声明为共享的Hive UD函数

用JDBC链接其余数据库

Spark SQL也能够用JDBC访问其余数据库。这一功能应该优先于使用JdbcRDD。由于它返回一个DataFrame,而DataFrame在Spark SQL中操做更简单,且更容易和来自其余数据源的数据进行交互关联。JDBC数据源在java和python中用起来也很简单,不须要用户提供额外的ClassTag。(注意,这与Spark SQL JDBC server不一样,Spark SQL JDBC server容许其余应用执行Spark SQL查询)

首先,你须要在spark classpath中包含对应数据库的JDBC driver,下面这行包括了用于访问postgres的数据库driver

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

远程数据库的表能够经过Data Sources API,用DataFrame或者SparkSQL 临时表来装载。如下是选项列表:

属性名 含义
url 须要链接的JDBC URL
dbtable 须要读取的JDBC表。注意,任何能够填在SQL的where子句中的东西,均可以填在这里。(既能够填完整的表名,也可填括号括起来的子查询语句)
driver JDBC driver的类名。这个类必须在master和worker节点上均可用,这样各个节点才能将driver注册到JDBC的子系统中。
partitionColumn, lowerBound, upperBound, numPartitions 这几个选项,若是指定其中一个,则必须所有指定。他们描述了多个worker如何并行的读入数据,并将表分区。partitionColumn必须是所查询的表中的一个数值字段。注意,lowerBound和upperBound只是用于决定分区跨度的,而不是过滤表中的行。所以,表中全部的行都会被分区而后返回。
fetchSize JDBC fetch size,决定每次获取多少行数据。在JDBC驱动上设成较小的值有利于性能优化(如,Oracle上设为10)
val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()

疑难解答

  • JDBC driver class必须在全部client session或者executor上,对java的原生classloader可见。这是由于Java的DriverManager在打开一个链接以前,会作安全检查,并忽略全部对原声classloader不可见的driver。最简单的一种方法,就是在全部worker节点上修改compute_classpath.sh,并包含你所需的driver jar包。
  • 一些数据库,如H2,会把全部的名字转大写。对于这些数据库,在Spark SQL中必须也使用大写。

性能调整

对于有必定计算量的Spark做业来讲,可能的性能改进的方式,不是把数据缓存在内存里,就是调整一些开销较大的选项参数。

内存缓存

Spark SQL能够经过调用SQLContext.cacheTable(“tableName”)或者DataFrame.cache()把tables以列存储格式缓存到内存中。随后,Spark SQL将会扫描必要的列,并自动调整压缩比例,以减小内存占用和GC压力。你也能够用SQLContext.uncacheTable(“tableName”)来删除内存中的table。

你还可使用SQLContext.setConf 或在SQL语句中运行SET key=value命令,来配置内存中的缓存。

属性名 默认值 含义
spark.sql.inMemoryColumnarStorage.compressed true 若是设置为true,Spark SQL将会根据数据统计信息,自动为每一列选择单独的压缩编码方式。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式缓存批量的大小。增大批量大小能够提升内存利用率和压缩率,但同时也会带来OOM(Out Of Memory)的风险。

其余配置选项

如下选项一样也能够用来给查询任务调性能。不过这些选项在将来可能被放弃,由于spark将支持愈来愈多的自动优化。

属性名 默认值 含义
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置join操做时,可以做为广播变量的最大table的大小。设置为-1,表示禁用广播。注意,目前的元数据统计仅支持Hive metastore中的表,而且须要运行这个命令:ANALYSE TABLE <tableName> COMPUTE STATISTICS noscan
spark.sql.tungsten.enabled true 设为true,则启用优化的Tungsten物理执行后端。Tungsten会显式的管理内存,并动态生成表达式求值的字节码
spark.sql.shuffle.partitions 200 配置数据混洗(shuffle)时(join或者聚合操做),使用的分区数。

分布式SQL引擎

Spark SQL能够做为JDBC/ODBC或者命令行工具的分布式查询引擎。在这种模式下,终端用户或应用程序,无需写任何代码,就能够直接在Spark SQL中运行SQL查询。

运行Thrift JDBC/ODBC server

这里实现的Thrift JDBC/ODBC server和Hive-1.2.1中的HiveServer2是相同的。你可使用beeline脚原本测试Spark或者Hive-1.2.1的JDBC server。

在Spark目录下运行下面这个命令,启动一个JDBC/ODBC server

./sbin/start-thriftserver.sh

这个脚本能接受全部 bin/spark-submit 命令支持的选项参数,外加一个 –hiveconf 选项,来指定Hive属性。运行./sbin/start-thriftserver.sh –help能够查看完整的选项列表。默认状况下,启动的server将会在localhost:10000端口上监听。要改变监听主机名或端口,能够用如下环境变量:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...

或者Hive系统属性 来指定

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

接下来,你就能够开始在beeline中测试这个Thrift JDBC/ODBC server:

./bin/beeline

下面的指令,能够链接到一个JDBC/ODBC server

beeline> !connect jdbc:hive2://localhost:10000

可能须要输入用户名和密码。在非安全模式下,只要输入你本机的用户名和一个空密码便可。对于安全模式,请参考beeline documentation.

Hive的配置是在conf/目录下的hive-site.xml,core-site.xml,hdfs-site.xml中指定的。

你也能够在beeline的脚本中指定。

Thrift JDBC server也支持经过HTTP传输Thrift RPC消息。如下配置(在conf/hive-site.xml中)将启用HTTP模式:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

一样,在beeline中也能够用HTTP模式链接JDBC/ODBC server:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

使用Spark SQL命令行工具

Spark SQL CLI是一个很方便的工具,它能够用local mode运行hive metastore service,而且在命令行中执行输入的查询。注意Spark SQL CLI目前还不支持和Thrift JDBC server通讯。

用以下命令,在spark目录下启动一个Spark SQL CLI

./bin/spark-sql

Hive配置在conf目录下hive-site.xml,core-site.xml,hdfs-site.xml中设置。你能够用这个命令查看完整的选项列表:./bin/spark-sql –help

升级指南

1.5升级到1.6

  • 从Spark-1.6.0起,默认Thrift server 将运行于多会话并存模式下(multi-session)。这意味着,每一个JDBC/ODBC链接有其独立的SQL配置和临时函数注册表。table的缓存仍然是公用的。若是你更喜欢老的单会话模式,只需设置spark.sql.hive.thriftServer.singleSession为true便可。固然,你也可在spark-defaults.conf中设置,或者将其值传给start-thriftserver.sh –conf(以下):
./sbin/start-thriftserver.sh \
     --conf spark.sql.hive.thriftServer.singleSession=true \
     ...

1.4升级到1.5

  • Tungsten引擎如今默认是启用的,Tungsten是经过手动管理内存优化执行计划,同时也优化了表达式求值的代码生成。这两个特性均可以经过把spark.sql.tungsten.enabled设为false来禁用。
  • Parquet schema merging默认不启用。须要启用的话,设置spark.sql.parquet.mergeSchema为true便可
  • Python接口支持用点(.)来访问字段内嵌值,例如df[‘table.column.nestedField’]。但这也意味着,若是你的字段名包含点号(.)的话,你就必须用重音符来转义,如:table.`column.with.dots`.nested。
  • 列式存储内存分区剪枝默认是启用的。要禁用,设置spark.sql.inMemoryColumarStorage.partitionPruning为false便可
  • 再也不支持无精度限制的decimal。Spark SQL如今强制最大精度为38位。对于BigDecimal对象,类型推导将会使用(38,18)精度的decimal类型。若是DDL中没有指明精度,默认使用的精度是(10,0)
  • 时间戳精确到1us(微秒),而不是1ns(纳秒)
  • 在“sql”这个SQL变种设置中,浮点数将被解析为decimal。HiveQL解析保持不变。
  • 标准SQL/DataFrame函数均为小写,例如:sum vs SUM。
  • 当推测任务被启用是,使用DirectOutputCommitter是不安全的,所以,DirectOutputCommitter在推测任务启用时,将被自动禁用,且忽略相关配置。
  • JSON数据源再也不自动加载其余程序产生的新文件(例如,不是Spark SQL插入到dataset中的文件)。对于一个JSON的持久化表(如:Hive metastore中保存的表),用户可使用REFRESH TABLE这个SQL命令或者HiveContext.refreshTable来把新文件包括进来。

1.3升级到1.4

DataFrame数据读写接口

根据用户的反馈,咱们提供了一个新的,更加流畅的API,用于数据读(SQLContext.read)写(DataFrame.write),同时老的API(如:SQLCOntext.parquetFile, SQLContext.jsonFile)将被废弃。

有关SQLContext.read和DataFrame.write的更详细信息,请参考API文档。

DataFrame.groupBy保留分组字段

根据用户的反馈,咱们改变了DataFrame.groupBy().agg()的默认行为,在返回的DataFrame结果中保留了分组字段。若是你想保持1.3中的行为,设置spark.sql.retainGroupColumns为false便可。

// 在1.3.x中,若是要保留分组字段"department", 你必须显式的在agg聚合时包含这个字段
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// 而在1.4+,分组字段"department"默认就会包含在返回的DataFrame中
df.groupBy("department").agg(max("age"), sum("expense"))

// 要回滚到1.3的行为(不包含分组字段),按以下设置便可:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")

1.2升级到1.3

在Spark 1.3中,咱们去掉了Spark SQL的”Alpha“标签,并清理了可用的API。从Spark 1.3起,Spark SQL将对1.x系列二进制兼容。这个兼容性保证不包括显式的标注为”unstable(如:DeveloperAPI或Experimental)“的API。

SchemaRDD重命名为DataFrame

对于用户来讲,Spark SQL 1.3最大的改动就是SchemaRDD更名为DataFrame。主要缘由是,DataFrame再也不直接由RDD派生,而是经过本身的实现提供RDD的功能。DataFrame只须要调用其rdd方法就能转成RDD。

在Scala中仍然有SchemaRDD,只不过这是DataFrame的一个别名,以便兼容一些现有代码。但仍然建议用户改用DataFrame。Java和Python用户就没这个福利了,他们必须改代码。

统一Java和Scala API

在Spark 1.3以前,有单独的java兼容类(JavaSQLContext和JavaSchemaRDD)及其在Scala API中的镜像。Spark 1.3中将Java API和Scala API统一。两种语言的用户都应该使用SQLContext和DataFrame。通常这些类中都会使用两种语言中都有的类型(如:Array取代各语言独有的集合)。有些状况下,没有通用的类型(例如:闭包或者maps),将会使用函数重载来解决这个问题。

另外,java特有的类型API被删除了。Scala和java用户都应该用org.apache.spark.sql.types来编程描述一个schema。

隐式转换隔离,DSL包移除 – 仅针对scala

Spark 1.3以前的不少示例代码,都在开头用 import sqlContext._,这行将会致使全部的sqlContext的函数都被引入进来。所以,在Spark 1.3咱们把RDDs到DataFrames的隐式转换隔离出来,单独放到SQLContext.implicits对象中。用户如今应该这样写:import sqlContext.implicits._

另外,隐式转换也支持由Product(如:case classes或tuples)组成的RDD,但须要调用一个toDF方法,而不是自动转换。

若是须要使用DSL(被DataFrame取代的API)中的方法,用户以前须要导入DSL(import org.apache.spark.sql.catalyst.dsl), 而如今应该要导入 DataFrame API(import org.apache.spark.sql.functions._)

移除org.apache.spark.sql中DataType别名 – 仅针对scala

Spark 1.3删除了sql包中的DataType类型别名。如今,用户应该使用 org.apache.spark.sql.types中的类。

UDF注册挪到sqlContext.udf中 – 针对java和scala

注册UDF的函数,无论是DataFrame,DSL或者SQL中用到的,都被挪到SQLContext.udf中。

sqlContext.udf.register("strLen", (s: String) => s.length())

Python UDF注册保持不变。

Python DataTypes再也不是单例

在python中使用DataTypes,你须要先构造一个对象(如:StringType()),而不是引用一个单例。

Shark用户迁移指南

调度

用户能够经过以下命令,为JDBC客户端session设定一个Fair Scheduler pool。

SET spark.sql.thriftserver.scheduler.pool=accounting;

Reducer个数

在Shark中,默认的reducer个数是1,而且由mapred.reduce.tasks设定。Spark SQL废弃了这个属性,改成 spark.sql.shuffle.partitions, 而且默认200,用户可经过以下SET命令来自定义:

SET spark.sql.shuffle.partitions=10;
SELECT page, count(*) c
FROM logs_last_month_cached
GROUP BY page ORDER BY c DESC LIMIT 10;

你也能够把这个属性放到hive-site.xml中来覆盖默认值。

目前,mapred.reduce.tasks属性仍然能被识别,而且自动转成spark.sql.shuffle.partitions

缓存

shark.cache表属性已经不存在了,而且以”_cached”结尾命名的表也再也不会自动缓存。取而代之的是,CACHE TABLE和UNCACHE TABLE语句,用以显式的控制表的缓存:

CACHE TABLE logs_last_month;
UNCACHE TABLE logs_last_month;

注意:CACHE TABLE tbl 如今默认是饥饿模式,而非懒惰模式。不再须要手动调用其余action来触发cache了!

从Spark-1.2.0开始,Spark SQL新提供了一个语句,让用户本身控制表缓存是不是懒惰模式

CACHE [LAZY] TABLE [AS SELECT] ...

如下几个缓存相关的特性再也不支持:

  • 用户定义分区级别的缓存逐出策略
  • RDD 重加载
  • 内存缓存直接写入策略

兼容Apache Hive

Spark SQL设计时考虑了和Hive metastore,SerDes以及UDF的兼容性。目前这些兼容性斗是基于Hive-1.2.1版本,而且Spark SQL能够连到不一样版本的Hive metastore(从0.12.0到1.2.1,参考:http://spark.apache.org/docs/latest/sql-programming-guide.html#interacting-with-different-versions-of-hive-metastore

部署在已有的Hive仓库之上

Spark SQL Thrift JDBC server采用了”out of the box”(开箱即用)的设计,使用很方便,并兼容已有的Hive安装版本。你不须要修改已有的Hive metastore或者改变数据的位置,或者表分区。

支持的Hive功能

Spark SQL 支持绝大部分Hive功能,如:

  • Hive查询语句:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 全部的Hive操做符:
    • Relational operators (===<><>>=<=, etc)
    • Arithmetic operators (+-*/%, etc)
    • Logical operators (AND&&OR||, etc)
    • Complex type constructors
    • Mathematical functions (signlncos, etc)
    • String functions (instrlengthprintf, etc)
  • 用户定义函数(UDF)
  • 用户定义聚合函数(UDAF)
  • 用户定义序列化、反序列化(SerDes)
  • 窗口函数(Window functions)
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • 查询子句
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • 采样
  • 执行计划详细(Explain)
  • 分区表,包括动态分区插入
  • 视图
  • 全部Hive DDL(data definition language):
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • 绝大部分Hive数据类型:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

不支持的Hive功能

如下是目前不支持的Hive特性的列表。多数是不经常使用的。

不支持的Hive常见功能

  • bucket表:butcket是Hive表的一个哈希分区

不支持的Hive高级功能

  • UNION类操做
  • 去重join
  • 字段统计信息收集:Spark SQL不支持同步的字段统计收集

Hive输入、输出格式

  • CLI文件格式:对于须要回显到CLI中的结果,Spark SQL仅支持TextOutputFormat。
  • Hadoop archive — Hadoop归档

Hive优化

一些比较棘手的Hive优化目前尚未在Spark中提供。有一些(如索引)对应Spark SQL这种内存计算模型来讲并不重要。另一些,在Spark SQL将来的版本中会支持。

  • 块级别位图索引和虚拟字段(用来建索引)
  • 自动计算reducer个数(join和groupBy算子):目前在Spark SQL中你须要这样控制混洗后(post-shuffle)并发程度:”SET spark.sql.shuffle.partitions=[num_tasks];”
  • 元数据查询:只查询元数据的请求,Spark SQL仍须要启动任务来计算结果
  • 数据倾斜标志:Spark SQL不会理会Hive中的数据倾斜标志
  • STREAMTABLE join提示:Spark SQL里没有这玩艺儿
  • 返回结果时合并小文件:若是返回的结果有不少小文件,Hive有个选项设置,来合并小文件,以免超过HDFS的文件数额度限制。Spark SQL不支持这个。

参考

数据类型

Spark SQL和DataFrames支持以下数据类型:

  • Numeric types(数值类型)
    • ByteType: 1字节长的有符号整型,范围:-128 到 127.
    • ShortType: 2字节长有符号整型,范围:-32768 到 32767.
    • IntegerType: 4字节有符号整型,范围:-2147483648 到 2147483647.
    • LongType: 8字节有符号整型,范围: -9223372036854775808 to 9223372036854775807.
    • FloatType: 4字节单精度浮点数。
    • DoubleType: 8字节双精度浮点数
    • DecimalType: 任意精度有符号带小数的数值。内部使用java.math.BigDecimal, BigDecimal包含任意精度的不缩放整型,和一个32位的缩放整型
  • String type(字符串类型)
    • StringType: 字符串
  • Binary type(二进制类型)
    • BinaryType: 字节序列
  • Boolean type(布尔类型)
    • BooleanType: 布尔类型
  • Datetime type(日期类型)
    • TimestampType: 表示包含年月日、时分秒等字段的日期
    • DateType: 表示包含年月日字段的日期
  • Complex types(复杂类型)
    • ArrayType(elementType, containsNull):数组类型,表达一系列的elementType类型的元素组成的序列,containsNull表示数组可否包含null值
    • MapType(keyType, valueType, valueContainsNull):映射集合类型,表示一个键值对的集合。键的类型是keyType,值的类型则由valueType指定。对应MapType来讲,键是不能为null的,而值可否为null则取决于valueContainsNull。
    • StructType(fields):表示包含StructField序列的结构体。
      • StructField(name, datatype, nullable): 表示StructType中的一个字段,name是字段名,datatype是数据类型,nullable表示该字段是否能够为空

全部Spark SQL支持的数据类型都在这个包里:org.apache.spark.sql.types,你能够这样导入之:

import  org.apache.spark.sql.types._
Data type Value type in Scala API to access or create a data type
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])注意:默认containsNull为true
MapType scala.collection.Map MapType(keyTypevalueType, [valueContainsNull])注意:默认valueContainsNull为true
StructType org.apache.spark.sql.Row StructType(fields)注意:fields是一个StructFields的序列,而且同名的字段是不容许的。
StructField 定义字段的数据对应的Scala类型(例如,若是StructField的dataType为IntegerType,则其数据对应的scala类型为Int) StructField(namedataTypenullable)

NaN语义

这是Not-a-Number的缩写,某些float或double类型不符合标准浮点数语义,须要对其特殊处理:

  • NaN == NaN,即:NaN和NaN老是相等
  • 在聚合函数中,全部NaN分到同一组
  • NaN在join操做中能够当作一个普通的join key
  • NaN在升序排序中排到最后,比任何其余数值都大

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文连接地址: 《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南

Favorite添加本文到个人收藏

Related Posts:

  1. 《Spark 官方文档》Spark快速入门
  2. 《Spark 官方文档》
  3. 《Spark 官方文档》Spark安全性
  4. 《Spark 官方文档》硬件配置
  5. 《Spark 官方文档》监控和工具
  6. 《Spark 官方文档》Spark做业调度
  7. 《Spark 官方文档》在Mesos上运行Spark
  8. 《Spark 官方文档》Spark编程指南
  9. 《Spark 官方文档》Spark独立模式
  10. 《Spark 官方文档》在YARN上运行Spark
  11. 《Spark 官方文档》Spark配置
  12. 《Spark 官方文档》在Amazon EC2上运行Spark
  13. 《Spark 官方文档》Spark调优
  14. 《Spark官方文档》Spark Streaming编程指南
  15. Apache Storm 官方文档 —— 使用非 JVM 语言开发
相关文章
相关标签/搜索