SparkSQL的3种Join实现

引言Join是SQL语句中的经常使用操做,良好的表结构可以将数据分散在不一样的表中,使其符合某种范式,减小表冗余、更新容错等。而创建表和表之间关系的最佳方式就是Join操做。web

对于Spark来讲有3中Join的实现,每种Join对应着不一样的应用场景:算法

Broadcast Hash Join :适合一张较小的表和一张大表进行joinsql

Shuffle Hash Join : 适合一张小表和一张大表进行join,或者是两张小表之间的join数据库

Sort Merge Join :适合两张较大的表之间进行join缓存

前二者都基于的是Hash Join,只不过在hash join以前须要先shuffle仍是先broadcast。下面将详细的解释一下这三种不一样的join的具体原理。并发

Hash Join先来看看这样一条SQL语句:分布式

select * from order,item where item.id = order.i_id函数

  1. 肯定Build Table以及Probe Table:这个概念比较重要,Build Table使用join key构建Hash Table,而Probe Table使用join key进行探测,探测成功就能够join在一块儿。一般状况下,小表会做为Build Table,大表做为Probe Table。此事例中item为Build Table,order为Probe Table;很简单一个Join节点,参与join的两张表是item和order,join key分别是item.id以及order.i_id。如今假设这个Join采用的是hash join算法,整个过程会经历三步:
  2. 构建Hash Table:依次读取Build Table(item)的数据,对于每一行数据根据join key(item.id)进行hash,hash到对应的Bucket,生成hash table中的一条记录。数据缓存在内存中,若是内存放不下须要dump到外存;
  3. 探测:再依次扫描Probe Table(order)的数据,使用相同的hash函数映射Hash Table中的记录,映射成功以后再检查join条件(item.id = order.i_id),若是匹配成功就能够将二者join在一块儿。

基本流程能够参考上图,这里有两个小问题须要关注:性能

  1. hash join性能如何?很显然,hash join基本都只扫描两表一次,能够认为o(a+b),较之最极端的笛卡尔集运算a*b,不知甩了多少条街;
  2. 为何Build Table选择小表?道理很简单,由于构建的Hash Table最好能所有加载在内存,效率最高;这也决定了hash join算法只适合至少一个小表的join场景,对于两个大表的join场景并不适用。

上文说过,hash join是传统数据库中的单机join算法,在分布式环境下须要通过必定的分布式改造,说到底就是尽量利用分布式计算资源进行并行化计算,提升整体效率。hash join分布式改造通常有两种经典方案:大数据

  1. broadcast hash join:将其中一张小表广播分发到另外一张大表所在的分区节点上,分别并发地与其上的分区记录进行hash join。broadcast适用于小表很小,能够直接广播的场景;
  2. shuffler hash join:一旦小表数据量较大,此时就再也不适合进行广播分发。这种状况下,能够根据join key相同必然分区相同的原理,将两张表分别按照join key进行从新组织分区,这样就能够将join分而治之,划分为不少小join,充分利用集群资源并行化。

Broadcast Hash Join你们知道,在数据库的常见模型中(好比星型模型或者雪花模型),表通常分为两种:事实表和维度表。维度表通常指固定的、变更较少的表,例如联系人、物品种类等,通常数据有限。而事实表通常记录流水,好比销售清单等,一般随着时间的增加不断膨胀。

由于Join操做是对两个表中key值相同的记录进行链接,在SparkSQL中,对两个表作Join最直接的方式是先根据key分区,再在每一个分区中把key值相同的记录拿出来作链接操做。但这样就不可避免地涉及到shuffle,而shuffle在Spark中是比较耗时的操做,咱们应该尽量的设计Spark应用使其避免大量的shuffle。

当维度表和事实表进行Join操做时,为了不shuffle,咱们能够将大小有限的维度表的所有数据分发到每一个节点上,供事实表使用。executor存储维度表的所有数据,必定程度上牺牲了空间,换取shuffle操做大量的耗时,这在SparkSQL中称做Broadcast Join,以下图所示:

Table B是较小的表,黑色表示将其广播到每一个executor节点上,Table A的每一个partition会经过block manager取到Table A的数据。根据每条记录的Join Key取到Table B中相对应的记录,根据Join Type进行操做。这个过程比较简单,不作赘述。

Broadcast Join的条件有如下几个:

  1. 被广播的表须要小于spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M (或者加了broadcast join的hint)
  2. 基表不能被广播,好比left outer join时,只能广播右表

看起来广播是一个比较理想的方案,但它有没有缺点呢?也很明显。这个方案只能用于广播较小的表,不然数据的冗余传输就远大于shuffle的开销;另外,广播时须要将被广播的表现collect到driver端,当频繁有广播出现时,对driver的内存也是一个考验。

