Spark (三) 性能优化

参数配置java

一、spark-env.sh数据库

二、程序经过SparkConf或System.setPropertyapache

 

性能观察与日志数组

1)Web UI。
2)Driver程序控制台日志。
3)logs文件夹下日志。
4)work文件夹下日志。
5)Profiler工具。缓存

 

调度与分区优化安全

1.小分区合并性能优化

频繁的过滤或者过滤掉的数据量过大就会产生问题,形成大量小分区的产生。Spark是每一个数据分区都会分配一个任务执行,若是任务过多,则每一个任务处理的数据量很小,会形成线程切换开销大,不少任务等待执行,并行度不高;服务器

解决方式:能够采用RDD中重分区的函数进行数据紧缩,减小分区数,将小分区合并变为大分区。
经过coalesce函数来减小分区。这个函数会返回一个含有numPartitions数量个分区的新RDD,即将整个RDD重分区。
当分区由10000重分区到100时,因为先后两个阶段的分区是窄依赖的,因此不会产生Shuffle的操做。
可是若是分区数量急剧减小,如极端情况从10000重分区为一个分区时,就会形成一个问题:数据会分布到一个节点上进行计算,彻底没法开掘集群并行计算的能力。为了规避这个问题,能够设置shuffle=true网络

因为Shuffle能够分隔Stage,这就保证了上一阶段Stage中的上游任务还是10000个分区在并行计算。若是不加Shuffle,则两个上下游的任务合并为一个Stage计算,这个Stage便会在1个分区情况下进行并行计算。数据结构

另外一个需求,即当前的每一个分区数据量过大,须要将分区数量增长,以利用并行计算能力,这就须要把Shuffle设置为true,而后执行coalesce函数,将分区数增大,在这个过程当中,默认使用Hash分区器将数据进行重分区。

reparition本质上就是调用的coalesce方法。所以若是用户不想进行Shuffle,就需用coalese配置重分区,为了方便起见,能够直接用repartition进行重分区。

 

2.倾斜问题

倾斜(skew)问题是分布式大数据计算中的重要问题,倾斜有数据倾斜和任务倾斜两种状况,数据倾斜致使的结果即为任务倾斜,在个别分区上,任务执行时间过长。当少许任务处理的数据量和其余任务差别过大时,任务进度长时间维持在99%(或100%),此时,任务监控页面中有少许(1个或几个)reduce子任务
未完成。单一reduce的记录数与平均记录数差别过大,最长时长远大于平均时长,常可能达到3倍甚至更多

数据倾斜
产生数据倾斜的缘由大体有如下几种。
1)key的数据分布不均匀(通常是分区key取得很差或者分区函数设计得很差)。
2)业务数据自己就会产生数据倾斜(像TPC-DS为了模拟真实环境负载特地用有倾斜的数据进行测试)。
3)结构化数据表设计问题。
4)某些SQL语句会产生数据倾斜

任务倾斜

任务倾斜的缘由较为隐蔽,通常就是那台机器的正在执行的Executor执行时间过长,由于服务器架构,或JVM,也多是来自线程池的问题,等等。

解决方式:能够经过考虑在其余并行处理方式中间加入汇集运算,以减小倾斜数据量。
数据倾斜通常能够经过在业务上将极度不均匀的数据剔除解决。这里其实还有Skew Join的一种处理方式,将数据分两个阶段处理,倾斜的key数据做为数据源处理,剩下的key的数据再作一样的处理。两者分开作一样的处理

任务执行速度倾斜

产生缘由多是数据倾斜,也多是执行任务的机器在架构,OS、JVM各节点配置不一样或其余缘由。
解决方式:设置spark.speculation=true把那些执行时间过长的节点去掉,从新调度分配任务,这个方式和Hadoop MapReduce的speculation是相通的。同时能够配置多长时间来推测执行,spark.speculation.interval用来设置执行间隔进行配置。在源码中默认是配置的100

解决方案

1)增大任务数,减小每一个分区数据量:增大任务数,也就是扩大分区量,同时减小单个分区的数据量。
2)对特殊key处理:空值映射为特定Key,而后分发到不一样节点,对空值不作处理。
3)广播。
①小数据量表直接广播。
②数据量较大的表能够考虑切分为多个小表,多阶段进行Map Side Join。
4)汇集操做能够Map端汇集部分结果,而后Reduce端合并,减小Reduce端压力。
5)拆分RDD:将倾斜数据与原数据分离,分两个Job进行计算。

 

