当前SparkSQL支持三种Join算法-shuffle hash join、broadcast hash join以及sort merge join。其中前二者归根到底都属于hash join,只不过在hash join以前须要先shuffle仍是先broadcast。node
选择思路大概是:大表与小表进行join会使用broadcast hash join,一旦小表稍微大点再也不适合广播分发就会选择shuffle hash join,最后,两张大表的话无疑选择sort merge join。算法
将小表转换成Hash Table,用大表进行遍历,对每一个元素去Hash Table里查看是否存在,存在则进行jion。sql
先来看看这样一条SQL语句:select * from order,item where item.id = order.i_id
,很简单一个Join节点,参与join的两张表是item和order,join key分别是item.id以及order.i_id。如今假设这个Join采用的是hash join算法,整个过程会经历三步:数据库
肯定Build Table以及Probe Table:这个概念比较重要,Build Table使用join key构建Hash Table,而Probe Table使用join key进行探测,探测成功就能够join在一块儿。一般状况下,小表会做为Build Table,大表做为Probe Table。此事例中item为Build Table,order为Probe Table。缓存
构建Hash Table:依次读取Build Table(item)的数据,对于每一行数据根据join key(item.id)进行hash,hash到对应的Bucket,生成hash table中的一条记录。数据缓存在内存中,若是内存放不下须要dump到外存。网络
探测(Probe):再依次扫描Probe Table(order)的数据,使用相同的hash函数映射Hash Table中的记录,映射成功以后再检查join条件(item.id = order.i_id),若是匹配成功就能够将二者join在一块儿。并发
基本流程能够参考上图,这里有两个小问题须要关注:分布式
hash join性能如何?很显然,hash join基本都只扫描两表一次,能够认为o(a+b)函数
为何Build Table选择小表?道理很简单,由于构建的Hash Table最好能所有加载在内存,效率最高;这也决定了hash join算法只适合至少一个小表的join场景,对于两个大表的join场景并不适用;oop
上文说过,hash join是传统数据库中的单机join算法,在分布式环境下须要通过必定的分布式改造,说到底就是尽量利用分布式计算资源进行并行化计算,提升整体效率。hash join分布式改造通常有两种经典方案:
broadcast hash join:将其中一张小表广播分发到另外一张大表所在的分区节点上,分别并发地与其上的分区记录进行hash join。broadcast适用于小表很小,能够直接广播的场景。
shuffler hash join:一旦小表数据量较大,此时就再也不适合进行广播分发。这种状况下,能够根据join key相同必然分区相同的原理,将两张表分别按照join key进行从新组织分区,这样就能够将join分而治之,划分为不少小join,充分利用集群资源并行化。
broadcast hash join能够分为两步:
broadcast阶段:将小表广播分发到大表所在的全部主机。广播算法能够有不少,最简单的是先发给driver,driver再统一分发给全部executor;要不就是基于bittorrete的p2p思路;
hash join阶段:在每一个executor上执行单机版hash join,小表映射,大表试探;
SparkSQL规定broadcast hash join执行的基本条件为被广播小表必须小于参数spark.sql.autoBroadcastJoinThreshold
,默认为10M。
BroadcastNestedLoopJoin
cross jion
在进行笛卡尔集运算时使用了嵌套云环的jion方式,也就是咱们经常使用的两个for循环嵌套进行判断。
在大数据条件下若是一张表很小,执行join操做最优的选择无疑是broadcast hash join,效率最高。可是一旦小表数据量增大,广播所需内存、带宽等资源必然就会太大,broadcast hash join就再也不是最优方案。此时能够按照join key进行分区,根据key相同必然分区相同的原理,就能够将大表join分而治之,划分为不少小表的join,充分利用集群资源并行化。以下图所示,shuffle hash join也能够分为两步:
shuffle阶段:分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中全部节点。这个过程称为shuffle
hash join阶段:每一个分区节点上的数据单独执行单机hash join算法。
shuffle阶段:将两张大表根据join key进行从新分区,两张表数据会分布到整个集群,以便分布式并行处理
sort阶段:对单个分区节点的两表数据,分别进行排序
merge阶段:对排好序的两张分区表数据执行join操做。join操做很简单,分别遍历两个有序序列,碰到相同join key就merge输出(相比hash jion解决了大表不能所有hash到内存中的问题)
仔细分析的话会发现,sort-merge join的代价并不比shuffle hash join小,反而是多了不少。那为何SparkSQL还会在两张大表的场景下选择使用sort-merge join算法呢?这和Spark的shuffle实现有关,目前spark的shuffle实现都适用sort-based shuffle算法,所以在通过shuffle以后partition数据都是按照key排序的。所以理论上能够认为数据通过shuffle以后是不须要sort的,能够直接merge。
hash jion中使用了谓词布隆过滤器进行下推实现了FR(Runtime Filter)
, 对jion操做进一步作了优化。
因为spark CBO分析的不许确问题致使broadcastjoin 常常出现乱广播的情形。