以下图所示,broadcast hash join能够分为两步:

  1. broadcast阶段:将小表广播分发到大表所在的全部主机。广播算法能够有不少,最简单的是先发给driver,driver再统一分发给全部executor;要不就是基于bittorrete的p2p思路;
  2. hash join阶段:在每一个executor上执行单机版hash join,小表映射,大表试探;

SparkSQL规定broadcast hash join执行的基本条件为被广播小表必须小于参数spark.sql.autoBroadcastJoinThreshold,默认为10M。

Shuffle Hash Join当一侧的表比较小时,咱们选择将其广播出去以免shuffle,提升性能。但由于被广播的表首先被collect到driver段,而后被冗余分发到每一个executor上,因此当表比较大时,采用broadcast join会对driver端和executor端形成较大的压力。

但因为Spark是一个分布式的计算引擎,能够经过分区的形式将大批量的数据划分红n份较小的数据集进行并行计算。这种思想应用到Join上即是Shuffle Hash Join了。利用key相同必然分区相同的这个原理,两个表中,key相同的行都会被shuffle到同一个分区中,SparkSQL将较大表的join分而治之,先将表划分红n个分区,再对两个表中相对应分区的数据分别进行Hash Join,这样即在必定程度上减小了driver广播一侧表的压力,也减小了executor端取整张被广播表的内存消耗。其原理以下图:

Shuffle Hash Join分为两步:

  1. 对两张表分别按照join keys进行重分区,即shuffle,目的是为了让有相同join keys值的记录分到对应的分区中
  2. 对对应分区中的数据进行join,此处先将小表分区构造为一张hash表,而后根据大表分区中记录的join keys值拿出来进行匹配

Shuffle Hash Join的条件有如下几个:

  1. 分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M
  2. 基表不能被广播,好比left outer join时,只能广播右表
  3. 一侧的表要明显小于另一侧,小的一侧将被广播(明显小于的定义为3倍小,此处为经验值)

咱们能够看到,在必定大小的表中,SparkSQL从时空结合的角度来看,将两个表进行从新分区,而且对小表中的分区进行hash化,从而完成join。在保持必定复杂度的基础上,尽可能减小driver和executor的内存压力,提高了计算时的稳定性。

在大数据条件下若是一张表很小,执行join操做最优的选择无疑是broadcast hash join,效率最高。可是一旦小表数据量增大,广播所需内存、带宽等资源必然就会太大,broadcast hash join就再也不是最优方案。此时能够按照join key进行分区,根据key相同必然分区相同的原理,就能够将大表join分而治之,划分为不少小表的join,充分利用集群资源并行化。以下图所示,shuffle hash join也能够分为两步:

  1. shuffle阶段:分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中全部节点。这个过程称为shuffle
  2. hash join阶段:每一个分区节点上的数据单独执行单机hash join算法。

看到这里,能够初步总结出来若是两张小表join能够直接使用单机版hash join;若是一张大表join一张极小表,能够选择broadcast hash join算法;而若是是一张大表join一张小表,则能够选择shuffle hash join算法;那若是是两张大表进行join呢?

Sort Merge Join上面介绍的两种实现对于必定大小的表比较适用,但当两个表都很是大时,显然不管适用哪一种都会对计算内存形成很大压力。这是由于join时二者采起的都是hash join,是将一侧的数据彻底加载到内存中,使用hash code取join keys值相等的记录进行链接。

当两个表都很是大时,SparkSQL采用了一种全新的方案来对表进行Join,即Sort Merge Join。这种实现方式不用将一侧数据所有加载后再进星hash join,但须要在join前将数据排序,以下图所示:

能够看到,首先将两张表按照join keys进行了从新shuffle,保证join keys值相同的记录会被分在相应的分区。分区后对每一个分区内的数据进行排序,排序后再对相应的分区内的记录进行链接,以下图示:

看着很眼熟吧?也很简单,由于两个序列都是有序的,从头遍历,碰到key相同的就输出;若是不一样,左边小就继续取左边,反之取右边。

能够看出,不管分区有多大,Sort Merge Join都不用把某一侧的数据所有加载到内存中,而是即用即取即丢,从而大大提高了大数据量下sql join的稳定性。

SparkSQL对两张大表join采用了全新的算法-sort-merge join,以下图所示,整个过程分为三个步骤:

  1. shuffle阶段:将两张大表根据join key进行从新分区,两张表数据会分布到整个集群,以便分布式并行处理;
  2. sort阶段:对单个分区节点的两表数据,分别进行排序;
  3. merge阶段:对排好序的两张分区表数据执行join操做。join操做很简单,分别遍历两个有序序列,碰到相同join key就merge输出,不然取更小一边,见下图示意:

通过上文的分析,能够明确每种Join算法都有本身的适用场景,数据仓库设计时最好避免大表与大表的join查询,SparkSQL也能够根据内存资源、带宽资源适量将参数spark.sql.autoBroadcastJoinThreshold调大,让更多join实际执行为broadcast hash join。

相关文章
相关标签/搜索