MapReduce shuffle中快速排序详解

MapReduce中使用的快速排序在经典的快速排序之上进行了一些列的优化,具体优化处理以下: java

  1. 因为快速排序的分割基数(基数左边的数都不大于该基数,而右边的都不小于该基数)选择的好坏直接影响快速排序的性能,最坏的状况是划分过程当中是中产生两个极端不对称称的子序列——一个长度为1而另外一个长度为n-1,此时有最坏的时间复杂度O(N^2),为了减少出现划分严重不对称的可能性,Hadoop将序列的守卫和中间元素中的中位数做为选择的分割基数;
  2.    子序列的划分方法,Hadoop使用了两个索引i和j分别从左右两端进行扫描,并让索引i扫描到大于等于分割基数为止,索引j扫描到小于等于分割基数为止,而后交换两个元素,重复这个过程直到两个索引相遇;
  3. 对相同的元素的优化,在每次划分子序列时,将于分割基数相同的元素放在中间位置,让他再也不参与后续的递归处理,即将序列划分为三部分:小于分割基数、等于分割基数和大于分割基数;
  4. 当子序列中元素数小于13时,直接使用插入排序算法,不在递归。

 

总结来讲 : 当数据量较小时(当前为小于13)采用插入排序,不然采用快速排序;当快速排序递归到必定深度时,采用堆排序。这种算法也称为introsort,避免了快速排序最坏状况的发生。算法

具体实现代码以下: express

/* 
 * Licensed to the Apache Software Foundation (ASF) under one 
 * or more contributor license agreements.  See the NOTICE file 
 * distributed with this work for additional information 
 * regarding copyright ownership.  The ASF licenses this file 
 * to you under the Apache License, Version 2.0 (the 
 * "License"); you may not use this file except in compliance 
 * with the License.  You may obtain a copy of the License at 
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0 
 * 
 * Unless required by applicable law or agreed to in writing, software 
 * distributed under the License is distributed on an "AS IS" BASIS, 
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 * See the License for the specific language governing permissions and 
 * limitations under the License. 
 */  
package org.apache.hadoop.util;  
  
/** 
 * An implementation of the core algorithm of QuickSort. 
 */  
public final class QuickSort implements IndexedSorter {  
  
  private static final IndexedSorter alt = new HeapSort();  
  
  public QuickSort() { }  
  
  private static void fix(IndexedSortable s, int p, int r) {  
    if (s.compare(p, r) > 0) {  
      s.swap(p, r);  
    }  
  }  
  
  /** 
   * Deepest recursion before giving up and doing a heapsort. 
   * Returns 2 * ceil(log(n)). 
   */  
  protected static int getMaxDepth(int x) {  
    if (x <= 0)  
      throw new IllegalArgumentException("Undefined for " + x);  
    return (32 - Integer.numberOfLeadingZeros(x - 1)) << 2;  
  }  
  
  /** 
   * Sort the given range of items using quick sort. 
   * {@inheritDoc} If the recursion depth falls below {@link #getMaxDepth}, 
   * then switch to {@link HeapSort}. 
   */  
  public void sort(IndexedSortable s, int p, int r) {  
    sort(s, p, r, null);  
  }  
  
  /** 
   * {@inheritDoc} 
   */  
  public void sort(final IndexedSortable s, int p, int r,  
      final Progressable rep) {  
    sortInternal(s, p, r, rep, getMaxDepth(r - p));  
  }  
  
  private static void sortInternal(final IndexedSortable s, int p, int r,  
      final Progressable rep, int depth) {  
    if (null != rep) {  
      rep.progress();  
    }  
    while (true) {  
    if (r-p < 13) {  
      for (int i = p; i < r; ++i) {  
        for (int j = i; j > p && s.compare(j-1, j) > 0; --j) {  
          s.swap(j, j-1);  
        }  
      }  
      return;  
    }  
    if (--depth < 0) {  
      // give up  
      alt.sort(s, p, r, rep);  
      return;  
    }  
  
    // select, move pivot into first position  
    fix(s, (p+r) >>> 1, p);  
    fix(s, (p+r) >>> 1, r - 1);  
    fix(s, p, r-1);  
  
    // Divide  
    int i = p;  
    int j = r;  
    int ll = p;  
    int rr = r;  
    int cr;  
    while(true) {  
      while (++i < j) {  
        if ((cr = s.compare(i, p)) > 0) break;  
        if (0 == cr && ++ll != i) {  
          s.swap(ll, i);  
        }  
      }  
      while (--j > i) {  
        if ((cr = s.compare(p, j)) > 0) break;  
        if (0 == cr && --rr != j) {  
          s.swap(rr, j);  
        }  
      }  
      if (i < j) s.swap(i, j);  
      else break;  
    }  
    j = i;  
    // swap pivot- and all eq values- into position  
    while (ll >= p) {  
      s.swap(ll--, --i);  
    }  
    while (rr < r) {  
      s.swap(rr++, j++);  
    }  
  
    // Conquer  
    // Recurse on smaller interval first to keep stack shallow  
    assert i != j;  
    if (i - p < r - j) {  
      sortInternal(s, p, i, rep, depth);  
      p = j;  
    } else {  
      sortInternal(s, j, r, rep, depth);  
      r = i;  
    }  
    }  
  }  
  
}