本篇博客主要介绍了Spark SQL中的filter过滤数据、去重、集合等基本操做,以及一些经常使用日期函数,随机函数,字符串操做等函数的使用,并列编写了示例代码,同时还给出了代码当中用到的一些数据,放在最文章最后。sql
Spark SQL是Spark生态系统中很是重要的组件,其前身为Shark。Shark是Spark上的数据仓库,最初设计成与Hive兼容,可是该项目于2014年开始中止开发,转向Spark SQL。Spark SQL全面继承了Shark,并进行了优化。 Spark SQL增长了SchemaRDD(即带有Schema信息的RDD),使用户能够在Spark SQL中执行SQL语句,数据既能够来自RDD,也能够来自Hive、HDFS、Cassandra等外部数据源,还能够是JSON格式的数据。Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范。数据库
Spark SQL能够很好地支持SQL查询,一方面,能够编写Spark应用程序使用SQL语句进行数据查询,另外一方面,也可使用标准的数据库链接器(好比JDBC或ODBC)链接Spark进行SQL查询 。apache
去重json
distinct:根据每条数据进行完整去重。app
dropDuplicates:根据字段去重。函数
package spark2x import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * 类名 DistinctDemo * 做者 彭三青 * 建立时间 2018-11-29 15:02 * 版本 1.0 * 描述: $ 去重操做:distinct、drop */ object DistinctDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[2]") .appName("Operations") .getOrCreate() import spark.implicits._ val employeeDF: DataFrame = spark.read.json("E://temp/person.json") val employeeDS: Dataset[Employee] = employeeDF.as[Employee] println("--------------------distinct---------------------") // 根据每条数据进行完整的去重 employeeDS.distinct().show() println("--------------------dropDuplicates---------------------") // 根据字段进行去重 employeeDS.dropDuplicates(Seq("name")).show() } } case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
过滤优化
filter():括号里的参数能够是过滤函数、函数返回的Boolean值(为true则保留,false则过滤掉)、列名或者表达式。ui
except:过滤出当前DataSet中有,但在另外一个DataSet中不存在的。spa
intersect:获取两个DataSet的交集。scala
提示:except和intersect使用的时候必需要是相同的实例,若是把另一个的Employee换成一个一样的字段的Person类就会报错。
package spark2x import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * 类名 FilterDemo * 做者 彭三青 * 建立时间 2018-11-29 15:09 * 版本 1.0 * 描述: $ */ object FilterDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[2]") .appName("FilterDemo") .getOrCreate() import spark.implicits._ val employeeDF: DataFrame = spark.read.json("E://temp/employee.json") val employeeDS: Dataset[Employee] = employeeDF.as[Employee] val employee2DF: DataFrame = spark.read.json("E://temp/employee2.json") val employee2DS: Dataset[Employee] = employee2DF.as[Employee] println("--------------------employee--------------------") employeeDS.show() println("--------------------employee2--------------------") employee2DS.show() println( " ┏┓ ┏┓\n" + " ┏┛┻━━━┛┻┓\n" + " ┃ ┃\n" + " ┃ ━ ┃\n" + " ┃ ┳┛ ┗┳ ┃\n" + " ┃ ┃\n" + " ┃ ┻ ┃\n" + " ┃ ┃\n" + " ┗━┓ ┏━┛\n" + " ┃ ┃\n" + " ┃ ┃\n" + " ┃ ┗━━━┓\n" + " ┃ ┣┓\n" + " ┃ ┏┛\n" + " ┗┓┓┏━┳┓┏┛\n" + " ┃┫┫ ┃┫┫\n" + " ┗┻┛ ┗┻┛\n" ) println("-------------------------------------------------") // 若是参数返回true,就保留该元素,不然就过滤掉 employeeDS.filter(employee => employee.age == 35).show() employeeDS.filter(employee => employee.age > 30).show() // 获取当前的DataSet中有,可是在另一个DataSet中没有的元素 employeeDS.except(employee2DS).show() // 获取两个DataSet的交集 employeeDS.intersect(employee2DS).show() spark.stop() } } case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
集合
collect_set:将一个分组内指定字段的值都收集到一块儿,不去重
collect_list:讲一个分组内指定字段的值都收集到一块儿,会去重
package spark2x import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * 类名 CollectSetAndList * 做者 彭三青 * 建立时间 2018-11-29 15:24 * 版本 1.0 * 描述: $ collect_list、 collect_set */ object CollectSetAndList { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[2]") .appName("FilterDemo") .getOrCreate() import spark.implicits._ import org.apache.spark.sql.functions._ val employeeDF: DataFrame = spark.read.json("E://temp/employee.json") val employeeDS: Dataset[Employee] = employeeDF.as[Employee] // collect_list:将一个分组内指定字段的值都收集到一块儿,不去重 // collect_set:同上,但惟一区别是会去重 employeeDS .groupBy(employeeDS("depId")) .agg(collect_set(employeeDS("name")), collect_list(employeeDS("name"))) .show() } } case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
joinWith和sort
package spark2x import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * 类名 JoinAndSort * 做者 彭三青 * 建立时间 2018-11-29 15:19 * 版本 1.0 * 描述: $ */ object JoinAndSort { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[2]") .appName("FilterDemo") .getOrCreate() import spark.implicits._ val employeeDF: DataFrame = spark.read.json("E://temp/employee.json") val employeeDS: Dataset[Employee] = employeeDF.as[Employee] val departmentDF: DataFrame = spark.read.json("E://temp/department.json") val departmentDS: Dataset[Department] = departmentDF.as[Department] println("----------------------employeeDS----------------------") employeeDS.show() println("----------------------departmentDS----------------------") departmentDS.show() println("------------------------------------------------------------") // 等值链接 employeeDS.joinWith(departmentDS, $"depId" === $"id").show() // 按照年龄进行排序,并降序排列 employeeDS.sort($"age".desc).show() } } case class Department(id: Long, name: String) case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
日期函数:
current_time():获取当前日期。
current_timestamp():获取当前时间戳。
数学函数
rand():生成0~1之间的随机数
round(e: column,scale: Int ):column列名,scala精确到小数点的位数。
round(e: column):一个参数默认精确到小数点1位。
字符串函数
concat_ws(seq: String, exprs: column*):字符串拼接。参数seq传入的拼接的字符,column传入的须要拼接的字符,能够指定多个列,不一样列之间用逗号隔开。
package spark2x import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * 类名 FunctionsDemo * 做者 彭三青 * 建立时间 2018-11-29 15:56 * 版本 1.0 * 描述: $ */ object FunctionsDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[2]") .appName("Operations") .getOrCreate() import spark.implicits._ import org.apache.spark.sql.functions._ val employeeDF: DataFrame = spark.read.json("E://temp/employee.json") val employeeDS: Dataset[Employee] = employeeDF.as[Employee] employeeDS .select(employeeDS("name"), current_date(), current_timestamp(), rand(), round(employeeDS("salary"), 2),// 取随机数, concat(employeeDS("gender"), employeeDS("age")), concat_ws("|", employeeDS("gender"), employeeDS("age"))).show() spark.stop() } } case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
employee.json
{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123} {"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000} {"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000} {"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000} {"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000} {"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000} {"name": "Jen", "age": 19, "depId": 2, "gender": "male", "salary": 8000} {"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000} {"name": "XiaoFang", "age": 18, "depId": 3, "gender": "female", "salary": 58000}
employee2.json
{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123} {"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000} {"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000} {"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000} {"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000} {"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
department.json
{"id": 1, "name": "Technical Department"} {"id": 2, "name": "Financial Department"} {"id": 3, "name": "HR Department"}