spark-shuffle调优

什么状况下会发生shuffle,而后shuffle的原理是什么?网络

  • 在spark中,主要是如下几个算子:groupByKey、reduceByKey、countByKey、join,等等。
  • groupByKey,要把分布在集群各个节点上的数据中的同一个key,对应的values,都给集中到一起,集中到集群中同一个节点上,更严密一点说,就是集中到一个节点的一个executor的一个task中。而后呢,集中一个key对应的values以后,才能交给咱们来进行处理,<key, Iterable<value>>;reduceByKey,算子函数去对values集合进行reduce操做,最后变成一个value;countByKey,须要在一个task中,获取到一个key对应的全部的value,而后进行计数,统计总共有多少个value;join,RDD<key, value>,RDD<key, value>,只要是两个RDD中,key相同对应的2个value,都能到一个节点的executor的task中,给咱们进行处理。
  • 问题在于,同一个单词,好比说(hello, 1),可能散落在不一样的节点上;对每一个单词进行累加计数,就必须让全部单词都跑到同一个节点的一个task中,给一个task来进行处理;
  • 每个shuffle的前半部分stage的task,每一个task都会建立下一个stage的task数量相同的文件,好比下一个stage会有100个task,那么当前stage每一个task都会建立100份文件;会将同一个key对应的values,必定是写入同一个文件中的;
  • shuffle的后半部分stage的task,每一个task都会从各个节点上的task写的属于本身的那一份文件中,拉取key, value对;而后task会有一个内存缓冲区,而后会用HashMap,进行key, values的汇聚;(key ,values);
  • task会用咱们本身定义的聚合函数,好比reduceByKey(_+_),把全部values进行一对一的累加,聚合出来最终的值。就完成了shuffle;
  • shuffle,必定是分为两个stage来完成的。由于这实际上是个逆向的过程,不是stage决定shuffle,是shuffle决定stage。
  • reduceByKey(_+_),在某个action触发job的时候,DAGScheduler,会负责划分job为多个stage。划分的依据,就是,若是发现有会触发shuffle操做的算子,好比reduceByKey,就将这个操做的前半部分,以及以前全部的RDD和transformation操做,划分为一个stage;shuffle操做的后半部分,以及后面的,直到action为止的RDD和transformation操做,划分为另一个stage;
  • shuffle前半部分的task在写入数据到磁盘文件以前,都会先写入一个一个的内存缓冲,内存缓冲满溢以后,再spill溢写到磁盘文件中。

若是不合并map端输出文件的话,会怎么样?app

  1. 减小网络传输、disk io、减小reduce端内存缓冲
    • 问题来了:默认的这种shuffle行为,对性能有什么样的恶劣影响呢?
      • 实际生产环境的条件:
        • 100个节点(每一个节点一个executor):100个executor,每一个executor:2个cpu core,总共1000个task:每一个executor平均10个task,上游1000个task,下游1000个task,每一个节点,10个task,每一个节点或者说每个executor会输出多少份map端文件?10 * 1000=1万个文件(M*R)
        • 总共有多少份map端输出文件?100 * 10000 = 100万。
    • shuffle中的写磁盘的操做,基本上就是shuffle中性能消耗最为严重的部分。
    • 经过上面的分析,一个普通的生产环境的spark job的一个shuffle环节,会写入磁盘100万个文件。
    • 磁盘IO对性能和spark做业执行速度的影响,是极其惊人和吓人的。
    • 基本上,spark做业的性能,都消耗在shuffle中了,虽然不仅是shuffle的map端输出文件这一个部分,可是这里也是很是大的一个性能消耗点。
      new SparkConf().set("spark.shuffle.consolidateFiles", "true")

       

  2. 开启shuffle map端输出文件合并的机制;默认状况下,是不开启的,就是会发生如上所述的大量map端输出文件的操做,严重影响性能。jvm

  • 开启了map端输出文件的合并机制以后:
    • 第一个stage,同时就运行cpu core个task,好比cpu core是2个,并行运行2个task;
    • 每一个task都建立下一个stage的task数量个文件;
    • 第一个stage,并行运行的2个task执行完之后,就会执行另外两个task;
    • 另外2个task不会再从新建立输出文件;而是复用以前的task建立的map端输出文件,将数据写入上一批task的输出文件中;
    • 第二个stage,task在拉取数据的时候,就不会去拉取上一个stage每个task为本身建立的那份输出文件了;


提醒一下(map端输出文件合并):函数

  • 只有并行执行的task会去建立新的输出文件;
  • 下一批并行执行的task,就会去复用以前已有的输出文件;
  • 可是有一个例外,好比2个task并行在执行,可是此时又启动要执行2个task(不是同一批次);
  • 那么这个时候的话,就没法去复用刚才的2个task建立的输出文件了;
  • 而是仍是只能去建立新的输出文件。

要实现输出文件的合并的效果,必须是一批task先执行,而后下一批task再执行,
才能复用以前的输出文件;负责多批task同时起来执行,仍是作不到复用的。
性能

开启了map端输出文件合并机制以后,生产环境上的例子,会有什么样的变化?spa

实际生产环境的条件:
100个节点(每一个节点一个executor):100个executor
每一个executor:2个cpu core
总共1000个task:每一个executor平均10个task
上游1000个task,下游1000个taskscala


每一个节点,2个cpu core,有多少份输出文件呢?2 * 1000 = 2000个(C*R)
总共100个节点,总共建立多少份输出文件呢?100 * 2000 = 20万个文件code

