1. 数据序列化html
默认使用的是Java自带的序列化机制。优势是能够处理全部实现了java.io.Serializable 的类。
可是Java 序列化比较慢。java
可使用Kryo序列化机制,一般比Java 序列化机制性能高10倍。可是并不支持全部实现了java.io.Serializable 的类。使用 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 开启Kryo序列化。不使用Kryo作为默认值的缘由是:须要注册自定义的类。例如:web
val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf)
注意:若是Object很大,须要在配置中增长 spark.kryoserializer.buffer 的值。若是没有在Kryo中注册自定义的类,Kryo也能正常工做,这些类会彻底地保存下来(等于没有序列化就进行传输或保存了),会形成资源浪费。apache
2. 内存调优api
能够考虑3个方面:(1)对象须要的总内存 (2)指向这些对象的指针 (3)GC数组
一般状况下,指针占用的空间将是原始数据的2~5倍。有如下几个缘由:数据结构
(1)Java对象的“object header”(对象头),包含了指向它的类的指针,占用16bytes。对于一些只有不多数据的object,16bytes要比对象自己占用的空间要多。oracle
(2)Java String 中在原始String数据的基础上有另外40bytes的开销(String的保存形式是Char的数组,而且有length的额外数据)。由于String内部使用UTF-16编码,每一个char 占用2个byte。所以10个字符的String,将会很轻易地占用60个bytes性能
(3)诸如HashMap,LinkedList 的集合类,使用链式结构,每一个entry(Map.Entry)都有一个包装类。这些类不只有“object header”,还有指向下一个对象的指针(一般是8个bytes)。优化
(4)基本类型的集合,一般会被包装成对象类型。
3. 内存管理
Spark中的内存使用主要有两类:执行内存和存储内存。执行内存是指shuffles, joins, sorts and aggregations计算时用到的内存。存储内存主要是指cache和集群间传播的内部数据用到的内存。执行内存和存储内存使用的是同一块区域。当没有计算执行时, 存储将得到全部这块区域的可用内存,反之亦然。执行比存储具备更高的获取内存的优先级,也就是说,若是内存不够时,存储会释放一部份内存给执行用,直到存储须要的最低的阀值。
有两个相关的配置,可是一般来讲,用户不须要改变其默认值。
(1) spark.memory.fraction 表示使用的Java 堆内存的比例。默认值0.6. 剩下的40%的内存用于:(a)存储用户数据、Spark内部元数据 (b)防止OOM
(2)spark.memory.storageFraction 表示上面所说的存储内存最少占用的比例。默认值 是0.5
4. 肯定内存消耗
最好的方式是生成一个RDD并cache,在web UI 中的 Storage 中查看占用了多少内存。
肯定一个指定object 占用内存的大小,可使用 SizeEstimator.estimate(obj) 方法。
5. 调整数据结构
减小内存消耗,首先应该避免使用基于指针的数据结构和包装对象等诸如此类的Java特性。有如下几种途径:
(1)数据结构优先使用对象数组和基本类型,尽可能不使用Java和scala里的集合类(如:HashMap)。可使用 fastutil (http://fastutil.di.unimi.it/) 提供的集合类和基本类型。
(2)尽可能避免使用有不少小对象和指针的内嵌结构
(3)考虑使用数字ID 和枚举类代替做为key的String
(4)若是内存小于32GB,在Spark-env.sh 里设置 -XX:+UseCompressedOops,这样指针使用4bytes 而不是8bytes
6. 序列化RDD 存储
当你的 object 仍然很大,简单的下降内存消耗的方法是使用序列化的存储方法。强烈建议使用kyro序列化机制。
7. 垃圾回收调优
垃圾回收的时间主要是花费在寻找那些再也不被引用的对象。所以它跟Java Object 的数量有关。咱们应该使用具备较少object的数据结构(如:使用array代替linkedList)。一种较好的方法是用序列化的形式持久化Object,这样每一个RDD partition 只有一个字节数组。
测量GC的影响:在Java option 中加入 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 后,可在worker 的 stdout 中找到GC的日志。
(1) 在任务完成以前,若是有屡次full GC,说明执行任务的内存不够
(2) 若是有屡次minor GC,可是 full GC 并很少,能够增大 Eden 区的大小
(3) 在GC的日志中,若是老年代快满了,减小 spark.memory.fraction 以下降cache所用的内存
(4) 尝试使用 G1 垃圾回收器(-XX:+UseG1GC)。若是堆比较大,应该增长 G1 区的大小(经过 -XX:G1HeapRegionSize 设置
)
(5) 若是任务是从HDFS上读数据,HDFS 块的大小为 128M,块解压后的大小通常为原始大小的2~3倍,若是要运行4个task,能够估算Eden区须要 4*3*128M。
8. 其它
(1)并行度。除非你手动每步都设置较高的并行度,不然,集群不会被最大化地利用。Spark会自动根据每一个文件的大小设置相应的task数量。对于诸如groupByKey,reduceByKey 等 reduce 操做,并行度为最大的父 RDD 的 partition 的数量。能够配置 spark.default.parallelism 设置默认的并行度。通常来说,建议一个CPU 运行 2~3个task。
(2)Reduce Task 的内存使用。有时候,发生OOM并非由于内存中放不下RDD,而是由于某个或几个task 分配的内存不够。例如:某个groupByKey 操做处理很大的数据集(由于数据倾斜的缘故)。 简单的解决方法是:设置较高的并行度。
(3)广播大的变量。 使用广播的功能能有效地减小序列化的 task 的大小和集群加载job的花消。若是你的task中须要使用一个来自driver的大的object(如:静态查询表),应该把它转化成广播变量。 Master端会打印序列化后的 task 的大小,一般若是大于20KB 的话,就值得去优化。
(4)数据本地性。数据本地性可分为如下几类:
(a) PROCESS_LOCAL 数据在运行代码的JVM中。
(b) NODE_LOCAL 数据和运行的代码在同一台机器上。如:当前节点上正好有HDFS的数据块。
(c) NO_PREF 数据能够较快获取,可是不在本地
(d) RACK_LOCAL 数据在同一 机架内,须要经过network获取
(e) Any 除上述外的数据
最好的状况就是 task 都运行在最好的数据本地性的环境,但一般不太可能。不少时候,某个executor 上的任务都完成了,而其它忙碌的机器上尚有未处理的data。Spark一般会等一段时间,以等待忙碌的机器空闲下来去处理数据(由于具备较高的本地性)。当超过这个等待时间后,空间的executor会把这些数据拉过来进行处理。每一个数据本地性级别对应的等待时间能够查看配置中的 spark.locality
部分。一般默认的配置工做得蛮好的。若是你的task运行时间较长,能够增长这些值。