Spark SQL 之 Join 实现

原文地址:Spark SQL 之 Join 实现

 

Spark SQL 之 Join 实现

涂小刚 2017-07-19 217标签: spark , 数据库

Join做为SQL中一个重要语法特性,几乎全部稍微复杂一点的数据分析场景都离不开Join,现在Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,做为开发者,咱们有必要了解Join在Spark中是如何组织运行的。sql

SparkSQL整体流程介绍

在阐述Join实现以前,咱们首先简单介绍SparkSQL的整体流程,通常地,咱们有两种方式使用SparkSQL,一种是直接写sql语句,这个须要有元数据库支持,例如Hive等,另外一种是经过Dataset/DataFrame编写Spark应用程序。以下图所示,sql语句被语法解析(SQL AST)成查询计划,或者咱们经过Dataset/DataFrame提供的APIs组织成查询计划,查询计划分为两大类:逻辑计划和物理计划,这个阶段一般叫作逻辑计划,通过语法分析(Analyzer)、一系列查询优化(Optimizer)后获得优化后的逻辑计划,最后被映射成物理计划,转换成RDD执行。数据库

更多关于SparkSQL的解析与执行请参考文章【sql的解析与执行】。对于语法解析、语法分析以及查询优化,本文不作详细阐述,本文重点介绍Join的物理执行过程。数据结构

Join基本要素

以下图所示,Join大体包括三个要素:Join方式、Join条件以及过滤条件。其中过滤条件也能够经过AND语句放在Join条件中。分布式

Spark支持全部类型的Join,包括:性能

  • inner join
  • left outer join
  • right outer join
  • full outer join
  • left semi join
  • left anti join

下面分别阐述这几种Join的实现。优化

Join基本实现流程

整体上来讲,Join的基本实现流程以下图所示,Spark将参与Join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter),一般streamIter为大表,buildIter为小表,咱们不用担忧哪一个表为streamIter,哪一个表为buildIter,这个spark会根据join语句自动帮咱们完成。ui

在实际计算时,spark会基于streamIter来遍历,每次取出streamIter中的一条记录rowA,根据Join条件计算keyA,而后根据该keyA去buildIter中查找全部知足Join条件(keyB==keyA)的记录rowBs,并将rowBs中每条记录分别与rowAjoin获得join后的记录,最后根据过滤条件获得最终join的记录。spa

从上述计算过程当中不难发现,对于每条来自streamIter的记录,都要去buildIter中查找匹配的记录,因此buildIter必定要是查找性能较优的数据结构。spark提供了三种join实现:sort merge join、broadcast join以及hash join。3d

sort merge join实现

要让两条记录能join到一块儿,首先须要将具备相同key的记录在同一个分区,因此一般来讲,须要作一次shuffle,map阶段根据join条件肯定每条记录的key,基于该key作shuffle write,将可能join到一块儿的记录分到同一个分区中,这样在shuffle read阶段就能够将两个表中具备相同key的记录拉到同一个分区处理。前面咱们也提到,对于buildIter必定要是查找性能较优的数据结构,一般咱们能想到hash表,可是对于一张较大的表来讲,不可能将全部记录所有放到hash表中,另外也能够对buildIter先排序,查找时按顺序查找,查找代价也是能够接受的,咱们知道,spark shuffle阶段自然就支持排序,这个是很是好实现的,下面是sort merge join示意图。code

在shuffle read阶段,分别对streamIter和buildIter进行merge sort,在遍历streamIter时,对于每条记录,都采用顺序查找的方式从buildIter查找对应的记录,因为两个表都是排序的,每次处理完streamIter的一条记录后,对于streamIter的下一条记录,只需从buildIter中上一次查找结束的位置开始查找,因此说每次在buildIter中查找没必要重头开始,总体上来讲,查找性能仍是较优的。

broadcast join实现

为了能具备相同key的记录分到同一个分区,咱们一般是作shuffle,那么若是buildIter是一个很是小的表,那么其实就没有必要大动干戈作shuffle了,直接将buildIter广播到每一个计算节点,而后将buildIter放到hash表中,以下图所示。

