Spark性能调优的第一步,就是为任务分配更多的资源,在必定范围内,增长资源的分配与性能的提高是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。算法
资源的分配在使用脚本提交Spark任务时进行指定,标准的Spark任务提交脚本如代码清单2-1所示:sql
代码清单2-1 标准Spark提交脚本数据库
/usr/opt/modules/spark/bin/spark-submit \apache
--class com.atguigu.spark.Analysis \数组
--num-executors 80 \缓存
--driver-memory 6g \网络
--executor-memory 6g \架构
--executor-cores 3 \app
/usr/opt/modules/spark/jar/spark.jar \运维
能够进行分配的资源如表2-1所示:
表2-1 可分配资源表
名称 |
说明 |
--num-executors |
配置Executor的数量 |
--driver-memory |
配置Driver内存(影响不大) |
--executor-memory |
配置每一个Executor的内存大小 |
--executor-cores |
配置每一个Executor的CPU core数量 |
调节原则:尽可能将任务分配的资源调节到可使用的资源的最大限度。
对于具体资源的分配,咱们分别讨论Spark的两种Cluster运行模式:
第一种是Spark Standalone模式,你在提交任务前,必定知道或者能够从运维部门获取到你可使用的资源状况,在编写submit脚本的时候,就根据可用的资源状况进行资源的分配,好比说集群有15台机器,每台机器为8G内存,2个CPU core,那么就指定15个Executor,每一个Executor分配8G内存,2个CPU core。
第二种是Spark Yarn模式,因为Yarn使用资源队列进行资源的分配和调度,在表写submit脚本的时候,就根据Spark做业要提交到的资源队列,进行资源的分配,好比资源队列有400G内存,100个CPU core,那么指定50个Executor,每一个Executor分配8G内存,2个CPU core。
对表2-1中的各项资源进行了调节后,获得的性能提高如表2-2所示:
表2-2 资源调节后的性能提高
名称 |
解析 |
增长Executor·个数 |
在资源容许的状况下,增长Executor的个数能够提升执行task的并行度。好比有4个Executor,每一个Executor有2个CPU core,那么能够并行执行8个task,若是将Executor的个数增长到8个(资源容许的状况下),那么能够并行执行16个task,此时的并行能力提高了一倍。 |
增长每一个Executor的CPU core个数 |
在资源容许的状况下,增长每一个Executor的Cpu core个数,能够提升执行task的并行度。好比有4个Executor,每一个Executor有2个CPU core,那么能够并行执行8个task,若是将每一个Executor的CPU core个数增长到4个(资源容许的状况下),那么能够并行执行16个task,此时的并行能力提高了一倍。 |
增长每一个Executor的内存量 |
在资源容许的状况下,增长每一个Executor的内存量之后,对性能的提高有三点:
|
补充:生产环境Spark submit脚本配置
/usr/local/spark/bin/spark-submit \
--class com.atguigu.spark.dataetl \
--num-executors 80 \
--driver-memory 6g \
--executor-memexecutoory 6g \
--r-cores 3 \
--master yarn-cluster \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.wait.timeout =300 \
/usr/local/spark/spark.jar
参数配置参考值:
--num-executors:50~100
--driver-memory:1G~5G
--executor-memory:6G~10G
--executor-cores:3
--master:实际生产环境必定使用yarn-cluster
1.2.1 RDD复用
在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD进行重复的计算,如图2-1所示:
图2-1 RDD的重复计算
对图2-1中的RDD计算架构进行修改,获得如图2-2所示的优化结果:
图2-2 RDD架构优化
1.2.2 RDD持久化
在Spark中,当屡次对同一个RDD执行算子操做时,每一次都会对这个RDD以以前的父RDD从新计算一次,这种状况是必需要避免的,对同一个RDD的重复计算是对资源的极大浪费,所以,必须对屡次使用的RDD进行持久化,经过持久化将公共RDD的数据缓存到内存/磁盘中,以后对于公共RDD的计算都会从内存/磁盘中直接获取RDD数据。
对于RDD的持久化,有两点须要说明:
第一,RDD的持久化是能够进行序列化的,当内存没法将RDD的数据完整的进行存放的时候,能够考虑使用序列化的方式减少数据体积,将数据完整存储在内存中。
第二,若是对于数据的可靠性要求很高,而且内存充足,可使用副本机制,对RDD数据进行持久化。当持久化启用了复本机制时,对于持久化的每一个数据单元都存储一个副本,放在其余节点上面,由此实现数据的容错,一旦一个副本数据丢失,不须要从新计算,还可使用另一个副本。
1.2.3 RDD尽量早的filter操做
获取到初始RDD后,应该考虑尽早地过滤掉不须要的数据,进而减小对内存的占用,从而提高Spark做业的运行效率。
默认状况下,task中的算子中若是使用了外部的变量,每一个task都会获取一份变量的复本,这就形成了内存的极大消耗。一方面,若是后续对RDD进行持久化,可能就没法将RDD数据存入内存,只能写入磁盘,磁盘IO将会严重消耗性能;另外一方面,task在建立对象的时候,也许会发现堆内存没法存放新建立的对象,这就会致使频繁的GC,GC会致使工做线程中止,进而致使Spark暂停工做一段时间,严重影响Spark性能。
假设当前任务配置了20个Executor,指定500个task,有一个20M的变量被全部task共用,此时会在500个task中产生500个副本,耗费集群10G的内存,若是使用了广播变量, 那么每一个Executor保存一个副本,一共消耗400M内存,内存消耗减小了5倍。
广播变量在每一个Executor保存一个副本,此Executor的全部task共用此广播变量,这让变量产生的副本数量大大减小。
在初始阶段,广播变量只在Driver中有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在本身本地的Executor对应的BlockManager中尝试获取变量,若是本地没有,BlockManager就会从Driver或者其余节点的BlockM=anager上远程拉取变量的复本,并由本地的BlockManager进行管理;以后此Executor的全部task都会直接从本地的BlockManager中获取变量。
默认状况下,Spark使用Java的序列化机制。Java的序列化机制使用方便,不须要额外的配置,在算子中使用的变量实现Serializable接口便可,可是,Java序列化机制的效率不高,序列化速度慢而且序列化后的数据所占用的空间依然较大。
Kryo序列化机制比Java序列化机制性能提升10倍左右,Spark之因此没有默认使用Kryo做为序列化类库,是由于它不支持全部对象的序列化,同时Kryo须要用户在使用前注册须要序列化的类型,不够方便,但从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs 已经默认使用Kryo序列化方式了。
Kryo序列化注册方式的实例代码如代码清单2-3所示:
代码清单2-3 Kryo序列化机制配置代码
public class MyKryoRegistrator implements KryoRegistrator
{
@Override
public void registerClasses(Kryo kryo)
{
kryo.register(StartupReportLogs.class);
}
}
配置Kryo序列化方式的实例代码如代码清单2-4所示:
代码清单2-4 Kryo序列化机制配置代码
//建立SparkConf对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化库,若是要使用Java序列化库,须要把该行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在Kryo序列化库中注册自定义的类集合,若是要使用Java序列化库,须要把该行屏蔽掉
conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator");
Spark做业运行过程当中,Driver会对每个stage的task进行分配。根据Spark的task分配算法,Spark但愿task可以运行在它要计算的数据算在的节点(数据本地化思想),这样就能够避免数据的网络传输。一般来讲,task可能不会被分配到它处理的数据所在的节点,由于这些节点可用的资源可能已经用尽,此时,Spark会等待一段时间,默认3s,若是等待指定时间后仍然没法在指定节点运行,那么会自动降级,尝试将task分配到比较差的本地化级别所对应的节点上,好比将task分配到离它要计算的数据比较近的一个节点,而后进行计算,若是当前级别仍然不行,那么继续降级。
当task要处理的数据不在task所在节点上时,会发生数据的传输。task会经过所在节点的BlockManager获取数据,BlockManager发现数据不在本地时,户经过网络传输组件从数据所在节点的BlockManager处获取数据。
网络传输数据的状况是咱们不肯意看到的,大量的网络传输会严重影响性能,所以,咱们但愿经过调节本地化等待时长,若是在等待时长这段时间内,目标节点处理完成了一部分task,那么当前的task将有机会获得执行,这样就可以改善Spark做业的总体性能。
Spark的本地化等级如表2-3所示:
表2-3 Spark本地化等级
名称 |
解析 |
PROCESS_LOCAL |
进程本地化,task和数据在同一个Executor中,性能最好。 |
NODE_LOCAL |
节点本地化,task和数据在同一个节点中,可是task和数据不在同一个Executor中,数据须要在进程间进行传输。 |
RACK_LOCAL |
机架本地化,task和数据在同一个机架的两个节点上,数据须要经过网络在节点之间进行传输。 |
NO_PREF |
对于task来讲,从哪里获取都同样,没有好坏之分。 |
ANY |
task和数据能够在集群的任何地方,并且不在一个机架中,性能最差。 |
在Spark项目开发阶段,可使用client模式对程序进行测试,此时,能够在本地看到比较全的日志信息,日志信息中有明确的task数据本地化的级别,若是大部分都是PROCESS_LOCAL,那么就无需进行调节,可是若是发现不少的级别都是NODE_LOCAL、ANY,那么须要对本地化的等待时长进行调节,经过延长本地化等待时长,看看task的本地化级别有没有提高,并观察Spark做业的运行时间有没有缩短。
注意,过犹不及,不要将本地化等待时长延长地过长,致使由于大量的等待时长,使得Spark做业的运行时间反而增长了。
Spark本地化等待时长的设置如代码清单2-5所示:
代码清单2-5 Spark本地化等待时长设置示例
val conf = new SparkConf()
.set("spark.locality.wait", "6")
普通的map算子对RDD中的每个元素进行操做,而mapPartitions算子对RDD中每个分区进行操做。若是是普通的map算子,假设一个partition有1万条数据,那么map算子中的function要执行1万次,也就是对每一个元素进行操做。
图2-3 map算子
若是是mapPartition算子,因为一个task处理一个RDD的partition,那么一个task只会执行一次function,function一次接收全部的partition数据,效率比较高。
图2-4 mapPartitions算子
好比,当要把RDD中的全部数据经过JDBC写入数据,若是使用map算子,那么须要对RDD中的每个元素都建立一个数据库链接,这样对资源的消耗很大,若是使用mapPartitions算子,那么针对一个分区的数据,只须要创建一个数据库链接。
mapPartitions算子也存在一些缺点:对于普通的map操做,一次处理一条数据,若是在处理了2000条数据后内存不足,那么能够将已经处理完的2000条数据从内存中垃圾回收掉;可是若是使用mapPartitions算子,但数据量很是大时,function一次处理一个分区的数据,若是一旦内存不足,此时没法回收内存,就可能会OOM,即内存溢出。
所以,mapPartitions算子适用于数据量不是特别大的时候,此时使用mapPartitions算子对性能的提高效果仍是不错的。(当数据量很大的时候,一旦使用mapPartitions算子,就会直接OOM)
在项目中,应该首先估算一下RDD的数据量、每一个partition的数据量,以及分配给每一个Executor的内存资源,若是资源容许,能够考虑使用mapPartitions算子代替map。
在生产环境中,一般使用foreachPartition算子来完成数据库的写入,经过foreachPartition算子的特性,能够优化写数据库的性能。
若是使用foreach算子完成数据库的操做,因为foreach算子是遍历RDD的每条数据,所以,每条数据都会创建一个数据库链接,这是对资源的极大浪费,所以,对于写数据库操做,咱们应当使用foreachPartition算子。
与mapPartitions算子很是类似,foreachPartition是将RDD的每一个分区做为遍历对象,一次处理一个分区的数据,也就是说,若是涉及数据库的相关操做,一个分区的数据只须要建立一次数据库链接,如图2-5所示:
图2-5 foreachPartition算子
使用了foreachPartition算子后,能够得到如下的性能提高:
1. 对于咱们写的function函数,一次处理一整个分区的数据;
2. 对于一个分区内的数据,建立惟一的数据库链接;
3. 只须要向数据库发送一次SQL语句和多组参数;
在生产环境中,所有都会使用foreachPartition算子完成数据库操做-。foreachPartition算子存在一个问题,与mapPartitions算子相似,若是一个分区的数据量特别大,可能会形成OOM,即内存溢出。
在Spark任务中咱们常常会使用filter算子完成RDD中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,可是一旦进过filter过滤后,每一个分区的数据量有可能会存在较大差别,如图2-6所示:
图2-6 分区数据过滤结果
根据图2-6咱们能够发现两个问题:
1. 每一个partition的数据量变小了,若是还按照以前与partition相等的task个数去处理当前数据,有点浪费task的计算资源;
2. 每一个partition的数据量不同,会致使后面的每一个task处理每一个partition数据的时候,每一个task要处理的数据量不一样,这颇有可能致使数据倾斜问题。
如图2-6所示,第二个分区的数据过滤后只剩100条,而第三个分区的数据过滤后剩下800条,在相同的处理逻辑下,第二个分区对应的task处理的数据量与第三个分区对应的task处理的数据量差距达到了8倍,这也会致使运行速度可能存在数倍的差距,这也就是数据倾斜问题。
针对上述的两个问题,咱们分别进行分析:
1. 针对第一个问题,既然分区的数据量变小了,咱们但愿能够对分区数据进行从新分配,好比将原来4个分区的数据转化到2个分区中,这样只须要用后面的两个task进行处理便可,避免了资源的浪费。
2. 针对第二个问题,解决方法和第一个问题的解决方法很是类似,对分区数据从新分配,让每一个partition中的数据量差很少,这就避免了数据倾斜问题。
那么具体应该如何实现上面的解决思路?咱们须要coalesce算子。
repartition与coalesce均可以用来进行重分区,其中repartition只是coalesce接口中shuffle为true的简易实现,coalesce默认状况下不进行shuffle,可是能够经过参数进行设置。
假设咱们但愿将本来的分区个数A经过从新分区变为B,那么有如下几种状况:
① A与B相差值不大
此时使用coalesce便可,无需shuffle过程。
② A与B相差值很大
此时可使用coalesce而且不启用shuffle过程,可是会致使合并过程性能低下,因此推荐设置coalesce的第二个参数为true,即启动shuffle过程。
此时使用repartition便可,若是使用coalesce须要将shuffle设置为true,不然coalesce无效。
咱们能够在filter操做以后,使用coalesce算子针对每一个partition的数据量各不相同的状况,压缩partition的数量,并且让每一个partition的数据量尽可能均匀紧凑,以便于后面的task进行计算操做,在某种程度上可以在必定程度上提高性能。
注意:local模式是进程内模拟集群运行,已经对并行度和分区数量有了必定的内部优化,所以不用去设置并行度和分区数量。
Spark SQL的并行度不容许用户本身指定,Spark SQL本身会默认根据hive表对应的HDFS文件的block个数自动设置Spark SQL所在的那个stage的并行度,但有时此默认并行度太低,致使任务运行缓慢。
因为Spark SQL所在stage的并行度没法手动设置,若是数据量较大,而且此stage中后续的transformation操做有着复杂的业务逻辑,而Spark SQL自动设置的task数量不多,这就意味着每一个task要处理为数很多的数据量,而后还要执行很是复杂的处理逻辑,这就可能表现为第一个有Spark SQL的stage速度很慢,然后续的没有Spark SQL的stage运行速度很是快。
为了解决Spark SQL没法设置并行度和task数量的问题,咱们可使用repartition算子。
图2-7 repartition算子使用先后对比图
Spark SQL这一步的并行度和task数量确定是没有办法去改变了,可是,对于Spark SQL查询出来的RDD,当即使用repartition算子,去从新进行分区,这样能够从新分区为多个partition,从repartition以后的RDD操做,因为再也不设计Spark SQL,所以stage的并行度就会等于你手动设置的值,这样就避免了Spark SQL所在的stage只能用少许的task去处理大量数据并执行复杂的算法逻辑。使用repartition算子的先后对好比图2-7所示。
reduceByKey相较于普通的shuffle操做一个显著的特色就是会进行map端的本地聚合,map端会先对本地的数据进行combine操做,而后将数据写入给下个stage的每一个task建立的文件中,也就是在map端,对每个key对应的value,执行reduceByKey算子函数。reduceByKey算子的执行过程如图2-8所示:
图2-8 reduceByKey算子执行过程
使用reduceByKey对性能的提高以下:
基于reduceByKey的本地聚合特征,咱们应该考虑使用reduceByKey代替其余的shuffle算子,例如groupByKey。reduceByKey与groupByKey的运行原理如图2-9和图2-10所示:
图2-9 groupByKey原理
图2-10 reduceByKey原理
根据上图可知,groupByKey不会进行map端的聚合,而是将全部map端的数据shuffle到reduce端,而后在reduce端进行数据的聚合操做。因为reduceByKey有map端聚合的特性,使得网络传输的数据量减少,所以效率要明显高于groupByKey。
在Spark任务运行过程当中,若是shuffle的map端处理的数据量比较大,可是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁spill溢写到磁盘文件中的状况,使得性能很是低下,经过调节map端缓冲的大小,能够避免频繁的磁盘IO操做,进而提高Spark任务的总体性能。
map端缓冲的默认配置是32KB,若是每一个task处理640KB的数据,那么会发生640/32 = 20次溢写,若是每一个task处理64000KB的数据,机会发生64000/32=2000此溢写,这对于性能的影响是很是严重的。
map端缓冲的配置方法如代码清单2-7所示:
代码清单2-7 map端缓冲配置
val conf = new SparkConf()
.set("spark.shuffle.file.buffer", "64")
Spark Shuffle过程当中,shuffle reduce task的buffer缓冲区大小决定了reduce task每次可以缓冲的数据量,也就是每次可以拉取的数据量,若是内存资源较为充足,适当增长拉取数据缓冲区的大小,能够减小拉取数据的次数,也就能够减小网络传输的次数,进而提高性能。
reduce端数据拉取缓冲区的大小能够经过spark.reducer.maxSizeInFlight参数进行设置,默认为48MB,该参数的设置方法如代码清单2-8所示:
代码清单2-8 reduce端数据拉取缓冲区配置
val conf = new SparkConf()
.set("spark.reducer.maxSizeInFlight", "96")
Spark Shuffle过程当中,reduce task拉取属于本身的数据时,若是由于网络异常等缘由致使失败会自动进行重试。对于那些包含了特别耗时的shuffle操做的做业,建议增长重试最大次数(好比60次),以免因为JVM的full gc或者网络不稳定等因素致使的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数能够大幅度提高稳定性。
reduce端拉取数据重试次数能够经过spark.shuffle.io.maxRetries参数进行设置,该参数就表明了能够重试的最大次数。若是在指定次数以内拉取仍是没有成功,就可能会致使做业执行失败,默认为3,该参数的设置方法如代码清单2-9所示:
代码清单2-9 reduce端拉取数据重试次数配置
val conf = new SparkConf()
.set("spark.shuffle.io.maxRetries", "6")
Spark Shuffle过程当中,reduce task拉取属于本身的数据时,若是由于网络异常等缘由致使失败会自动进行重试,在一次失败后,会等待必定的时间间隔再进行重试,能够经过加大间隔时长(好比60s),以增长shuffle操做的稳定性。
reduce端拉取数据等待间隔能够经过spark.shuffle.io.retryWait参数进行设置,默认值为5s,该参数的设置方法如代码清单2-10所示:
代码清单2-10 reduce端拉取数据等待间隔配置
val conf = new SparkConf()
.set("spark.shuffle.io.retryWait", "10s")
对于SortShuffleManager,若是shuffle reduce task的数量小于某一阈值则shuffle write过程当中不会进行排序操做,而是直接按照未经优化的HashShuffleManager的方式去写数据,可是最后会将每一个task产生的全部临时磁盘文件都合并成一个文件,并会建立单独的索引文件。
当你使用SortShuffleManager时,若是的确不须要排序操做,那么建议将这个参数调大一些,大于shuffle read task的数量,那么此时map-side就不会进行排序了,减小了排序的性能开销,可是这种方式下,依然会产生大量的磁盘文件,所以shuffle write性能有待提升。
SortShuffleManager排序操做阈值的设置能够经过spark.shuffle.sort. bypassMergeThreshold这一参数进行设置,默认值为200,该参数的设置方法如代码清单2-11所示:
代码清单2-10 reduce端拉取数据等待间隔配置
val conf = new SparkConf()
.set("spark.shuffle.sort.bypassMergeThreshold", "400")
对于JVM调优,首先应该明确,full gc/minor gc,都会致使JVM的工做线程中止工做,即stop the world。
1. 静态内存管理机制
根据Spark静态内存管理机制,堆内存被划分为了两块,Storage和Execution。Storage主要用于缓存RDD数据和broadcast数据,Execution主要用于缓存在shuffle过程当中产生的中间数据,Storage占系统内存的60%,Execution占系统内存的20%,而且二者彻底独立。
在通常状况下,Storage的内存都提供给了cache操做,可是若是在某些状况下cache操做内存不是很紧张,而task的算子中建立的对象不少,Execution内存又相对较小,这回致使频繁的minor gc,甚至于频繁的full gc,进而致使Spark频繁的中止工做,性能影响会很大。
在Spark UI中能够查看每一个stage的运行状况,包括每一个task的运行时间、gc时间等等,若是发现gc太频繁,时间太长,就能够考虑调节Storage的内存占比,让task执行算子函数式,有更多的内存可使用。
Storage内存区域能够经过spark.storage.memoryFraction参数进行指定,默认为0.6,即60%,能够逐级向下递减,如代码清单2-6所示:
代码清单2-6 Storage内存占比设置
val conf = new SparkConf()
.set("spark.storage.memoryFraction", "0.4")
2. 统一内存管理机制
根据Spark统一内存管理机制,堆内存被划分为了两块,Storage和Execution。Storage主要用于缓存数据,Execution主要用于缓存在shuffle过程当中产生的中间数据,二者所组成的内存部分称为统一内存,Storage和Execution各占统一内存的50%,因为动态占用机制的实现,shuffle过程须要的内存过大时,会自动占用Storage的内存区域,所以无需手动进行调节。
Executor的堆外内存主要用于程序的共享库、Perm Space、 线程Stack和一些Memory mapping等, 或者类C方式allocate object。
有时,若是你的Spark做业处理的数据量很是大,达到几亿的数据量,此时运行Spark做业会时不时地报错,例如shuffle output file cannot find,executor lost,task lost,out of memory等,这多是Executor的堆外内存不太够用,致使Executor在运行的过程当中内存溢出。
stage的task在运行的时候,可能要从一些Executor中去拉取shuffle map output文件,可是Executor可能已经因为内存溢出挂掉了,其关联的BlockManager也没有了,这就可能会报出shuffle output file cannot find,executor lost,task lost,out of memory等错误,此时,就能够考虑调节一下Executor的堆外内存,也就能够避免报错,与此同时,堆外内存调节的比较大的时候,对于性能来说,也会带来必定的提高。
默认状况下,Executor堆外内存上限大概为300多MB,在实际的生产环境下,对海量数据进行处理的时候,这里都会出现问题,致使Spark做业反复崩溃,没法运行,此时就会去调节这个参数,到至少1G,甚至于2G、4G。
Executor堆外内存的配置须要在spark-submit脚本里配置,如代码清单2-7所示:
代码清单2-7 Executor堆外内存配置
--conf spark.yarn.executor.memoryOverhead=2048
以上参数配置完成后,会避免掉某些JVM OOM的异常问题,同时,能够提高总体Spark做业的性能。
在Spark做业运行过程当中,Executor优先从本身本地关联的BlockManager中获取某份数据,若是本地BlockManager没有的话,会经过TransferService远程链接其余节点上Executor的BlockManager来获取数据。
若是task在运行过程当中建立大量对象或者建立的对象较大,会占用大量的内存,这回致使频繁的垃圾回收,可是垃圾回收会致使工做现场所有中止,也就是说,垃圾回收一旦执行,Spark的Executor进程就会中止工做,没法提供相应,此时,因为没有响应,没法创建网络链接,会致使网络链接超时。
在生产环境下,有时会遇到file not found、file lost这类错误,在这种状况下,颇有多是Executor的BlockManager在拉取数据的时候,没法创建链接,而后超过默认的链接等待时长60s后,宣告数据拉取失败,若是反复尝试都拉取不到数据,可能会致使Spark做业的崩溃。这种状况也可能会致使DAGScheduler反复提交几回stage,TaskScheduler返回提交几回task,大大延长了咱们的Spark做业的运行时间。
此时,能够考虑调节链接的超时时长,链接等待时长须要在spark-submit脚本中进行设置,设置方式如代码清单2-8所示:
代码清单2-8 链接等待时长配置
--conf spark.core.connection.ack.wait.timeout=300
调节链接等待时长后,一般能够避免部分的XX文件拉取失败、XX文件lost等报错。
Spark中的数据倾斜问题主要指shuffle过程当中出现的数据倾斜问题,是因为不一样的key对应的数据量不一样致使的不一样task所处理的数据量不一样的问题。
例如,reduce点一共要处理100万条数据,第一个和第二个task分别被分配到了1万条数据,计算5分钟内完成,第三个task分配到了98万数据,此时第三个task可能须要10个小时完成,这使得整个Spark做业须要10个小时才能运行完成,这就是数据倾斜所带来的后果。
注意,要区分开数据倾斜与数据量过量这两种状况,数据倾斜是指少数task被分配了绝大多数的数据,所以少数task运行缓慢;数据过量是指全部task被分配的数据量都很大,相差很少,全部task都运行缓慢。
数据倾斜的表现:
1. Spark做业的大部分task都执行迅速,只有有限的几个task执行的很是慢,此时可能出现了数据倾斜,做业能够运行,可是运行得很是慢;
2. Spark做业的大部分task都执行迅速,可是有的task在运行过程当中会忽然报出OOM,反复执行几回都在某一个task报出OOM错误,此时可能出现了数据倾斜,做业没法正常运行。
定位数据倾斜问题:
1. 查阅代码中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,根据代码逻辑判断此处是否会出现数据倾斜;
2. 查看Spark做业的log文件,log文件对于错误的记录会精确到代码的某一行,能够根据异常定位到的代码位置来明确错误发生在第几个stage,对应的shuffle算子是哪个;
绝大多数状况下,Spark做业的数据来源都是Hive表,这些Hive表基本都是通过ETL以后的昨天的数据。
为了不数据倾斜,咱们能够考虑避免shuffle过程,若是避免了shuffle过程,那么从根本上就消除了发生数据倾斜问题的可能。
若是Spark做业的数据来源于Hive表,那么能够先在Hive表中对数据进行聚合,例如按照key进行分组,将同一key对应的全部value用一种特殊的格式拼接到一个字符串里去,这样,一个key就只有一条数据了;以后,对一个key的全部value进行处理时,只须要进行map操做便可,无需再进行任何的shuffle操做。经过上述方式就避免了执行shuffle操做,也就不可能会发生任何的数据倾斜问题。
对于Hive表中数据的操做,不必定是拼接成一个字符串,也能够是直接对key的每一条数据进行累计计算。
若是在Spark做业中容许丢弃某些数据,那么能够考虑将可能致使数据倾斜的key进行过滤,滤除可能致使数据倾斜的key对应的数据,这样,在Spark做业中就不会发生数据倾斜了。
当方案一和方案二对于数据倾斜的处理没有很好的效果时,能够考虑提升shuffle过程当中的reduce端并行度,reduce端并行度的提升就增长了reduce端task的数量,那么每一个task分配到的数据量就会相应减小,由此缓解数据倾斜问题。
在大部分的shuffle算子中,均可以传入一个并行度的设置参数,好比reduceByKey(500),这个参数会决定shuffle过程当中reduce端的并行度,在进行shuffle操做的时候,就会对应着建立指定数量的reduce task。对于Spark SQL中的shuffle类语句,好比group by、join等,须要设置一个参数,即spark.sql.shuffle.partitions,该参数表明了shuffle read task的并行度,该值默认是200,对于不少场景来讲都有点太小。
增长shuffle read task的数量,可让本来分配给一个task的多个key分配给多个task,从而让每一个task处理比原来更少的数据。举例来讲,若是本来有5个key,每一个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增长了shuffle read task之后,每一个task就分配到一个key,即每一个task就处理10条数据,那么天然每一个task的执行时间都会变短了。
提升reduce端并行度并无从根本上改变数据倾斜的本质和问题(方案一和方案二从根本上避免了数据倾斜的发生),只是尽量地去缓解和减轻shuffle reduce task的数据压力,以及数据倾斜的问题,适用于有较多key对应的数据量都比较大的状况。
该方案一般没法完全解决数据倾斜,由于若是出现一些极端状况,好比某个key对应的数据量有100万,那么不管你的task数量增长到多少,这个对应着100万数据的key确定仍是会分配到一个task中去处理,所以注定仍是会发生数据倾斜的。因此这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其余方案结合起来使用。
在理想状况下,reduce端并行度提高后,会在必定程度上减轻数据倾斜的问题,甚至基本消除数据倾斜;可是,在一些状况下,只会让原来因为数据倾斜而运行缓慢的task运行速度稍有提高,或者避免了某些task的OOM问题,可是,仍然运行缓慢,此时,要及时放弃方案三,开始尝试后面的方案。
当使用了相似于groupByKey、reduceByKey这样的算子时,能够考虑使用随机key实现双重聚合,如图3-1所示:
图3-1 随机key实现双重聚合
首先,经过map算子给每一个数据的key添加随机数前缀,对key进行打散,将原先同样的key变成不同的key,而后进行第一次聚合,这样就可让本来被一个task处理的数据分散到多个task上去作局部聚合;随后,去除掉每一个key的前缀,再次进行聚合。
此方法对于由groupByKey、reduceByKey这类算子形成的数据倾斜由比较好的效果,仅仅适用于聚合类的shuffle操做,适用范围相对较窄。若是是join类的shuffle操做,还得用其余的解决方案。
此方法也是前几种方案没有比较好的效果时要尝试的解决方案。
正常状况下,join操做都会执行shuffle过程,而且执行的是reduce join,也就是先将全部相同的key和对应的value汇聚到一个reduce task中,而后再进行join。普通join的过程以下图所示:
图3-2 普通join过程
普通的join是会走shuffle过程的,而一旦shuffle,就至关于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。可是若是一个RDD是比较小的,则能够采用广播小RDD全量数据+map算子来实现与join一样的效果,也就是map join,此时就不会发生shuffle操做,也就不会发生数据倾斜。
(注意,RDD是并不能进行广播的,只能将RDD内部的数据经过collect拉取到Driver内存而后再进行广播)
不使用join算子进行链接操做,而使用Broadcast变量与map类算子实现join操做,进而彻底规避掉shuffle类的操做,完全避免数据倾斜的发生和出现。将较小RDD中的数据直接经过collect算子拉取到Driver端的内存中来,而后对其建立一个Broadcast变量;接着对另一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照链接key进行比对,若是链接key相同的话,那么就将两个RDD的数据用你须要的方式链接起来。
根据上述思路,根本不会发生shuffle操做,从根本上杜绝了join操做可能致使的数据倾斜问题。
当join操做有数据倾斜问题而且其中一个RDD的数据量较小时,能够优先考虑这种方式,效果很是好。map join的过程如图3-3所示:
图3-3 map join过程
因为Spark的广播变量是在每一个Executor中保存一个副本,若是两个RDD数据量都比较大,那么若是将一个数据量比较大的 RDD作成广播变量,那么颇有可能会形成内存溢出。
在Spark中,若是某个RDD只有一个key,那么在shuffle过程当中会默认将此key对应的数据打散,由不一样的reduce端task进行处理。
当由单个key致使数据倾斜时,可有将发生数据倾斜的key单独提取出来,组成一个RDD,而后用这个本来会致使倾斜的key组成的RDD根其余RDD单独join,此时,根据Spark的运行机制,此RDD中的数据会在shuffle阶段被分散到多个task中去进行join操做。倾斜key单独join的流程如图3-4所示:
图3-4 倾斜key单独join流程
1. 适用场景分析:
对于RDD中的数据,能够将其转换为一个中间表,或者是直接使用countByKey()的方式,看一个这个RDD中各个key对应的数据量,此时若是你发现整个RDD就一个key的数据量特别多,那么就能够考虑使用这种方法。
当数据量很是大时,能够考虑使用sample采样获取10%的数据,而后分析这10%的数据中哪一个key可能会致使数据倾斜,而后将这个key对应的数据单独提取出来。
2. 不适用场景分析:
若是一个RDD中致使数据倾斜的key不少,那么此方案不适用。
若是在进行join操做时,RDD中有大量的key致使数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了,对于join操做,咱们能够考虑对其中一个RDD数据进行扩容,另外一个RDD进行稀释后再join。
咱们会将原先同样的key经过附加随机前缀变成不同的key,而后就能够将这些处理后的“不一样key”分散到多个task中去处理,而不是让一个task处理大量的相同key。这一种方案是针对有大量倾斜key的状况,无法将部分key拆分出来进行单独处理,须要对整个RDD进行数据扩容,对内存资源要求很高。
1. 核心思想:
选择一个RDD,使用flatMap进行扩容,对每条数据的key添加数值前缀(1~N的数值),将一条数据映射为多条数据;(扩容)
选择另一个RDD,进行map映射操做,每条数据的key都打上一个随机数做为前缀(1~N的随机数);(稀释)
将两个处理后的RDD,进行join操做。
图3-6 使用随机数以及扩容进行join
2. 局限性:
若是两个RDD都很大,那么将RDD进行N倍的扩容显然行不通;
使用扩容的方式只能缓解数据倾斜,不能完全解决数据倾斜问题。
当RDD中有几个key致使数据倾斜时,方案六再也不适用,而方案七又很是消耗资源,此时能够引入方案七的思想完善方案六:
1. 对包含少数几个数据量过大的key的那个RDD,经过sample算子采样出一份样原本,而后统计一下每一个key的数量,计算出来数据量最大的是哪几个key。
2. 而后将这几个key对应的数据从原来的RDD中拆分出来,造成一个单独的RDD,并给每一个key都打上n之内的随机数做为前缀,而不会致使倾斜的大部分key造成另一个RDD。
3. 接着将须要join的另外一个RDD,也过滤出来那几个倾斜key对应的数据并造成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会致使倾斜的大部分key也造成另一个RDD。
4. 再将附加了随机前缀的独立RDD与另外一个膨胀n倍的独立RDD进行join,此时就能够将原先相同的key打散成n份,分散到多个task中去进行join了。
5. 而另外两个普通的RDD就照常join便可。
6. 最后将两次join的结果使用union算子合并起来便可,就是最终的join结果。
在Shuffle过程,reduce端task并非等到map端task将其数据所有写入磁盘后再去拉取,而是map端写一点数据,reduce端task就会拉取一小部分数据,而后当即进行后面的聚合、算子函数的使用等操做。
reduce端task可以拉取多少数据,由reduce拉取数据的缓冲区buffer来决定,由于拉取过来的数据都是先放在buffer中,而后再进行后续的处理,buffer的默认大小为48MB。
reduce端task会一边拉取一边计算,不必定每次都会拉满48MB的数据,可能大多数时候拉取一部分数据就处理掉了。
虽说增大reduce端缓冲区大小能够减小拉取次数,提高Shuffle性能,可是有时map端的数据量很是大,写出的速度很是快,此时reduce端的全部task在拉取的时候,有可能所有达到本身缓冲的最大极限值,即48MB,此时,再加上reduce端执行的聚合函数的代码,可能会建立大量的对象,这可难会致使内存溢出,即OOM。
若是一旦出现reduce端内存溢出的问题,咱们能够考虑减少reduce端拉取数据缓冲区的大小,例如减小为12MB。
在实际生产环境中是出现过这种问题的,这是典型的以性能换执行的原理。reduce端拉取数据的缓冲区减少,不容易致使OOM,可是相应的,reudce端的拉取次数增长,形成更多的网络传输开销,形成性能的降低。
注意,要保证任务可以运行,再考虑性能的优化。
在Spark做业中,有时会出现shuffle file not found的错误,这是很是常见的一个报错,有时出现这种错误之后,选择从新执行一遍,就再也不报出这种错误。
出现上述问题可能的缘由是Shuffle操做中,后面stage的task想要去上一个stage的task所在的Executor拉取数据,结果对方正在执行GC,执行GC会致使Executor内全部的工做现场所有中止,好比BlockManager、基于netty的网络通讯等,这就会致使后面的task拉取数据拉取了半天都没有拉取到,就会报出shuffle file not found的错误,而第二次再次执行就不会再出现这种错误。
能够经过调整reduce端拉取数据重试次数和reduce端拉取数据时间间隔这两个参数来对Shuffle性能进行调整,增大参数值,使得reduce端拉取数据的重试次数增长,而且每次失败后等待的时间间隔加长。
代码清单4-1 JVM GC致使的shuffle文件拉取失败
val conf = new SparkConf()
.set("spark.shuffle.io.maxRetries", "6")
.set("spark.shuffle.io.retryWait", "6s")
当Spark做业在运行过程当中报错,并且报错信息中含有Serializable等相似词汇,那么多是序列化问题致使的报错。
序列化问题要注意如下三点:
在一些算子函数里,须要咱们有一个返回值,可是在一些状况下咱们不但愿有返回值,此时咱们若是直接返回NULL,会报错,例如Scala.Math(NULL)异常。
若是你遇到某些状况,不但愿有返回值,那么能够经过下述方式解决:
2. 在经过算子获取到了一个RDD以后,能够对这个RDD执行filter操做,进行数据过滤,将数值为-1的数据给过滤掉;
3. 在使用完filter算子后,继续调用coalesce算子进行优化。
YARN-client模式的运行原理以下图所示:
图4-1 YARN-client模式运行原理
在YARN-client模式下,Driver启动在本地机器上,而Driver负责全部的任务调度,须要与YARN集群上的多个Executor进行频繁的通讯。
假设有100个Executor, 1000个task,那么每一个Executor分配到10个task,以后,Driver要频繁地跟Executor上运行的1000个task进行通讯,通讯数据很是多,而且通讯品类特别高。这就致使有可能在Spark任务运行过程当中,因为频繁大量的网络通信,本地机器的网卡流量会激增。
注意,YARN-client模式只会在测试环境中使用,而之因此使用YARN-client模式,是因为能够看到详细全面的log信息,经过查看log,能够锁定程序中存在的问题,避免在生产环境下发送故障。
在生产环境下,使用的必定是YARN-cluster模式。在YARN-cluster模式下,就不会形成本地机器网卡流量激增问题,若是YARN-cluster模式下存在网络通讯的问题,须要运维团队进行解决。
YARN-cluster模式的运行原理以下图所示:
图4-1 YARN-client模式运行原理
当Spark做业中包含SparkSQL的内容时,可能会碰到YARN-client模式下能够运行,可是YARN-cluster模式下没法提交运行(报出OOM错误)的状况。
YARN-client模式下,Driver是运行在本地机器上的,Spark使用的JVM的PermGen的配置(JDK1.8以前),是本地机器上的spark-class文件,JVM永久代的大小是128MB,这个是没有问题的,可是在YARN-cluster模式下,Driver运行在YARN集群的某个节点上,使用的是没有通过配置的默认设置,PermGen永久代大小为82MB。
SparkSQL的内部要进行很复杂的SQL的语义解析、语法树转换等等,很是复杂,若是sql语句自己就很是复杂,那么颇有可能会致使性能的损耗和内存的占用,特别是对PermGen的占用会比较大。
因此,此时若是PermGen的占用好过了82MB,可是又小于128MB,就会出现YARN-client模式下能够运行,YARN-cluster模式下没法运行的状况。
解决上述问题的方法时增长PermGen的容量,须要在spark-submit脚本中对相关参数进行设置,设置方法如代码清单4-2所示。
代码清单4-2 配置
--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
经过上述方法就设置了Driver永久代的大小,默认为128MB,最大256MB,这样就能够避免上面所说的问题。
当SparkSQL的sql语句有成百上千的or关键字时,就可能会出现Driver端的JVM栈内存溢出。
JVM栈内存溢出基本上就是因为调用的方法层级过多,产生了大量的,很是深的,超出了JVM栈深度限制的递归。(咱们猜想SparkSQL有大量or语句的时候,在解析SQL时,例如转换为语法树或者进行执行计划的生成的时候,对于or的处理是递归,or很是多时,会发生大量的递归)
此时,建议将一条sql语句拆分为多条sql语句来执行,每条sql语句尽可能保证100个之内的子句。根据实际的生产环境试验,一条sql语句的or关键字控制在100个之内,一般不会致使JVM栈内存溢出。
Spark持久化在大部分状况下是没有问题的,可是有时数据可能会丢失,若是数据一旦丢失,就须要对丢失的数据从新进行计算,计算完后再缓存和使用,为了不数据的丢失,能够选择对这个RDD进行checkpoint,也就是将数据持久化一份到容错的文件系统上(好比HDFS)。
一个RDD缓存并checkpoint后,若是一旦发现缓存丢失,就会优先查看checkpoint数据存不存在,若是有,就会使用checkpoint数据,而不用从新计算。也便是说,checkpoint能够视为cache的保障机制,若是cache失败,就使用checkpoint的数据。
使用checkpoint的优势在于提升了Spark做业的可靠性,一旦缓存出现问题,不须要从新计算数据,缺点在于,checkpoint时须要将数据写入HDFS等文件系统,对性能的消耗较大。