大致翻译自:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql.htmlhtml
如同通常的 Spark 处理,Spark SQL 本质上也是大规模的基于内存的分布式计算。git
Spark SQL 和 RDD 计算模型最大的区别在于数据处理的框架不一样。Spark SQL 能够经过多种不一样的方式对结构化的数据和半结构化的数据进行处理。它既可使用 SQL , HiveQL 这种结构化查询查询语言,也可使用类 SQL,声明式,类型安全的Dataset API 进行查询,这种被称为 Structured Query DSL。sql
Spark SQL 支持 批处理(Batch) 和流式处理(Struct streaming) 两种处理方式。数据库
不管使用什么样的查询方式,全部的查询都会转化为一个由 Catalyst expressions 组成的树,在这个过程当中会对不断的对查询进行优化。express
在 Spark 2.0 之后, Spark SQL 已经成为了 Spark 计算平台最主要的接口, 它经过更高层次的抽象封装了RDD,方便用户经过 SQL 处理数据。apache
// Define the schema using a case class case class Person(name: String, age: Int) // you could read people from a CSV file // It's been a while since you saw RDDs, hasn't it? // Excuse me for bringing you the old past. import org.apache.spark.rdd.RDD val peopleRDD: RDD[Person] = sc.parallelize(Seq(Person("Jacek", 10))) // Convert RDD[Person] to Dataset[Person] and run a query // Automatic schema inferrence from existing RDDs scala> val people = peopleRDD.toDS people: org.apache.spark.sql.Dataset[Person] = [name: string, age: int] // Query for teenagers using Scala Query DSL scala> val teenagers = people.where('age >= 10).where('age <= 19).select('name).as[String] teenagers: org.apache.spark.sql.Dataset[String] = [name: string] scala> teenagers.show +-----+ | name| +-----+ |Jacek| +-----+ // You could however want to use good ol' SQL, couldn't you? // 1. Register people Dataset as a temporary view in Catalog people.createOrReplaceTempView("people") // 2. Run SQL query val teenagers = sql("SELECT * FROM people WHERE age >= 10 AND age <= 19") scala> teenagers.show +-----+---+ | name|age| +-----+---+ |Jacek| 10| +-----+---+
经过启动 Hive 支持 (enableHiveSupport),用户能够 HiveQL 对 Hive 中的数据进行处理。json
sql("CREATE OR REPLACE TEMPORARY VIEW v1 (key INT, value STRING) USING csv OPTIONS ('path'='people.csv', 'header'='true')") // Queries are expressed in HiveQL sql("FROM v1").show scala> sql("desc EXTENDED v1").show(false) +----------+---------+-------+ |col_name |data_type|comment| +----------+---------+-------+ |# col_name|data_type|comment| |key |int |null | |value |string |null | +----------+---------+-------+
和其它的数据库同样, Spark SQL 经过 Logical Query Plan Optimizer, code generation , Tungsten execution engine 来这些措施进行优化。安全
Spark SQL 引入了一种抽象的表格式的数据结构 Dataset。 经过 Dataset, Spark SQL 能够更加方便、快速的处理大批量的结构化数据。数据结构
下面的片断展现了如何读取JSON文件,而后将一种一部分数据保存为CSV文件。框架
spark.read .format("json") .load("input-json") .select("name", "score") .where($"score" > 15) .write .format("csv") .save("output-csv")
DataSet 是 Spark SQL 中最核心的抽象。他表示了一批已知 schema 的结构化数据。这些数据能够能够保存在JVM 堆外的内存中,而且变为列压缩的二进制串,来增长计算的速度,减小内存的使用和GC。
Spark SQL 支持 predicate pushdown 对 DataSet 的性能进行优化,而且能够在运行时生成优化代码。
Spark SQL 包含了如下几种 API:
Spark SQL 经过 DataFrameReader 和 DataFrameWrite 这两个统一的接口来访问 HDFS 等存储系统。
Spark SQL 定义了集中不一样类型的函数:
若是你已经将一个 CSV 加载到一个 dataframe 中了,那你能够经过将 dataframe 注册为 table, 而后使用 SQL 进行查询。
// Example 1 val df = Seq(1 -> 2).toDF("i", "j") val query = df.groupBy('i) .agg(max('j).as("aggOrdering")) .orderBy(sum('j)) .as[(Int, Int)] query.collect contains (1, 2) // true // Example 2 val df = Seq((1, 1), (-1, 1)).toDF("key", "value") df.createOrReplaceTempView("src") scala> sql("SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp").show +-------------------+ |(IF((a > 0), a, 0))| +-------------------+ | 1| | 0| +-------------------+