Spark SQL Join原理分析

Spark SQL Join原理分析

1. Join问题综述:

Join有inner,leftouter,rightouter,fullouter,leftsemi,leftanti六种类型,对单独版本的Join操做,能够将问题表述为:算法

IterA,IterB为两个Iterator,根据规则A将两个Iterator中相应的Row进行合并,而后按照规则B对合并后Row进行过滤。
好比Inner_join,它的合并规则A为:对IterA中每一条记录,生成一个key,并利用该key从IterB的Map集合中获取到相应记录,并将它们进行合并;而对于规则B能够为任意过滤条件,好比IterA和IterB任何两个字段进行比较操做。sql

对于IterA和IterB,当咱们利用iterA中key去IterB中进行一一匹配时,咱们称IterA为streamedIter,IterB为BuildIter或者hashedIter。即咱们流式遍历streamedIter中每一条记录,去hashedIter中去查找相应匹配的记录。apache

而这个查找过程当中,即为Build过程,每一次Build操做的结果即为一条JoinRow(A,B),其中JoinRow(A)来自streamedIterJoinRow(B)来自BuildIter,此时这个过程为BuildRight,而若是JoinRow(B)来自streamedIterJoinRow(A)来自BuildIter,即为BuildLeft分布式

有点拗口!那么为何要去区分BuildLeftBuildRight呢?对于leftouterrightouterleftsemi,leftanti,它们的Build类型是肯定,即left*BuildRightright*BuildLeft类型,可是对于inner操做,BuildLeftBuildRight两种均可以,并且选择不一样,可能有很大性能区别:ide

BuildIter也称为hashedIter,即须要将BuildIter构建为一个内存Hash,从而加速Build的匹配过程;此时若是BuildIter和streamedIter大小相差较大,显然利用小的来创建Hash,内存占用要小不少!性能

总结一下:Join即由下面几部分组成:ui

trait Join {
  val joinType: JoinType //Join类型
  val streamedPlan: SparkPlan //用于生成streamedIter
  val buildPlan: SparkPlan //用于生成hashedIter

  val buildSide: BuildSide //BuildLeft或BuildRight
  val buildKeys: Seq[Expression] //用于从streamedIter中生成buildKey的表达式
  val streamedKeys: Seq[Expression] //用于从hashedIter中生成streamedKey的表达式

  val condition: Option[Expression]//对joinRow进行过滤
}

注:对于fullouter,IterA和IterB同时为streamedIter和hashedIter,即先IterA=streamedIter,IterB=hashedIter进行leftouter,而后再用先IterB=streamedIter,IterA=hashedIter进行leftouter,再把两次结果进行合并。spa

1.1 几种Join的实现

1.1.1 InnerJoin

  1. 利用streamIter中每一个srow,从hashedIter中查找匹配项;
  2. 若是匹配成功,即构建多个JoinRow,不然返回emptycode

    streamIter.flatMap{ srow =>
        val joinRow = new JoinedRow
        joinRow.withLeft(srow)
        val matches = hashedIter.get(buildKeys(srow))
        if (matches != null) {
            matches.map(joinRow.withRight(_)).filter(condition)
        } else {
            Seq.empty
        }
    }

1.1.2 LeftOutJoin

  1. leftIter即为streamIter,而RightIter即为hashedIter,不能够改变
  2. 利用streamIter中每一个srow,从hashedIter中查找匹配项;
  3. 若是匹配成功,即构建多个JoinRow,不然返回JoinRow的Build部分为Null排序

    val nullRow = new NullRow()
    streamIter.flatMap{ srow =>
        val joinRow = new JoinedRow
        joinRow.withLeft(srow)
        val matches = hashedIter.get(buildKeys(srow))
        if (matches != null) {
            matches.map(joinRow.withRight(_)).filter(condition)
        } else {
            Seq(joinRow.withRight(nullRow))
        }
    }

1.1.3 RightOutJoin

  1. RightIter即为streamIter,而LeftIter即为hashedIter,不能够改变
  2. 利用streamIter中每一个srow,从hashedIter中查找匹配项;
  3. 若是匹配成功,即构建多个JoinRow,不然返回JoinRow的Build部分为Null

    val nullRow = new NullRow()
    streamIter.flatMap{ srow =>
        val joinRow = new JoinedRow
        joinRow.withRight(srow)//注意与LeftOutJoin的区别
        val matches = hashedIter.get(buildKeys(srow))
        if (matches != null) {
            matches.map(joinRow.withLeft(_)).filter(condition)
        } else {
            Seq(joinRow.withLeft(nullRow))
        }
    }

1.1.4 LeftSemi

  1. leftIter即为streamIter,而RightIter即为hashedIter,不能够改变
  2. 利用streamIter中每一个srow,从hashedIter中查找匹配项;
  3. 若是匹配成功,即返回srow,不然返回empty
  4. 它不是返回JoinRow,而是返回srow

    streamIter.filter{ srow =>
        val matches = hashedIter.get(buildKeys(srow))
        if(matches == null) {
            false //没有找到匹配项
        } else{
            if(condition.isEmpty == false) { //须要对`假想`后joinrow进行判断
                    val joinRow = new JoinedRow
                    joinRow.withLeft(srow)
                    ! matches.map(joinRow.withLeft(_)).filter(condition).isEmpty
            } else {
                true
            }
        }
    }

    LeftSemi从逻辑上来讲,它即为In判断。