从上图能够看到,不用作shuffle,能够直接在一个map中完成,一般这种join也称之为map join。那么问题来了,何时会用broadcast join实现呢?这个不用咱们担忧,spark sql自动帮咱们完成,当buildIter的估计大小不超过参数spark.sql.autoBroadcastJoinThreshold设定的值(默认10M),那么就会自动采用broadcast join,不然采用sort merge join。

hash join实现

除了上面两种join实现方式外,spark还提供了hash join实现方式,在shuffle read阶段不对记录排序,反正来自两格表的具备相同key的记录会在同一个分区,只是在分区内不排序,未来自buildIter的记录放到hash表中,以便查找,以下图所示。

不难发现,要未来自buildIter的记录放到hash表中,那么每一个分区来自buildIter的记录不能太大,不然就存不下,默认状况下hash join的实现是关闭状态,若是要使用hash join,必须知足如下四个条件:

  • buildIter整体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,即不知足broadcast join条件
  • 开启尝试使用hash join的开关,spark.sql.join.preferSortMergeJoin=false
  • 每一个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle read阶段每一个分区来自buildIter的记录要能放到内存中
  • streamIter的大小是buildIter三倍以上

因此说,使用hash join的条件实际上是很苛刻的,在大多数实际场景中,即便能使用hash join,可是使用sort merge join也不会比hash join差不少,因此尽可能使用hash

下面咱们分别阐述不一样Join方式的实现流程。

inner join

inner join是必定要找到左右表中知足join条件的记录,咱们在写sql语句或者使用DataFrmae时,能够不用关心哪一个是左表,哪一个是右表,在spark sql查询优化阶段,spark会自动将大表设为左表,即streamIter,将小表设为右表,即buildIter。这样对小表的查找相对更优。其基本实现流程以下图所示,在查找阶段,若是右表不存在知足join条件的记录,则跳过。

left outer join

left outer join是以左表为准,在右表中查找匹配的记录,若是查找失败,则返回一个全部字段都为null的记录。咱们在写sql语句或者使用DataFrmae时,通常让大表在左边,小表在右边。其基本实现流程以下图所示。

right outer join

right outer join是以右表为准,在左表中查找匹配的记录,若是查找失败,则返回一个全部字段都为null的记录。因此说,右表是streamIter,左表是buildIter,咱们在写sql语句或者使用DataFrmae时,通常让大表在右边,小表在左边。其基本实现流程以下图所示。

full outer join

full outer join相对来讲要复杂一点,整体上来看既要作left outer join,又要作right outer join,可是又不能简单地先left outer join,再right outer join,最后union获得最终结果,由于这样最终结果中就存在两份inner join的结果了。由于既然完成left outer join又要完成right outer join,因此full outer join仅采用sort merge join实现,左边和右表既要做为streamIter,又要做为buildIter,其基本实现流程以下图所示。

因为左表和右表已经排好序,首先分别顺序取出左表和右表中的一条记录,比较key,若是key相等,则joinrowA和rowB,并将rowA和rowB分别更新到左表和右表的下一条记录;若是keyA<keyB,则说明右表中没有与左表rowA对应的记录,那么joinrowA与nullRow,紧接着,rowA更新到左表的下一条记录;若是keyA>keyB,则说明左表中没有与右表rowB对应的记录,那么joinnullRow与rowB,紧接着,rowB更新到右表的下一条记录。如此循环遍历直到左表和右表的记录所有处理完。

left semi join

left semi join是以左表为准,在右表中查找匹配的记录,若是查找成功,则仅返回左边的记录,不然返回null,其基本实现流程以下图所示。

left anti join

left anti join与left semi join相反,是以左表为准,在右表中查找匹配的记录,若是查找成功,则返回null,不然仅返回左边的记录,其基本实现流程以下图所示。

总结

Join是数据库查询中一个很是重要的语法特性,在数据库领域能够说是“得join者的天下”,SparkSQL做为一种分布式数据仓库系统,给咱们提供了全面的join支持,并在内部实现上无声无息地作了不少优化,了解join的实现将有助于咱们更深入的了解咱们的应用程序的运行轨迹。

 

原文地址:Spark SQL 之 Join 实现

相关文章
相关标签/搜索