当第一次对RDD2执行算子,获取RDD3的时候,就会从RDD1开始计算,就是读取HDFS文件,而后对RDD1执行算子,获取到RDD2,而后再计算,获得RDD3数组
默认状况下,屡次对一个RDD执行算子,去获取不一样的RDD;都会对这个RDD以及以前的父RDD,所有从新计算一次;读取HDFS->RDD1->RDD2-RDD4
这种状况,是绝对绝对,必定要避免的,一旦出现一个RDD重复计算的状况,就会致使性能急剧下降。缓存
好比,HDFS->RDD1-RDD2的时间是15分钟,那么此时就要走两遍,变成30分钟网络
另一种状况,从一个RDD到几个不一样的RDD,算子和计算逻辑实际上是彻底同样的,结果由于人为的疏忽,计算了屡次,获取到了多个RDD。session
因此,建议采用如下方法能够优化:数据结构
第一,RDD架构重构与优化
尽可能去复用RDD,差很少的RDD,能够抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。架构
第二,公共RDD必定要实现持久化
持久化,也就是说,将RDD的数据缓存到内存中/磁盘中,(BlockManager),之后不管对这个RDD作多少次计算,那么都是直接取这个RDD的持久化的数据,好比从内存中或者磁盘中,直接提取一份数据。
第三,持久化,是能够进行序列化的
若是正常将数据持久化在内存中,那么可能会致使内存的占用过大,这样的话,也许,会致使OOM内存溢出。
当纯内存没法支撑公共RDD数据彻底存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每一个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减小内存的空间占用。
序列化的方式,惟一的缺点就是,在获取数据的时候,须要反序列化。
若是序列化纯内存方式,仍是致使OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。内存+磁盘,序列化。
第四,为了数据的高可靠性,并且内存充足,可使用双副本机制,进行持久化
持久化的双副本机制,持久化后的一个副本,由于机器宕机了,副本丢了,就仍是得从新计算一次;持久化的每一个数据单元,存储一份副本,放在其余节点上面;从而进行容错;一个副本丢了,不用从新计算,还可使用另一份副本。这种方式,仅仅针对你的内存资源极度充足。
sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());
/**
* 持久化,很简单,就是对RDD调用persist()方法,并传入一个持久化级别
*
* 若是是persist(StorageLevel.MEMORY_ONLY()),纯内存,无序列化,那么就能够用cache()方法来替代
* StorageLevel.MEMORY_ONLY_SER(),第二选择
* StorageLevel.MEMORY_AND_DISK(),第三选择
* StorageLevel.MEMORY_AND_DISK_SER(),第四选择
* StorageLevel.DISK_ONLY(),第五选择
*
* 若是内存充足,要使用双副本高可靠机制
* 选择后缀带_2的策略
* StorageLevel.MEMORY_ONLY_2()
*
*/
sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());
广播变量,
其实就是SparkContext的broadcast()方法,传入你要广播的变量,便可
final Broadcast<Map<String, Map<String, IntList>>> broadcast = sc.broadcast(fastutilDateHourExtractMap);
使用广播变量的时候,直接调用广播变量(Broadcast类型)的value() / getValue() ,能够获取到以前封装的广播变量
Map<String, Map<String, IntList>> dateHourExtractMap = broadcast .value();
像随机抽取的map,1M,举例。还算小的。若是你是从哪一个表里面读取了一些维度数据,比方说,全部商品品类的信息,在某个算子函数中要使用到。100M。1000个task。100G的数据,网络传输。集群瞬间由于这个缘由消耗掉100G的内存。
这种默认的,task执行的算子中,使用了外部的变量,每一个task都会获取一份变量的副本,有什么缺点呢?在什么状况下,会出现性能上的恶劣的影响呢?map,自己是不小,存放数据的一个单位是Entry,还有可能会用链表的格式的来存放Entry链条。因此map是比较消耗内存的数据格式。
好比,map是1M。总共,你前面调优都调的特好,资源给的到位,配合着资源,并行度调节的绝对到位,1000个task。大量task的确都在并行运行。这些task里面都用到了占用1M内存的map,那么首先,map会拷贝1000份副本,经过网络传输到各个task中去,给task使用。总计有1G的数据,会经过网络传输。网络传输的开销,不容乐观啊!!!网络传输,也许就会消耗掉你的spark做业运行的总时间的一小部分。
map副本,传输到了各个task上以后,是要占用内存的。1个map的确不大,1M;1000个map分布在你的集群中,一会儿就耗费掉1G的内存。对性能会有什么影响呢?没必要要的内存的消耗和占用,就致使了,你在进行RDD持久化到内存,也许就无法彻底在内存中放下;就只能写入磁盘,最后致使后续的操做在磁盘IO上消耗性能;
你的task在建立对象的时候,也许会发现堆内存放不下全部对象,也许就会致使频繁的垃圾回收器的回收,GC。GC的时候,必定是会致使工做线程中止,也就是致使Spark暂停工做那么一点时间。频繁GC的话,对Spark做业的运行的速度会有至关可观的影响。
若是说,task使用大变量(1m~100m),明知道会致使性能出现恶劣的影响。那么咱们怎么来解决呢?
广播,Broadcast,将大变量广播出去。而不是直接使用。
广播变量的好处,不是每一个task一份变量副本,而是变成每一个节点的executor才一份副本。这样的话,就可让变量产生的副本大大减小。
广播变量,初始的时候,就在Drvier上有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在本身本地的Executor对应的
BlockManager中,尝试获取变量副本;若是本地没有,BlockManager,也许会从远程的Driver上面去获取变量副本;也有可能从距离比较近的其余
节点的Executor的BlockManager上去获取,并保存在本地的BlockManager中;BlockManager负责管理某个Executor对应的内存和磁盘上的数据,
此后这个executor上的task,都会直接使用本地的BlockManager中的副本。
好比,50个executor,1000个task。一个map,10M:
默认状况下,1000个task,1000份副本。10G的数据,网络传输,在集群中,耗费10G的内存资源。
若是使用了广播变量。50个execurtor,50个副本。500M的数据,网络传输,并且不必定都是从Driver传输到每一个节点,还多是就近从最近的
节点的executor的bockmanager上拉取变量副本,网络传输速度大大增长;500M的内存消耗。
在SparkConf中设置一个属性,spark.serializer,org.apache.spark.serializer.KryoSerializer类;注册你使用到的,须要经过Kryo序列化的,
一些自定义类,SparkConf.registerKryoClasses()
SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
Kryo之因此没有被做为默认的序列化类库的缘由,就要出现了:主要是由于Kryo要求,若是要达到它的最佳性能的话,那么就必定要注册你自定义的类(好比,你的算子函数中使用到了外部自定义类型的对象变量,这时,就要求必须注册你的类,不然Kryo达不到最佳性能)。
当使用了序列化的持久化级别时,在将每一个RDD partition序列化成一个大的字节数组时,就会使用Kryo进一步优化序列化的效率和性能。默认状况下,Spark内部是使用Java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化。
这种默认序列化机制的好处在于,处理起来比较方便;也不须要咱们手动去作什么事情,只是,你在算子里面使用的变量,必须是实现Serializable接口的,可序列化便可。可是缺点在于,默认的序列化机制的效率不高,序列化的速度比较慢;序列化之后的数据,占用的内存空间相对仍是比较大。
Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10
。因此Kryo序列化优化之后,可让网络传输的数据变少;在集群中耗费的内存资源大大减小。在进行stage间的task的shuffle操做时,节点与节点之
间的task会互相大量经过网络拉取和传输文件,此时,这些数据既然经过网络传输,也是可能要序列化的,就会使用Kryo。
Kryo序列化机制,一旦启用之后,会生效的几个地方:
一、算子函数中使用到的外部变量,使用Kryo之后:优化网络传输的性能,能够优化集群中内存的占用和消耗
二、持久化RDD,StorageLevel.MEMORY_ONLY_SER优化内存的占用和消耗;持久化RDD占用的内存越少,task执行的时候,建立的对象,就
不至于频繁的占满内存,频繁发生GC。
三、shuffle:能够优化网络传输的性能。
本地化级别
PROCESS_LOCAL:进程本地化,代码和数据在同一个进程中,也就是在同一个executor中;计算数据的task由executor执行,数据在executor的BlockManager中;性能最好
NODE_LOCAL:节点本地化,代码和数据在同一个节点中;好比说,数据做为一个HDFS block块,就在节点上,而task在节点上某个executor中运行;或者是,数据和task在一个节点上的不一样executor中;数据须要在进程间进行传输
NO_PREF:对于task来讲,数据从哪里获取都同样,没有好坏之分
RACK_LOCAL:机架本地化,数据和task在一个机架的两个节点上;数据须要经过网络在节点之间进行传输
ANY:数据和task可能在集群中的任何地方,并且不在一个机架中,性能最差
Spark.locality.wait,默认是3s
Spark在Driver上,对Application的每个stage的task,进行分配以前,都会计算出每一个task要计算的是哪一个分片数据,RDD的某个partition;Spark的task分配算法,优先,会但愿每一个task正好分配到它要计算的数据所在的节点,这样的话,就不用在网络间传输数据;
可是可能task没有机会分配到它的数据所在的节点,由于可能那个节点的计算资源和计算能力都满了;因此呢,这种时候,一般来讲,Spark会等待一段时间,默认状况下是3s钟(不是绝对的,还有不少种状况,对不一样的本地化级别,都会去等待),到最后,实在是等待不了了,就会选择一个比较差的本地化级别,好比说,将task分配到靠它要计算的数据所在节点,比较近的一个节点,而后进行计算。
可是对于第二种状况,一般来讲,确定是要发生数据传输,task会经过其所在节点的BlockManager来获取数据,BlockManager发现本身本地没有数据,会经过一个getRemote()方法,经过TransferService(网络数据传输组件)从数据所在节点的BlockManager中,获取数据,经过网络传输回task所在节点。
对于咱们来讲,固然不但愿是相似于第二种状况的了。最好的,固然是task和数据在一个节点上,直接从本地executor的BlockManager中获取数据,纯内存,或者带一点磁盘IO;若是要经过网络传输数据的话,那么实在是,性能确定会降低的,大量网络传输,以及磁盘IO,都是性能的杀手。
时候要调节这个参数?
观察日志,spark做业的运行日志,推荐你们在测试的时候,先用client模式,在本地就直接能够看到比较全的日志。
日志里面会显示,starting task。。。,PROCESS LOCAL、NODE LOCAL,观察大部分task的数据本地化级别。
若是大多都是PROCESS_LOCAL,那就不用调节了
若是是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长
调节完,应该是要反复调节,每次调节完之后,再来运行,观察日志
看看大部分的task的本地化级别有没有提高;看看,整个spark做业的运行时间有没有缩短
可是注意别本末倒置,本地化级别却是提高了,可是由于大量的等待时长,spark做业的运行时间反而增长了,那就仍是不要调节了。
spark.locality.wait,默认是3s;能够改为6s,10s
默认状况下,下面3个的等待时长,都是跟上面那个是同样的,都是3sspark.locality.wait.processspark.locality.wait.nodespark.locality.wait.racknew SparkConf() .set("spark.locality.wait", "10")