不废话,直接进入正题!面试
在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD进行重复的计算,以下图所示:算法
对上图中的RDD计算架构进行修改,获得以下图所示的优化结果:数据库
获取到初始RDD后,应该考虑尽早地过滤掉不须要的数据,进而减小对内存的占用,从而提高Spark做业的运行效率。apache
本文首发于公众号:五分钟学大数据,欢迎围观!回复【书籍】便可得到上百本大数据书籍数组
当咱们将一个文本文件读取为 RDD 时,输入的每一行都会成为RDD的一个元素。缓存
也能够将多个完整的文本文件一次性读取为一个pairRDD,其中键是文件名,值是文件内容。网络
val input:RDD[String] = sc.textFile("dir/*.log")
若是传递目录,则将目录下的全部文件读取做为RDD。文件路径支持通配符。架构
可是这样对于大量的小文件读取效率并不高,应该使用 wholeTextFiles返回值为RDD[(String, String)],其中Key是文件的名称,Value是文件的内容。koa
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])
wholeTextFiles读取小文件:ide
val filesRDD: RDD[(String, String)] =
sc.wholeTextFiles("D:\\data\\files", minPartitions = 3)
val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
map(_....) 表示每个元素
mapPartitions(_....) 表示每一个分区的数据组成的迭代器
普通的map算子对RDD中的每个元素进行操做,而mapPartitions算子对RDD中每个分区进行操做。
若是是普通的map算子,假设一个partition有1万条数据,那么map算子中的function要执行1万次,也就是对每一个元素进行操做。
若是是mapPartition算子,因为一个task处理一个RDD的partition,那么一个task只会执行一次function,function一次接收全部的partition数据,效率比较高。
好比,当要把RDD中的全部数据经过JDBC写入数据,若是使用map算子,那么须要对RDD中的每个元素都建立一个数据库链接,这样对资源的消耗很大,若是使用mapPartitions算子,那么针对一个分区的数据,只须要创建一个数据库链接。
mapPartitions算子也存在一些缺点:对于普通的map操做,一次处理一条数据,若是在处理了2000条数据后内存不足,那么能够将已经处理完的2000条数据从内存中垃圾回收掉;可是若是使用mapPartitions算子,但数据量很是大时,function一次处理一个分区的数据,若是一旦内存不足,此时没法回收内存,就可能会OOM,即内存溢出。
所以,mapPartitions算子适用于数据量不是特别大的时候,此时使用mapPartitions算子对性能的提高效果仍是不错的。(当数据量很大的时候,一旦使用mapPartitions算子,就会直接OOM)
在项目中,应该首先估算一下RDD的数据量、每一个partition的数据量,以及分配给每一个Executor的内存资源,若是资源容许,能够考虑使用mapPartitions算子代替map。
rrd.foreache(_....) 表示每个元素
rrd.forPartitions(_....) 表示每一个分区的数据组成的迭代器
在生产环境中,一般使用foreachPartition算子来完成数据库的写入,经过foreachPartition算子的特性,能够优化写数据库的性能。
若是使用foreach算子完成数据库的操做,因为foreach算子是遍历RDD的每条数据,所以,每条数据都会创建一个数据库链接,这是对资源的极大浪费,所以,对于写数据库操做,咱们应当使用foreachPartition算子。
与mapPartitions算子很是类似,foreachPartition是将RDD的每一个分区做为遍历对象,一次处理一个分区的数据,也就是说,若是涉及数据库的相关操做,一个分区的数据只须要建立一次数据库链接,以下图所示:
使用了foreachPartition 算子后,能够得到如下的性能提高:
在生产环境中,所有都会使用foreachPartition算子完成数据库操做。foreachPartition算子存在一个问题,与mapPartitions算子相似,若是一个分区的数据量特别大,可能会形成OOM,即内存溢出。
在Spark任务中咱们常常会使用filter算子完成RDD中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,可是一旦进过filter过滤后,每一个分区的数据量有可能会存在较大差别,以下图所示:
根据上图咱们能够发现两个问题:
每一个partition的数据量变小了,若是还按照以前与partition相等的task个数去处理当前数据,有点浪费task的计算资源;
每一个partition的数据量不同,会致使后面的每一个task处理每一个partition数据的时候,每一个task要处理的数据量不一样,这颇有可能致使数据倾斜问题。
如上图所示,第二个分区的数据过滤后只剩100条,而第三个分区的数据过滤后剩下800条,在相同的处理逻辑下,第二个分区对应的task处理的数据量与第三个分区对应的task处理的数据量差距达到了8倍,这也会致使运行速度可能存在数倍的差距,这也就是数据倾斜问题。
针对上述的两个问题,咱们分别进行分析:
针对第一个问题,既然分区的数据量变小了,咱们但愿能够对分区数据进行从新分配,好比将原来4个分区的数据转化到2个分区中,这样只须要用后面的两个task进行处理便可,避免了资源的浪费。
针对第二个问题,解决方法和第一个问题的解决方法很是类似,对分区数据从新分配,让每一个partition中的数据量差很少,这就避免了数据倾斜问题。
那么具体应该如何实现上面的解决思路?咱们须要coalesce算子。
repartition与coalesce均可以用来进行重分区,其中repartition只是coalesce接口中shuffle为true的简易实现,coalesce默认状况下不进行shuffle,可是能够经过参数进行设置。
假设咱们但愿将本来的分区个数A经过从新分区变为B,那么有如下几种状况:
A > B(多数分区合并为少数分区)
A与B相差值不大
此时使用coalesce便可,无需shuffle过程。
A与B相差值很大
此时可使用coalesce而且不启用shuffle过程,可是会致使合并过程性能低下,因此推荐设置coalesce的第二个参数为true,即启动shuffle过程。
A < B(少数分区分解为多数分区)
此时使用repartition便可,若是使用coalesce须要将shuffle设置为true,不然coalesce无效。
咱们能够在filter操做以后,使用coalesce算子针对每一个partition的数据量各不相同的状况,压缩partition的数量,并且让每一个partition的数据量尽可能均匀紧凑,以便于后面的task进行计算操做,在某种程度上可以在必定程度上提高性能。
注意:local模式是进程内模拟集群运行,已经对并行度和分区数量有了必定的内部优化,所以不用去设置并行度和分区数量。
Spark做业中的并行度指各个stage的task的数量。
若是并行度设置不合理而致使并行度太低,会致使资源的极大浪费,例如,20个Executor,每一个Executor分配3个CPU core,而Spark做业有40个task,这样每一个Executor分配到的task个数是2个,这就使得每一个Executor有一个CPU core空闲,致使资源的浪费。
理想的并行度设置,应该是让并行度与资源相匹配,简单来讲就是在资源容许的前提下,并行度要设置的尽量大,达到能够充分利用集群资源。合理的设置并行度,能够提高整个Spark做业的性能和运行速度。
Spark官方推荐,task数量应该设置为Spark做业总CPU core数量的2~3倍。之因此没有推荐task数量与CPU core总数相等,是由于task的执行时间不一样,有的task执行速度快而有的task执行速度慢,若是task数量与CPU core总数相等,那么执行快的task执行完成后,会出现CPU core空闲的状况。若是task数量设置为CPU core总数的2~3倍,那么一个task执行完毕后,CPU core会马上执行下一个task,下降了资源的浪费,同时提高了Spark做业运行的效率。
Spark做业并行度的设置以下:
val conf = new SparkConf().set("spark.default.parallelism", "500")
原则:让 cpu 的 Core(cpu 核心数) 充分利用起来, 若有100个 Core,那么并行度能够设置为200~300。
咱们知道 Spark 中有并行度的调节策略,可是,并行度的设置对于Spark SQL是不生效的,用户设置的并行度只对于Spark SQL之外的全部Spark的stage生效。
Spark SQL的并行度不容许用户本身指定,Spark SQL本身会默认根据hive表对应的HDFS文件的split个数自动设置Spark SQL所在的那个stage的并行度,用户本身通 spark.default.parallelism 参数指定的并行度,只会在没Spark SQL的stage中生效。
因为Spark SQL所在stage的并行度没法手动设置,若是数据量较大,而且此stage中后续的transformation操做有着复杂的业务逻辑,而Spark SQL自动设置的task数量不多,这就意味着每一个task要处理为数很多的数据量,而后还要执行很是复杂的处理逻辑,这就可能表现为第一个有Spark SQL的stage速度很慢,然后续的没有Spark SQL的stage运行速度很是快。
为了解决Spark SQL没法设置并行度和task数量的问题,咱们可使用repartition算子。
repartition 算子使用先后对比图以下:
Spark SQL这一步的并行度和task数量确定是没有办法去改变了,可是,对于Spark SQL查询出来的RDD,当即使用repartition算子,去从新进行分区,这样能够从新分区为多个partition,从repartition以后的RDD操做,因为再也不涉及Spark SQL,所以stage的并行度就会等于你手动设置的值,这样就避免了Spark SQL所在的stage只能用少许的task去处理大量数据并执行复杂的算法逻辑。使用repartition算子的先后对好比上图所示。
reduceByKey相较于普通的shuffle操做一个显著的特色就是会进行map端的本地聚合,map端会先对本地的数据进行combine操做,而后将数据写入给下个stage的每一个task建立的文件中,也就是在map端,对每个key对应的value,执行reduceByKey算子函数。
reduceByKey算子的执行过程以下图所示:
使用reduceByKey对性能的提高以下:
基于reduceByKey的本地聚合特征,咱们应该考虑使用reduceByKey代替其余的shuffle算子,例如groupByKey。
groupByKey与reduceByKey的运行原理以下图1和图2所示:
根据上图可知,groupByKey不会进行map端的聚合,而是将全部map端的数据shuffle到reduce端,而后在reduce端进行数据的聚合操做。因为reduceByKey有map端聚合的特性,使得网络传输的数据量减少,所以效率要明显高于groupByKey。
Spark持久化在大部分状况下是没有问题的,可是有时数据可能会丢失,若是数据一旦丢失,就须要对丢失的数据从新进行计算,计算完后再缓存和使用,为了不数据的丢失,能够选择对这个RDD进行checkpoint,也就是将数据持久化一份到容错的文件系统上(好比HDFS)。
一个RDD缓存并checkpoint后,若是一旦发现缓存丢失,就会优先查看checkpoint数据存不存在,若是有,就会使用checkpoint数据,而不用从新计算。也便是说,checkpoint能够视为cache的保障机制,若是cache失败,就使用checkpoint的数据。
使用checkpoint的优势在于提升了Spark做业的可靠性,一旦缓存出现问题,不须要从新计算数据,缺点在于,checkpoint时须要将数据写入HDFS等文件系统,对性能的消耗较大。
持久化设置以下:
sc.setCheckpointDir(‘HDFS’)
rdd.cache/persist(memory_and_disk)
rdd.checkpoint
默认状况下,task中的算子中若是使用了外部的变量,每一个task都会获取一份变量的复本,这就形成了内存的极大消耗。一方面,若是后续对RDD进行持久化,可能就没法将RDD数据存入内存,只能写入磁盘,磁盘IO将会严重消耗性能;另外一方面,task在建立对象的时候,也许会发现堆内存没法存放新建立的对象,这就会致使频繁的GC,GC会致使工做线程中止,进而致使Spark暂停工做一段时间,严重影响Spark性能。
假设当前任务配置了20个Executor,指定500个task,有一个20M的变量被全部task共用,此时会在500个task中产生500个副本,耗费集群10G的内存,若是使用了广播变量, 那么每一个Executor保存一个副本,一共消耗400M内存,内存消耗减小了5倍。
广播变量在每一个Executor保存一个副本,此Executor的全部task共用此广播变量,这让变量产生的副本数量大大减小。
在初始阶段,广播变量只在Driver中有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在本身本地的Executor对应的BlockManager中尝试获取变量,若是本地没有,BlockManager就会从Driver或者其余节点的BlockManager上远程拉取变量的复本,并由本地的BlockManager进行管理;以后此Executor的全部task都会直接从本地的BlockManager中获取变量。
对于多个Task可能会共用的数据能够广播到每一个Executor上:
val 广播变量名= sc.broadcast(会被各个Task用到的变量,即须要广播的变量)
广播变量名.value//获取广播变量
默认状况下,Spark使用Java的序列化机制。Java的序列化机制使用方便,不须要额外的配置,在算子中使用的变量实现Serializable接口便可,可是,Java序列化机制的效率不高,序列化速度慢而且序列化后的数据所占用的空间依然较大。
Spark官方宣称Kryo序列化机制比Java序列化机制性能提升10倍左右,Spark之因此没有默认使用Kryo做为序列化类库,是由于它不支持全部对象的序列化,同时Kryo须要用户在使用前注册须要序列化的类型,不够方便,但从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs 已经默认使用Kryo序列化方式了。
Kryo序列化注册方式的代码以下:
public class MyKryoRegistrator implements KryoRegistrator{
@Override
public void registerClasses(Kryo kryo){
kryo.register(StartupReportLogs.class);
}
}
配置Kryo序列化方式的代码以下:
//建立SparkConf对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化库
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在Kryo序列化库中注册自定义的类集合
conf.set("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator");
本文首发于公众号:五分钟学大数据,回复【666】便可得到全套大数据笔面试教程