本文主要介绍 Spark SQL 的多表链接,须要预先准备测试数据。分别建立员工和部门的 Datafame,并注册为临时视图,代码以下:java
val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate() val empDF = spark.read.json("/usr/file/json/emp.json") empDF.createOrReplaceTempView("emp") val deptDF = spark.read.json("/usr/file/json/dept.json") deptDF.createOrReplaceTempView("dept")
两表的主要字段以下:git
emp 员工表 |-- ENAME: 员工姓名 |-- DEPTNO: 部门编号 |-- EMPNO: 员工编号 |-- HIREDATE: 入职时间 |-- JOB: 职务 |-- MGR: 上级编号 |-- SAL: 薪资 |-- COMM: 奖金
dept 部门表 |-- DEPTNO: 部门编号 |-- DNAME: 部门名称 |-- LOC: 部门所在城市
注:emp.json,dept.json 能够在本仓库的resources 目录进行下载。github
Spark 中支持多种链接类型:sql
其中内,外链接,笛卡尔积均与普通关系型数据库中的相同,以下图所示:数据库
这里解释一下左半链接和左反链接,这两个链接等价于关系型数据库中的 IN
和 NOT IN
字句:json
-- LEFT SEMI JOIN SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno -- 等价于以下的 IN 语句 SELECT * FROM emp WHERE deptno IN (SELECT deptno FROM dept) -- LEFT ANTI JOIN SELECT * FROM emp LEFT ANTI JOIN dept ON emp.deptno = dept.deptno -- 等价于以下的 IN 语句 SELECT * FROM emp WHERE deptno NOT IN (SELECT deptno FROM dept)
全部链接类型的示例代码以下:网络
// 1.定义链接表达式 val joinExpression = empDF.col("deptno") === deptDF.col("deptno") // 2.链接查询 empDF.join(deptDF,joinExpression).select("ename","dname").show() // 等价 SQL 以下: spark.sql("SELECT ename,dname FROM emp JOIN dept ON emp.deptno = dept.deptno").show()
empDF.join(deptDF, joinExpression, "outer").show() spark.sql("SELECT * FROM emp FULL OUTER JOIN dept ON emp.deptno = dept.deptno").show()
empDF.join(deptDF, joinExpression, "left_outer").show() spark.sql("SELECT * FROM emp LEFT OUTER JOIN dept ON emp.deptno = dept.deptno").show()
empDF.join(deptDF, joinExpression, "right_outer").show() spark.sql("SELECT * FROM emp RIGHT OUTER JOIN dept ON emp.deptno = dept.deptno").show()
empDF.join(deptDF, joinExpression, "left_semi").show() spark.sql("SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno").show()
empDF.join(deptDF, joinExpression, "left_anti").show() spark.sql("SELECT * FROM emp LEFT ANTI JOIN dept ON emp.deptno = dept.deptno").show()
empDF.join(deptDF, joinExpression, "cross").show() spark.sql("SELECT * FROM emp CROSS JOIN dept ON emp.deptno = dept.deptno").show()
天然链接是在两张表中寻找那些数据类型和列名都相同的字段,而后自动地将他们链接起来,并返回全部符合条件的结果。app
spark.sql("SELECT * FROM emp NATURAL JOIN dept").show()
如下是一个天然链接的查询结果,程序自动推断出使用两张表都存在的 dept 列进行链接,其实际等价于:ide
spark.sql("SELECT * FROM emp JOIN dept ON emp.deptno = dept.deptno").show()
因为天然链接经常会产生不可预期的结果,因此并不推荐使用。测试
在对大表与大表之间进行链接操做时,一般都会触发 Shuffle Join
,两表的全部分区节点会进行 All-to-All
的通信,这种查询一般比较昂贵,会对网络 IO 会形成比较大的负担。
而对于大表和小表的链接操做,Spark 会在必定程度上进行优化,若是小表的数据量小于 Worker Node 的内存空间,Spark 会考虑将小表的数据广播到每个 Worker Node,在每一个工做节点内部执行链接计算,这能够下降网络的 IO,但会加大每一个 Worker Node 的 CPU 负担。
是否采用广播方式进行 Join
取决于程序内部对小表的判断,若是想明确使用广播方式进行 Join
,则能够在 DataFrame API 中使用 broadcast
方法指定须要广播的小表:
empDF.join(broadcast(deptDF), joinExpression).show()
更多大数据系列文章能够参见 GitHub 开源项目: 大数据入门指南