其余更多java基础文章:
java基础学习(目录)java
继续上一篇Spark学习——性能调优(二)的讲解算法
数据量不是特别大的时候,均可以用这种MapPartitions系列操做,性能仍是很是不错的,是有提高的。缓存
在项目中,本身先去估算一下RDD的数据量,以及每一个partition的量,还有本身分配给每一个executor的内存资源。看看一会儿内存容纳全部的partition数据,行不行。若是行,能够试一下,能跑通就好。性能确定是有提高的。bash
可是试了一下之后,发现,不行,OOM了,那就放弃吧。网络
针对上述的两个问题,咱们但愿应该可以怎么样?app
针对第一个问题,咱们但愿能够进行partition的压缩吧,由于数据量变少了,那么partition其实也彻底能够对应的变少。好比原来是4个partition,如今彻底能够变成2个partition。那么就只要用后面的2个task来处理便可。就不会形成task计算资源的浪费。(没必要要,针对只有一点点数据的partition,还去启动一个task来计算)post
针对第二个问题,其实解决方案跟第一个问题是同样的;也是去压缩partition,尽可能让每一个partition的数据量差很少。那么这样的话,后面的task分配到的partition的数据量也就差很少。不会形成有的task运行速度特别慢,有的task运行速度特别快。避免了数据倾斜的问题。性能
coalesce算子
主要就是用于在filter操做以后,针对每一个partition的数据量各不相同的状况,来压缩partition的数量。减小partition的数量,并且让每一个partition的数据量都尽可能均匀紧凑。学习
从而便于后面的task进行计算操做,在某种程度上,可以必定程度的提高性能。优化
RDD.filter(XXX).coalesce(100);
复制代码
前说过,并行度是本身能够调节,或者说是设置的。
一、spark.default.parallelism
二、textFile(),传入第二个参数,指定partition数量(比较少用)
复制代码
官方推荐,根据你的application的总cpu core数量(在spark-submit中能够指定,好比 200个),本身手动设置spark.default.parallelism参数,指定为cpu core总数的2~3倍。400~600个并行度。
你设置的这个并行度,在哪些状况下会生效?哪些状况下,不会生效?
若是你压根儿没有使用Spark SQL(DataFrame),那么你整个spark application默认全部stage的并行度都是你设置的那个参数。(除非你使用coalesce算子缩减过partition数量)
问题来了,若是用Spark SQL,那含有Spark SQL的那个stage的并行度,你无法本身指定。Spark SQL本身会默认根据hive表对应的hdfs文件的block,自动设置Spark SQL查询所在的那个stage的并行度。你本身经过spark.default.parallelism参数指定的并行度,只会在没有Spark SQL的stage中生效。
好比你第一个stage,用了Spark SQL从hive表中查询出了一些数据,而后作了一些transformation操做,接着作了一个shuffle操做(groupByKey);下一个stage,在shuffle操做以后,作了一些transformation操做。hive表,对应了一个hdfs文件,有20个block;你本身设置了spark.default.parallelism参数为100。
你的第一个stage的并行度,是不受你的控制的,就只有20个task;第二个stage,才会变成你本身设置的那个并行度,100。
问题在哪里?
Spark SQL默认状况下,它的那个并行度,我们无法设置。可能致使的问题,也可能没什么问题。Spark SQL所在的那个stage中,后面的那些transformation操做,可能会有很是复杂的业务逻辑,甚至说复杂的算法。若是你的Spark SQL默认把task数量设置的不多,20个,而后每一个task要处理为数很多的数据量,而后还要执行特别复杂的算法。
这个时候,就会致使第一个stage的速度,特别慢。第二个stage,刷刷刷,很是快。
如何解决
repartition算子,你用Spark SQL这一步的并行度和task数量,确定是没有办法去改变了。可是呢,能够将你用Spark SQL查询出来的RDD,使用repartition算子,去从新进行分区,此时能够分区成多个partition,好比从20个partition,分区成100个。
而后呢,从repartition之后的RDD,再日后,并行度和task数量,就会按照你预期的来了。就能够避免跟Spark SQL绑定在一个stage中的算子,只能使用少许的task去处理大量数据以及复杂的算法逻辑。
return dataDF.javaRDD().repartition(1000);
复制代码
用reduceByKey对性能的提高: