一:Spark的性能优化,主要手段包括:
一、使用高性能序列化类库
二、优化数据结构
三、对屡次使用的RDD进行持久化 / Checkpoint
四、使用序列化的持久化级别
五、Java虚拟机垃圾回收调优
六、提升并行度
七、广播共享数据
八、数据本地化
九、reduceByKey和groupByKey的合理使用
十、Shuffle调优(核心中的核心,重中之重)java
二:spark诊断内存消耗node
java主要的内存消耗算法
1、每一个Java对象,都有一个对象头,会占用16个字节,主要是包括了一些对象的元信息,好比指向它的类的指针。若是一个对象自己很小,好比就包括了一个int类型的field,那么它的对象头实际上比对象本身还要大。 2、Java的String对象,会比它内部的原始数据,要多出40个字节。由于它内部使用char数组来保存内部的字符序列的,而且还得保存诸如数组长度之类的信息。并且由于String使用的是UTF-16编码,因此每一个字符会占用2个字节。好比,包含10个字符的String,会占用60个字节。 3、Java中的集合类型,好比HashMap和LinkedList,内部使用的是链表数据结构,因此对链表中的每个数据,都使用了Entry对象来包装。Entry对象不光有对象头,还有指向下一个Entry的指针,一般占用8个字节。 4、元素类型为原始数据类型(好比int)的集合,内部一般会使用原始数据类型的包装类型,好比Integer,来存储元素。
怎么判断程序消耗的内存:apache
1、首先,本身设置RDD的并行度,有两种方式:要否则,在parallelize()、textFile()等方法中,传入第二个参数,设置RDD的task / partition的数量;要否则,用SparkConf.set()方法,设置一个参数,spark.default.parallelism,能够统一设置这个application全部RDD的partition数量。 2、其次,在程序中将RDD cache到内存中,调用RDD.cache()方法便可。 3、最后,观察Driver的log,你会发现相似于:“INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)”的日志信息。这就显示了每一个partition占用了多少内存。 4、将这个内存信息乘以partition数量,便可得出RDD的内存占用量。
三:spark高性能序列化库编程
两种序列化机制json
spark默认使用了第一种序列化机制: 1、Java序列化机制:默认状况下,Spark使用Java自身的ObjectInputStream和ObjectOutputStream机制进行对象的序列化。只要你的类实现了Serializable接口,那么都是能够序列化的。并且Java序列化机制是提供了自定义序列化支持的,只要你实现Externalizable接口便可实现本身的更高性能的序列化算法。Java序列化机制的速度比较慢,并且序列化后的数据占用的内存空间比较大。 2、Kryo序列化机制:Spark也支持使用Kryo类库来进行序列化。Kryo序列化机制比Java序列化机制更快,并且序列化后的数据占用的空间更小,一般比Java序列化的数据占用的空间要小10倍。Kryo序列化机制之因此不是默认序列化机制的缘由是,有些类型虽然实现了Seriralizable接口,可是它也不必定可以进行序列化;此外,若是你要获得最佳的性能,Kryo还要求你在Spark应用程序中,对全部你须要序列化的类型都进行注册。
如何使用Kroyo序列数组
方式一:SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
方式二:若是要注册自定义的类型,那么就使用以下的代码,便可:
Scala版本:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[Counter] ))
val sc = new SparkContext(conf)
Java版本:
SparkConf conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Counter.class)
JavaSparkContext sc = new JavaSparkContext(conf)
使用Kroyo序列的建议:缓存
1、优化缓存大小 若是注册的要序列化的自定义的类型,自己特别大,好比包含了超过100个field。那么就会致使要序列化的对象过大。此时就须要对Kryo自己进行优化。由于Kryo内部的缓存可能不够存放那么大的class对象。此时就须要调用SparkConf.set()方法,设置spark.kryoserializer.buffer.mb参数的值,将其调大。 默认状况下它的值是2,就是说最大能缓存2M的对象,而后进行序列化。能够在必要时将其调大。好比设置为10。 2、预先注册自定义类型 虽然不注册自定义类型,Kryo类库也能正常工做,可是那样的话,对于它要序列化的每一个对象,都会保存一份它的全限定类名。此时反而会耗费大量内存。所以一般都建议预先注册号要序列化的自定义的类。
使用场景:性能优化
首先,这里讨论的都是Spark的一些普通的场景,一些特殊的场景,好比RDD的持久化,在后面会讲解。这里先不说。
那么,这里针对的Kryo序列化类库的使用场景,就是算子函数使用到了外部的大数据的状况。好比说吧,咱们在外部定义了一个封装了应用全部配置的对象,好比自定义了一个MyConfiguration对象,里面包含了100m的数据。而后,在算子函数里面,使用到了这个外部的大对象。
此时呢,若是默认状况下,让Spark用java序列化机制来序列化这种外部的大对象,那么就会致使,序列化速度缓慢,而且序列化之后的数据仍是比较大,比较占用内存空间。
所以,在这种状况下,比较适合,切换到Kryo序列化类库,来对外部的大对象进行序列化操做。一是,序列化速度会变快;二是,会减小序列化后的数据占用的内存空间。
四:Spark优化数据结构网络
目的:使用数据结构是为了减小数据的占用量,从而减小内存的开销。
优化的对象:主要就是优化你的算子函数,内部使用到的局部数据,或者是算子函数外部的数据。均可以进行数据结构的优化。优化以后,都会减小其对内存的消耗和占用。
优化方式:
1、优先使用数组以及字符串,而不是集合类。也就是说,优先用array,而不是ArrayList、LinkedList、HashMap等集合。 好比,有个List<Integer> list = new ArrayList<Integer>(),将其替换为int[] arr = new int[]。这样的话,array既比List少了额外信息的存储开销,还能使用原始数据类型(int)来存储数据,比List中用Integer这种包装类型存储数据,要节省内存的多。 还好比,一般企业级应用中的作法是,对于HashMap、List这种数据,统一用String拼接成特殊格式的字符串,好比Map<Integer, Person> persons = new HashMap<Integer, Person>()。能够优化为,特殊的字符串格式:id:name,address|id:name,address...。 2、避免使用多层嵌套的对象结构。好比说,public class Teacher { private List<Student> students = new ArrayList<Student>() }。就是很是很差的例子。由于Teacher类的内部又嵌套了大量的小Student对象。 好比说,对于上述例子,也彻底可使用特殊的字符串来进行数据的存储。好比,用json字符串来存储数据,就是一个很好的选择。 {"teacherId": 1, "teacherName": "leo", students:[{"studentId": 1, "studentName": "tom"},{"studentId":2, "studentName":"marry"}]} 3、对于有些可以避免的场景,尽可能使用int替代String。由于String虽然比ArrayList、HashMap等数据结构高效多了,占用内存量少多了,可是以前分析过,仍是有额外信息的消耗。好比以前用String表示id,那么如今彻底能够用数字类型的int,来进行替代。 这里提醒,在spark应用中,id就不要用经常使用的uuid了,由于没法转成int,就用自增的int类型的id便可。(sdfsdfdf-234242342-sdfsfsfdfd)
五:对屡次使用的RDD进行持久化操做 或 CheckPoint
对屡次运算的RDD进行持久化或放到内存,能够减小对重复计算的代价;
若是要保证在RDD的持久化数据可能丢失的状况下,还要保证高性能,那么能够对RDD进行Checkpoint操做。
对数据的持久化有多重级别:
除了对屡次使用的RDD进行持久化操做以外,还能够进一步优化其性能。由于颇有可能,RDD的数据是持久化到内存,或者磁盘中的。那么,此时,若是内存大小不是特别充足,彻底可使用序列化的持久化级别,好比MEMORY_ONLY_SER、MEMORY_AND_DISK_SER等。使用RDD.persist(StorageLevel.MEMORY_ONLY_SER)这样的语法便可。
这样的话,将数据序列化以后,再持久化,能够大大减少对内存的消耗。此外,数据量小了以后,若是要写入磁盘,那么磁盘io性能消耗也比较小。
对RDD持久化序列化后,RDD的每一个partition的数据,都是序列化为一个巨大的字节数组。这样,对于内存的消耗就小的多了。可是惟一的缺点就是,获取RDD数据时,须要对其进行反序列化,会增大其性能开销。
所以,对于序列化的持久化级别,还能够进一步优化,也就是说,使用Kryo序列化类库,这样,能够得到更快的序列化速度,而且占用更小的内存空间。可是要记住,若是RDD的元素(RDD<T>的泛型类型),是自定义类型的话,在Kryo中提早注册自定义类型。
六:JVM虚拟机垃圾回收
主要是建立少许的对象,以及建立对象的大小。编程中避免大对象。
还有一些jvm的通用方法。都是通用的,能够参考一些通用方法。
七:提升并行度
实际上Spark集群的资源并不必定会被充分利用到,因此要尽可能设置合理的并行度,来充分地利用集群的资源。才能充分提升Spark应用程序的性能。
Spark会自动设置以文件做为输入源的RDD的并行度,依据其大小,好比HDFS,就会给每个block建立一个partition,也依据这个设置并行度。对于reduceByKey等会发生shuffle的操做,就使用并行度最大的父RDD的并行度便可。
能够手动使用textFile()、parallelize()等方法的第二个参数来设置并行度;也可使用spark.default.parallelism参数,来设置统一的并行度。
好比说,spark-submit设置了executor数量是10个,每一个executor要求分配2个core,那么application总共会有20个core。此时能够设置new SparkConf().set("spark.default.parallelism", "60")来设置合理的并行度,从而充分利用资源。
官方建议设置的并行数量为2-3倍的cpu cores的数量,这样可使一些计算能力较弱的cpu少计算一些数据。能力好的cpu计算多一些数据。
八:广播共享文件
若是你的算子函数中,使用到了特别大的数据,那么,这个时候,推荐将该数据进行广播。这样的话,就不至于将一个大数据拷贝到每个task上去。而是给每一个节点拷贝一份,而后节点上的task共享该数据。
这样的话,就能够减小大数据在节点上的内存消耗。而且能够减小数据到节点的网络传输消耗。
九:数据本地化
基于移动计算的成本要远远小于移动数据的原则。
数据本地化级别:
数据本地化,指的是,数据离计算它的代码有多近。基于数据距离代码的距离,有几种数据本地化级别: 1、PROCESS_LOCAL:数据和计算它的代码在同一个JVM进程中。 2、NODE_LOCAL:数据和计算它的代码在一个节点上,可是不在一个进程中,好比在不一样的executor进程中,或者是数据在HDFS文件的block中。 3、NO_PREF:数据从哪里过来,性能都是同样的。 4、RACK_LOCAL:数据和计算它的代码在一个机架上。 5、ANY:数据可能在任意地方,好比其余网络环境内,或者其余机架上。
优化方案:
Spark倾向于使用最好的本地化级别来调度task,可是这是不可能的。若是没有任何未处理的数据在空闲的executor上,那么Spark就会放低本地化级别。这时有两个选择:第一,等待,直到executor上的cpu释放出来,那么就分配task过去;第二,当即在任意一个executor上启动一个task。
Spark默认会等待一下子,来指望task要处理的数据所在的节点上的executor空闲出一个cpu,从而将task分配过去。只要超过了时间,那么Spark就会将task分配到其余任意一个空闲的executor上。
能够设置参数,spark.locality系列参数,来调节Spark等待task能够进行数据本地化的时间。spark.locality.wait(3000毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack。
十: groupByKey 和 ReduceByKey
若是能用reduceByKey,那就用reduceByKey,由于它会在map端,先进行本地combine,能够大大减小要传输到reduce端的数据量,减少网络传输的开销。
只有在reduceByKey处理不了时,才用groupByKey().map()来替代。
十一: shuffle优化
了解下shuffle的过程:
优化参数:
new SparkConf().set("spark.shuffle.consolidateFiles", "true") spark.shuffle.consolidateFiles:是否开启shuffle block file的合并,默认为false spark.reducer.maxSizeInFlight:reduce task的拉取缓存,默认48m spark.shuffle.file.buffer:map task的写磁盘缓存,默认32k spark.shuffle.io.maxRetries:拉取失败的最大重试次数,默认3次 spark.shuffle.io.retryWait:拉取失败的重试间隔,默认5s spark.shuffle.memoryFraction:用于reduce端聚合的内存比例,默认0.2,超过比例就会溢出到磁盘上
1- spark.shuffle.consolidateFiles参数优化
没有开启consolidation机制的时候,shuffle write的性能是比较低下的,并且会影响到shuffle read的性能,也会比较低下。
由于在shuffle map端建立的磁盘文件太多了,致使shuffle write要耗费大量的性能到磁盘文件的建立,以及磁盘的io上。对于shuffle read,也是同样的,每一个result task可能都须要经过磁盘io读取多个文件的数据,都只shuffle read,性能可能也会受到影响。作主要的仍是shuffle write,由于要写的磁盘文件太多。
好比每一个节点有100个shuffle map task,10个CPU core是,总共有1000个result task。因此,每一个节点上的磁盘文件为100*1000个。
设置为true时,每一个cpu为每一个result task写一个文件(文件内容是以前的数据进行合并的结果),每一个节点上的磁盘文件为10*1000个。
2- spark.reducer.maxSizeInFlight
若是内存足够的话,这个量应该增大,这样,result task拉取的次数会减小(每次拉取数据量增长)。
3- spark.shuffle.file.buffer
能够适量增大,这样每次写入到文件的数据量减小,从而减小写文件的次数。
4- spark.shuffle.io.maxRetries
拉取数据的时候,可能jvm在full GC。
5- spark.shuffle.io.retryWait
能够适当增长时间。为了应对jvm 的full GC。
6- spark.shuffle.memoryFraction
能够适当的调大。
执行reduce task的Excetor中,有一部份内存是用来汇聚各个reduce task拉取的数据,放到map集合中,进行聚合。
当该数据超过总缓存*比例时,会把该内存的数据写入到磁盘上。
7- 若是jvm GC没有调优好,会致使每次gc都须要1min。那么拉取的最大默认时间为3*5s=15s。就会致使频繁的不少文件拉取失败。会报shuffle output file lost。而后DAGScheduler会重试task和stage。最后甚至致使application挂掉。
以上观点基本都借鉴自:中华石杉--spark从入门到精通的观点。