Spark学习——性能调优(二)

其余更多java基础文章:
java基础学习(目录)java


继续上一篇Spark学习——性能调优(一)的讲解缓存

下降cache操做的内存占比

关于RDD内存的使用,后面会专门写一篇,能够配合学习阅读bash

spark中,堆内存又被划分红了两块儿,一起是专门用来给RDD的cache、persist操做进行RDD数据缓存用的;另一块儿,就是咱们刚才所说的,用来给spark算子函数的运行使用的,存放函数中本身建立的对象。网络

默认状况下,给RDD cache操做的内存占比,是0.6,60%的内存都给了cache操做了。可是问题是,若是某些状况下,cache不是那么的紧张,问题在于task算子函数中建立的对象过多,而后内存又不太大,致使了频繁的minor gc,甚至频繁full gc,致使spark频繁的中止工做。性能影响会很大。app

针对上述这种状况,你们能够在spark ui界面,若是经过yarn去运行的话,那么就经过yarn的界面,去查看你的spark做业的运行统计。能够看到每一个stage的运行状况,包括每一个task的运行时间、gc时间等等。若是发现gc太频繁,时间太长。此时就能够适当调价这个比例。jvm

下降cache操做的内存占比,大不了用persist操做,选择将一部分缓存的RDD数据写入磁盘,或者序列化方式,配合Kryo序列化类,减小RDD缓存的内存占用;下降cache操做内存占比;对应的,算子函数的内存占比就提高了。这个时候,可能,就能够减小minor gc的频率,同时减小full gc的频率。对性能的提高是有必定的帮助的。函数

一句话,让task执行算子函数时,有更多的内存可使用。post

SparkConf conf = new SparkConf()
  .set("spark.storage.memoryFraction", "0.5")
复制代码

调节executor堆外内存

有时候,若是你的spark做业处理的数据量特别特别大,几亿数据量;而后spark做业一运行,时不时的报错,shuffle file cannot find,executor、task lost,out of memory(内存溢出);

多是说executor的堆外内存不太够用,致使executor在运行的过程当中,可能会内存溢出;而后可能致使后续的stage的task在运行的时候,可能要从一些executor中去拉取shuffle map output文件,可是executor可能已经挂掉了,关联的block manager也没有了;因此可能会报shuffle output file not found;resubmitting task;executor lost;spark做业完全崩溃。性能

上述状况下,就能够去考虑调节一下executor的堆外内存。也许就能够避免报错;此外,有时,堆外内存调节的比较大的时候,对于性能来讲,也会带来必定的提高。学习

如何调节executor堆外内存

--conf spark.yarn.executor.memoryOverhead=2048
复制代码

spark-submit脚本里面,去用--conf的方式,去添加配置;必定要注意!!!切记,不是在你的spark做业代码中,用new SparkConf().set()这种方式去设置,不要这样去设置,是没有用的!必定要在spark-submit脚本中去设置。

spark.yarn.executor.memoryOverhead(看名字,顾名思义,针对的是基于yarn的提交模式)

默认状况下,这个堆外内存上限大概是300多M;咱们一般项目中,真正处理大数据的时候,这里都会出现问题,致使spark做业反复崩溃,没法运行;此时就会去调节这个参数,到至少1G(1024M),甚至说2G、4G

一般这个参数调节上去之后,就会避免掉某些JVM OOM的异常问题,同时呢,会让总体spark做业的性能,获得较大的提高。

调节链接等待时长

若是本地block manager没有的话,那么会经过TransferService,去远程链接其余节点上executor的block manager去获取。若是正好其余节点上的executor正在GC,此时呢,就会没有响应,没法创建网络链接;会卡住;ok,spark默认的网络链接的超时时长,是60s;若是卡住60s都没法创建链接的话,那么就宣告失败了。

碰到一种状况,偶尔,偶尔,偶尔!!!没有规律!!!某某file。一串file id。uuid(dsfsfd-2342vs--sdf--sdfsd)。not found。file lost。

这种状况下,颇有多是有那份数据的executor在jvm gc。因此拉取数据的时候,创建不了链接。而后超过默认60s之后,直接宣告失败。

报错几回,几回都拉取不到数据的话,可能会致使spark做业的崩溃。也可能会致使DAGScheduler,反复提交几回stage。TaskScheduler,反复提交几回task。大大延长咱们的spark做业的运行时间。

