========== Spark SQL ==========
一、Spark SQL 是 Spark 的一个模块,能够和 RDD 进行混合编程、支持标准的数据源、能够集成和替代 Hive、能够提供 JDBC、ODBC 服务器功能。java
二、Spark SQL 的特色:
(1)和 Spark Core 的无缝集成,能够在写整个 RDD 应用的时候,配合 Spark SQL 来实现逻辑。
(2)统一的数据访问方式,Spark SQL 提供标准化的 SQL 查询。
(3)Hive 的集成,Spark SQL 经过内嵌的 Hive 或者链接外部已经部署好的 Hive 实例,实现了对 Hive 语法的集成和操做。
(4)标准化的链接方式,Spark SQL 能够经过启动 thrift Server 来支持 JDBC、ODBC 的访问,即将本身做为一个 BI Server 来使用。mysql
三、Spark SQL 能够执行 SQL 语句,也能够执行 HQL 语句,将运行的结果做为 Dataset 和 DataFrame(将查询出来的结果转换成 RDD,相似于 hive 将 sql 语句转换成 mapreduce)。算法
四、Spark SQL 的计算速度(Spark sql 比 Hive 快了至少一个数量级,尤为是在 Tungsten 成熟之后会更加无可匹敌),Spark SQL 推出的 DataFrame 可让数据仓库直接使用机器学习、图计算等复杂的算法库来对数据仓库进行复杂深度数据价值的挖掘。sql
五、老版本中使用 hivecontext,如今使用 sparkSession。shell
========== Spark SQL 的数据抽象 ==========
0、RDD(Spark1.0)-> DataFrame(Spark1.3)-> DataSet(Spark1.6)
一、Spark SQL 提供了 DataFrame 和 DataSet 数据抽象。
二、DataFrame 就是 RDD + Schema,能够认为是一张二维表格。DataFrame 也是懒执行的、不可变的。DataFrame 性能上比 RDD 要高。
三、DataFrame 是一个弱类型的数据对象,DataFrame 的劣势是在编译期不进行表格中的字段的类型检查。在运行期进行检查。相似于 java.sql.ResultSet 类,只能经过 getString 这种方式来获取具体数据。
四、DataSet 是 Spark 最新的数据抽象,Spark 的发展会逐步将 DataSet 做为主要的数据抽象,弱化 RDD 和 DataFrame。DataSet 包含了 DataFrame 全部的优化机制。除此以外提供了以样例类为 Schema 模型的强类型。
五、type DataFrame = Dataset[Row]
六、DataFrame 和 DataSet 都有可控的内存管理机制,全部数据都保存在非堆内存
上,节省了大量空间以外,还摆脱了GC的限制。都使用了 catalyst 进行 SQL 的优化。可使得不太会使用 RDD 的工程师写出相对高效的代码。
七、RDD 和 DataFrame 和 DataSet 之间能够进行数据转换。数据库
========== Spark SQL 的初探 -- 客户端查询 ==========
一、你能够经过 spark-shell 或者 spark-sql 来操做 Spark SQL,注意:spark 做为 SparkSession 的变量名,sc 做为 SparkContext 的变量名。
二、你能够经过 Spark 提供的方法读取 JSON 文件,将 JSON 文件转换成 DataFrame。
三、你能够经过 DataFrame 提供的 API 来操做 DataFrame 里面的数据。
四、你能够经过将 DataFrame 注册成为一个临时表的方式,来经过 Spark.sql 方法运行标准的 SQL 语句来查询。express
小细节:
show() --> 表格
collect() --> RDD 打印apache
========== IDEA 建立 Spark SQL 程序 ==========
一、Spark SQL 读取 json 须要 json 文件中一行是一个 json 对象。
二、经过建立 SparkSession 来使用 SparkSQL:
示例代码以下:编程
package com.atguigu.sparksql
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object HelloWorld {
val logger = LoggerFactory.getLogger(HelloWorld.getClass)
def main(args: Array[String]) {
// 建立 SparkSession 并设置 App 名称
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// 经过隐式转换将 RDD 操做添加到 DataFrame 上(将 RDD 转成 DataFrame)
import spark.implicits._
// 经过 spark.read 操做读取 JSON 数据
val df = spark.read.json("examples/src/main/resources/people.json")
// show 操做相似于 Action,将 DataFrame 直接打印到 Console 上
df.show()
// DSL 风格的使用方式:属性的获取方法 $
df.filter($"age" > 21).show()
//将 DataFrame 注册为表
df.createOrReplaceTempView("persons")
// 执行 Spark SQL 查询操做
spark.sql("select * from perosns where age > 21").show()
// 关闭资源
spark.stop()
}
}
========== DataFrame 查询方式 ==========
一、DataFrame 支持两种查询方式:一种是 DSL 风格,另一种是 SQL 风格。
DSL 风格:
(1)你须要引入 import spark.implicit._ 这个隐式转换,能够将 DataFrame 隐式转换成 RDD。
示例:
df.select("name").show()
df.filter($"age" > 25).show()json
SQL 风格:
(1)你须要将 DataFrame 注册成一张表格,若是你经过 createOrReplaceTempView 这种方式来建立,那么该表当前 Session 有效,若是你经过 createGlobalTempView 来建立,那么该表跨 Session 有效,可是 SQL 语句访问该表的时候须要加上前缀 global_temp.xxx。
(2)你须要经过 sparkSession.sql 方法来运行你的 SQL 语句。
示例:
一个 SparkContext 能够屡次建立 SparkSession。
// Session 内可访问,一个 SparkSession 结束后,表自动删除。
df.createOrReplaceTempView("persons") // 使用表名不须要任何前缀
// 应用级别内可访问,一个 SparkContext 结束后,表自动删除。
df.createGlobalTempView("persons") // 使用表名须要加上“global_temp.” 前缀,好比:global_temp.persons
========== DataSet 建立方式 ==========
一、定义一个 DataSet,首先你须要先定义一个 case 类。
========== RDD、DataFrame、DataSet 之间的转换总结 ==========
一、RDD -> DataFrame : rdd.map(para => (para(0).trim(), para(1).trim().toInt)).toDF("name", "age") // RDD -> 元组 -> toDF()(注意:这是第一种方式)
二、DataFrame -> RDD : df.rdd 注意输出类型
:res2: Array[org.apache.spark.sql.Row] = Array([Michael,29], [Andy,30], [Justin,19])
一、 RDD -> DataSet : rdd.map(para => Person(para(0).trim(), para(1).trim().toInt)).toDS() // 须要先定义样例类 -> toDS()
二、 DataSet -> RDD : ds.rdd注意输出类型
:res5: Array[Person] = Array(Person(Michael,29), Person(Andy,30), Person(Justin,19))
一、 DataFrame -> DataSet : df.as[Person] // 传入类型
二、 DataSet -> DataFrame : ds.toDF()
========== DataFrame 的 Schema 的获取方式 ==========
RDD -> DataFram 的三种方式:
// 将没有包含 case 类的 RDD 转换成 DataFrame
rdd.map(para => (para(0).trim(), para(1).trim().toInt)).toDF("name", "age") // RDD -> 元组 -> toDF()(注意:这是第一种方式)
// 将包含有 case 类的 RDD 转换成 DataFrame,注意:须要咱们先定义 case 类
// 经过反射的方式来设置 Schema 信息,适合于编译期能肯定列的状况
rdd.map(attributes => Person(attributes(0), attributes(1).trim().toInt)).toDF() // 样例类-> RDD -> toDF()(注意:这是第二种方式)
// 经过编程的方式来设置 Schema 信息,适合于编译期不能肯定列的状况(注意:这是第三种方式)
val schemaString = "name age" // 实际开发中 schemaString 是动态生成的
val fields = schemaString.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val rdd[Row] = rdd.map(attributes => Row(attributes(0.trim), attributes(1).trim))
val peopeDF = spark.createDataFrame(rdd[Row], schema)
========== 对于 DataFrame Row 对象的访问方式 ==========
一、由 DataFrame = Dataset[Row] 可知, DataFrame 里面每一行都是 Row 对象。
二、若是须要访问 Row 对象中的每个元素,能够经过索引 row(0);也能够经过列名 row.getAsString 或者索引 row.getAsInt。
========== 应用 UDF 函数(用户自定义函数) ==========
一、经过 spark.udf.register(funcName, func) 来注册一个 UDF 函数,name 是 UDF 调用时的标识符,即函数名,fun 是一个函数,用于处理字段。
二、你须要将一个 DF 或者 DS 注册为一个临时表。
三、经过 spark.sql 去运行一个 SQL 语句,在 SQL 语句中能够经过 funcName(列名) 方式来应用 UDF 函数。
示例代码以下:
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
scala> spark.udf.register("addName", (x: String) => "Name:" + x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> df.createOrReplaceTempView("people")
scala> spark.sql("select addName(name), age from people").show()
scala> spark.sql("select addName(name) as newName, age from people").show()
========== 应用 UDAF 函数(用户自定义聚合函数) ==========
一、弱类型用户自定义聚合函数
步骤以下:
(1)新建一个 Class 继承UserDefinedAggregateFunction,而后复写方法:
// 聚合函数须要输入参数的数据类型
override def inputSchema: StructType = ???
// 聚合缓冲区中值的数据类型
override def bufferSchema: StructType = ???
// 返回值的数据类型
override def dataType: DataType = ???
// 对于相同的输入一直有相同的输出
override def deterministic: Boolean = true
// 用于初始化你的数据结构
override def initialize(buffer: MutableAggregationBuffer): Unit = ???
// 相同 Execute 间的数据合并(同一分区)
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ???
// 不一样 Execute 间的数据合并(不一样分区)
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ???
// 计算最终结果
override def evaluate(buffer: Row): Any = ???
(2)你须要经过 spark.udf.resigter 去注册你的 UDAF 函数。
(3)须要经过 spark.sql 去运行你的 SQL 语句,能够经过 select UDAF(列名) 来应用你的用户自定义聚合函数。
二、强类型的用户自定义聚合函数
步骤以下:
(1)新建一个class,继承Aggregator[Employee, Average, Double]
其中 Employee 是在应用聚合函数的时候传入的对象,Average 是聚合函数在运行的时候内部须要的数据结构,Double 是聚合函数最终须要输出的类型。这些能够根据本身的业务需求去调整。
复写相对应的方法:
// 用于定义一个聚合函数内部须要的数据结构
override def zero: Average = ???
// 针对每一个分区内部每个输入来更新你的数据结构
override def reduce(b: Average, a: Employee): Average = ???
// 用于对于不一样分区的结构进行聚合
override def merge(b1: Average, b2: Average): Average = ???
// 计算输出
override def finish(reduction: Average): Double = ???
// 设定之间值类型的编码器,要转换成 case 类
// Encoders.product 是进行 scala 元组和 case 类转换的编码器
override def bufferEncoder: Encoder[Average] = ???
// 设定最终输出值的编码器
override def outputEncoder: Encoder[Double] = ???
二、新建一个 UDAF 实例,经过 DF 或者 DS 的 DSL 风格语法去应用。
========== Spark SQL 的输入和输出 ==========
一、对于 Spark SQL 的输入须要使用 sparkSession.read 方法
(1)通用模式 sparkSession.read.format("json").load("path") 支持的类型有:parquet、json、text、csv、orc、jdbc、......
(2)专业模式 sparkSession.read.json("path") 或 csv 或 ... 即直接指定类型
二、对于 Spark SQL 的输出须要使用 sparkSession.write 方法
(1)通用模式 dataFrame.write.format("json").save("path") 支持的类型有:parquet、json、text、csv、orc、jdbc、......
(2)专业模式 dataFrame.write.csv("path") 或 json 或 ... 即直接指定类型
三、若是使用通用模式,则 spark 默认的 parquet 是默认格式,那么 sparkSession.read.load 它加载的默认是 parquet 格式;dataFrame.write.save 也是默认保存成 parquet 格式。
四、注意
:若是须要保存成一个 text 文件,那么须要 dataFrame 里面只有一列数据。
========== Spark SQL 与 Hive 的集成 ==========
内置 Hive
一、Spark 内置有 Hive,Spark 2.1.1 内置的 Hive 是 1.2.1。
二、若是要使用内嵌的 Hive,什么都不用作,直接用就能够了。可是呢,此时的咱们只能建立表,若是查询表的话会报错,缘由是:本地有 spark-warehouse 目录,而其余机器节点没有 spark-warehouse 目录。解决办法以下:
三、须要将 core-site.xml 和 hdfs-site.xml 拷贝到 spark 的 conf 目录下,而后分发至其余机器节点。若是 spark 路径下发现有 metastore_db 和 spark-warehouse,删除掉。而后重启集群。
四、在第一次启动建立 metastore 的时候,须要指定 spark.sql.warehouse.dir 这个参数,
好比:bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://hadoop102:9000/spark_warehouse
五、注意
:若是在 load 数据的时候,须要先将数据放到 HDFS 上。
外部 Hive
一、须要将 hive-site.xml 拷贝到 spark 的 conf 目录下,而后分发至其余机器节点。
二、若是 hive 的 metestore 使用的是 mysql 数据库,那么须要将 mysql 的 jdbc 驱动包放到 spark 的 jars 目录下。
三、能够经过 spark-sql 或者 spark-shell 来进行 sql 的查询,完成和 hive 的链接。
hive、spark、hdfs 关系: spark 文件中有两个文件夹:spark-warehouse、metastore_db,当咱们拷贝 hive-site.xml 文件到 spark 的 conf 目录后,会读取 Hive 中的 warehouse 文件,获取到 hive 中的表格数据。