第22课:Spark性能调优之使用更高性能算子及其源码剖析

第22课:Spark性能调优之使用更高性能算子及其源码剖析算法

Spark性能调优之使用更高性能算子的重要性在于一样的状况下,若是使用更高性能的算子,从算子级别给咱们带来更高的效率。Spark如今主推的是DataSet这个API接口,愈来愈多的算子能够基于DataSet去作,DataSet基于自然自带的优化引擎,理论上讲比RDD的性能更高,DataSet弱点是没法自定义不少功能。平时使用来说,使用的最基本是Shuffle的算子。Shuffle分为2部分:Mapper端和Reducer端。性能调优的准则是尽可能不使用Shuffle类的算子,尽可能避免Shuffle。在进行Shuffle的时候,将多个节点的同一个Key的数据汇聚到一样一个节点进行Shuffle的操做,基本思路是将数据放入内存中,内存中放不下的话就放入磁盘中。数据库

若是要避免Shuffle,全部的分布式计算框架是产生Map端的Join,2个RDD进行操做,先把RDD的数据收集过来,而后经过SparkContext进行BroadCast广播出去,假设原先是RDD一、RDD2,咱们把RDD2广播出去,原来是进行Join,Join的过程确定是进行某种计算,此时RDD2其实已经不是RDD了,就是一个数据结构体包裹着数据自己,对RDD1进行Join操做,就是一条条遍历数据跟另外一个集合里的数据产生某种算法操做。网络

若是不能避免Shuffle,咱们退而求其次,须要更多的机器承担Shuffle的工做,充分利用Mapper端和Reducer端机器的计算资源,尽可能让Mapper端承担聚合的任务。若是在Mapper端进行Aggregate的操做,在Mapper端的进程中就会把相同的Key进行合并。数据结构

(1)reduceByKey和aggregateByKey取代groupByKeyapp

PairRDDFunctions.scala的aggregateByKey,aggregateByKey算子使用给定的组合函数和一个中立的“零值”聚合每一个key的值。这个函数能够返回不一样的结果类型U,不一样于RDD的值的类型 V。在 scala.TraversableOnce中,咱们须要一个操做将V合并成U和一个操做合并两个U,前者的操做用于合并分区内的值,后者用于在分区之间合并值。为了不内存框架

分配,这两个函数都容许修改和返回他们的第一个参数而不是创造一个新的U。aggregateByKey代码以下:dom

1.          def aggregateByKey[U: ClassTag](zeroValue: U,partitioner: Partitioner)(seqOp: (U, V) => U,分布式

2.               combOp: (U, U) => U): RDD[(K, U)] =self.withScope {ide

3.             // Serialize the zero value to a byte arrayso that we can get a new clone of it on each key函数

4.             val zeroBuffer =SparkEnv.get.serializer.newInstance().serialize(zeroValue)

5.             val zeroArray = newArray[Byte](zeroBuffer.limit)

6.             zeroBuffer.get(zeroArray)

7.          

8.             lazy val cachedSerializer =SparkEnv.get.serializer.newInstance()

9.             val createZero = () =>cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

10.       

11.          // We will clean the combiner closure laterin `combineByKey`

12.          val cleanedSeqOp = self.context.clean(seqOp)

13.          combineByKeyWithClassTag[U]((v: V) =>cleanedSeqOp(createZero(), v),

14.            cleanedSeqOp, combOp, partitioner)

15.        }

例如:groupByKey会不会进行Mapper的聚合操做呢?不会。groupByKey重载函数都没有指定函数操做的功能。相对于groupByKey而言,咱们倾向于采用reduceByKey和aggregateByKey来取代groupByKey,由于groupByKey不会进行Mapper端的aggregate的操做,全部的数据会经过网络传输传到Reducer端,性能会比较差。而咱们进行aggregateByKey的时候,能够自定义Mapper端的操做和Reducer端的操做,固然reduceByKey和aggregateByKey算子是同样的。

PairRDDFunctions.scala的groupByKey代码以下:

1.          def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])] = self.withScope {

2.             // groupByKey shouldn't use map sidecombine because map side combine does not

3.             // reduce the amount of data shuffled andrequires all map side data be inserted

4.             // into a hash table, leading to moreobjects in the old gen.

5.             val createCombiner = (v: V) => CompactBuffer(v)

6.             val mergeValue = (buf: CompactBuffer[V], v:V) => buf += v

7.             val mergeCombiners = (c1: CompactBuffer[V],c2: CompactBuffer[V]) => c1 ++= c2

8.             val bufs =combineByKeyWithClassTag[CompactBuffer[V]](

9.               createCombiner, mergeValue, mergeCombiners,partitioner, mapSideCombine = false)

10.          bufs.asInstanceOf[RDD[(K, Iterable[V])]]

11.        }

reduceByKey和aggregateByKey在正常状况下取代groupByKey的2个问题:

1,groupByKey可进行分组,reduceByKey和aggregateByKey怎么进行分组?可采用算法控制。

2,reduceByKey和aggregateByKey均可以取代groupByKey,reduceByKey和aggregateByKey有什么区别?区别很简单,aggregateByKey给予咱们更多的控制,能够定义Mapper端的aggregate函数和Reducer端aggregate函数;

 

(2)批量处理数据mapPartitions算子取代map算子

咱们看一下RDD.scala源代码,RDD在处理一块又一块的写数据的时候,不要使用map算子,可使用mapPartitions算子,但mapPartitions有一个弊端,会出现OOM的问题,由于每次处理掉一个Partitions的数据,对JVM也是一个负担。

RDD.scala的mapPartitions代码以下:

1.         def  mapPartitions[U: ClassTag](

2.               f: Iterator[T] => Iterator[U],

3.               preservesPartitioning: Boolean = false):RDD[U] = withScope {

4.             val cleanedF = sc.clean(f)

5.             new MapPartitionsRDD(

6.               this,

7.               (context: TaskContext, index: Int, iter:Iterator[T]) => cleanedF(iter),

8.               preservesPartitioning)

9.           }      

 

(3)批量数据处理foreachPartition取代foreach

foreach处理一条条的数据,foreachPartition将一批数据写入数据库或Hbase,至少提高50%的性能。RDD.scala的foreachPartition foreach源代码以下:

1.           def foreach(f: T => Unit): Unit = withScope{

2.             val cleanF = sc.clean(f)

3.             sc.runJob(this, (iter: Iterator[T]) =>iter.foreach(cleanF))

4.           }

5.          

6.           /**

7.            * Applies a function f to each partition ofthis RDD.

8.            */

9.           def foreachPartition(f: Iterator[T] =>Unit): Unit = withScope {

10.          val cleanF = sc.clean(f)

11.          sc.runJob(this, (iter: Iterator[T]) =>cleanF(iter))

12.        }

 

(4) 使用coalesce算子整理碎片文件。coalesce 默认状况不产生Shuffle,基本工做机制把更多并行度的数据变成更少的并行度。例如1万个并行度的数据变成100个并行度。coalesce算子返回一个新的RDD,汇聚为`numPartitions`个分区。这将致使一个窄依赖,例如,若是从1000个分区变成100分区,将不会产生shuffle,而不是从当前分区的10个分区变成100新分区。然而,若是作一个激烈的合并,例如numpartitions= 1,这可能会致使计算发生在更少的节点。(例如设置numpartitions = 1,将计算在一个节点上)。为了不这个状况,能够设置shuffle = true。这将增长一个shuffle 的步骤,但意味着当前的上游分区将并行执行。shuffle = true,能够汇聚到一个更大的分区,对于少许的分区这是有用的,

例如说100个分区,可能有几个分区数据很是大。那使用coalesce算子合并(1000,shuffle = true),将致使使用哈希分区器将数据分布在1000个分区。注意可选的分区coalescer必须是可序列化的。

RDD.scala的coalesce算子代码以下:

1.           def coalesce(numPartitions: Int, shuffle:Boolean = false,

2.                        partitionCoalescer:Option[PartitionCoalescer] = Option.empty)

3.                       (implicit ord: Ordering[T] =null)

4.               : RDD[T] = withScope {

5.             require(numPartitions > 0, s"Numberof partitions ($numPartitions) must be positive.")

6.             if (shuffle) {

7.               /** Distributes elements evenly acrossoutput partitions, starting from a random partition. */

8.               val distributePartition = (index: Int,items: Iterator[T]) => {

9.                 var position = (newRandom(index)).nextInt(numPartitions)

10.              items.map { t =>

11.                // Note that the hash code of the keywill just be the key itself. The HashPartitioner

12.                // will mod it with the number oftotal partitions.

13.                position = position + 1

14.                (position, t)

15.              }

16.            } : Iterator[(Int, T)]

17.       

18.            // include a shuffle step so that ourupstream tasks are still distributed

19.            new CoalescedRDD(

20.              new ShuffledRDD[Int, T,T](mapPartitionsWithIndex(distributePartition),

21.              new HashPartitioner(numPartitions)),

22.              numPartitions,

23.              partitionCoalescer).values

24.          } else {

25.            new CoalescedRDD(this, numPartitions,partitionCoalescer)

26.          }

27.        }

从最优化的角度讲,使用coalesce通常在使用filter算子以后。由于filter算子会产生数据碎片,Spark的并行度会从上游传到下游,咱们在filter算子以后通常会使用coalesce算子。

 

(5) 使用repartition算子,其背后使用的还是coalesce。可是shuffle 值默认设置为true,repartition算子会产生Shuffle。Repartition代码以下:

1.              def repartition(numPartitions: Int)(implicitord: Ordering[T] = null): RDD[T] = withScope {

2.             coalesce(numPartitions, shuffle = true)

3.           }

 

(6)repartition算子碎片整理之后会进行排序,Spark官方提供了一个repartitionAndSortWithinPartitions算子。JavaPairRDD的repartitionAndSortWithinPartitions方法代码以下:

1.           def  repartitionAndSortWithinPartitions(partitioner:Partitioner): JavaPairRDD[K, V] = {

2.             val comp =com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]

3.             repartitionAndSortWithinPartitions(partitioner,comp)

4.           }

 

(7)persist:数据复用的时候使用持久化算子。

1.          def persist(newLevel: StorageLevel): this.type= {

2.             if (isLocallyCheckpointed) {

3.               // This means the user previously calledlocalCheckpoint(), which should have already

4.               // marked this RDD for persisting. Herewe should override the old storage level with

5.               // one that is explicitly requested bythe user (after adapting it to use disk).

6.               persist(LocalRDDCheckpointData.transformStorageLevel(newLevel),allowOverride = true)

7.             } else {

8.               persist(newLevel, allowOverride = false)

9.             }

10.        }

 

 

(8)mapPartitionsWithIndex算子:推荐使用,每一个分区有个index,实际运行的时候看RDD上面有数字,若是对数字感兴趣可使用mapPartitionsWithIndex算子。

mapPartitionsWithIndex算子代码以下:

1.           def mapPartitionsWithIndex[U: ClassTag](

2.               f: (Int, Iterator[T]) => Iterator[U],

3.               preservesPartitioning: Boolean = false):RDD[U] = withScope {

4.             val cleanedF = sc.clean(f)

5.             new MapPartitionsRDD(

6.               this,

7.               (context: TaskContext, index: Int, iter:Iterator[T]) => cleanedF(index, iter),

8.               preservesPartitioning)

9.           }