能够考虑调节链接的超时时长。

--conf spark.core.connection.ack.wait.timeout=300
复制代码

spark-submit脚本,切记,不是在new SparkConf().set()这种方式来设置的。

合并map端输出文件

实际生产环境的条件:
100个节点(每一个节点一个executor):100个executor
每一个executor:2个cpu core
总共1000个task:每一个executor平均10个task
每一个节点,10个task,每一个节点会输出多少份map端文件?10 * 1000=1万个文件
总共有多少份map端输出文件?100 * 10000 = 100万。

  • shuffle中的写磁盘的操做,基本上就是shuffle中性能消耗最为严重的部分。 经过上面的分析,一个普通的生产环境的spark job的一个shuffle环节,会写入磁盘100万个文件。 磁盘IO对性能和spark做业执行速度的影响,是极其惊人和吓人的。 基本上,spark做业的性能,都消耗在shuffle中了,虽然不仅是shuffle的map端输出文件这一个部分,可是这里也是很是大的一个性能消耗点。

开启map端输出文件的合并机制

经过一下命令能够开启map端输出文件的合并机制

new SparkConf().set("spark.shuffle.consolidateFiles", "true")
复制代码

如上图:

  • 第一个stage,同时就运行cpu core个task,好比cpu core是2个,并行运行2个task;每一个task都建立下一个stage的task数量个文件;
  • 第一个stage,并行运行的2个task执行完之后;就会执行另外两个task;另外2个task不会再从新建立输出文件;而是复用以前的task建立的map端输出文件,将数据写入上一批task的输出文件中。
  • 第二个stage,task在拉取数据的时候,就不会去拉取上一个stage每个task为本身建立的那份输出文件了;而是拉取少许的输出文件,每一个输出文件中,可能包含了多个task给本身的map端输出。

合并map端输出文件后,上面的例子会有什么改变呢?

  1. map task写入磁盘文件的IO,减小:100万文件 -> 20万文件
  2. 第二个stage,本来要拉取第一个stage的task数量份文件,1000个task,第二个stage的每一个task,都要拉取1000份文件,走网络传输;合并之后,100个节点,每一个节点2个cpu core,第二个stage的每一个task,主要拉取100 * 2 = 200个文件便可;网络传输的性能消耗是否是也大大减小

提醒一下(map端输出文件合并):
只有并行执行的task会去建立新的输出文件;下一批并行执行的task,就会去复用以前已有的输出文件;可是有一个例外,好比2个task并行在执行,可是此时又启动要执行2个task;那么这个时候的话,就没法去复用刚才的2个task建立的输出文件了;而是仍是只能去建立新的输出文件。

要实现输出文件的合并的效果,必须是一批task先执行,而后下一批task再执行,才能复用以前的输出文件;负责多批task同时起来执行,仍是作不到复用的。

调节map端内存缓存和reduce端内存占比

默认map端内存缓冲是每一个task,32kb。reduce端聚合内存比例,是0.2,也就是20%。

若是map端的task,处理的数据量比较大,可是呢,你的内存缓冲大小是固定的。可能会出现什么样的状况?

每一个task就处理320kb,32kb,总共会向磁盘溢写320 / 32 = 10次。
每一个task处理32000kb,32kb,总共会向磁盘溢写32000 / 32 = 1000次。

在map task处理的数据量比较大的状况下,而你的task的内存缓冲默认是比较小的,32kb。可能会形成屡次的map端往磁盘文件的spill溢写操做,发生大量的磁盘IO,从而下降性能。

reduce端聚合内存,占比。默认是0.2。若是数据量比较大,reduce task拉取过来的数据不少,那么就会频繁发生reduce端聚合内存不够用,频繁发生spill操做,溢写到磁盘上去。并且最要命的是,磁盘上溢写的数据量越大,后面在进行聚合操做的时候,极可能会屡次读取磁盘中的数据,进行聚合。默认不调优,在数据量比较大的状况下,可能频繁地发生reduce端的磁盘文件的读写。

这两个点之因此放在一块儿讲,是由于他们俩是有关联的。数据量变大,map端确定会出点问题;reduce端确定也会出点问题;出的问题是同样的,都是磁盘IO频繁,变多,影响性能。

如何调优

