Spark 之 算子调优(一)

这是我参与更文挑战的第21天,活动详情查看:更文挑战算法

算子调优一:mapPartitions

  普通的map算子对RDD中的每个元素进行操做,而mapPartitions算子对RDD中每个分区进行操做。若是是普通的map算子,假设一个partition有1万条数据,那么map算子中的function要执行1万次,也就是对每一个元素进行操做。数据库

image.png

  若是是mapPartition算子,因为一个task处理一个RDD的partition,那么一个task只会执行一次function,function一次接收全部的partition数据,效率比较高。markdown

image.png

  好比,当要把RDD中的全部数据经过JDBC写入数据,若是使用map算子,那么须要对RDD中的每个元素都建立一个数据库链接,这样对资源的消耗很大,若是使用mapPartitions算子,那么针对一个分区的数据,只须要创建一个数据库链接。app

  mapPartitions算子也存在一些缺点:对于普通的map操做,一次处理一条数据,若是在处理了2000条数据后内存不足,那么能够将已经处理完的2000条数据从内存中垃圾回收掉;可是若是使用mapPartitions算子,但数据量很是大时,function一次处理一个分区的数据,若是一旦内存不足,此时没法回收内存,就可能会OOM,即内存溢出。函数

  所以,mapPartitions算子适用于数据量不是特别大的时候,此时使用mapPartitions算子对性能的提高效果仍是不错的。(当数据量很大的时候,一旦使用mapPartitions算子,就会直接OOM)post

  在项目中,应该首先估算一下RDD的数据量、每一个partition的数据量,以及分配给每一个Executor的内存资源,若是资源容许,能够考虑使用mapPartitions算子代替map。性能

算子调优二:foreachPartition优化数据库操做

  在生产环境中,一般使用foreachPartition算子来完成数据库的写入,经过foreachPartition算子的特性,能够优化写数据库的性能。优化

  若是使用foreach算子完成数据库的操做,因为foreach算子是遍历RDD的每条数据,所以,每条数据都会创建一个数据库链接,这是对资源的极大浪费,所以,对于写数据库操做,咱们应当使用foreachPartition算子。ui

  与mapPartitions算子很是类似,foreachPartition是将RDD的每一个分区做为遍历对象,一次处理一个分区的数据,也就是说,若是涉及数据库的相关操做,一个分区的数据只须要建立一次数据库链接,如图所示:url

image.png

  • 使用了foreachPartition算子后,能够得到如下的性能提高:
    • 对于咱们写的function函数,一次处理一整个分区的数据;
    • 对于一个分区内的数据,建立惟一的数据库链接;
    • 只须要向数据库发送一次SQL语句和多组参数;

  在生产环境中,所有都会使用foreachPartition算子完成数据库操做。foreachPartition算子存在一个问题,与mapPartitions算子相似,若是一个分区的数据量特别大,可能会形成OOM,即内存溢出。

算子调优三:repartition解决SparkSQL低并行度问题

  在第一节的常规性能调优中咱们讲解了并行度的调节策略,可是,并行度的设置对于Spark SQL是不生效的,用户设置的并行度只对于Spark SQL之外的全部Spark的stage生效。

  Spark SQL的并行度不容许用户本身指定,Spark SQL本身会默认根据hive表对应的HDFS文件的split个数自动设置Spark SQL所在的那个stage的并行度,用户本身通spark.default.parallelism参数指定的并行度,只会在没Spark SQL的stage中生效。

  因为Spark SQL所在stage的并行度没法手动设置,若是数据量较大,而且此stage中后续的transformation操做有着复杂的业务逻辑,而Spark SQL自动设置的task数量不多,这就意味着每一个task要处理为数很多的数据量,而后还要执行很是复杂的处理逻辑,这就可能表现为第一个有Spark SQL的stage速度很慢,然后续的没有Spark SQL的stage运行速度很是快。

为了解决Spark SQL没法设置并行度和task数量的问题,咱们可使用repartition算子。 image.png

  Spark SQL这一步的并行度和task数量确定是没有办法去改变了,可是,对于Spark SQL查询出来的RDD,当即使用repartition算子,去从新进行分区,这样能够从新分区为多个partition,从repartition以后的RDD操做,因为再也不设计Spark SQL,所以stage的并行度就会等于你手动设置的值,这样就避免了Spark SQL所在的stage只能用少许的task去处理大量数据并执行复杂的算法逻辑。

相关文章
相关标签/搜索