Join有inner
,leftouter
,rightouter
,fullouter
,leftsemi
,leftanti
六种类型,对单独版本的Join操做,能够将问题表述为:算法
IterA,IterB为两个Iterator,根据规则A将两个Iterator中相应的Row进行合并,而后按照规则B对合并后Row进行过滤。
好比Inner_join,它的合并规则A为:对IterA中每一条记录,生成一个key,并利用该key从IterB的Map集合中获取到相应记录,并将它们进行合并;而对于规则B能够为任意过滤条件,好比IterA和IterB任何两个字段进行比较操做。sql
对于IterA和IterB,当咱们利用iterA中key去IterB中进行一一匹配时,咱们称IterA为streamedIter
,IterB为BuildIter
或者hashedIter
。即咱们流式遍历streamedIter
中每一条记录,去hashedIter
中去查找相应匹配的记录。apache
而这个查找过程当中,即为Build
过程,每一次Build
操做的结果即为一条JoinRow(A,B)
,其中JoinRow(A)
来自streamedIter
,JoinRow(B)
来自BuildIter
,此时这个过程为BuildRight
,而若是JoinRow(B)
来自streamedIter
,JoinRow(A)
来自BuildIter
,即为BuildLeft
,分布式
有点拗口!那么为何要去区分BuildLeft
和BuildRight
呢?对于leftouter
,rightouter
,leftsemi
,leftanti
,它们的Build类型是肯定,即left*
为BuildRight
,right*
为BuildLeft
类型,可是对于inner
操做,BuildLeft
和BuildRight
两种均可以,并且选择不一样,可能有很大性能区别:ide
BuildIter也称为hashedIter,即须要将BuildIter构建为一个内存Hash,从而加速Build的匹配过程;此时若是BuildIter和streamedIter大小相差较大,显然利用小的来创建Hash,内存占用要小不少!性能
总结一下:Join即由下面几部分组成:ui
trait Join { val joinType: JoinType //Join类型 val streamedPlan: SparkPlan //用于生成streamedIter val buildPlan: SparkPlan //用于生成hashedIter val buildSide: BuildSide //BuildLeft或BuildRight val buildKeys: Seq[Expression] //用于从streamedIter中生成buildKey的表达式 val streamedKeys: Seq[Expression] //用于从hashedIter中生成streamedKey的表达式 val condition: Option[Expression]//对joinRow进行过滤 }
注:对于fullouter,IterA和IterB同时为streamedIter和hashedIter,即先IterA=streamedIter,IterB=hashedIter进行leftouter,而后再用先IterB=streamedIter,IterA=hashedIter进行leftouter,再把两次结果进行合并。spa
若是匹配成功,即构建多个JoinRow,不然返回emptycode
streamIter.flatMap{ srow => val joinRow = new JoinedRow joinRow.withLeft(srow) val matches = hashedIter.get(buildKeys(srow)) if (matches != null) { matches.map(joinRow.withRight(_)).filter(condition) } else { Seq.empty } }
若是匹配成功,即构建多个JoinRow,不然返回JoinRow的Build部分为Null排序
val nullRow = new NullRow() streamIter.flatMap{ srow => val joinRow = new JoinedRow joinRow.withLeft(srow) val matches = hashedIter.get(buildKeys(srow)) if (matches != null) { matches.map(joinRow.withRight(_)).filter(condition) } else { Seq(joinRow.withRight(nullRow)) } }
若是匹配成功,即构建多个JoinRow,不然返回JoinRow的Build部分为Null
val nullRow = new NullRow() streamIter.flatMap{ srow => val joinRow = new JoinedRow joinRow.withRight(srow)//注意与LeftOutJoin的区别 val matches = hashedIter.get(buildKeys(srow)) if (matches != null) { matches.map(joinRow.withLeft(_)).filter(condition) } else { Seq(joinRow.withLeft(nullRow)) } }
它不是返回JoinRow,而是返回srow
streamIter.filter{ srow => val matches = hashedIter.get(buildKeys(srow)) if(matches == null) { false //没有找到匹配项 } else{ if(condition.isEmpty == false) { //须要对`假想`后joinrow进行判断 val joinRow = new JoinedRow joinRow.withLeft(srow) ! matches.map(joinRow.withLeft(_)).filter(condition).isEmpty } else { true } } }
LeftSemi从逻辑上来讲,它即为In判断。
它不是返回JoinRow,而是返回srow
streamIter.filter{ srow => val matches = hashedIter.get(buildKeys(srow)) if(matches == null) { true //没有找到匹配项 } else{ if(condition.isEmpty == false) { //须要对`假想`后joinrow进行判断 val joinRow = new JoinedRow joinRow.withLeft(srow) matches.map(joinRow.withLeft(_)).filter(condition).isEmpty } else { false } } }
上面描述的Join是须要将BuildIter
在内存中构建为hashedIter
,从而加速匹配过程,所以咱们也将这个Join称为HashJoin。可是创建一个Hash表须要占用大量的内存。
那么问题来:若是咱们的Iter太大,没法创建Hash表怎么吧?在分布式Join计算下,Join过程当中发生在Shuffle阶段,若是一个数据集的Key存在数据偏移,很容易出现一个BuildIter
超过内存大小,没法完成Hash表的创建,进而致使HashJoin失败,那么怎么办?
在HashJoin过程当中,针对
BuildIter
创建hashedIter
是为了加速匹配过程当中。匹配查找除了创建Hash表这个方法之外,将streamedIter和BuildIter进行排序,也是一个加速匹配过程,即咱们这里说的sortJoin。
排序不也是须要内存吗?是的,首先排序占用内存比创建一个hash表要小不少,其次排序若是内存不够,能够将一部分数据Spill到磁盘,而Hash为全内存,若是内存不够,将会致使整个Shuffle失败。
下面以InnerJoin的SortJoin实现为例子,讲述它与HashJoin的区别:
利用streamIter中每一个srow,从BuildIter中顺序查找,因为两边都是有序的,因此查找代价很小。
val buildIndex = 0 streamIter.flatMap{ srow => val joinRow = new JoinedRow joinRow.withLeft(srow) //顺序查找 val matches = BuildIter.search(buildKeys(srow), buildIndex) if (matches != null) { matches.map(joinRow.withRight(_)).filter(condition) buildIndex += matches.length } else { Seq.empty } }
对于FullOuter
Join,若是采用HashJoin方式来实现,代价较大,须要创建双向的Hash表,而基于SortJoin,它的代价与其余几种Join相差不大,所以`FullOuter默认都是基于SortJon来实现。
Spark针对Join提供了分布式实现,可是Join操做本质上也是单机进行,怎么理解?若是要对两个数据集进行分布式Join,Spark会先对两个数据集进行Exchange
,即进行ShuffleMap操做,将Key相同数据分到一个分区中,而后在ShuffleFetch过程当中利用HashJoin/SortJoin单机版算法来对两个分区进行Join操做。
另外若是Build端的整个数据集(非一个iter)大小较小,能够将它进行Broadcast操做,从而节约Shuffle的开销。
所以Spark支持ShuffledHashJoinExec
,SortMergeJoinExec
,BroadcastHashJoinExec
三种Join算法,那么它怎么进行选择的呢?
spark.sql.autoBroadcastJoinThreshold
,默认10M,那么优先进行BroadcastHashJoinExecspark.sql.join.preferSortMergeJoin
为True,那么优先选择SortMergeJoinExecShuffledHashJoinExec
了
这一块逻辑都在org.apache.spark.sql.execution.JoinSelection
中描述。ps:Spark也对Without joining keys
的Join进行支持,可是不在咱们此次讨论范围中。
BroadcastHashJoinExec
val p = spark.read.parquet("/Users/p.parquet") val p1 = spark.read.parquet("/Users/p1.parquet") p.joinWith(p1, p("to_module") === p1("to_module"),"inner") 此时因为p和p1的大小都较小,它会默认选择BroadcastHashJoinExec == Physical Plan == BroadcastHashJoin [_1#269.to_module], [_2#270.to_module], Inner, BuildRight :- Project p :- Project p1
SortMergeJoinExec
val p = spark.read.parquet("/Users/p.parquet") val p1 = spark.read.parquet("/Users/p1.parquet") p.joinWith(p1, p("to_module") === p1("to_module"),"fullouter") fullouterJoin不支持Broadcast和ShuffledHashJoinExec,所以为ShuffledHashJoinExec == Physical Plan == SortMergeJoin [_1#273.to_module], [_2#274.to_module], FullOuter :- Project p :- Project p1
因为ShuffledHashJoinExec通常状况下,不会被选择,它的条件比较苛责。
//首先不能进行Broadcast! private def canBroadcast(plan: LogicalPlan): Boolean = { plan.statistics.isBroadcastable || plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold(10M) } //其次spark.sql.join.preferSortMergeJoin必须设置false //而后build端能够放的进内存! private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = { plan.statistics.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions } //最后build端和stream端大小必须相差3倍!不然使用sort性能要好。 private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = { a.statistics.sizeInBytes * 3 <= b.statistics.sizeInBytes } //或者RowOrdering.isOrderable(leftKeys)==false