3.并行度

Spark会根据文件的大小,默认配置Map阶段任务数量,也就是分区数量(也能够经过SparkContext.textFile等方法进行配置)。而Reduce的阶段任务数量配置能够有两种方式

第一种方式:写函数的过程当中经过函数的第二个参数进行配置

第二种方式:经过配置spark.default.parallelism来进行配置。它们的本质原理一致,均是控制Shuffle过程的默认任务数量

Spark官方推荐选择每一个CPU Core分配2~3个任务,即cpu corenum*2(或3)数量的并行度。
若是并行度过高,任务数太多,就会产生大量的任务启动和切换开销。
若是并行度过低,任务数过小,就会没法发挥集群的并行计算能力,任务执行过慢,同时可能会形成内存combine数据过多占用内存,而出现内存溢出(out of memory)的异常。

 

3. DAG调度执行优化

1)同一个Stage中尽可能容纳更多的算子,以减小Shuffle的发生。

因为Stage中的算子是按照流水线方式执行的,因此更多的Transformation放在一块儿执行可以减小Shuffle的开销和任务启动和切换的开销

2)复用已经cache过的数据。可使用cache和persist函数将数据缓存在内存,其实能够按单机的方式理解,存储仍然是多级存储,数据存储在访问快的存储设备中,提升快速存储命中率会提高整个应用程序的性能

 

内存存储优化

1.JVM调优

内存调优过程的大方向上有三个方向是值得考虑的。
1)应用程序中对象所占用的内存空间。
2)访问这些内存对象的代价。
3)垃圾回收的开销。
一般情况下,Java的对象访问速度是很快的,可是相对于对象中存储的原始数据,Java对象总体会耗费2~5倍的内存空间。

1)不一样的Java对象都会有一个对象头(object header),这个对象头大约为16byte,包含指向这个对象的类的指针等信息,对一些只有少许数据的对象,这是极为不经济的。例如,只有一个Int属性的对象,这个头的信息所占空间会大于对象的数据空间。
2)Java中的字符串(String)占用40byte空间。String的内存是将真正字符串的信息存储在一个char数组中,而且还会存储其余的信息,如字符串长度,同时若是采用UTF-16编码,一个字符就占用2byte的空间。综合以上,一个10字符的字符串会占用超过60byte的内存空间。
3)经常使用的一些集合类,如LinkedList等是采用链式数据结构存储的,对底层的每一个数据项进行了包装,这个对象不仅存储数据,还会存储指向其余数据项的指针,这些指针也会产生数据空间的占用和开销。
4)集合类中的基本数据类型经常采用一些装箱的对象存储,如java.lang.Ingeger。装箱与拆箱的机制在不少程序设计语言中都有,Java中装箱意味着将这些基本数据类型包装为对象存储在内存的Java堆中,而拆箱意味着将堆中对象转换为栈中存储的数据。

计算数据在集群内存占用的空间的大小的最好方法是建立一个RDD,读取这些数据,将数据加载到cache,在驱动程序的控制台查看SparkContext的日志。这些日志信息会显示每一个分区占用多少空间

调整数据结构

减小对象嵌套;使用数字的ID或者枚举对象;序列化存储RDD;

当内存小于32GB时,官方推荐配置JVM参数-XX:+UseCompressedOops,进而将指针由8byte压缩为4byte。OOP的全称是ordinary object pointer,即普通对象指针。在64位HotSpot中,OOP使用32位指针,默认64位指针会比32位指针使用的内存多1.5倍,启用CompressOops后,会压缩的对象以下。
①每一个Class的属性指针(静态成员变量)。
②每一个对象的属性指针。
③普通对象数组每一个元素的指针。
可是,指向PermGen的Class对象指针、本地变量、堆栈元素、入参、返回值、NULL指针不会被压缩。能够经过配置文件spark-env.sh配置这个参数,从而在Spark中启用JVM指针压缩。

JVM垃圾回收(GC)调优

当Spark程序产生大数据量的RDD时,JVM的垃圾回收就会成为一个问题。当Spark任务的工做内存空间和RDD的缓存数据空间产生干扰时,垃圾回收一样会成为一个问题,能够经过控制分给RDD的缓存来缓解这个问题。GC来讲,一个重要的配置参数就是内存给RDD用于缓存的空间大小。默认状况下,Spark用配置好的Executor 60%的内存(spark.executor.memory)缓存RDD。这就意味着40%的剩余内存空间可让Task在执行过程当中缓存新建立的对象。在有些状况下,用户的任务变慢,并且JVM频繁地进行垃圾回收或者出现内存溢出(out of memory异常),这时能够调整这个百分比参数为50%。这个百分比参数能够经过配置spark-env.sh中的变量spark.storage.memoryFraction=0.5进行配置。同时结合序列化的缓存存储对象减小内存空间占用,将会更加有效地缓解垃圾回收问题

