Spark SQL
是 Spark
用来处理结构化数据的一个模块,它提供了2个编程抽象:php
DataFrame
和DataSet
,而且做为分布式 SQL 查询引擎的做用。html
以 Hive
做为对比,Hive
是将 Hive SQL
转换成 MapReduce
而后提交到集群上执行,大大简化了编写 MapReduce
的程序的复杂性,因为 MapReduce
这种计算模型执行效率比较慢。全部Spark SQL
的应运而生,它是将 Spark SQL
转换成 RDD
,而后提交到集群执行,执行效率很是快。java
这里引用 Spark 官网:mysql
在 Spark
中,DataFrame
是一种以 RDD
为基础的分布式数据集,相似于传统数据库中的二维表格。sql
DataFrame
与 RDD
的主要区别在于,前者带有 schema
元信息,即 DataFrame
所表示的二维表数据集的每一列都带有名称和类型。这使得 Spark SQL
得以洞察更多的结构信息,从而对藏于DataFrame
背后的数据源以及做用于 DataFrame
之上的变换进行了针对性的优化,最终达到大幅提高运行时效率的目标。shell
反观 RDD
,因为无从得知所存二维数据的内部结构,Spark Core
只能在 stage
层面进行简单、通用的流水线优化。数据库
图示apache
DataFrame
也是懒执行的,但性能上比 RDD
要高,主要缘由:编程
优化的执行计划,即查询计划经过 Spark Catalyst Optimiser
进行优化。好比下面一个例子:json
看看 Spark Core
和 Spark SQL
模块对这个计划的执行步骤:
DataSet
也是分布式数据集合。
DataSet
是 Spark 1.6
中添加的一个新抽象,是 DataFrame
的一个扩展。它提供了 RDD
的优点(强类型,使用强大的 Lambda
函数的能力)以及 Spark SQL
优化执行引擎的优势,DataSet
也可使用功能性的转换(操做 map
,flatMap
,filter
等等)。
具体的优点以下:
1)是 DataFrame API
的一个扩展,SparkSQL
最新的数据抽象;
2)用户友好的 API
风格,既具备类型安全检查也具备 DataFrame
的查询优化特性;
3)用样例类来对 DataSet
中定义数据的结构信息,样例类中每一个属性的名称直接映射到 DataSet
中的字段名称;
4)DataSet
是强类型的。好比能够有 DataSet[Car]
,DataSet[Person]
在老的版本中,Spark SQL
提供两种 SQL
查询起始点:一个叫 SQLContext
,用于 Spark
本身提供的 SQL
查询;一个叫 HiveContext
,用于链接 Hive
的查询。
SparkSession
是 Spark
最新的 SQL
查询起始点,实质上是 SQLContext
和 HiveContext
的组合,因此在 SQLContex
和 HiveContext
上可用的 API
在 SparkSession
上一样是可使用的。
SparkSession
内部封装了 SparkContext
,因此计算其实是由 SparkContext
完成的。
在 Spark SQL
中 SparkSession
是建立 DataFrame
和执行 SQL
的入口,建立 DataFrame
有三种方式
Spark
的数据源进行建立;RDD
进行转换;Hive Table
进行查询返回Spark
数据源进行建立的文件格式json
文件建立 DataFrame
RDD
转换(详见 2.5 节)Hive Table
转换(详见 3.3节)直接经过 SQL
语句对 DataFrame
的数据进行操做
DataFrame
DataFrame
建立一个临时表建立临时表的三种方式
SQL
语句实现查询全表注意:普通临时表是 Session
范围内的,若是想应用范围内有效,可使用全局临时表。使用全局临时表时须要全路径访问,如:global_temp.people
DataFrame
建立一个全局表df.createGlobalTempView("people")
复制代码
SQL
语句实现查询全表spark.sql("SELECT * FROM global_temp.people").show()
复制代码
spark.newSession().sql("SELECT * FROM global_temp.people").show()
复制代码
以上两行代码的执行效果一致~
使用更为简洁的语法对 DataFrame
的数据操做
建立一个 DataFrame
(同上)
查看 DataFrame
的 Schema
信息
name
列数据name
列数据以及 age+1
数据age
大于 21
的数据age
分组,查看数据条数我的感受简单的操做可使用 DSL
,复杂查询再使用 SQL
是一个很不错的方案
注意:DSL
方法由 DataFrame
调用,而 SQL
由 SparkSession
调用
注意:若是须要 RDD
与 DF
或者 DS
之间操做,那么都须要引入 import spark.implicits._
【spark不是包名,而是 SparkSession
对象的名称】
前置条件:导入隐式转换并建立一个 RDD
经过反射肯定(须要用到样例类)
case class People(name:String, age:Int)
复制代码
RDD
转换为 DataFrame
import org.apache.spark.sql.types._
复制代码
Schema
val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)
复制代码
import org.apache.spark.sql.Row
复制代码
RDD
val data = rdd.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}
复制代码
schema
建立 DataFrame
val dataFrame = spark.createDataFrame(data, structType)
复制代码
DataSet
是具备强类型的数据集合,须要提供对应的类型信息。
DataSet
的建立能够直接使用 Seq
静态方法建立 或者 RDD
转换 或者 DataFrame
转换
case class Person(name: String, age: Long)
复制代码
DataSet
Spark SQL
可以自动将包含有 case
类的 RDD
转换成 DataFrame
,case
类定义了 table
的结构,case
类属性经过反射变成了表的列名。case
类能够包含诸如 Seqs
或者 Array
等复杂的结构。
RDD
case class Person(name: String, age: Int)
复制代码
RDD
转化为 DataSet
DateFrame
DataSet
val ds = Seq(Person("Andy", 32)).toDS()
复制代码
DataSet
转化为 DataFrame
val df = ds.toDF
复制代码
使用 as
方法,转成 Dataset
,这在数据类型是 DataFrame
又须要针对各个字段处理时极为方便。在使用一些特殊的操做时,必定要加上 import spark.implicits._
否则 toDF
、toDS
没法使用。
在 Spark SQL
中 Spark
为咱们提供了两个新的抽象,分别是 DataFrame
和 DataSet
。他们和 RDD
有什么区别呢?首先从版本的产生上来看:
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
若是一样的数据都给到这三个数据结构,他们分别计算以后,都会给出相同的结果。不一样是的他们的执行效率和执行方式。在后期的 Spark
版本中,DataSet
有可能会逐步取代 RDD
和 DataFrame
成为惟一的 API
接口。
(1)RDD
、DataFrame
、Dataset
全都是 spark
平台下的分布式弹性数据集,为处理超大型数据提供便利;
(2)三者都有惰性机制,在进行建立、转换,如 map
方法时,不会当即执行,只有在遇到 Action
如 foreach
时,三者才会开始遍历运算;
(3)三者有许多共同的函数,如 filter
,排序
等;
(4)在对 DataFrame
和 Dataset
进行操做许多操做都须要这个包:import spark.implicits._
(在建立好 SparkSession
对象后尽可能直接导入)
这里给出关于这三者讲解比较深刻的文章
经过一个简单的案例快速入手如何在 IDEA
上开发 Spark SQL
程序
导入如下依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
复制代码
代码实现
object Main2 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
import session.implicits._
val dataFrame: DataFrame = session.read.json("/home/cris/people.json")
//打印
dataFrame.show()
//DSL风格:查询年龄在21岁以上的
dataFrame.filter($"age" > 21).show()
//建立临时表
dataFrame.createOrReplaceTempView("persons")
//SQL风格:查询年龄在21岁以上的
session.sql("SELECT * FROM persons where age > 21").show()
//关闭链接
session.stop()
}
}
复制代码
若是在执行 Scala
或者是 java
程序中,报没法找到主类执行的异常,多是项目的结构有问题,将父模块直接移除掉,而后从新导入父模块便可
object MyFunc {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.read.json("/home/cris/people.json")
/*用户自定义 UDF 函数*/
session.udf.register("addName", (x: String) => {
"cool:" + x
})
dataFrame.createOrReplaceTempView("people")
session.sql("select addName(name),age from people").show()
session.stop()
}
}
复制代码
结果以下
强类型的 Dataset
和弱类型的 DataFrame
都提供了相关的聚合函数, 如 count()
,countDistinct()
,avg()
,max()
,min()
。
除此以外,用户能够设定本身的自定义聚合函数。经过继承 UserDefinedAggregateFunction
来实现用户自定义聚合函数
/** * 定义本身的 UDAF 函数 * * @author cris * @version 1.0 **/
object MyFunc extends UserDefinedAggregateFunction {
// 聚合函数输入参数的数据类型
override def inputSchema: StructType = StructType(StructField("inputField", LongType) :: Nil)
// 聚合缓冲区中值得数据类型
override def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// 返回值的数据类型
override def dataType: DataType = DoubleType
// 对于相同的输入是否一直返回相同的输出
override def deterministic: Boolean = true
// 初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
// 工资的总额
buffer(0) = 0L
// 员工人数
buffer(1) = 0L
}
// 相同 Executor 间的数据合并
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
// 不一样 Executor 间的数据合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 最终函数计算的返回值
override def evaluate(buffer: Row): Double = {
buffer.getLong(0).toDouble / buffer.getLong(1)
}
}
复制代码
测试代码
object MyFuncTest2 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.read.json("/home/cris/employees.json")
session.udf.register("avg", MyFunc)
dataFrame.createTempView("emp")
session.sql("select avg(salary) as avg_sal from emp").show()
session.stop()
}
}
复制代码
测试以下
read
方法直接加载数据scala> spark.read.
csv jdbc json orc parquet textFile… …
复制代码
注意:加载数据的相关参数需写到上述方法中。如:textFile
需传入加载数据的路径,jdbc
需传入 JDBC
相关参数
format
方法(了解)scala> spark.read.format("…")[.option("…")].load("…")
复制代码
用法详解:
(1)format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
(2)load("…"):在"csv"、"orc"、"parquet"和"textFile"格式下须要传入加载数据的路径。
(3)option("…"):在"jdbc"格式下须要传入JDBC相应参数,url、user、password和dbtable
write
直接保存数据scala> df.write.
csv jdbc json orc parquet textFile… …
复制代码
注意:保存数据的相关参数需写到上述方法中。如:textFile
需传入加载数据的路径,jdbc
需传入 JDBC
相关参数
format
指定保存数据类型(了解)scala> df.write.format("…")[.option("…")].save("…")
复制代码
用法详解:
(1)format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
(2)save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下须要传入保存数据的路径。
(3)option("…"):在"jdbc"格式下须要传入JDBC相应参数,url、user、password和dbtable
object Main2 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.read.json("/home/cris/people.json")
//建立临时表
dataFrame.createOrReplaceTempView("persons")
//SQL风格:查询年龄在21岁以上的
val frame: DataFrame = session.sql("SELECT * FROM persons where age > 21")
frame.show()
frame.write.json("/home/cris/output")
//关闭链接
session.stop()
}
}
复制代码
执行效果
能够采用SaveMode
执行存储操做,SaveMode
定义了对数据的处理模式。SaveMode
是一个枚举类,其中的常量包括:
(1)Append
:当保存路径或者表已存在时,追加内容;
(2)Overwrite
: 当保存路径或者表已存在时,覆写内容;
(3)ErrorIfExists
:当保存路径或者表已存在时,报错;
(4)Ignore
:当保存路径或者表已存在时,忽略当前的保存操做
使用以下
df.write.mode(SaveMode.Append).save("… …")
复制代码
记得保存选项放在 save
操做以前执行
Spark SQL
的默认数据源为 Parquet
格式。数据源为 Parquet
文件时,Spark SQL
能够方便的执行全部的操做。修改配置项 spark.sql.sources.default
,可修改默认数据源格式。
val df = spark.read.load("./examples/src/main/resources/users.parquet")
复制代码
df.select("name", " color").write.save("user.parquet")
复制代码
Spark SQL
可以自动推测 JSON
数据集的结构,并将它加载为一个 Dataset[Row]
. 能够经过 SparkSession.read.json()
去加载一个 一个 JSON
文件。
注意:这个JSON文件不是一个传统的JSON文件,每一行都得是一个JSON串。格式以下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
复制代码
Spark-Shell
操做以下:
import spark.implicits._
复制代码
JSON
文件val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
复制代码
peopleDF.createOrReplaceTempView("people")
复制代码
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
+------+
| name|
+------+
|Justin|
+------+
复制代码
Spark SQL
能够经过 JDBC
从关系型数据库中读取数据的方式建立 DataFrame
,经过对 DataFrame
一系列的计算后,还能够将数据再写回关系型数据库中。
可在启动 shell
时指定相关的数据库驱动路径,或者将相关的数据库驱动放到 Spark
的类路径下(推荐)。
Spark-Shell
[cris@hadoop101 spark-local]$ bin/spark-shell --master spark://hadoop101:7077 [--jars mysql-connector-java-5.1.27-bin.jar]
复制代码
建议将 MySQL
的驱动直接放入到 Spark
的类(jars
)路径下,就不用每次进入 Spark-Shell
带上 --jar
参数了
JDBC
相关参数配置信息val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "000000")
复制代码
read.jdbc
加载参数val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop102:3306/spark", "person", connectionProperties)
复制代码
format
形式加载配置参数(不推荐)val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/spark").option("dbtable", " person").option("user", "root").option("password", "000000").load()
复制代码
write.jdbc
保存数据(可使用文件保存选项)jdbcDF2.write.mode(org.apache.spark.sql.SaveMode.Append).jdbc("jdbc:mysql://hadoop102:3306/spark", "person", connectionProperties)
复制代码
format
形式保存数据(不推荐)jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/spark")
.option("dbtable", "person")
.option("user", "root")
.option("password", "000000")
.save()
复制代码
pom.xml
导入 MySQL
驱动依赖<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
复制代码
MySQL
表数据IDEA
操做代码以下/** * IDEA 测试 Spark SQL 链接远程的 MySQL 获取数据和写入数据 * * @author cris * @version 1.0 **/
object MysqlTest {
def main(args: Array[String]): Unit = {
// 获取 SparkSession
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
// 设置配置参数
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "000000")
// 从 MySQL 获取数据,show() 方法实际调用的是 show(20),默认显示 20 行数据
val dataFrame: DataFrame = session.read.jdbc("jdbc:mysql://hadoop102:3306/spark?characterEncoding=UTF-8", "person", properties)
dataFrame.show()
// 修改并保存数据到 MySQL
dataFrame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop102:3306/spark?characterEncoding=UTF-8", "person", properties)
session.stop()
}
}
复制代码
注意:防止中文乱码,url
加上 ?characterEncoding=UTF-8
;写入数据最好指定保存模式 SaveMode
测试以下:
Apache Hive
是 Hadoop
上的 SQL
引擎,Spark SQL
编译时能够包含 Hive
支持,也能够不包含。包含 Hive
支持的 Spark SQL
能够支持 Hive
表访问、UDF
(用户自定义函数)以及 Hive
查询语言(HQL
)等。Spark-Shell
默认是Hive
支持的;代码中是默认不支持的,须要手动指定(加一个参数便可)。
若是要使用内嵌的 Hive
,直接用就能够了。
指定路径下就会生成该表的文件夹
在当前 Spark-local
路径下,建立文件 bb
1
2
3
4
5
复制代码
而后建立表,导入数据
查询也没有问题
对应目录下也生成了 bb
表的文件夹
若是想链接外部已经部署好的 Hive
,须要经过如下几个步骤:
Hive
中的 hive-site.xml
拷贝或者软链接到 Spark
安装目录下的 conf
目录下[cris@hadoop101 spark-local]$ cp /opt/module/hive-1.2.1/conf/hive-site.xml ./conf/
复制代码
JDBC
的驱动包放置在 Spark
的 .jars
目录下,启动 Spark-Shell
[cris@hadoop101 spark-local]$ cp /opt/module/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar ./jars/
复制代码
能够经过 Hive
的客户端建立一张表 users
hive> create table users(id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
复制代码
并导入数据
hive> load data local inpath './user.txt' into users;
复制代码
此时 HDFS
显示数据导入成功
在 Spark-Shell
窗口查看
执行 Hive
的查询语句
能够在 Spark-Shell
执行全部的 Hive
语句,而且执行流程走的是 Spark
,而不是 MapReduce
Spark SQL CLI
能够很方便的在本地运行 Hive
元数据服务以及从命令行执行查询任务。在 Spark
目录下执行以下命令启动 Spark SQL CLI
,直接执行 SQL
语句,相似一个 Hive
窗口。
[cris@hadoop101 spark-local]$ bin/spark-sql
复制代码
若是使用这种模式进行测试,最好将 log4j
的日志级别设置为 error
,不然会有不少 info
级别的日志输出
首先 pom.xml
引入 Hive
依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
复制代码
而后将 Hive
的配置文件 hive-site.xml
放入 resource
路径下
hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop102:3306/metastore?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>000000</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>hadoop101,hadoop102,hadoop103</value>
<description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
<description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
</configuration>
复制代码
具体的配置介绍这里再也不赘述,能够参考个人 Hive 笔记
测试代码以下:
/** * IDEA 测试 Spark SQL 和 Hive 的联动 * * @author cris * @version 1.0 **/
object HiveTest {
def main(args: Array[String]): Unit = {
// 注意开启 enableHiveSupport
val session: SparkSession = SparkSession.builder().enableHiveSupport().appName("spark sql").master("local[*]")
.getOrCreate()
session.sql("show tables").show()
// 注意关闭 session 链接
session.stop()
}
}
复制代码
执行结果以下
正好就是刚才建立的 Hive
表
Cris
的 IDEA
设置一行字数最多 120
,不然就自动换行,大大提升阅读的温馨感和编码的规范性
由于 Cris
使用的是 Linux
桌面系统 Deepin
,因此常用自带的 Terminal
链接远程服务器,这里给出快速右键复制 Terminal
内容的设置
由于 Cris
以前使用的是 MacBook
,输入法搜狗会很智能的为输入的英文进行先后空格分割,换了 Deepin
后,自带的虽然也是搜狗输入法,可是没有对英文自动空格分割的功能了,后面想一想办法,看怎么解决~
由于要对英文和重要内容进行突出显示,Typora
中设置 code
的快捷键默认为 Ctrl+Shift+`,比较麻烦,网上找了找自定义快捷键的设置,最后设置成 Ctrl+C