Spark 架构

本文转之Pivotal的一个工程师的博客。以为极好。node

 
做者本人常常在StackOverflow上回答一个关系Spark架构的问题,发现整个互联网都没有一篇文章能对Spark整体架构进行很好的描述,做者可怜咱们这些菜鸟,写了这篇文章,太感动了。 本文读者须要必定的Spark的基础知识,至少了解Spark的RDD和DAG。
Spark-Architecture-Official.png (607×284)
上图引入了不少术语:"Executor","Task","Cache","Worker Node"等等,当我开始学习Spark的时候,这几乎是整个互联网上惟一一张关于Spark架构的图了,我我的以为该图缺失了一些很重要的概念或者是描述的
 
任何Spark的进程都是一个JVM进程,既然是一个JVM进程,那么就能够配置它的堆大小(-Xmx和-Xms),可是进程怎么使用堆内存和为何须要它呢?下面是一个JVM堆空间下Spark的内存分配状况
Spark-Heap-Usage.png
默认状况下,Spark进程的堆空间是512mb,为了安全考虑同时避免OOM,Spark只容许利用90%的堆空间,spark中使用spark.storage.safetyFraction用来配置该值(默认是0.9). Spark做为一个内存计算工具,Spark能够在内存中存储数据,若是读过 http://0x0fff.com/spark-misconceptions/ 就会明白Spark不是一个真的内存工具,它只是把内存做为他的LRU缓存,这样大量的内存被用来缓存正在计算的数据,该部分占safe堆的60%,Spark使用spark.storage.memoryFraction控制该值,若是想知道Spark中能缓存了多少数据,能够统计全部Executor的堆大小,乘上safeFraction和memoryFraction,默认是54%,这就是Spark可用缓存数据使用的堆大小,
 
该部分介绍shuffle的内存使用状况,它经过 堆大小 * spark.shuffle.safetyFraction * spark.shuffle.memoryFraction。  spark.shuffle.safetyFraction的默认值是0.8,  spark.shuffle.memoryFraction的默认值是0.2,因此最终只能最多使堆空间的16%用于shuffle,关于怎么使用这块内存,参考https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala ,可是一般spark会使用这块内存用于shuffle中一些别的任务,当执行shuffle时,有时对数据进行排序,当进行排序时,须要缓冲排完序后的数据(注意不能改变LRU缓冲中的数据,由于后面可能要重用),这样就须要大量的RAM存储排完序后的数据块,当没有足够的内存用于排序,参考外排的实现,能够一块一块的排序,而后最终合并。
 
最后要讲到的一块内存是"unroll",该快内存用于unroll计算以下:spark.storage.unrollFraction * spark.storage.memoryFraction * spark.storage.safetyFraction 。当咱们须要在内存展开数据块的时候使用,那么为何须要展开呢?由于spark容许以序列化和非序列化两种方式存储数据,序列化后的数据没法直接使用,因此使用时必需要展开。该部份内存占用缓存的内存,因此若是须要内存用于展开数据时,若是这个时候内存不够,那么Spark LRU缓存中的数据会删除一些快。
 
此时应该清楚知道spark怎么使用JVM中堆内存了,如今切换到集群模式,当你启动一个spark集群,如何看待它,下图是YARN模式下的
Spark-Architecture-On-YARN.png
当运行在yarn集群上时,Yarn的ResourceMananger用来管理集群资源,集群上每一个节点上的NodeManager用来管控所在节点的资源,从yarn的角度来看,每一个节点看作可分配的资源池,当向ResourceManager请求资源时,它返回一些NodeManager信息,这些NodeManager将会提供execution container给你,每一个 execution container就是知足请求的堆大小的JVM进程,JVM进程的位置是由ResourceMananger管理的,不能本身控制,若是一个节点有64GB的内存被yarn管理(经过yarn.nodemanager.resource.memory-mb配置),当请求10个4G内存的executors时,这些 executors可能运行在同一个节点上。
 
当在yarn上启动spark集群上,能够指定executors的数量(-num-executors或者spark.executor.instances),能够指定每一个executor使用的内存(-executor-memory或者spark.executor.memory),能够指定每一个executor使用的cpu核数(-executor-cores或者spark.executor.cores),指定每一个task执行使用的core数(spark.task.cpus),也能够指定driver应用使用的内存(-driver-memory和spark.driver.memory)
 
当在集群上执行应用时,job会被切分红stages,每一个stage切分红task,每一个task单独调度,能够把executor的jvm进程看作task执行池,每一个executor有 spark.executor.cores / spark.task.cpus execution 个执行槽,这里有个例子:集群有12个节点运行Yarn的NodeManager,每一个节点有64G内存和32的cpu核,每一个节点能够启动2个executor,每一个executor的使用26G内存,剩下的内用系统和别的服务使用,每一个executor有12个cpu核用于执行task,这样整个集群有12 machines * 2 executors per machine * 12 cores per executor / 1 core = 288 个task执行槽,这意味着spark集群能够同时跑288个task,整个集群用户缓存数据的内存有0.9 spark.storage.safetyFraction * 0.6 spark.storage.memoryFraction * 12 machines * 2 executors per machine * 26 GB per executor = 336.96 GB.
 
到目前为止,咱们已经了解了spark怎么使用JVM的内存以及集群上执行槽是什么,目前为止尚未谈到task的一些细节,这将在另外一个文章中提升,基本上就是spark的一个工做单元,做为exector的jvm进程中的一个线程执行,这也是为何spark的job启动时间快的缘由,在jvm中启动一个线程比启动一个单独的jvm进程块(在hadoop中执行mapreduce应用会启动多个jvm进程)
 
下面将关注spark的另外一个抽象:partition, spark处理的全部数据都会切分红partion,一个parition是什么以及怎么肯定,partition的大小彻底依赖数据源,spark中大部分用于读取数据的方法均可以指定生成的RDD中partition的个数,当从hdfs上读取一个文件时,会使用Hadoop的InputFormat来处理,默认状况下InputFormat返回每一个InputSplit都会映射RDD中的一个Partition,大部分存储在HDFS上的文件每一个数据块会生成一个InputSplit,每一个数据块大小为64mb和128mb,由于HDFS上面的数据的块边界是按字节来算的(64mb一个块),可是当数据被处理是,它又要按记录进行切分,对于文本文件来讲切分的字符就是换行符,对于sequence文件来讲,他是块结束,若是是压缩文件,整个文件都被压缩了,它不能按行进行切分了,整个文件只有一个inputsplit,这样spark中也会只有一个parition,在处理的时候须要手动的repatition。
 
 
 
 
 



相关文章
相关标签/搜索