度量GC的影响:-verbose:gc-XX:+PrintGCDetails-XX:+PrintGCTime-Stamps

若是任务是从HDFS读取数据,内存空间的占用能够经过从HDFS读取的数据块大小和数量估计。须要注意的是,通常状况下,压缩的数据压缩以后一般为原来数据块大小的2~3倍。所以若是一个JVM中要执行3~4个任务,同时HDFS的数据块大小是64MB,就能够估计须要的Eden代大小是4×3×64MB大小的空间。

OOM的缘由还极可能是Shuffle类操做符在任务执行过程当中在内存创建的Hash表过大。在这种状况下,能够经过增长任务数,即分区数来提高并行性度,减少每一个任务的输入数据,减小内存占用来解决

2.磁盘临时目录空间优化

配置参数spark.local.dir可以配置Spark在磁盘的临时目录,默认是/tmp目录。在Spark进行Shuffle的过程当中,中间结果会写入Spark在磁盘的临时目录中,或者当内存不可以彻底存储RDD时,内存放不下的数据会写到配置的磁盘临时目录中。这个临时目录设置太小会形成No space left on device异常。也能够配置多个盘块spark.local.dir=/mn1/spark,/mnt2/spar,/mnt3/spark来扩展Spark的磁盘临时目录,让更多的数据能够写到磁盘,加快I/O速度

 

网络传输优化

1.大任务分发优化

在任务的分发过程当中会序列化任务的元数据信息,以及任务须要的jar和文件。任务的分发是经过AKKA库中的Actor模型之间的消息传送的。由于Spark采用了Scala的函数式风格,传递函数的变量引用采用闭包方式传递,因此当须要传输的数据经过Task进行分发时,会拖慢总体的执行速度。配置参数spark.akka.frameSize(默认buffer的大小为10MB)能够缓解过大的任务形成AKKA缓冲区溢出的问题,可是这个方式并不能解决本质的问题。

spark.akka.frameSize控制Spark框架内使用的AKKA框架中,Actor通讯消息的最大容量(如任务(Task)的输出结果),由于整个Spark集群的消息传递都是经过Actor进行的,默认为10MB。当处理大规模数据时,任务的输出可能会大于这个值,须要根据实际数据设置一个更高的值。若是是这个值不够大而产生的错误,则能够从Worker节点的日志中排查。一般Worker上的任务失败后,主节点Master的运行日志上提示“Lost TID:”,可经过查看失败的Worker日志文件$SPARK_HOME/work/目录下面的日志文件中记录的任务的Serializedsize of result是否超过10MB来肯定通讯数据超过AKKA的Buffer异常

2.Broadcast在调优场景的使用

Spark的Broadcast(广播)变量对数据传输进行优化,经过Broadcast变量将用到的大数据量数据进行广播发送,能够提高总体速度。Broadcast主要用于共享Spark在计算过程当中各个task都会用到的只读变量,Broadcast变量只会在每台计算机器上保存一份,而不会每一个task都传递一份,这样就大大节省了空间,节省空间的同时意味着传输时间的减小,效率也高。在Spark的HadoopRDD实现中,就采用Broadcast进行Hadoop JobConf的传输。官方文档的说法是,当task大于20KB时,能够考虑使用Broadcast进行优化,还能够在控制台日志看到任务是多大,进而决定是否优化。还须要注意,每次迭代所传输的Broadcast变量都
会保存在从节点Worker的内存中,直至内存不够用,Spark才会把旧的Broadcast变量释放掉,不能提早进行释放。BroadCast变量有一些应用场景,如MapSideJoin中的小表进行广播、机器学习中须要共享的矩阵的广播等

3.Collect结果过大优化

当收集的最终结果数据过大时,能够将数据存储在分布式的HDFS或其余分布式持久化层上。将数据分布式地存储,能够减少单机数据的I/O开销和单机内存存储压力。或者当数据不太大,但会超出AKKA传输的Buffer大小时,须要增长AKKA Actor的buffer,能够经过配置参数spark.akka.frameSize(默认大小为10MB)进行调整。

 