相比较开启合并机制以前的状况,100万个orm

map端输出文件,在生产环境中,立减5倍!进程

合并map端输出文件,对我们的spark的性能有哪些方面的影响呢?

  1. map task写入磁盘文件的IO,减小:100万文件 -> 20万文件
  2. 第二个stage,本来要拉取第一个stage的task数量份文件,1000个task,第二个stage的每一个task,都要拉取1000份文件,走网络传输;合并之后,100个节点,每一个节点2个cpu core,第二个stage的每一个task,主要拉取1000 * 2 = 2000个文件便可;网络传输的性能消耗是否是也大大减小分享一下,实际在生产环境中,使用了spark.shuffle.consolidateFiles机制之后,实际的性能调优的效果:对于上述的这种生产环境的配置,性能的提高,仍是至关的客观的。

spark做业,5个小时 -> 2~3个小时。

你们不要小看这个map端输出文件合并机制。实际上,在数据量比较大,你本身自己作了前面的性能调优,
executor上去->cpu core上去->并行度(task数量)上去,shuffle没调优,shuffle就很糟糕了;
大量的map端输出文件的产生。对性能有比较恶劣的影响。

这个时候,去开启这个机制,能够颇有效的提高性能。
spark.shuffle.manager hash M*R 个小文件
spark.shuffle.manager sort   C*R 个小文件  (默认的shuffle管理机制

spark.shuffle.file.buffer,默认32k
spark.shuffle.memoryFraction,0.2

        默认状况下,shuffle的map task,输出到磁盘文件的时候,统一都会先写入每一个task本身关联的一个内存缓冲区。这个缓冲区大小,默认是32kb。每一次,当内存缓冲区满溢以后,才会进行spill操做,溢写操做,溢写到磁盘文件中去reduce端task,在拉取到数据以后,会用hashmap的数据格式,来对各个key对应的values进行汇聚。针对每一个key对应的values,执行咱们自定义的聚合函数的代码,好比_ + _(把全部values累加起来)reduce task,在进行汇聚、聚合等操做的时候,实际上,使用的就是本身对应的executor的内存,executor(jvm进程,堆),默认executor内存中划分给reduce task进行聚合的比例,是0.2。问题来了,由于比例是0.2,因此,理论上,颇有可能会出现,拉取过来的数据不少,那么在内存中,放不下;这个时候,默认的行为,就是说,将在内存放不下的数据,都spill(溢写)到磁盘文件中去。

原理说完以后,来看一下,默认状况下,不调优,可能会出现什么样的问题?

默认,map端内存缓冲是每一个task,32kb。
默认,reduce端聚合内存比例,是0.2,也就是20%。

若是map端的task,处理的数据量比较大,可是呢,你的内存缓冲大小是固定的。
可能会出现什么样的状况?

每一个task就处理320kb,32kb,总共会向磁盘溢写320 / 32 = 10次。
每一个task处理32000kb,32kb,总共会向磁盘溢写32000 / 32 = 1000次。

在map task处理的数据量比较大的状况下,而你的task的内存缓冲默认是比较小的,32kb。可能会形成屡次的map端往磁盘文件的spill溢写操做,发生大量的磁盘IO,从而下降性能。

reduce端聚合内存,占比。默认是0.2。若是数据量比较大,reduce task拉取过来的数据不少,那么就会频繁发生reduce端聚合内存不够用,频繁发生spill操做,溢写到磁盘上去。并且最要命的是,磁盘上溢写的数据量越大,后面在进行聚合操做的时候,极可能会屡次读取磁盘中的数据,进行聚合。

默认不调优,在数据量比较大的状况下,可能频繁地发生reduce端的磁盘文件的读写。

这两个点之因此放在一块儿讲,是由于他们俩是有关联的。数据量变大,map端确定会出点问题;
reduce端确定也会出点问题;出的问题是同样的,都是磁盘IO频繁,变多,影响性能。

调优:

调节map task内存缓冲:spark.shuffle.file.buffer,默认32k(spark 1.3.x不是这个参数,
后面还有一个后缀,kb;spark 1.5.x之后,变了,就是如今这个参数)
调节reduce端聚合内存占比:spark.shuffle.memoryFraction,0.2

在实际生产环境中,咱们在何时来调节两个参数?

看Spark UI,若是你的公司是决定采用standalone模式,那么很简单,你的spark跑起来,会显示一个Spark UI的地址,4040的端口,进去看,依次点击进去,能够看到,你的每一个stage的详情,有哪些executor,有哪些task,每一个task的shuffle write和shuffle read的量,shuffle的磁盘和内存,读写的数据量;若是是用的yarn模式来提交,课程最前面,从yarn的界面进去,点击对应的application,进入Spark UI,查看详情。

若是发现shuffle 磁盘的write和read,很大,能够调节这两个参数

调节上面说的那两个参数。调节的时候的原则。spark.shuffle.file.buffer,每次扩大一倍,而后看看效果,64,128;spark.shuffle.memoryFraction,每次提升0.1,看看效果。不能调节的太大,太大了之后过犹不及,由于内存资源是有限的,你这里调节的太大了,其余环节的内存使用就会有问题了。

调节了之后,效果?map task内存缓冲变大了,减小spill到磁盘文件的次数;reduce端聚合内存变大了, 减小spill到磁盘的次数,并且减小了后面聚合读取磁盘文件的数量。  

相关文章
相关标签/搜索