spark优化参数调节和故障参数调节

1:“物尽其用”,但给spark分配多个机器后,先需配置spark-submit shell以下:

/usr/local/spark/bin/spark-submit \
--class com.spark.test.Top3UV \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
--files /usr/local/hive/conf/hive-site.xml \
--driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.17.jar \
/usr/local/jars/spTest-1.0-SNAPSHOT-jar-with-dependencies.jar \

executor的cpu核心数为3,而且executor数量为3,那么总cpu核心数就是9,task并行度推荐设置2~3倍的cpu-core才能“物尽其用”,由于难以保证全部task都在同一时间执行完成!

  SparkConf conf = new SparkConf()java

  conf.set("spark.default.parallelism", "500")mysql

2:重复使用的Rdd,须要缓存:StorageLevel.MEMORY_AND_DISK_SER_2()

可选择:
(1)内存缓存(2)内存磁盘缓存(3)带有序列化的缓存(4)带有副本的缓存-》以防数据丢失,形如 _2。
***复用的rdd还能够持久化到hdfs,使用checkpoint机制,以下

javaSparkContext.setCheckpointDir("hdfs://xxx:9000/checkPointPath");//设置checkpoint的存储路径
rdd_date_range.checkpoint();//对rdd_date_range的Rdd进行checkpoint存储(若是rdd使用cache则checkpoint数据从缓存中获取)
3:shuffle操做时优化
 (1)开启 consolidateFile,这样map端产生的file数量会和下游stage的task数量一致,不会由于重复建立文件致使性能降低。
 (2)"spark.shuffle.file.buffer", "128k",设置以后可减小map端数据输出到文件的次数减小,提高性能。
 (3)"spark.reducer.maxSizeInFlight", "96m",下游stage拉取map造成file中的内容,每次拉取的数据量,值太小会致使屡次网络通讯。
 (4)"spark.shuffle.io.maxRetries", "6",若是上游jvm出现stw的话,有可能下游获取file时会出现,没法获取的状况,这个参数表明能够重试的次数,"spark.shuffle.io.retryWait", "10s"而这个参数设定每次重试的间隔时间。
4:使用  fastUtil工具  代替jdk中带有的基础数据类型,减少内存开销;如:ArrayList或者HashMap等。
5:使用kryo序列化工具,这样序列化数据的速度可以提高,并且可以减少内存的开销;***但要注意的是
使用kryo序列化,若是涉及到自定义类型必需要注册,这样才能被kryo序列化***
6:spark1.2.x 之后默认使用sort shuffle manager,但若是没有必要使用排序功能能够在SparkConf中设置便可;
以下:
   new SparkConf().set("spark.shuffle.manager","hash");

***固然spark1.5.x之后又出现了tungsten-sort shuffle manager,要比sort shuffle manager性能更好***
(若是使用sort shuffle manager能够经过new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold","350");若是实际task的数量大于350才开启sort shuffle manager)
7:若是在某处使用了spark sql,那么这个阶段的partition数量是不受控制的spark.default.parallelism设置控制的,若是想要设置须要使用repartition方法来设置实际的partition数量;
以下:
actionDF.javaRDD().repartition(1000);//在使用DataFrame转化成javaRDD的时候,使用repartition来提升实际的数据分块数量,从而提升并行度。
8:若是可用内存比较大的话,那么map操做可使用mapPartitions来代替;
如:
rdd.mapToPair使用rdd.mapPartitionsToPair来代替,这样会一次性得到rdd中的某个partition,方法变成迭代的方式仅仅执行一次(可是这样很是大的可能致使oom直接挂掉);
 rdd.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Row>, Object, Object>() {
            @Override
            public Iterable<Tuple2<Object, Object>> call(Iterator<Row> rowIterator) throws Exception {
                return null;
            }
        });
        return rdd.mapToPair(new PairFunction<Row, String, Row>() {
            @Override
            public Tuple2<String, Row> call(Row row) throws Exception {
                return new Tuple2<String, Row>(row.getString(2), row);
            }
        });
9:shuffle阶段,在reduce中有时须要调节拉取数据时内存缓冲区(从map端输出的action触发job计算的文件),默认是48MB,若是当数据量特别大的时候颇有可能出现OOM的问题,这个问题除了增长硬件条件外,必须经过牺牲性能来换取执行能力了;
调整参数:
 将默认48MB下调到10MB,增长数据拉取(增长了网络通讯次数)的次数,来避免OOM
 new SparkConf().set("spark.reducer.maxSizeInFlight","10");
10:若是在日志中出现了shuffle file not found 错误! 极可能是因为reduce时executor的jvm发生gc致使了reduce阶段没法得到文件,
解决问题能够经过,增长重试次数,并调节重试的周期:
.set("spark.shuffle.io.maxRetries",3);//默认重试次数是3次,能够调成60
.set("spark.shuffle.io.retryWait",5);//默认每隔5s重试一次,能够调成60
11:若是报错 Scala.Math(NULL) 相似于这种异常,那就是说明在算子中出现了null值的直接返回。
12:spark默认状况下cache缓存配置占比为spark.storage.memoryFraction:0.6,咱们能够调整的小些如0.3,必要的时候可使用persist进行内存+磁盘的缓存方式(StorageLevel.MEMORY_AND_DISK())进行缓存;这样可以保证运行spark核心业务的各类算子可以有足够的运行空间,防止因为内存不足而且频繁的GC而形成spark做业执行的卡顿。
相关文章
相关标签/搜索