序列化与压缩

1.经过序列化优化

序列化的本质做用是将链式存储的对象数据,转化为连续空间的字节数组存储的数据

1)对象能够以数据流方式进行进程间传输(包含网络传输),一样能够以连续空间方式存储到文件或者其余持久化层中。
2)连续空间的存储意味着能够进行压缩。这样减小数据存储空间和传输时间。
3)减小了对象自己的元数据信息和基本数据类型的元数据信息的开销。
4)对象数减小也会减小GC的开销和压力。

经过spark.serializer="org.apache.spark.serializer.KryoSerializer"来配置是否使用Kyro进行序列化

Kyro相对于Java序列化库可以更加快速和紧凑地进行序列化(一般有10倍的性能优点),可是Kyro并不能支持全部可序列化的类型,若是对程序有较高的性能优化要求,就须要自定义注册类。官方推荐对于网络传输密集型(network-intensive)计算,采用Kyro序列化性能更好。

若是对象占用空间很大,须要增长Kryo的缓冲区容量,就须要增长配置项spark.kryoserializer.buffer.mb的数值,默认是2MB,但参数值应该足够大,以便容纳最大的序列化后对象的传输。若是用户不注册自定义的类,Kyro仍能够运行,可是它会针对每一个对象存储一次整个类名,这样会形成很大的空间浪费。

2.经过压缩方式优化

在Spark应用中,有很大一部分做业是I/O密集型的。数据压缩对I/O密集型的做业带来性能的大大提高,可是若是用户的jobs做业是CPU密集型的,那么再压缩就会下降性能,这就要判断做业的类型,权衡是否要压缩数据。

Spark目前支持LZF和Snappy两种解压缩方式。Snappy提供了更高的压缩速度,LZF提供了更高的压缩比,用户能够根据具体的需求选择压缩方式

 

批处理优化

调用外部资源,如数据库链接等,这些链接经过JDBC或者ODBC与外部数据源进行交互。将单条记录写转化为数据库的批量写,每一个分区的数据写一次,这样能够利用数据库的批量写优化减小开销和减轻数据库压力

不然,由于整个RDD的数据项很大,整个集群会在短期内产生高并发写入数据库的操做,对数据库压力很大,将产生很大的写入开销

 

reduce和reduceByKey的优化

reduce是Action操做,reduceByKey是Transformation操做

reduce是一种聚合函数,能够把各个任务的执行结果聚集到一个节点,还能够指定自定义的函数传入reduce执行。Spark也对reduce的实现进行了优化,能够把同一个任务内的结果先在本地Worker节点执行聚合函数,再把结果传给Driver执行聚合。但最终数据仍是要汇总到主节点,并且reduce会把接收到的数据保存到内存中,直到全部任务都完成为止。所以,当任务不少,任务的结果数据又比较大时Driver容易形成性能瓶颈,这样就应该考虑尽可能避免reduce的使用,而将数据转化为Key-Value对,并使用reduceByKey实现逻辑,使计算变为分布式计算。
reduceByKey也是聚合操做,是根据key聚合对应的value。一样的,在每个mapper把数据发送给reducer前,会在Map端本地先合并(相似于MapReduce中的Combiner)。与reduce不一样的是,reduceByKey不是把数据聚集到Driver节点,是分布式进行的,所以不会存在reduce那样的性能瓶颈。

 

Shuffle操做符的内存使用

有时候OOM并非由于内存大小不可以容纳RDD,而是由于执行任务中使用的数据集合太大(如groupByKey)。Spark的Shuffle操做符(sortByKey、groupByKey、reduceByKey、join等均可以算是Shuffle操做符,由于这些操做会引起Shuffle)在执行分组操做的过程当中,会在每一个任务执行过程当中,在内存建立Hash表来对数据进行分组,而这个Hash表在不少状况下一般变得很大。最简单的一种解决 方案就是增长并行度,即增长任务数量和分区数量。这样每轮次每一个Executor执行的任务数是固定的,每一个任务接收的输入数据变少会减小Hash表的大小,占用的内存就会减小,从而避免内存溢出OOM的发生。 Spark经过多任务复用Worker的JVM,每一个节点全部任务的执行是在同一个JVM上的线程池中执行的,这样就减小了线程的启动开销,能够高效地支持单个任务200ms的执行时间。经过这个机制,能够安全地将任务数量的配置扩展到超过集群的总体的CPU core数,而不会出现问题。

相关文章
相关标签/搜索