Timsort工做原理

Timsort是spark中用做外部排序的机制。一个典型的应用是在spark sql中用来作Order操做的实现。Order时候将行记录插入到ExternalSorter中,ExternalSorter用timsort排序数组,返回排序后的Iterator。sql

spark sql的物理计划中,排序Sort属于agg相关的聚合操做。相关的类有:SortAggregateExecSortBasedAggregationIteratorSortExec等。express

    1. SortAggregateExec

排序后数据的聚合操做。构造方法和入参以下:apache

case class SortAggregateExec(数组

    requiredChildDistributionExpressions: Option[Seq[Expression]],ide

    groupingExpressions: Seq[NamedExpression],测试

    aggregateExpressions: Seq[AggregateExpression],ui

    aggregateAttributes: Seq[Attribute],this

    initialInputBufferOffset: Int,spa

    resultExpressions: Seq[NamedExpression],orm

    child: SparkPlan)

  extends UnaryExecNode

物理执行经过doExecute(): RDD[InternalRow]方法。主要代码:

val outputIter = new SortBasedAggregationIterator(

          groupingExpressions,

          child.output,

          iter,

          aggregateExpressions,

          aggregateAttributes,

          initialInputBufferOffset,

          resultExpressions,

          (expressions, inputSchema) =>

            newMutableProjection(expressions, inputSchema, subexpressionEliminationEnabled),

          numOutputRows)

        if (!hasInput && groupingExpressions.isEmpty) {

          // There is no input and there is no grouping expressions.

          // We need to output a single row as the output.

          numOutputRows += 1

          Iterator[UnsafeRow](outputIter.outputForEmptyGroupingKeyWithoutInput())

        } else {

          outputIter

        }

经过构造SortBasedAggregationIterator迭代器来生成聚合后的数据迭代。将聚合前的数据迭代器做为入参传入SortBasedAggregationIterator中。

 

    1. SortExec

真正执行外部排序的类。 定义:

case class SortExec(

    sortOrder: Seq[SortOrder],

    global: Boolean,

    child: SparkPlan,

    testSpillFrequency: Int = 0)

  extends UnaryExecNode with CodegenSupport

child不用说天然是子执行计划。

testSpillFrequency表示是否阶段性的spill数据到磁盘,Int型表示每隔多少条数据就spill到磁盘。通常在测试环境下使用。

sortOrder是排序的字段属性。

global表示是否全局排序,若是全局排序的话通常须要先将各分区的数据打散shuffle,而后再执行排序。

      1. 关键方法createSorter

def createSorter(): UnsafeExternalRowSorter

生成外部排序类,而后对原始数据的每行数据,插入到外部排序类,最后外部排序类返回排序后的迭代器Iterator。

protected override def doExecute(): RDD[InternalRow] = {

    val peakMemory = longMetric("peakMemory")

    val spillSize = longMetric("spillSize")

    val sortTime = longMetric("sortTime")

 

    child.execute().mapPartitionsInternal { iter =>

      val sorter = createSorter()

 

      val metrics = TaskContext.get().taskMetrics()

      // Remember spill data size of this task before execute this operator so that we can

      // figure out how many bytes we spilled for this operator.

      val spillSizeBefore = metrics.memoryBytesSpilled

      val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])

      sortTime += sorter.getSortTimeNanos / 1000000

      peakMemory += sorter.getPeakMemoryUsage

      spillSize += metrics.memoryBytesSpilled - spillSizeBefore

      metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)

 

      sortedIterator

    }

  }

利用UnsafeExternalRowSorter生成排序后的Iterator。

 

      1. UnsafeExternalRowSorter

UnsafeExternalRowSorterSpark-catalyst包里。路径sql/execution/ UnsafeExternalRowSorter

它又使用UnsafeExternalSorter做为内部排序迭代器。UnsafeExternalRowSorter自己的逻辑不复杂,主要是封装了UnsafeExternalSorter来排序。它将原始数据插入到UnsafeExternalSorter中,最后获取UnsafeExternalSorter的排序迭代器。

 

      1. UnsafeExternalSorter

UnsafeExternalSorter在spark-core中。

路径:org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter。它是Java类。

最终它是经过Timsort来对内存数据排序的。

 

      1. Timsort

包名:org.apache.spark.util.collection;

MIN_MERGE:最小merge长度,若是待排序数组长度小于该值则直接用二分差值法排序,不然引用merge过程。

private final SortDataFormat<K, Buffer> s;

SortDataFormat定义了待排序数据的格式。

binarySort方法

少于32个元素的时候用binarySort排序。是简单实现。

先找出一段已经排序好的数组,lo~hi,而后从hi+1处开始知道数组的最后循环迭代。每次迭代排序lo~hi+index这么长的数组,这个数组有个特色就是前面的数据已经排序好只有最后一个元素没有排序。每次迭代过程大体以下:

找到比最后一个原色大的数组的位置start,而后复制start~hi+index到start+1~hi+index+1,而后将hi+index+1(也就是最后一个原色)复制到start位置处,完成整段数组的排序。

若是多于32个元素,则要复杂一点了。

SortState

利用SortState来作多于32个元素的排序。

/**

     * March over the array once, left to right, finding natural runs,

     * extending short natural runs to minRun elements, and merging runs

     * to maintain stack invariant.

     */

    SortState sortState = new SortState(a, c, hi - lo);

    int minRun = minRunLength(nRemaining);

    do {

      // Identify next run

      int runLen = countRunAndMakeAscending(a, lo, hi, c);

 

      // If run is short, extend to min(minRun, nRemaining)

      if (runLen < minRun) {

        int force = nRemaining <= minRun ? nRemaining : minRun;

        binarySort(a, lo, lo + force, lo + runLen, c);

        runLen = force;

      }

 

      // Push run onto pending-run stack, and maybe merge

      sortState.pushRun(lo, runLen);

      sortState.mergeCollapse();

 

      // Advance to find next run

      lo += runLen;

      nRemaining -= runLen;

    } while (nRemaining != 0);

 

    // Merge all remaining runs to complete sort

    assert lo == hi;

    sortState.mergeForceCollapse();

assert sortState.stackSize == 1;

大体过程以下:

对2段局部排序的数组,找出插入点,而后执行Range复制插入过程,一次将多个区间数据移动,这样对于2段局部排序好的数组,最多执行2-3次批量移动复制过程就能够完成总体排序。

对于内存不够放的局部排序数据,保存到多个磁盘文件,每一个磁盘文件都是一个排序好的文件,这里叫UnsafeSorterSpillReader。

UnsafeSorterSpillMerger作多个磁盘文件的排序类。每一个磁盘文件做为一个文件句柄插入到PriorityQueue排序队列中,将每次取数据时从这多个排序队列中取出最小的元素,实现排序。

相关文章
相关标签/搜索