调节map task内存缓冲:
new SparkConf().set("spark.shuffle.file.buffer", "64")
默认32k(spark 1.3.x不是这个参数,后面还有一个后缀,kb;spark 1.5.x之后,变了,就是如今这个参数)

调节reduce端聚合内存占比:
new SparkConf().set("spark.shuffle.memoryFraction", "0.3")
默认0.2
复制代码

在实际生产环境中,咱们在何时来调节两个参数?

看Spark UI,若是你的公司是决定采用standalone模式,那么很简单,你的spark跑起来,会显示一个Spark UI的地址,4040的端口,进去看,依次点击进去,能够看到,你的每一个stage的详情,有哪些executor,有哪些task,每一个task的shuffle write和shuffle read的量,shuffle的磁盘和内存,读写的数据量;若是是用的yarn模式来提交,从yarn的界面进去,点击对应的application,进入Spark UI,查看详情。

若是发现shuffle 磁盘的write和read很大。这个时候,就意味着最好调节一些shuffle的参数。进行调优。首先固然是考虑开启map端输出文件合并机制。

调节上面说的那两个参数。调节的时候的原则。spark.shuffle.file.buffer,每次扩大一倍,而后看看效果,64,128;spark.shuffle.memoryFraction,每次提升0.1,看看效果。

不能调节的太大,太大了之后过犹不及,由于内存资源是有限的,你这里调节的太大了,其余环节的内存使用就会有问题了。

SortShuffleManager调优

//阈值设置
new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "550")
复制代码
  • 在spark 1.5.x之后,对于shuffle manager又出来了一种新的manager,tungsten-sort(钨丝),钨丝sort shuffle manager。官网上通常说,钨丝sort shuffle manager,效果跟sort shuffle manager是差很少的。 可是,惟一的不一样之处在于,钨丝manager,是使用了本身实现的一套内存管理机制,性能上有很大的提高, 并且能够避免shuffle过程当中产生的大量的OOM,GC,等等内存相关的异常。

来一个总结,如今至关于把spark的shuffle的东西又多讲了一些。你们理解的更加深刻了。hash、sort、tungsten-sort。如何来选择?

  1. 需不须要数据默认就让spark给你进行排序?就好像mapreduce,默认就是有按照key的排序。若是不须要的话,其实仍是建议搭建就使用最基本的HashShuffleManager,由于最开始就是考虑的是不排序,换取高性能;

  2. 何时须要用sort shuffle manager?若是你须要你的那些数据按key排序了,那么就选择这种吧,并且要注意,reduce task的数量应该是超过200的,这样sort、merge(多个文件合并成一个)的机制,才能生效把。可是这里要注意,你必定要本身考量一下,有没有必要在shuffle的过程当中,就作这个事情,毕竟对性能是有影响的。

  3. 若是你不须要排序,并且你但愿你的每一个task输出的文件最终是会合并成一份的,你本身认为能够减小性能开销;能够去调节·bypassMergeThreshold·这个阈值,好比你的reduce task数量是500,默认阈值是200,因此默认仍是会进行sort和直接merge的;能够将阈值调节成550,不会进行sort,按照hash的作法,每一个reduce task建立一份输出文件,最后合并成一份文件。(必定要提醒你们,这个参数,其实咱们一般不会在生产环境里去使用,也没有通过验证说,这样的方式,到底有多少性能的提高)

  4. 若是你想选用sort based shuffle manager,并且大家公司的spark版本比较高,是1.5.x版本的,那么能够考虑去尝试使用tungsten-sort shuffle manager。看看性能的提高与稳定性怎么样。

总结:

  1. 在生产环境中,不建议你们贸然使用第三点和第四点:
  2. 若是你不想要你的数据在shuffle时排序,那么就本身设置一下,用hash shuffle manager。
  3. 若是你的确是须要你的数据在shuffle时进行排序的,那么就默认不用动,默认就是sort shuffle manager;或者是什么?若是你压根儿不care是否排序这个事儿,那么就默认让他就是sort的。调节一些其余的参数(consolidation机制)。(80%,都是用这种)
spark.shuffle.manager:hash、sort、tungsten-sort

new SparkConf().set("spark.shuffle.manager", "hash")
new SparkConf().set("spark.shuffle.manager", "tungsten-sort")
// 默认就是,new SparkConf().set("spark.shuffle.manager", "sort")

复制代码
相关文章
相关标签/搜索