不进行优化的代码就是耍流氓。html
整体来讲大数据优化主要分为三点,一是充分利用CPU,二是节省内存,三是减小网络传输。java
Hive默认采用集群模式进行计算,若是对于小数据量,能够设置为单台机器进行计算,这样能够大大缩减查询触发任务时间。node
用户能够经过设置hive.exec.mode.local.auto 的值为true,来让Hive在适当的时候自动启动这个优化。web
set hive.exec.mode.local.auto=true; //开启本地 mr //设置 local mr 的最大输入数据量,当输入数据量小于这个值时采用 local mr 的方式, 默认为 134217728,即 128M set hive.exec.mode.local.auto.inputbytes.max=50000000; //设置 local mr 的最大输入文件个数,当输入文件个数小于这个值时采用 local mr 的方 式,默认为 4 set hive.exec.mode.local.auto.input.files.max=10;
null值过滤算法
对于key值倾斜,有的时候是无效的null致使的,这个时候能够考虑过滤掉。sql
hive (default)> insert overwrite table jointable select n.* from (select * from nullidtable where id is not null ) n left join ori o on n.id = o.id;
null值随机分配shell
若是null不是异常数据,那么能够采用随机分配将null值分到不一样分区,解决数据倾斜。数据库
insert overwrite table jointable select n.* from nullidtable n full join ori o on case when n.id is null then concat('hive', rand()) else n.id end = o.id;
对于大数据量去重,能够采用分组的方式进行优化。apache
hive (default)> select count(id) from (select id from bigtable group by id) a;
对关联表进行过滤时,能够考虑在关联时就进行过滤,提升查询时间。数组
hive (default)> select b.id from bigtable b join (select id from ori where id <= 10 ) o on b.id = o.id;
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
增长 map 的方法为:根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M 公式,调整 maxSize 最大值。让 maxSize 最大值低于 blocksize 就能够增长 map 的个数。
设置最大切片值为100个字节
hive (default)> set mapreduce.input.fileinputformat.split.maxsize=100; hive (default)> select count(*) from emp;
调整 reduce 个数方法一
(1)每一个 Reduce 处理的数据量默认是 256MB
hive.exec.reducers.bytes.per.reducer=256000000
(2)每一个任务最大的 reduce 数,默认为 1009
hive.exec.reducers.max=1009
(3)计算 reducer 数的公式
调整 reduce 个数方法二
在 hadoop 的 mapred-default.xml 文件中修改,设置每一个 job 的 Reduce 个数
set mapreduce.job.reduces = 15;
在共享集群中设置并发执行能够提升运行速度。
set hive.exec.parallel=true; //打开任务并行执行 set hive.exec.parallel.thread.number=16; //同一个 sql 容许最大并行度,默认为 8。
固然,得是在系统资源比较空闲的时候才有优点,不然,没资源,并行也起不来。
Hive 提供了一个严格模式,能够防止用户执行那些可能意向不到的很差的影响的查询。经过设置属性 hive.mapred.mode 值为默认是非严格模式 nonstrict 。开启严格模式须要修改 hive.mapred.mode 值为 strict,开启严格模式能够禁止 3 种类型的查询
1).对于分区表,除非 where 语句中含有分区字段过滤条件来限制范围,不然不容许执行。
2).对于使用了 order by 语句的查询,要求必须使用 limit 语句。
3).限制笛卡尔积的查询。对关系型数据库很是了解的用户可能指望在执行 JOIN 查询的时候不使用 ON 语句而是使用 where 语句,这样关系数据库的执行优化器就能够高效地将WHERE 语句转化成那个 ON 语句。
在小文件场景或者task特别多的状况下,执行时间都很短。JVM重用可使用同一个JVM在同一个Job里面重复使用N次。N值在mapred-site.xml文件中进行配置。
<property> <name>mapreduce.job.jvm.numtasks</name> <value>10</value> <description>How many tasks to run per jvm. If set to -1, there is no limit. </description> </property>
缺点:JVM重用会一直占用使用到的task插槽,以便进行重用,若是遇到了某个job里面的reduce task分配不均匀,致使出现某几个task占用task时间很长,其它task空闲也无法被其它job使用,只有全部的task都执行完后才会释放。
对于某些耗时的任务,能够启动推测执行,这样就会把“拖后腿”的任务找出来,而后启动个备份任务执行相同的数据。最后选出执行最快的为最终结果。
设置开启推测执行参数:Hadoop 的 mapred-site.xml 文件中进行配置:
<property> <name>mapreduce.map.speculative</name> <value>true</value> <description>If true, then multiple instances of some map tasks may be executed in parallel.</description> </property> <property> <name>mapreduce.reduce.speculative</name> <value>true</value> <description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description> </property>
不过 hive 自己也提供了配置项来控制 reduce-side 的推测执行:
<property> <name>hive.mapred.reduce.tasks.speculative.execution</name> <value>true</value> <description>Whether speculative execution for reducers should be turned on. </description> </property>
PS:对于时差要求很苛刻的建议关闭掉推测执行。对于执行很长的任务也不建议开启,由于会浪费很大资源。
1)Hadoop Archive:
是一个高效地将小文件放入 HDFS 块中的文件存档工具,它可以将多个小文件打包成
一个 HAR 文件,这样就减小了 namenode 的内存使用。
2)Sequence file:
sequence file 由一系列的二进制 key/value 组成,若是 key 为文件名,value 为文件内容,
则能够将大批小文件合并成一个大文件。
3)CombineFileInputFormat:
CombineFileInputFormat 是一种新的 inputformat,用于将多个文件合并成一个单独的
split,另外,它会考虑数据的存储位置。
经过 http://master:4040咱们能够得到运行中的程序信息。
(1)stages和tasks调度状况;
(2)RDD大小和内存使用状况;
(3)系统环境信息;
(4)正在执行的executor信息;
设置历史服务器记录历史信息:
(1)在$SPARK_HOME/conf/spark-env.sh中设置:
export SPARK_HISTORY_OPTS="-Dspark.history.retainedApplications=50 Dspark.history.fs.logDirectory=hdfs://master01:9000/directory"
说明:spark.history.retainedApplica-tions仅显示最近50个应用。
spark.history.fs.logDirectory:Spark History Server页面只显示该路径下的信息。
(2)$SPARK_HOME/conf/spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop000:8020/directory #应用在运行过程当中全部的信息均记录在该属性指定的路径下
spark.eventLog.compress true
(3)HistoryServer 启动
$SPARK_HOMR/bin/start-histrory-server.sh
(4)HistoryServer 中止
$SPARK_HOMR/bin/stop-histrory-server.sh
--一样executor的logs也是查看的一个出处:
Standalone 模式:$SPARK_HOME/logs
YARN 模式:在 yarn-site.xml 文件中配置了 YARN 日志的存放位置:yarn.nodemanager.log-dirs,或使用命令获取 yarn logs -applicationId。
Nmon
Jmeter
Jprofiler
1.对象占内存,优化数据结构
(1)使用对象数组以及原始类型(primitive type)数组以替代 Java 或 者 Scala 集合类(collection class)。fastutil 库为原始数据类型提供了很是方便的集合类,且兼容 Java 标准类库。
(2)尽量地避免采用含有指针的嵌套数据结构来保存小对象。
(3)考虑采用数字 ID 或者枚举类型以便替代 String 类型的主键。
(4)若是内存少于 32GB,设置 JVM 参数-XX:+UseCom-pressedOops以便将 8 字节指针修改为 4 字节。与此同时,在 Java 7 或者更高版本,设置 JVM 参数-XX:+UseC-----ompressedStrings 以便采用 8 比特来编码每个 ASCII 字符。
2.频繁 GC 或者 OOM
针对这种状况,首先要肯定现象是发生在 Driver 端仍是在 Executor 端,而后在分别处理。
Driver 端:一般因为计算过大的结果集被回收到 Driver 端致使,须要调大 Driver 端的内存解决,或者进一步减小结果集的数量。
Executor 端:
(1)之外部数据做为输入的 Stage:能够增长 partition 的数量(即 task 的数量)来减小每一个 task 要处理的数据,来减小 GC 的可能性。
(2)以 shuffle 做为输入的 Stage:解决数据倾斜问题。
在 spark-default.conf 中添加:spark.speculation true
推测机制与如下几个参数有关:
查找数据倾斜代码
根据shuffler肯定数据倾斜代码,而后经过随机取样找到倾斜数据。
val sampledPairs = pairs.sample(false, 0.1) val sampledWordCounts = sampledPairs.countByKey() sampledWordCounts.foreach(println(_))
好比数据源是Kafka,一般一个分区对应一个Task,因此若是分区数据不均衡,则致使spark处理不均衡。
好比数据源是Hive,若是Hive数据不均衡,也会致使Spark数据倾斜。
解决方案是预处理或者其它。
好比reduceByKey(1000)。若是是group by,join须要设置参数即spark.sql.shuffle.partitions,该参数表明了shuffle read task的并行度,该值默认是200,对于不少场景来讲有点太小。设置完后不一样的key就能分到不一样的task去处理。
针对一个大表一个小表的join操做,使用广播变量将较小的数据进行广播,这样就能够把join改成map操做。
针对RDD执行ReduceByKey等聚合shuffler算子,以及Spark Sql执行GroupByKey等聚合算子,针对数据倾斜,能够先在key前面打上随机前缀,进行聚合,而后再把前缀去掉进行聚合,有效解决值分配不均匀问题。
示例以下:
// 第一步,给 RDD 中的每一个 key 都打上一个随机前缀。 JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair( new PairFunction<Tuple2<Long,Long>, String, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Long> call(Tuple2<Long, Long> tuple) throws Exception { Random random = new Random(); int prefix = random.nextInt(10); return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2); } }); // 第二步,对打上随机前缀的 key 进行局部聚合。 JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey( new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }); // 第三步,去除 RDD 中每一个 key 的随机前缀。 JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair( new PairFunction<Tuple2<String,Long>, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Long, Long> call(Tuple2<String, Long> tuple) throws Exception { long originalKey = Long.valueOf(tuple._1.split("_")[1]); return new Tuple2<Long, Long>(originalKey, tuple._2); } }); // 第四步,对去除了随机前缀的 RDD 进行全局聚合。 JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey( new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } });
具体实现代码以下:
public class SparkDataSkew{ public static void main(String[] args) { int parallelism = 48; SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("SolveDataSkewWithRandomPrefix"); sparkConf.set("spark.default.parallelism", parallelism + ""); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/") .mapToPair((String row) -> { String[] str = row.split(","); return new Tuple2<String, String>(str[0], str[1]); }); JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/") .mapToPair((String row) -> { String[] str = row.split(","); return new Tuple2<String, String>(str[0], str[1]); }); String[] skewedKeyArray = new String[]{"9500048", "9500096"}; Set<String> skewedKeySet = new HashSet<String>(); List<String> addList = new ArrayList<String>(); for(int i = 1; i <=24; i++) { addList.add(i + ""); } for(String key : skewedKeyArray) { skewedKeySet.add(key); } Broadcast<Set<String>> skewedKeys = javaSparkContext.broadcast(skewedKeySet); Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList); JavaPairRDD<String, String> leftSkewRDD = leftRDD .filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1())) .mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>((new Random().nextInt(24) + 1) + "," + tuple._1(), tuple._2())); JavaPairRDD<String, String> rightSkewRDD = rightRDD.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1())) .flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream() .map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2())) .collect(Collectors.toList()) .iterator() ); JavaPairRDD<String, String> skewedJoinRDD = leftSkewRDD .join(rightSkewRDD, parallelism) .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2())); JavaPairRDD<String, String> leftUnSkewRDD = leftRDD.filter((Tuple2<String, String> tuple) -> !skewedKeys.value().contains(tuple._1())); JavaPairRDD<String, String> unskewedJoinRDD = leftUnSkewRDD.join(rightRDD, parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2())); skewedJoinRDD.union(unskewedJoinRDD).foreachPartition((Iterator<Tuple2<String, String>> iterator) -> { AtomicInteger atomicInteger = new AtomicInteger(); iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet()); }); javaSparkContext.stop(); javaSparkContext.close(); } }
代码开发,资源分配和数据倾斜是重中之重,除此以外,Shuffler做为一个补充,也须要学习下。
spark.shuffle.file.buffer
默认值:32K
参数说明:缓冲大小,超过缓冲大小才会写入磁盘。
调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(),从而减小 shuffle write 过程当中溢写磁盘文件的次数,也就能够减小磁盘 IO 次数,进而提高性能。在实践中发现,合理调节该参数,性能会有 1%~5%的提高。
spark.reducer.maxSizeInFlight
默认值:48m
参数说明:这个 buffer 缓冲决定了每次可以拉取多少数据。
调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(好比 96m),从而减小拉取数据的次数,也就能够减小网络传输的次数,进而提高性能。在实践中发现,合理调节该参数,性能会有1%~5%的提高。
spark.shuffle.io.maxRetries
默认值:3
参数说明:拉去失败重试次数。
调优建议:对于那些包含了特别耗时的 shuffle 操做的做业,建议增长重试最大次数(好比 60 次),以免因为 JVM 的 full gc 或者网络不稳定等因素致使的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的 shuffle 过程,调节该参数能够大幅度提高稳定性。
spark.shuffle.io.retryWait
默认值:5s
参数说明:重试拉取数据的等待时间,默认是5s。
调优建议:建议加大间隔时长(好比 60s),以增长 shuffle 操做的稳定性。
spark.shuffle.memoryFraction
默认值:0.2
参数说明:分配给聚合操做的内存比例,默认是20%。
spark.shuffle.manager
默认值:sort
对同一个数据源不要创建多个RDD。
数据有包含关系的RDD能重用的就重用。
每次你对RDD执行算子操做时,都会从源头处从新计算一遍,因此通常会采起持久化方式,这样就直接从内存取了。
对屡次使用的RDD进行持久化示例:
// 若是要对一个 RDD 进行持久化,只要对这个 RDD 调用 cache()和 persist()便可。 // 正确的作法。 // cache()方法表示:使用非序列化的方式将 RDD 中的数据所有尝试持久化到内存中。 // 此时再对 rdd1 执行两次算子操做时,只有在第一次执行 map 算子时,才会将这个 rdd1 从源头处计 算一次。 // 第二次执行 reduce 算子时,就会直接从内存中提取数据进行计算,不会重复计算一个 rdd。 val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache() rdd1.map(...) rdd1.reduce(...) // persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。 // 好比说,StorageLevel.MEMORY_AND_DISK_SER 表示,内存充足时优先持久化到内存中, //内存不充足时持久化到磁盘文件中。 // 并且其中的_SER 后缀表示,使用序列化的方式来保存 RDD 数据,此时 RDD 中的每一个 partition //都会序列化成一个大的字节数组,而后再持久化到内存或磁盘中 // 序列化的方式能够减小持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多, //从而发生频繁 GC。 val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") .persist(StorageLevel.MEMORY_AND_DISK_SER) rdd1.map(...) rdd1.reduce(...)
开发过程当中,能避免则尽量避免使用 reduceByKey、join、distinct、repartition 等会进行 shuffle 的算子,尽可能使用 map 类的非shuffle 算子。这样的话,没有 shuffle 操做或者仅有较少 shuffle 操做的Spark 做业,能够大大减小性能开销。
Broadcast 与 map 进行 join 代码示例:
// 传统的 join 操做会致使 shuffle 操做。 // 由于两个 RDD 中,相同的 key 都须要经过网络拉取到一个节点上,由一个 task 进行 join 操做。 val rdd3 = rdd1.join(rdd2) // Broadcast+map 的 join 操做,不会致使 shuffle 操做。 // 使用 Broadcast 将一个数据量较小的 RDD 做为广播变量。 val rdd2Data = rdd2.collect() val rdd2DataBroadcast = sc.broadcast(rdd2Data) // 在 rdd1.map 算子中,能够从 rdd2DataBroadcast 中,获取 rdd2 的全部数据。 // 而后进行遍历,若是发现 rdd2 中某条数据的 key 与 rdd1 的当前数据的 key 是相同的, //那么就断定能够进行 join。 // 此时就能够根据本身须要的方式,将 rdd1 当前数据与 rdd2 中能够链接的数据, //拼接在一块儿(String 或 Tuple)。 val rdd3 = rdd1.map(rdd2DataBroadcast...) // 注意,以上操做,建议仅仅在 rdd2 的数据量比较少(好比几百 M,或者一两 G)的状况下使用。 // 由于每一个 Executor 的内存中,都会驻留一份 rdd2 的全量数据。
若是由于业务须要,必定要使用 shuffle 操做,没法用 map 类的算子来替代,那么尽可能使用能够 map-side 预聚合的算子。
使用reduceByKey,aggregateByKey代替groupByKey,由于reduceByKey和aggregateByKey会进行预聚合,groupByKey不会。
使用 reduceByKey/aggregateByKey 替代 groupByKey,详情见“原则五:使用 map-side 预聚合的 shuffle 操做”。
使用 mapPartitions 替代普通 map。
使用 filter 以后进行 coalesce 操做。
使用 repartitionAndSortWithinPartitions 替代 repartition 与 sort 类操做。
有时在开发过程当中,会遇到须要在算子函数中使用外部变量的场景,那么此时就应该使用 Spark的广播(Broadcast)功能来提高性能。由于若是不使用广播变量,那么每一个任务会拉取数据并建立一个副本,这样会大大增长网络开销,并占用系统内存。若是使用广播变量的话,数据就会保留一份。
广播大变量代码示例:
// 如下代码在算子函数中,使用了外部的变量。 // 此时没有作任何特殊操做,每一个 task 都会有一份 list1 的副本。 val list1 = ... rdd1.map(list1...) // 如下代码将 list1 封装成了 Broadcast 类型的广播变量。 // 在算子函数中,使用广播变量时,首先会判断当前 task 所在 Executor 内存中,是否有变量副本。 // 若是有则直接使用;若是没有则从 Driver 或者其余 Executor 节点上远程拉取一份放到本地 Executor 内存中。 // 每一个 Executor 内存中,就只会驻留一份广播变量副本。 val list1 = ... val list1Broadcast = sc.broadcast(list1) rdd1.map(list1Broadcast...)
代码示例:
// 建立 SparkConf 对象。 val conf = new SparkConf().setMaster(...).setAppName(...) // 设置序列化器为 KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注册要序列化的自定义类型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
Java 中,有三种类型比较耗费内存:
1.对象。 2.集合类型,好比HashMap,LinedList等。3.字符串,每一个字符串内部都有一个字符数组以及长度等额外信息。
在spark-submit调节资源参数来提升资源利用率。
num-executors
参数说明:设置spark做业总共用多少个executor来执行。
参数调优建议:每一个spark做业通常设置50~100个左右的Executor进程比较合适。过小不能充分利用资源,太大队列没法提供足够的资源。
executor-memory
参数说明:设置每一个Executor进程的内存。
参数调优建议:每一个 Executor 进程的内存设置 4G~8G 较为合适。可是这只是一个参考值,具体的设置仍是得根据不一样部门的资源队列来定。能够看看本身团队的资源队列的最大内存限制是多少,num-executors 乘 以 executor-memory,是不能超过队列的最大内存量的。此外,若是你是跟团队里其余人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的 1/3~1/2,避免你本身的 Spark 做业占用了队列全部的资源,致使别的同窗的做业没法运行。
executor-cores
参数说明:设置每一个Executor进程CUP core数量。由于每一个cpu core一个时间只能执行一个task,因此cpu core数量越多,执行速度越快。
参数调优建议:Executor 的 CPU core 数量设置为 2~4 个较为合适。一样得根据不一样部门的资源队列来定,能够看看本身的资源队列的最大CPU core 限制是多少,再依据设置的 Executor 数量,来决定每一个 Executor进程能够分配到几个 CPU core。一样建议,若是是跟他人共享这个队列,那 么 num-executors * executor-cores 不 要 超 过 队 列 总 CPU core 的1/3~1/2 左右比较合适,也是避免影响其余同窗的做业运行。
driver-memory
参数说明:设置Driver进程的内存。
参数调优建议:Driver 的内存一般来讲不设置,或者设置 1G 左右应该就够了。
spark.default.parallelism
参数说明:该参数用于设置每一个 stage 的默认 task 数量。这个参数极为重要,若是不设置可能会直接影响你的 Spark 做业性能。
参数调优建议:Spark 做业的默认 task 数量为 500~1000 个较为合适。很 多 同 学 常 犯 的 一 个 错 误 就 是 不 去 设 置 这 个 参 数 , 那 么 此 时 就 会 导 致Spark 本身根据底层 HDFS 的 block 数量来设置 task 的数量,默认是一个HDFS block 对应一个 task。一般来讲,Spark 默认设置的数量是偏少的(好比就几十个 task),若是 task 数量偏少的话,就会致使你前面设置好的Executor 的参数都前功尽弃。试想一下,不管你的 Executor 进程有多少个,内存和 CPU 有多大,可是 task 只有 1 个或者 10 个,那么 90%的 Executor进程可能根本就没有 task 执行,也就是白白浪费了资源!所以 Spark 官网建议的设置原则是,设置该参数为 num-executors * executor-cores 的 2~3倍较为合适,好比 Executor 的总 CPU core 数量为 300 个,那么设置 1000个 task 是能够的,此时能够充分地利用 Spark 集群的资源。
spark.storage.memoryFraction
参数说明:设置持久化数据在Executor占比,默认是0.6。
根据你选择的不一样的持久化策略,若是内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
参数调优建议:根据实际,能够适当提升,让数据写入内存。
spark.shuffle.memoryFraction
参数说明:该参数用于设置 shuffle 过程当中一个 task 拉取到上个 stage的 task 的输出后,进行聚合操做时可以使用的 Executor 内存的比例,默认是 0.2。
参数调优建议:若是 Spark 做业中的 RDD 持久化操做较少,shuffle 操做较多时,建议下降持久化操做的内存占比,提升 shuffle 操做的内存占比比例,避免 shuffle 过程当中数据过多时内存不够用,必须溢写到磁盘上,下降了性能。此外,若是发现做业因为频繁的 gc 致使运行缓慢,意味着 task执行用户代码的内存不够用,那么一样建议调低这个参数的值。
资源参数的调优,没有一个固定的值,须要同窗们根据本身的实际状况(包括 Spark 做业中的 shuffle 操做数量、RDD 持久化操做数量以及 spark web ui 中显示的做业 gc 状况),同时参考给出的原理以及调优建议,合理地设置上述参数。
资源参数参考示例:
如下是一份 spark-submit 命令的示例,你们能够参考一下,并根据本身的实际状况进行调节。
./bin/spark-submit \ --master yarn-cluster \ --num-executors 100 \ --executor-memory 6G \ --executor-cores 4 \ --driver-memory 1G \ --conf spark.default.parallelism=1000 \ --conf spark.storage.memoryFraction=0.5 \ --conf spark.shuffle.memoryFraction=0.3 \
web.backpressure.cleanup-interval
说明:当启动反压数据采集后,获取反压前等待时间,默认是60s。
web.backpressure.delay-between-samples:Stack Trace
说明:抽样到确认反压状态之间的时延,默认为50ms。
web.backpressure.num-samples
说明:设定Stack Trace抽样数以肯定反压状态,默认为100。
经过调整Checkpointing之间的时间间隔进行优化。
val env=StreamExecutionEnvironment.getExecutionEnvironment env.getCheckpointConfig.setMinPauseBetweenCheckpoints(milliseconds)
目前可用的压缩算法是Snappy,设置以下:
val env=StreamExecutionEnvironment.getExecutionEnvironment val config = env.getConfig config.setUseSnapshotCompression(true)
Flink,Spark等大数据引擎都实现了本身的内存管理,有效解决JVM内存溢出问题。
JobManager配置
jobmanager.heap.size:设定JobManager堆内存大小,默认为1024MB。
TaskManager配置
TaskManager做为Flink集群中的工做节点,全部任务的计算逻辑均执行在TaskManager之上,所以对TaskManager内存配置显得尤其重要,能够经过如下参数配置对TaskManager进行优化和调整。
taskmanager.heap.size
说明:设定TaskManager堆内存大小,默认值为1024M,若是在Yarn的集群中,TaskManager取决于Yarn分配给TaskManager Container的内存大小,且Yarn环境下通常会减掉一部份内存用于Container的容错。
taskmanager.jvm-exit-on-oom
说明:设定TaskManager是否会由于JVM发生内存溢出而中止,默认为false,当TaskManager发生内存溢出时,也不会致使TaskManager中止。
taskmanager.memory.size
说明:设定TaskManager内存大小,默认为0,若是不设定该值将会使用taskmanager.memory.fraction做为内存分配依据。
taskmanager.memory.fraction
说明:设定TaskManager堆中去除Network Buffers内存后的内存分配比例。该内存主要用于TaskManager任务排序、缓存中间结果等操做。例如,若是设定为0.8,则表明TaskManager保留80%内存用于中间结果数据的缓存,剩下20%的内存用于建立用户定义函数中的数据对象存储。注意,该参数只有在taskmanager.memory.size不设定的状况下才生效。
taskmanager.memory.off-heap
说明:设置是否开启堆外内存供Managed Memory或者Network Buffers使用。
taskmanager.memory.preallocate
说明:设置是否在启动TaskManager过程当中直接分配TaskManager管理内存。
taskmanager.numberOfTaskSlots
说明:每一个TaskManager分配的slot数量。
taskmanager.network.memory.fraction
说明:JVM中用于Network Buffers的内存比例。
taskmanager.network.memory.min
说明:最小的Network Buffers内存大小,默认为64MB。
taskmanager.network.memory.max
说明:最大的Network Buffers内存大小,默认1GB。
taskmanager.memory.segment-size
说明:内存管理器和Network栈使用的Buffer大小,默认为32KB。
默认Flink使用的Parallel Scavenge的垃圾回收器,能够改用G1垃圾回收器。
启动参数:
env.java.opts= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=300 -XX:+PrintGCDetails
-XX:MaxGCPauseMillis:设置容许的最大GC停顿时间,默认是200ms。
-XX:G1HeapRegionSize:每一个分区的大小,默认值会根据整个堆区的大小计算出来,范围是1M~32M,取值是2的幂,计算的倾向是尽可能有2048个分区数。
-XX:MaxTenuringThreshold=n:晋升到老年代的“年龄”阈值,默认值为15。
-XX:InitiatingHeapOccupancyPercent:通常会简写IHOP,默认是45%,这个占比跟并发周期的启动相关,当空间占比达到这个值时,会启动并发周期。若是常常出现FullGC,能够调低该值,今早的回收能够减小FullGC的触发,但若是太低,则并发阶段会更加频繁,下降应用的吞吐。
-XX:G1NewSizePercent:年轻代最小的堆空间占比,默认是5%。
-XX:G1MaxNewSizePercent:年轻代最大的堆空间占比,默认是60%。
-XX:ConcGCThreads:并发执行的线程数,默认值接近整个应用程序数的1/4。
-XX:-XX:G1HeapWastePercent:容许的浪费空间的占比,默认是5%。若是并发标记可回收的空间小于5%,则不会抛出MixedGC。
-XX:G1MixedGCCountTarget:一次全局并发标记以后,后续最多执行的MixedGC次数。默认值是8。