1.OOM问题,reduce端的缓冲大小,太大的话,吃撑了,一下过来不少数据,容易OOM,默认48,能够改小哦。spark.reducer.maxSizeInFlight,48---》24算法
2.JVM-GC致使的shuffle文件拉取失败,shuffle file not found缓存
spark.shuffle.io.maxRetries 3 第一个参数,意思就是说,shuffle文件拉取的时候,若是没有拉取到(拉取失败),最多或重试几回(会从新拉取几回文件),默认是3次。 spark.shuffle.io.retryWait 5s 第二个参数,意思就是说,每一次重试拉取文件的时间间隔,默认是5s钟。 默认状况下,假如说第一个stage的executor正在进行漫长的full gc。第二个stage的executor尝试去拉取文件,结果没有拉取到,默认状况下,会反复重试拉取3次,每次间隔是五秒钟。最多只会等待3 * 5s = 15s。若是15s内,没有拉取到shuffle file。就会报出shuffle file not found。app
spark.shuffle.io.maxRetries 60运维
spark.shuffle.io.retryWait 60side
3.YARN队列资源不足致使的application直接失败 :oop
限制提交一次做业,用线程池确保一个线程执行任务。性能
采用以下方案:spa
一、在你的J2EE(咱们这个项目里面,spark做业的运行,以前说过了,J2EE平台触发的,执行spark-submit脚本),限制,同时只能提交一个spark做业到yarn上去执行,确保一个spark做业的资源确定是有的。线程
二、你应该采用一些简单的调度区分的方式,好比说,你有的spark做业多是要长时间运行的,好比运行30分钟;有的spark做业,多是短期运行的,可能就运行2分钟。此时,都提交到一个队列上去,确定不合适。极可能出现30分钟的做业卡住后面一大堆2分钟的做业。分队列,能够申请(跟大家的YARN、Hadoop运维的同窗申请)。你本身给本身搞两个调度队列。每一个队列的根据你要执行的做业的状况来设置。在你的J2EE程序里面,要判断,若是是长时间运行的做业,就干脆都提交到某一个固定的队列里面去把;若是是短期运行的做业,就统一提交到另一个队列里面去。这样,避免了长时间运行的做业,阻塞了短期运行的做业。队列
三、你的队列里面,不管什么时候,只会有一个做业在里面运行。那么此时,就应该用咱们以前讲过的性能调优的手段,去将每一个队列能承载的最大的资源,分配给你的每个spark做业,好比80个executor;6G的内存;3个cpu core。尽可能让你的spark做业每一次运行,都达到最满的资源使用率,最快的速度,最好的性能;并行度,240个cpu core,720个task。
四、在J2EE中,经过线程池的方式(一个线程池对应一个资源队列),来实现上述咱们说的方案。
4.解决yarn-cluster模式的JVM内存溢出没法执行问题,(PermGen 永久代内存溢出)
既然是JVM的PermGen永久代内存溢出,那么就是内存不够用。我们呢,就给yarn-cluster模式下的,driver的PermGen多设置一些。 spark-submit脚本中,加入如下配置便可: --conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M" 这个就设置了driver永久代的大小,默认是128M,最大是256M。那么,这样的话,就能够基本保证你的spark做业不会出现上述的yarn-cluster模式致使的永久代内存溢出的问题。
5.错误的持久化方式以及checkpoint的使用
checkpoint原理:
一、在代码中,用SparkContext,设置一个checkpoint目录,能够是一个容错文件系统的目录,好比hdfs;
二、在代码中,对须要进行checkpoint的rdd,执行RDD.checkpoint();
三、RDDCheckpointData(spark内部的API),接管你的RDD,会标记为marked for checkpoint,准备进行checkpoint
四、你的job运行完以后,会调用一个finalRDD.doCheckpoint()方法,会顺着rdd lineage,回溯扫描,发现有标记为待checkpoint的rdd,就会进行二次标记,inProgressCheckpoint,正在接受checkpoint操做
五、job执行完以后,就会启动一个内部的新job,去将标记为inProgressCheckpoint的rdd的数据,都写入hdfs文件中。(备注,若是rdd以前cache过,会直接从缓存中获取数据,写入hdfs中;若是没有cache过,那么就会从新计算一遍这个rdd,再checkpoint)
六、将checkpoint过的rdd以前的依赖rdd,改为一个CheckpointRDD*,强制改变你的rdd的lineage。后面若是rdd的cache数据获取失败,直接会经过它的上游CheckpointRDD,去容错的文件系统,好比hdfs,中,获取checkpoint的数据。
数据倾斜
一、定位缘由与出现问题的位置: 根据log去定位 出现数据倾斜的缘由,基本只多是由于发生了shuffle操做,在shuffle的过程当中,出现了数据倾斜的问题。由于某个,或者某些key对应的数据,远远的高于其余的key。 一、你在本身的程序里面找找,哪些地方用了会产生shuffle的算子,groupByKey、countByKey、reduceByKey、join
二、看log log通常会报是在你的哪一行代码,致使了OOM异常;或者呢,看log,看看是执行到了第几个stage!!! 咱们这里不会去剖析stage的划分算法
方案一:聚合源数据
将致使数据倾斜的一些shuffle操做,好比,groupbykey,reducebykey 或者join等,提早到ETL过程进行处理,通常状况spark处理的源数据来之hive ETL的hive表。
方案二:过滤致使倾斜的key
从源头过滤掉啦
方案三:提升reduce Task的数量,调用方式时,传入numPartitions
治标不治本的意思,由于,它没有从根本上改变数据倾斜的本质和问题。不像第一个和第二个方案(直接避免了数据倾斜的发生)。原理没有改变,只是说,尽量地去缓解和减轻shuffle reduce task的数据压力,以及数据倾斜的问题。
方案四:双重groupby 在数据量过多的key上作文章咯。把这个key作分解,打散。好比key是“hello”作个map操做,变成“hello1”“hello2”。
使用场景 (1)groupByKey (2)reduceByKey 比较适合使用这种方式;
第一轮聚合的时候,对key进行打散,将原先同样的key,变成不同的key,至关因而将每一个key分为多组; 先针对多个组,进行key的局部聚合;接着,再去除掉每一个key的前缀,而后对全部的key,进行全局的聚合。 对groupByKey、reduceByKey形成的数据倾斜,有比较好的效果。
方案五:(针对join操做)reduce join转换为map join,适合在什么样的状况下,能够来使用? 若是两个RDD要进行join,其中一个RDD是比较小的。一个RDD是100万数据,一个RDD是1万数据。(一个RDD是1亿数据,一个RDD是100万数据) 其中一个RDD必须是比较小的,broadcast出去那个小RDD的数据之后,就会在每一个executor的block manager中都驻留一份。要确保你的内存足够存放那个小RDD中的数据 这种方式下,根本不会发生shuffle操做,确定也不会发生数据倾斜;从根本上杜绝了join操做可能致使的数据倾斜的问题;
方案六:(针对join操做)sample采样倾斜key进行两次join:这种方案何时适合使用? 优先对于join,确定是但愿可以采用上一讲讲的,reduce join转换map join。两个RDD数据都比较大,那么就不要那么搞了。 针对你的RDD的数据,你能够本身把它转换成一个中间表,或者是直接用countByKey()的方式,你能够看一下这个RDD各个key对应的数据量;此时若是你发现整个RDD就一个,或者少数几个key,是对应的数据量特别多;尽可能建议,好比就是一个key对应的数据量特别多。 此时能够采用我们的这种方案,单拉出来那个最多的key;单独进行join,尽量地将key分散到各个task上去进行join操做。 何时不适用呢? 若是一个RDD中,致使数据倾斜的key,特别多;那么此时,最好仍是不要这样了;仍是使用咱们最后一个方案,终极的join数据倾斜的解决方案。
方案七:使用随机数以及扩容表进行join:两个rdd都很大的jion操做,key进行打散,扩容10倍,在join