spark RDD,reduceByKey vs groupByKey

Spark中有两个相似的api,分别是reduceByKey和groupByKey。这两个的功能相似,但底层实现却有些不一样,那么为何要这样设计呢?咱们来从源码的角度分析一下。api

先看二者的调用顺序(都是使用默认的Partitioner,即defaultPartitioner)ide

所用spark版本:spark2.1.0性能

先看reduceByKey

Step1ui

def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

Setp2spa

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

Setp3设计

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

姑且不去看方法里面的细节,咱们会只要知道最后调用的是combineByKeyWithClassTag这个方法。这个方法有两个参数咱们来重点看一下,3d

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)

首先是partitioner参数,这个便是RDD的分区设置。除了默认的defaultPartitioner,Spark还提供了RangePartitioner和HashPartitioner外,此外用户也能够自定义partitioner。经过源码能够发现若是是HashPartitioner的话,那么是会抛出一个错误的。code

而后是mapSideCombine参数,这个参数正是reduceByKey和groupByKey最大不一样的地方,它决定是是否会先在节点上进行一次Combine操做,下面会有更具体的例子来介绍。blog

而后是groupByKey

Step1ci

def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(defaultPartitioner(self))
  }

Step2

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

Setp3

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

结合上面reduceByKey的调用链,能够发现最终其实都是调用combineByKeyWithClassTag这个方法的,但调用的参数不一样。
reduceByKey的调用

combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)

groupByKey的调用

combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)

正是二者不一样的调用方式致使了两个方法的差异,咱们分别来看

  • reduceByKey的泛型参数直接是[V],而groupByKey的泛型参数是[CompactBuffer[V]]。这直接致使了reduceByKey和groupByKey的返回值不一样,前者是RDD[(K, V)],然后者是RDD[(K, Iterable[V])]

  • 而后就是mapSideCombine=false了,这个mapSideCombine参数的默认是true的。这个值有什么用呢,上面也说了,这个参数的做用是控制要不要在map端进行初步合并(Combine)。能够看看下面具体的例子。

从功能上来讲,能够发现ReduceByKey其实就是会在每一个节点先进行一次合并的操做,而groupByKey没有。

这么来看ReduceByKey的性能会比groupByKey好不少,由于有些工做在节点已经处理了。那么groupByKey为何存在,它的应用场景是什么呢?我也不清楚,若是观看这篇文章的读者知道的话不妨在评论里说出来吧。很是感谢!

相关文章
相关标签/搜索