本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归做者(秦凯新)全部,禁止转载,欢迎学习。apache
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
复制代码
程序开发调优 :避免建立重复的RDD数组
val rdd1 = sc.textFile("hdfs://master01:9000/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://master01:9000/hello.txt")
rdd2.reduce(...)
复制代码
须要对名为“hello.txt”的HDFS文件进行一次map操做,再进行一次reduce操做。 也就是说,须要对一份数据执行两次算子操做。 错误的作法:对于同一份数据执行屡次算子操做时,建立多个RDD。 这里执行了两次textFile方法,针对同一个HDFS文件,建立了两个RDD出来 ,而后分别对每一个RDD都执行了一个算子操做。 这种状况下,Spark须要从HDFS上两次加载hello.txt文件的内容,并建立两个单独的RDD; 第二次加载HDFS文件以及建立RDD的性能开销,很明显是白白浪费掉的。网络
程序开发调优 :尽量复用同一个RDD数据结构
错误的作法: 有一个<long , String>格式的RDD,即rdd1。 接着因为业务须要,对rdd1执行了一个map操做,建立了一个rdd2, 而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。ide
JavaPairRDD<long , String> rdd1 = ... JavaRDD rdd2 = rdd1.map(...)函数
分别对rdd1和rdd2执行了不一样的算子操做。post
rdd1.reduceByKey(...)
rdd2.map(...)
复制代码
rdd2的数据彻底就是rdd1的子集而已,却建立了两个rdd,并对两个rdd都执行了一次算子操做。 此时会由于对rdd1执行map算子来建立rdd2,而多执行一次算子操做,进而增长性能开销。 其实在这种状况下彻底能够复用同一个RDD。 咱们可使用rdd1,既作reduceByKey操做,也作map操做。性能
JavaPairRDD<long , String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)
复制代码
程序开发调优 :对屡次使用的RDD进行持久化学习
// 正确的作法。
// cache()方法表示:使用非序列化的方式将RDD中的数据所有尝试持久化到内存中。
// 此时再对rdd1执行两次算子操做时,只有在第一次执行map算子时,才会将这个rdd1从源头处计算一次。
// 第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)
正确的作法:
// 序列化的方式能够减小持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,
//从而发生频繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
.persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)
复制代码
一般不建议使用DISK_ONLY和后缀为_2的级别:由于彻底基于磁盘文件进行数据的读写,会致使性能急剧下降,已经网络较大开销测试
若是有可能的话,要尽可能避免使用shuffle类算子,最消耗性能的地方就是shuffle过程。
shuffle过程当中,各个节点上的相同key都会先写入本地磁盘文件中,而后其余节点须要经过网络传输拉取各个节点上的磁盘文件中的相同key。并且相同key都拉取到同一个节点进行聚合操做时,还有可能会由于一个节点上处理的key过多,致使内存不够存放,进而溢写到磁盘文件中。所以在shuffle过程当中,可能会发生大量的磁盘文件读写的IO操做,以及数据的网络传输操做。磁盘IO和网络数据传输也是shuffle性能较差的主要缘由。
尽量避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽可能使用map类的非shuffle算子。
// 传统的join操做会致使shuffle操做。
// 由于两个RDD中,相同的key都须要经过网络拉取到一个节点上,由一个task进行join操做。
val rdd3 = rdd1.join(rdd2)
// Broadcast+map的join操做,不会致使shuffle操做。
// 使用Broadcast将一个数据量较小的RDD做为广播变量。
// 注意,以上操做,建议仅仅在rdd2的数据量比较少(好比几百M,或者一两G)的状况下使用。
// 由于每一个Executor的内存中,都会驻留一份rdd2的全量数据。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
val rdd3 = rdd1.map(rdd2DataBroadcast...)
复制代码
若是由于业务须要,必定要使用shuffle操做,没法用map类的算子来替代,那么尽可能使用能够map-side预聚合的算子 相似于MapReduce中的本地combiner。map-side预聚合以后,每一个节点本地就只会有一条相同的key,由于多条相同的key都被聚合起来了。其余节点在拉取全部节点上的相同key时,就会大大减小须要拉取的数据数量,从而也就减小了磁盘IO以及网络传输开销。
建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子
repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,若是须要在repartition重分区以后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子
有时在开发过程当中,会遇到须要在算子函数中使用外部变量的场景(尤为是大变量,好比100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提高性能。 默认状况下,Spark会将该变量复制多个副本,经过网络传输到task中,此时每一个task都有一个变量副本。若是变量自己比较大的话(好比100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存致使的频繁GC,都会极大地影响性能。 广播后的变量,会保证每一个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。
Spark默认使用的是Java的序列化机制,你可使用Kryo做为序列化类库,效率要比 Java的序列化机制要高:
// 建立SparkConf对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
复制代码
Java中,有三种类型比较耗费内存:
一、对象,每一个Java对象都有对象头、引用等额外的信息,所以比较占用内存空间。
二、字符串,每一个字符串内部都有一个字符数组以及长度等额外信息。
三、集合类型,好比HashMap、LinkedList等,由于集合类型内部一般会使用一些内部类来封装集合元素,好比Map.Entry
Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽可能不要使用上述三种数据结构,尽可能使用字符串替代对象,使用原始类型(好比Int、Long)替代字符串,使用数组替代集合类型,这样尽量地减小内存占用,从而下降GC频率,提高性能。
由于开发程序调优相对成熟,因此在此参考大牛的笔记,加上本身的总结,一鼓作气。
秦凯新 于深圳