当你开始编写 Apache Spark 代码或者浏览公开的 API 的时候,你会遇到各类各样术语,好比transformation,action,RDD(resilient distributed dataset) 等等。 了解到这些是编写 Spark 代码的基础。 一样,当你任务开始失败或者你须要透过web界面去了解本身的应用为什么如此费时的时候,你须要去了解一些新的名词: job, stage, task。对于这些新术语的理解有助于编写良好 Spark 代码。这里的良好主要指更快的 Spark 程序。对于 Spark 底层的执行模型的了解对于写出效率更高的 Spark 程序很是有帮助。html
一个 Spark 应用包括一个 driver 进程和若干个分布在集群的各个节点上的 executor 进程。node
driver 主要负责调度一些高层次的任务流(flow of work)。exectuor 负责执行这些任务,这些任务以 task 的形式存在, 同时存储用户设置须要caching的数据。 task 和全部的 executor 的生命周期为整个程序的运行过程(若是使用了dynamic resource allocation 时可能不是这样的)。如何调度这些进程是经过集群管理应用完成的(好比YARN,Mesos,Spark Standalone),可是任何一个 Spark 程序都会包含一个 driver 和多个 executor 进程。程序员
在执行层次结构的最上方是一系列 Job。调用一个Spark内部的 action 会产生一个 Spark job 来完成它。 为了肯定这些job实际的内容,Spark 检查 RDD 的DAG再计算出执行 plan 。这个 plan 以最远端的 RDD 为起点(最远端指的是对外没有依赖的 RDD 或者 数据已经缓存下来的 RDD),产生结果 RDD 的 action 为结束 。web
执行的 plan 由一系列 stage 组成,stage 是 job 的 transformation 的组合,stage 对应于一系列 task, task 指的对于不一样的数据集执行的相同代码。每一个 stage 包含不须要 shuffle 数据的 transformation 的序列。shell
什么决定数据是否须要 shuffle ?RDD 包含固定数目的 partition, 每一个 partiton 包含若干的 record。对于那些经过narrow tansformation(好比 map 和 filter)返回的 RDD,一个 partition 中的 record 只须要从父 RDD 对应的partition 中的 record 计算获得。每一个对象只依赖于父 RDD 的一个对象。有些操做(好比 coalesce)可能致使一个 task处理多个输入 partition ,可是这种 transformation 仍然被认为是 narrow 的,由于用于计算的多个输入 record 始终是来自有限个数的 partition。apache
然而 Spark 也支持须要 wide 依赖的 transformation,好比 groupByKey,reduceByKey。在这种依赖中,计算获得一个 partition 中的数据须要从父 RDD 中的多个 partition 中读取数据。全部拥有相同 key 的元组最终会被聚合到同一个partition 中,被同一个 stage 处理。为了完成这种操做, Spark须要对数据进行 shuffle,意味着数据须要在集群内传递,最终生成由新的 partition 集合组成的新的 stage。api
举例,如下的代码中,只有一个 action 以及 从一个文本串下来的一系列 RDD, 这些代码就只有一个 stage,由于没有哪一个操做须要从不一样的 partition 里面读取数据。缓存
sc.textFile("someFile.txt").
map(mapFunc).
flatMap(flatMapFunc).
filter(filterFunc).
count()
跟上面的代码不一样,下面一段代码须要统计总共出现超过1000次的单词:网络
val tokenized = sc.textFile(args(0)).flatMap(_.split(' ')) val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _) val filtered = wordCounts.filter(_._2 >= 1000) val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)). reduceByKey(_ + _) charCounts.collect()
这段代码能够分红三个 stage。recudeByKey 操做是各 stage 之间的分界,由于计算 recudeByKey 的输出须要按照能够从新分配 partition。数据结构
这里还有一个更加复杂的 transfromation 图,包含一个有多路依赖的 join transformation。
粉红色的框框展现了运行时使用的 stage 图。
运行到每一个 stage 的边界时,数据在父 stage 中按照 task 写到磁盘上,而在子 stage 中经过网络按照 task 去读取数据。这些操做会致使很重的网络以及磁盘的I/O,因此 stage 的边界是很是占资源的,在编写 Spark 程序的时候须要尽可能避免的。父 stage 中 partition 个数与子 stage 的 partition 个数可能不一样,因此那些产生 stage 边界的 transformation 经常须要接受一个 numPartition 的参数来以为子 stage 中的数据将被切分为多少个 partition。
正如在调试 MapReduce 是选择 reducor 的个数是一项很是重要的参数,调整在 stage 边届时的 partition 个数常常能够很大程度上影响程序的执行效率。咱们会在后面的章节中讨论如何调整这些值。
当须要使用 Spark 完成某项功能时,程序员须要从不一样的 action 和 transformation 中选择不一样的方案以得到相同的结果。可是不一样的方案,最后执行的效率可能有云泥之别。回避常见的陷阱选择正确的方案可使得最后的表现有巨大的不一样。一些规则和深刻的理解能够帮助你作出更好的选择。
在最新的 Spark5097 文档中开始稳定 SchemaRDD(也就是 Spark 1.3 开始支持的DataFrame),这将为使用 Spark 核心API的程序员打开 Spark的 Catalyst optimizer,容许 Spark 在使用 Operator 时作出更加高级的选择。当 SchemaRDD稳定以后,某些决定将不须要用户去考虑了。
选择 Operator 方案的主要目标是减小 shuffle 的次数以及被 shuffle 的文件的大小。由于 shuffle 是最耗资源的操做,因此有 shuffle 的数据都须要写到磁盘而且经过网络传递。repartition,join,cogroup,以及任何 *By 或者 *ByKey 的transformation 都须要 shuffle 数据。不是全部这些 Operator 都是平等的,可是有些常见的性能陷阱是须要注意的。
rdd.map(kv => (kv._1, new Set[String]() + kv._2)) .reduceByKey(_ ++ _)
这段代码生成了无数的非必须的对象,由于每一个须要为每一个 record 新建一个 Set。这里使用 aggregateByKey 更加适合,由于这个操做是在 map 阶段作聚合。
val zero = new collection.mutable.Set[String]() rdd.aggregateByKey(zero)( (set, v) => set += v, (set1, set2) => set1 ++= set2)
固然了解在哪些 transformation 上不会发生 shuffle 也是很是重要的。当前一个 transformation 已经用相同的patitioner 把数据分 patition 了,Spark知道如何避免 shuffle。参考一下代码:
rdd1 = someRdd.reduceByKey(...) rdd2 = someOtherRdd.reduceByKey(...) rdd3 = rdd1.join(rdd2)
由于没有 partitioner 传递给 reduceByKey,因此系统使用默认的 partitioner,因此 rdd1 和 rdd2 都会使用 hash 进行分 partition。代码中的两个 reduceByKey 会发生两次 shuffle 。若是 RDD 包含相同个数的 partition, join 的时候将不会发生额外的 shuffle。由于这里的 RDD 使用相同的 hash 方式进行 partition,因此所有 RDD 中同一个 partition 中的 key的集合都是相同的。所以,rdd3中一个 partiton 的输出只依赖rdd2和rdd1的同一个对应的 partition,因此第三次shuffle 是没必要要的。
举个例子说,当 someRdd 有4个 partition, someOtherRdd 有两个 partition,两个 reduceByKey 都使用3个partiton,全部的 task 会按照以下的方式执行:
若是 rdd1 和 rdd2 在 reduceByKey 时使用不一样的 partitioner 或者使用相同的 partitioner 可是 partition 的个数不一样的状况,那么只有一个 RDD (partiton 数更少的那个)须要从新 shuffle。
相同的 tansformation,相同的输入,不一样的 partition 个数:
当两个数据集须要 join 的时候,避免 shuffle 的一个方法是使用 broadcast variables。若是一个数据集小到可以塞进一个executor 的内存中,那么它就能够在 driver 中写入到一个 hash table中,而后 broadcast 到全部的 executor 中。而后map transformation 能够引用这个 hash table 做查询。
尽量减小 shuffle 的准则也有例外的场合。若是额外的 shuffle 可以增长并发那么这也可以提升性能。好比当你的数据保存在几个没有切分过的大文件中时,那么使用 InputFormat 产生分 partition 可能会致使每一个 partiton 中汇集了大量的record,若是 partition 不够,致使没有启动足够的并发。在这种状况下,咱们须要在数据载入以后使用 repartiton (会致使shuffle)提升 partiton 的个数,这样可以充分使用集群的CPU。
另一种例外状况是在使用 recude 或者 aggregate action 汇集数据到 driver 时,若是数据把不少 partititon 个数的数据,单进程执行的 driver merge 全部 partition 的输出时很容易成为计算的瓶颈。为了缓解 driver 的计算压力,可使用reduceByKey 或者 aggregateByKey 执行分布式的 aggregate 操做把数据分布到更少的 partition 上。每一个 partition中的数据并行的进行 merge,再把 merge 的结果发个 driver 以进行最后一轮 aggregation。查看 treeReduce 和treeAggregate 查看如何这么使用的例子。
这个技巧在已经按照 Key 汇集的数据集上格外有效,好比当一个应用是须要统计一个语料库中每一个单词出现的次数,而且把结果输出到一个map中。一个实现的方式是使用 aggregation,在每一个 partition 中本地计算一个 map,而后在 driver中把各个 partition 中计算的 map merge 起来。另外一种方式是经过 aggregateByKey 把 merge 的操做分布到各个partiton 中计算,而后在简单地经过 collectAsMap 把结果输出到 driver 中。
还有一个重要的技能是了解接口 repartitionAndSortWithinPartitions transformation。这是一个听起来很晦涩的transformation,可是却能涵盖各类奇怪状况下的排序,这个 transformation 把排序推迟到 shuffle 操做中,这使大量的数据有效的输出,排序操做能够和其余操做合并。
举例说,Apache Hive on Spark 在join的实现中,使用了这个 transformation 。并且这个操做在 secondary sort 模式中扮演着相当重要的角色。secondary sort 模式是指用户指望数据按照 key 分组,而且但愿按照特定的顺序遍历 value。使用 repartitionAndSortWithinPartitions 再加上一部分用户的额外的工做能够实现 secondary sort。
Spark 的用户邮件邮件列表中常常会出现 “我有一个500个节点的集群,为何可是个人应用一次只有两个 task 在执行”,鉴于 Spark 控制资源使用的参数的数量,这些问题不该该出现。可是在本章中,你将学会压榨出你集群的每一分资源。推荐的配置将根据不一样的集群管理系统( YARN、Mesos、Spark Standalone)而有所不一样,咱们将主要集中在YARN 上,由于这个 Cloudera 推荐的方式。
咱们先看一下在 YARN 上运行 Spark 的一些背景。查看以前的博文:点击这里查看
Spark(以及YARN) 须要关心的两项主要的资源是 CPU 和 内存, 磁盘 和 IO 固然也影响着 Spark 的性能,可是无论是 Spark 仍是 Yarn 目前都无法对他们作实时有效的管理。
在一个 Spark 应用中,每一个 Spark executor 拥有固定个数的 core 以及固定大小的堆大小。core 的个数能够在执行 spark-submit 或者 pyspark 或者 spark-shell 时,经过参数 --executor-cores 指定,或者在 spark-defaults.conf 配置文件或者 SparkConf 对象中设置 spark.executor.cores 参数。一样地,堆的大小能够经过 --executor-memory 参数或者 spark.executor.memory 配置项。core 配置项控制一个 executor 中task的并发数。 --executor-cores 5 意味着每一个executor 中最多同时能够有5个 task 运行。memory 参数影响 Spark 能够缓存的数据的大小,也就是在 groupaggregate 以及 join 操做时 shuffle 的数据结构的最大值。
--num-executors 命令行参数或者spark.executor.instances 配置项控制须要的 executor 个数。从 CDH 5.4/Spark 1.3 开始,你能够避免使用这个参数,只要你经过设置 spark.dynamicAllocation.enabled 参数打开 动态分配 。动态分配可使的 Spark 的应用在有后续积压的在等待的 task 时请求 executor,而且在空闲时释放这些 executor。
同时 Spark 需求的资源如何跟 YARN 中可用的资源配合也是须要着重考虑的,YARN 相关的参数有:
请求5个 core 会生成向 YARN 要5个虚拟core的请求。从 YARN 请求内存相对比较复杂由于如下的一些缘由:
下面展现的是 Spark on YARN 内存结构:
若是这些还不够决定Spark executor 个数,还有一些概念还须要考虑的:
为了让以上的这些更加具体一点,这里有一个实际使用过的配置的例子,能够彻底用满整个集群的资源。假设一个集群有6个节点有NodeManager在上面运行,每一个节点有16个core以及64GB的内存。那么 NodeManager的容量:yarn.nodemanager.resource.memory-mb 和 yarn.nodemanager.resource.cpu-vcores 能够设为 63 * 1024 = 64512 (MB) 和 15。咱们避免使用 100% 的 YARN container 资源由于还要为 OS 和 hadoop 的 Daemon 留一部分资源。在上面的场景中,咱们预留了1个core和1G的内存给这些进程。Cloudera Manager 会自动计算而且配置。
因此看起来咱们最早想到的配置会是这样的:--num-executors 6 --executor-cores 15 --executor-memory 63G。可是这个配置可能没法达到咱们的需求,由于:
- 63GB+ 的 executor memory 塞不进只有 63GB 容量的 NodeManager;
- 应用的 master 也须要占用一个core,意味着在某个节点上,没有15个core给 executor 使用;
- 15个core会影响 HDFS IO的吞吐量。
配置成 --num-executors 17 --executor-cores 5 --executor-memory 19G 可能会效果更好,由于:
- 这个配置会在每一个节点上生成3个 executor,除了应用的master运行的机器,这台机器上只会运行2个 executor
- --executor-memory 被分红3份(63G/每一个节点3个executor)=21。 21 * (1 - 0.07) ~ 19。
咱们知道 Spark 是一套数据并行处理的引擎。可是 Spark 并非神奇得可以将全部计算并行化,它没办法从全部的并行化方案中找出最优的那个。每一个 Spark stage 中包含若干个 task,每一个 task 串行地处理数据。在调试 Spark 的job时,task的个数多是决定程序性能的最重要的参数。
那么这个数字是由什么决定的呢?在以前的博文中介绍了 Spark 如何将 RDD 转换成一组 stage。task 的个数与 stage 中上一个 RDD 的 partition 个数相同。而一个 RDD 的 partition 个数与被它依赖的 RDD 的 partition 个数相同,除了如下的状况: coalesce transformation 能够建立一个具备更少 partition 个数的 RDD,union transformation 产出的 RDD的 partition 个数是它父 RDD 的 partition 个数之和, cartesian 返回的 RDD 的 partition 个数是它们的积。
若是一个 RDD 没有父 RDD 呢? 由 textFile 或者 hadoopFile 生成的 RDD 的 partition 个数由它们底层使用的 MapReduce InputFormat 决定的。通常状况下,每读到的一个 HDFS block 会生成一个 partition。经过 parallelize 接口生成的 RDD 的 partition 个数由用户指定,若是用户没有指定则由参数 spark.default.parallelism 决定。
要想知道 partition 的个数,能够经过接口 rdd.partitions().size() 得到。
这里最须要关心的问题在于 task 的个数过小。若是运行时 task 的个数比实际可用的 slot 还少,那么程序解无法使用到全部的 CPU 资源。
过少的 task 个数可能会致使在一些汇集操做时, 每一个 task 的内存压力会很大。任何 join,cogroup,*ByKey 操做都会在内存生成一个 hash-map或者 buffer 用于分组或者排序。join, cogroup ,groupByKey 会在 shuffle 时在 fetching 端使用这些数据结构, reduceByKey ,aggregateByKey 会在 shuffle 时在两端都会使用这些数据结构。
当须要进行这个汇集操做的 record 不能彻底轻易塞进内存中时,一些问题会暴露出来。首先,在内存 hold 大量这些数据结构的 record 会增长 GC的压力,可能会致使流程停顿下来。其次,若是数据不能彻底载入内存,Spark 会将这些数据写到磁盘,这会引发磁盘 IO和排序。在 Cloudera 的用户中,这多是致使 Spark Job 慢的首要缘由。
那么如何增长你的 partition 的个数呢?若是你的问题 stage 是从 Hadoop 读取数据,你能够作如下的选项:
- 使用 repartition 选项,会引起 shuffle;
- 配置 InputFormat 用户将文件分得更小;
- 写入 HDFS 文件时使用更小的block。
若是问题 stage 从其余 stage 中得到输入,引起 stage 边界的操做会接受一个 numPartitions 的参数,好比
val rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X)
X 应该取什么值?最直接的方法就是作实验。不停的将 partition 的个数从上次实验的 partition 个数乘以1.5,直到性能再也不提高为止。
同时也有一些原则用于计算 X,可是也不是很是的有效是由于有些参数是很难计算的。这里写到不是由于它们很实用,而是能够帮助理解。这里主要的目标是启动足够的 task 可使得每一个 task 接受的数据可以都塞进它所分配到的内存中。
每一个 task 可用的内存经过这个公式计算:spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/spark.executor.cores 。 memoryFraction 和 safetyFractio 默认值分别 0.2 和 0.8.
在内存中全部 shuffle 数据的大小很难肯定。最可行的是找出一个 stage 运行的 Shuffle Spill(memory) 和 Shuffle Spill(Disk) 之间的比例。在用全部shuffle 写乘以这个比例。可是若是这个 stage 是 reduce 时,可能会有点复杂:
在往上增长一点由于大多数状况下 partition 的个数会比较多。
试试在,在有所疑虑的时候,使用更多的 task 数(也就是 partition 数)都会效果更好,这与 MapRecuce 中建议 task 数目选择尽可能保守的建议相反。这个由于 MapReduce 在启动 task 时相比须要更大的代价。
Spark 的数据流由一组 record 构成。一个 record 有两种表达形式:一种是反序列化的 Java 对象另一种是序列化的二进制形式。一般状况下,Spark 对内存中的 record 使用反序列化以后的形式,对要存到磁盘上或者须要经过网络传输的record 使用序列化以后的形式。也有计划在内存中存储序列化以后的 record。
spark.serializer 控制这两种形式之间的转换的方式。Kryo serializer,org.apache.spark.serializer.KryoSerializer 是推荐的选择。但不幸的是它不是默认的配置,由于 KryoSerializer 在早期的 Spark 版本中不稳定,而 Spark 不想打破版本的兼容性,因此没有把 KryoSerializer 做为默认配置,可是 KryoSerializer 应该在任何状况下都是第一的选择。
你的 record 在这两种形式切换的频率对于 Spark 应用的运行效率具备很大的影响。去检查一下处处传递数据的类型,看看可否挤出一点水分是很是值得一试的。
过多的反序列化以后的 record 可能会致使数据处处到磁盘上更加频繁,也使得可以 Cache 在内存中的 record 个数减小。点击这里查看如何压缩这些数据。
过多的序列化以后的 record 致使更多的 磁盘和网络 IO,一样的也会使得可以 Cache 在内存中的 record 个数减小,这里主要的解决方案是把全部的用户自定义的 class 都经过 SparkConf#registerKryoClasses 的API定义和传递的。
任什么时候候你均可以决定你的数据如何保持在磁盘上,使用可扩展的二进制格式好比:Avro,Parquet,Thrift或者Protobuf,从中选择一种。当人们在谈论在Hadoop上使用Avro,Thrift或者Protobuf时,都是认为每一个 record 保持成一个 Avro/Thrift/Protobuf 结构保存成 sequence file。而不是JSON。
每次当时试图使用JSON存储大量数据时,仍是先放弃吧...
原文地址:
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/