随着大数据时代的来临,Hadoop风靡一时。为了使熟悉RDBMS但又不理解MapReduce的技术人员快速进行大数据开发,Hive应运而生。Hive是当时惟一运行在Hadoop上的SQL-on-Hadoop工具。java
可是MapReduce计算过程当中大量的中间磁盘落地过程消耗了大量的I/O,下降的运行效率。为了提升SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是:python
Shark是伯克利实验室Spark生态的组件之一,它修改了Hive Driver的内存管理、物理计划、执行三个模块,使之能运行在Spark引擎上,从而使得SQL查询的速度获得10-100倍的提高。sql
Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等等),制约了Spark的One Stack Rule Them All的既定方针,制约了Spark各个组件的相互集成,因此提出了SparkSQL项目。数据库
SparkSQL抛弃原有Shark的代码,汲取了Shark的一些优势,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,从新开发了SparkSQL代码。因为摆脱了对Hive的依赖性,SparkSQL不管在数据兼容、性能优化、组件扩展方面都获得了极大地提高。apache
不但兼容Hive,还能够从RDD、parquet文件、JSON文件中获取数据,也支持获取RDBMS数据以及cassandra等NOSQL数据。编程
除了采起In-Memory Columnar Storage、byte-code generation等优化技术外,引进Cost Model对查询进行动态评估、获取最佳物理计划等。json
不管是SQL的语法解析器、分析器仍是优化器均可以从新定义,进行扩展。api
2014年Shark中止开发,团队将全部资源放SparkSQL项目上,至此,Shark的发展画上了句号,但也所以发展出两条线:SparkSQL和Hive on Spark。性能优化
其中SparkSQL做为Spark生态的一员继续发展,而再也不受限于Hive,只是兼容Hive;而Hive on Spark是一个Hive的发展计划,该计划将Spark做为Hive的底层引擎之一,也就是说,Hive将再也不受限于一个引擎,能够采用Map-Reduce、Tez、Spark等引擎。网络
Spark SQL是一个用于结构化数据处理的模块。Spark SQL赋予待处理数据一些结构化信息,可使用SQL语句或DataSet API接口与Spark SQL进行交互。
Spark SQL可使用sql读写Hive中的数据;也能够在编程语言中使用sql,返回Dataset/DataFrame结果集。
Dataset是一个分布式数据集,它结合了RDD与SparkSQL执行引擎的优势。Dataset能够经过JVM对象构造,而后使用算子操做进行处理。Java和Scala都有Dataset API;Python和R自己支持Dataset特性。
DataFrame是一个二维结构的DataSet,至关于RDBMS中的表。DataFrame能够有多种方式构造,好比结构化数据文件、hive表、外部数据库、RDD等。在Scala、Java、Python及R中都有DataFrame API。
import org.apache.spark.sql.SparkSession // 构造SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // 建立DataFrame val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // DataFrame操做 // This import is needed to use the $-notation import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
import org.apache.spark.sql.SparkSession; //构造SparkSession SparkSession spark = SparkSession .builder() .appName("Java Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate(); //建立DataFrame import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ //DataFrame操做 // col("...") is preferable to df.col("...") import static org.apache.spark.sql.functions.col; // Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show(); // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select(col("name"), col("age").plus(1)).show(); // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter(col("age").gt(21)).show(); // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show(); // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
from pyspark.sql import SparkSession # 构造SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() # 建立DataFrame # spark is an existing SparkSession df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ # DataFrame操做 # spark, df are from the previous example # Print the schema in a tree format df.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Select only the "name" column df.select("name").show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ # Select everybody, but increment the age by 1 df.select(df['name'], df['age'] + 1).show() # +-------+---------+ # | name|(age + 1)| # +-------+---------+ # |Michael| null| # | Andy| 31| # | Justin| 20| # +-------+---------+ # Select people older than 21 df.filter(df['age'] > 21).show() # +---+----+ # |age|name| # +---+----+ # | 30|Andy| # +---+----+ # Count people by age df.groupBy("age").count().show() # +----+-----+ # | age|count| # +----+-----+ # | 19| 1| # |null| 1| # | 30| 1| # +----+-----+
Datasets和RDD相似,但使用专门的Encoder编码器来序列化须要通过网络传输的数据对象,而不用RDD使用的Java序列化或Kryo库。Encoder编码器是动态生成的代码,容许直接执行各类算子操做,而不用反序列化。
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
import java.util.Arrays; import java.util.Collections; import java.io.Serializable; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; public static class Person implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } // Create an instance of a Bean class Person person = new Person(); person.setName("Andy"); person.setAge(32); // Encoders are created for Java beans Encoder<Person> personEncoder = Encoders.bean(Person.class); Dataset<Person> javaBeanDS = spark.createDataset( Collections.singletonList(person), personEncoder ); javaBeanDS.show(); // +---+----+ // |age|name| // +---+----+ // | 32|Andy| // +---+----+ // Encoders for most common types are provided in class Encoders Encoder<Integer> integerEncoder = Encoders.INT(); Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder); Dataset<Integer> transformedDS = primitiveDS.map( (MapFunction<Integer, Integer>) value -> value + 1, integerEncoder); transformedDS.collect(); // Returns [2, 3, 4] // DataFrames can be converted to a Dataset by providing a class. Mapping based on name String path = "examples/src/main/resources/people.json"; Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder); peopleDS.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") //df.createGlobalTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people"); //df.createGlobalTempView("people") Dataset<Row> sqlDF = spark.sql("SELECT * FROM people"); sqlDF.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
# Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") # df.createGlobalTempView("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。