1.1.5 LeftAnti

  1. leftIter即为streamIter,而RightIter即为hashedIter,不能够改变
  2. 利用streamIter中每一个srow,从hashedIter中查找匹配项;
  3. 它匹配逻辑为LeftSemi基本相反,即至关于No In判断。
  4. 若是匹配不成功,即返回srow,不然返回empty
  5. 它不是返回JoinRow,而是返回srow

    streamIter.filter{ srow =>
        val matches = hashedIter.get(buildKeys(srow))
        if(matches == null) {
            true //没有找到匹配项
        } else{
            if(condition.isEmpty == false) { //须要对`假想`后joinrow进行判断
                    val joinRow = new JoinedRow
                    joinRow.withLeft(srow)
                    matches.map(joinRow.withLeft(_)).filter(condition).isEmpty
            } else {
                false
            }
        }
    }

1.2 HashJoin与SortJoin

上面描述的Join是须要将BuildIter在内存中构建为hashedIter,从而加速匹配过程,所以咱们也将这个Join称为HashJoin。可是创建一个Hash表须要占用大量的内存。
那么问题来:若是咱们的Iter太大,没法创建Hash表怎么吧?在分布式Join计算下,Join过程当中发生在Shuffle阶段,若是一个数据集的Key存在数据偏移,很容易出现一个BuildIter超过内存大小,没法完成Hash表的创建,进而致使HashJoin失败,那么怎么办?

在HashJoin过程当中,针对BuildIter创建hashedIter是为了加速匹配过程当中。匹配查找除了创建Hash表这个方法之外,将streamedIter和BuildIter进行排序,也是一个加速匹配过程,即咱们这里说的sortJoin。

排序不也是须要内存吗?是的,首先排序占用内存比创建一个hash表要小不少,其次排序若是内存不够,能够将一部分数据Spill到磁盘,而Hash为全内存,若是内存不够,将会致使整个Shuffle失败。

下面以InnerJoin的SortJoin实现为例子,讲述它与HashJoin的区别:

  1. streamIter和BuildIter都须要为有序。
  2. 利用streamIter中每一个srow,从BuildIter中顺序查找,因为两边都是有序的,因此查找代价很小。

    val buildIndex = 0
    streamIter.flatMap{ srow =>
        val joinRow = new JoinedRow
        joinRow.withLeft(srow)
        //顺序查找
        val matches = BuildIter.search(buildKeys(srow), buildIndex)
        if (matches != null) {
            matches.map(joinRow.withRight(_)).filter(condition)
            buildIndex += matches.length
        } else {
            Seq.empty
        }
    }

对于FullOuterJoin,若是采用HashJoin方式来实现,代价较大,须要创建双向的Hash表,而基于SortJoin,它的代价与其余几种Join相差不大,所以`FullOuter默认都是基于SortJon来实现。

2. Spark中的Join实现

Spark针对Join提供了分布式实现,可是Join操做本质上也是单机进行,怎么理解?若是要对两个数据集进行分布式Join,Spark会先对两个数据集进行Exchange,即进行ShuffleMap操做,将Key相同数据分到一个分区中,而后在ShuffleFetch过程当中利用HashJoin/SortJoin单机版算法来对两个分区进行Join操做。

另外若是Build端的整个数据集(非一个iter)大小较小,能够将它进行Broadcast操做,从而节约Shuffle的开销。

所以Spark支持ShuffledHashJoinExec,SortMergeJoinExec,BroadcastHashJoinExec三种Join算法,那么它怎么进行选择的呢?

  • 若是build-dataset支持Broadcastable,而且它的大小小于spark.sql.autoBroadcastJoinThreshold,默认10M,那么优先进行BroadcastHashJoinExec
  • 若是dataset支持Sort,而且spark.sql.join.preferSortMergeJoin为True,那么优先选择SortMergeJoinExec
  • 若是dataset不支持Sort,那么只能选择ShuffledHashJoinExec
    • 若是Join同时支持BuildRight和BuildLeft,那么根据两边数据大小,优先选择数据量小的进行Hash。

这一块逻辑都在org.apache.spark.sql.execution.JoinSelection 中描述。ps:Spark也对Without joining keys的Join进行支持,可是不在咱们此次讨论范围中。

BroadcastHashJoinExec

val p = spark.read.parquet("/Users/p.parquet")
val p1 = spark.read.parquet("/Users/p1.parquet")
p.joinWith(p1, p("to_module") === p1("to_module"),"inner")

此时因为p和p1的大小都较小,它会默认选择BroadcastHashJoinExec
== Physical Plan ==
BroadcastHashJoin [_1#269.to_module], [_2#270.to_module], Inner, BuildRight
    :- Project p
    :- Project p1

SortMergeJoinExec

val p = spark.read.parquet("/Users/p.parquet")
val p1 = spark.read.parquet("/Users/p1.parquet")
p.joinWith(p1, p("to_module") === p1("to_module"),"fullouter")

fullouterJoin不支持Broadcast和ShuffledHashJoinExec,所以为ShuffledHashJoinExec

== Physical Plan ==
SortMergeJoin [_1#273.to_module], [_2#274.to_module], FullOuter
    :- Project p
    :- Project p1

因为ShuffledHashJoinExec通常状况下,不会被选择,它的条件比较苛责。

//首先不能进行Broadcast!
private def canBroadcast(plan: LogicalPlan): Boolean = {
  plan.statistics.isBroadcastable ||
    plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold(10M)
}
//其次spark.sql.join.preferSortMergeJoin必须设置false
//而后build端能够放的进内存!
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
  plan.statistics.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
 //最后build端和stream端大小必须相差3倍!不然使用sort性能要好。
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
  a.statistics.sizeInBytes * 3 <= b.statistics.sizeInBytes
}
//或者RowOrdering.isOrderable(leftKeys)==false
相关文章
相关标签/搜索