==> 什么是 Spark SQL?sql
---> Spark SQL 是 Spark 用来处理结构化数据的一个模块
数据库
---> 做用:提供一个编程抽象(DataFrame) 而且做为分布式 SQL 查询引擎apache
---> 运行原理:将 Spark SQL 转化为 RDD, 而后提交到集群执行
编程
---> 特色:json
---- 容易整合
分布式
---- 统一的数据访问方式ide
---- 兼容 Hive函数
---- 标准的数据链接优化
==> SparkSessionspa
---> 特色:(2.0引用 SparkSession)
---- 为用户提供一个统一的切入点使用Spark 各项功能
---- 容许用户经过它调用 DataFrame 和 Dataset 相关 API 来编写程序
---- 减小了用户须要了解的一些概念,能够很容易的与 Spark 进行交互
---- 与 Spark 交互之时不须要显示的建立 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中
==> DataFrames 组织成命名列的数据集,等同于数据库中的表
---> 与 RDD 相比较:
---- RDD 是分布式的 Java 对象 的集合
---- DataFrame 是分布式 Row 对象的集合
---> 建立 DataFrames
---- 经过 case class 建立 DataFrames
// 定义 case class (至关于表的结构) case class Emp(Empno:Int, ename:String, job:String, mgr:String, hiredate:String, sal:Int, comm:String, deptno:Int) // 将 HDFS 上的数据读入 RDD, 并将 RDD 与 case class 关联 val lines = sc.textFile("hdfs://bigdata0:9000/input/emp.csv").map(_.split(",")) val emp = lines.map(x=> Emp(x(0).toInt, x(1), x(2), x(3), x(4), x(5).toInt, x(6), x(7).toInt)) ` // 将RDD 转换成 DataFrames val empDF = emp.toDF // 经过 DataFrames 查询数据 empDF.show
---- 经过 SparkSession 建立 DataFrames
// 建立 StructType 来定义结构,注意,须要先导入模块 import org.apache.spark.sql.types._ val myschema = StructType(List( StructField("empno", DataTypes.IntegerType), StructField("ename", DataTypes.StringType), StructField("job", DataTypes.StringType), StructField("mgr", DataTypes.StringType), StructField("hiredate", DataTypes.StringType), StructField("sal", DataTypes.IntegerType), StructField("comm", DataTypes.StringType), StructField("deptno", DataTypes.IntegerType) )) // 读入数据且切分数据 val empcsvRDD = sc.textFile("hdfs://bigdata0:9000/input/emp.csv").map(_.split(",")) // 将 RDD 数据映射成 Row,须要 import org.apache.spark.sql.Row import org.apache.spark.sql.Row val rowRDD = empcsvRDD.map(line=> Row(line(0).toInt, line(1), line(2), line(3),line(4), line(5).toInt, line(6), line(7).toInt) // 建立 DataFrames val df = spark.createDataFrame(rowRDD, myschema) // 查看表 df.show
---- 使用 Json 文件来建立 DataFrame
val df = spark.read.json("Json 文件") // 查看数据 df.show
---> DataFrame 操做 DataFrame 操做也称为无类型的 Dataset操做
---- 查询全部员工姓名
df.select("ename").show
---- 查询全部员工姓名和薪水,并给薪水加 100 元
df.select($"ename", $"sal", $"sal"+ 100).show
---- 查询工资大于2000的员工
df.select($"sal" > 2000).show
---- 求每一个部门员工数
df.groupBy($"deptno").count.show
---- 在 DataFrame 中使用 SQL 语句 注: 须要首先将 DataFrame 注册成表(视图)
df.createOrReplaceTempView("emp") // 执行查询 spark.sql("select * from emp").show
---> 临时视图(2种):
---- 只在当前会话中有效 df.createOrReplaceTempView("emp1")
---- 在全局有效 df.createGlobalTempView("emp2")
==> Datasets
---> 数据的分布式集合
--->特色:
---- Spark1.6中添加的新接口,是DataFrame之上更高一级的抽象
---- 提供了 RDD的优势(强类型化,使用 lambda函数的能力)
---- Spark SQL 优化后的执行引擎
---- 能够从 JVM 对象构造,而后使用函数转换(map, flatMap, filter等)去操做
---- 支持 Scala 和 Java,不支持 Python
---> 建立 DataSet
---- 使用序列
// 定义 case class case class MyData(a:String, b:String) // 生成序列并建立 DataSet val ds = Seq(MyData(1, "Tom"), MyData(2, "Mary")).toDS // 查看结果 ds.show
---- 使用 Json 数据
// 定义 case class case class Person(name:String, gender:String) //经过 Json 数据生成 DataFrame val df = spark.read.json(sc.parallelize("""{"gender":"Male", "name": "Tom"}""" ::Nil)) // 将 DataFrame 转成 DataSet df.as[Person].show df.as[Person].collect
---- 经过使用 DHFS 执行 WordCount 程序
// 读取 HDFS 数据,并建立 DataSet val linesDS = spark.read.text("hdfs://bigdata0:9000/input/data.txt").as[String] // 对DataSet 进行操做:分词后, 查询长度大于3 的单词 val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3) // 查看结果 words.show words.collect // 执行wordcount 程序 val result = linesDS.flatMap(_.split(" ").map((_.1)).groupByKey(x=> x._1).count) result.show // 排序 result.orderBy($"value").show
==> Datasets 操做
---> joinWith 和 join 的区别是链接后的新 Dataset 的 schema 会不同
// 使用 emp.json 生成 DataFrame val empDF = spark.read.json("/root/resources/emp.json") // 查询工资大于 3000 的员工 empDF.where($"sal" > 3000).show // 建立 case class case class Emp(empno:Lone, ename:String, job:String, hiredate:String, mgr:String, sal:Long, comm:String, deptno:Long) // 生成 DataSets,并查询数据 val empDS = empDF.as[Emp] // 查询工资大于 3000 的员工 empDS.filter(_.sal > 3000).show // 查看 10 号部门的员工 empDS.filter(_.deptno == 10) // 多表查询 // 1.建立部门表 val deptRDD = sc.textFile("/test/dept.csv").map(_.split(",")) case class Dept(deptno:Int, dname:String, loc:String) val deptDS = deptRDD.map(x=>Dept(x(0).toInt, x(1), x(2))).toDS // 2.建立员工表 case class Emp(empno:Int, ename:String, job:String, mgr:String, hiredate:String, sal:Int, comm:String, deptno:Int) val empRDD = sc.textFile("/test/emp.csv").map(_.split(",")) val empDS = empRDD.map(x=> Emp(x(0).toInt, x(1), x(2), x(3), x(4), x(5).toInt, x(6), x(7).toInt)) // 3.执行多表查询: 等值连接 val result = deptDF.join(empDS, "deptno") // 另外一种写法: 注意有三个等号 val result = deptDS.joinWith(empDS, deptDS("deptno") === empDS("deptno")) // 查看执行计划 result.explain