大数据计算平台Spark内核全面解读

1Spark介绍html

Spark是起源于美国加州大学伯克利分校AMPLab的大数据计算平台,在2010年开源,目前是Apache软件基金会的顶级项目。随着Spark在大数据计算领域的暂露头角,愈来愈多的企业开始关注和使用。201411月,SparkDaytona Gray Sort 100TB Benchmark竞赛中打破了由Hadoop MapReduce保持的排序记录。Spark利用1/10的节点数,100TB数据的排序时间从72分钟提升到了23分钟算法

Spark在架构上包括内核部分和4个官方子模块--Spark SQLSpark Streaming、机器学习库MLlib和图计算库GraphX。图1所示为Spark在伯克利的数据分析软件栈BDASBerkeley Data Analytics Stack)中的位置。可见Spark专一于数据的计算,而数据的存储在生产环境中每每仍是由Hadoop分布式文件系统HDFS承担。shell

1 SparkBDAS中的位置 apache

Spark被设计成支持多场景的通用大数据计算平台,它能够解决大数据计算中的批处理,交互查询及流式计算等核心问题。Spark能够从多数据源的读取数据,而且拥有不断发展的机器学习库和图计算库供开发者使用。数据和计算在Spark内核及Spark的子模块中是打通的,这就意味着Spark内核和子模块之间成为一个总体。Spark的各个子模块以Spark内核为基础,进一步支持更多的计算场景,例如使用Spark SQL读入的数据能够做为机器学习库MLlib的输入。表1列举了一些在Spark平台上的计算场景。数组

1 Spark的应用场景举例缓存

在本文写做是,Spark的最新版本为1.2.0,文中的示例代码也来自于这个版本。网络

2Spark内核介绍 架构

相信大数据工程师都很是了解Hadoop MapReduce一个最大的问题是在不少应用场景中速度很是慢,只适合离线的计算任务。这是因为MapReduce须要将任务划分红mapreduce两个阶段,map阶段产生的中间结果要写回磁盘,而在这两个阶段之间须要进行shuffle操做。Shuffle操做须要从网络中的各个节点进行数据拷贝,使其每每成为最为耗时的步骤,这也是Hadoop MapReduce慢的根本缘由之一,大量的时间耗费在网络磁盘IO中而不是用于计算。在一些特定的计算场景中,例如像逻辑回归这样的迭代式的计算,MapReduce的弊端会显得更加明显。机器学习

Spark是若是设计分布式计算的呢?首先咱们须要理解Spark中最重要的概念--弹性分布数据集(Resilient Distributed Dataset),也就是RDD。 分布式

2.1 弹性分布数据集RDD

RDDSpark中对数据和计算的抽象,是Spark中最核心的概念,它表示已被分片(partition),不可变的并可以被并行操做的数据集合。对RDD的操做分为两种transformationactionTransformation操做是经过转换从一个或多个RDD生成新的RDDAction操做是从RDD生成最后的计算结果。在Spark最新的版本中,提供丰富的transformationaction操做,比起MapReduce计算模型中仅有的两种操做,会大大简化程序开发的难度。

RDD的生成方式只有两种,一是从数据源读入,另外一种就是从其它RDD经过transformation操做转换。一个典型的Spark程序就是经过Spark上下文环境(SparkContext)生成一个或多个RDD,在这些RDD上经过一系列的transformation操做生成最终的RDD,最后经过调用最终RDDaction方法输出结果。

每一个RDD均可以用下面5个特性来表示,其中后两个为可选的:

  • 分片列表(数据块列表)

  • 计算每一个分片的函数

  • 对父RDD的依赖列表

  • key-value类型的RDD的分片器(Partitioner)(可选)

  • 每一个数据分片的预约义地址列表(如HDFS上的数据块的地址)(可选)

虽然Spark是基于内存的计算,但RDD不光能够存储在内存中,根据useDiskuseMemoryuseOffHeap, deserializedreplication五个参数的组合Spark提供了12种存储级别,在后面介绍RDD的容错机制时,咱们会进一步理解。值得注意的是当StorageLevel设置成OFF_HEAP时,RDD实际被保存到Tachyon中。Tachyon是一个基于内存的分布式文件系统,目前正在快速发展,本文不作详细介绍,能够经过其官方网站进一步了解。

  1. class StorageLevel private(

  2.     private var _useDisk: Boolean,

  3.     private var _useMemory: Boolean,

  4.     private var _useOffHeap: Boolean,

  5.     private var _deserialized: Boolean

  6.     private var _replication: Int = 1)

  7.   extends Externalizable { //… }

  8.  

  9. val NONE = new StorageLevel(false, false, false, false)

  10.   val DISK_ONLY = new StorageLevel(true, false, false, false)

  11.   val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)

  12.   val MEMORY_ONLY = new StorageLevel(false, true, false, true)

  13.   val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

  14.   val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)

  15.   val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

  16.   val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)

  17.   val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)

  18.   val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)

  19.   val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

  20.   val OFF_HEAP = new StorageLevel(false, false, true, false)

2.2 DAGStage与任务的生成

Spark的计算发生在RDDaction操做,而对action以前的全部transformationSpark只是记录下RDD生成的轨迹,而不会触发真正的计算。

Spark内核会在须要计算发生的时刻绘制一张关于计算路径的有向无环图,也就是DAG。举个例子,在图2中,从输入中逻辑上生成AC两个RDD,通过一系列transformation操做,逻辑上生成了F,注意,咱们说的是逻辑上,由于这时候计算没有发生,Spark内核作的事情只是记录了RDD的生成和依赖关系。当F要进行输出时,也就是F进行了action操做,Spark会根据RDD的依赖生成DAG,并从起点开始真正的计算。

2 逻辑上的计算过程:DAG 

有了计算的DAG图,Spark内核下一步的任务就是根据DAG图将计算划分红任务集,也就是Stage,这样能够将任务提交到计算节点进行真正的计算。Spark计算的中间结果默认是保存在内存中的,Spark在划分Stage的时候会充分考虑在分布式计算中可流水线计算(pipeline)的部分来提升计算的效率,而在这个过程当中,主要的根据就是RDD的依赖类型。根据不一样的transformation操做,RDD的依赖能够分为窄依赖(Narrow Dependency)和宽依赖(Wide Dependency,在代码中为ShuffleDependency)两种类型。窄依赖指的是生成的RDD中每一个partition只依赖于父RDD(s) 固定的partition。宽依赖指的是生成的RDD的每个partition都依赖于父 RDD(s) 全部partition。窄依赖典型的操做有map, filter, union等,宽依赖典型的操做有groupByKey, sortByKey等。能够看到,宽依赖每每意味着shuffle操做,这也是Spark划分stage的主要边界。对于窄依赖,Spark会将其尽可能划分在同一个stage中,由于它们能够进行流水线计算。

3 RDD的宽依赖和窄依赖

咱们再经过图4详细解释一下Spark中的Stage划分。咱们从HDFS中读入数据生成3个不一样的RDD,经过一系列transformation操做后再将计算结果保存回HDFS。能够看到这幅DAG中只有join操做是一个宽依赖,Spark内核会以此为边界将其先后划分红不一样的Stage. 同时咱们能够注意到,在图中Stage2中,从mapunion都是窄依赖,这两步操做能够造成一个流水线操做,经过map操做生成的partition能够不用等待整个RDD计算结束,而是继续进行union操做,这样大大提升了计算的效率。

4 Spark中的Stage划分 

Spark在运行时会把Stage包装成任务提交,有父StageSpark会先提交父Stage。弄清楚了Spark划分计算的原理,咱们再结合源码看一看这其中的过程。下面的代码是DAGScheduler中的获得一个RDDStage的函数,能够看到宽依赖为划分Stage的边界。

  1. /**

  2.    * Get or create the list of parent stages for a given RDD. The stages will be assigned the

  3.    * provided jobId if they haven't already been created with a lower jobId.

  4.    */

  5.  

  6.   private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {

  7.     val parents = new HashSet[Stage]

  8.     val visited = new HashSet[RDD[_]]

  9.     // We are manually maintaining a stack here to prevent StackOverflowError

  10.     // caused by recursively visiting

  11.     val waitingForVisit = new Stack[RDD[_]]

  12.     def visit(r: RDD[_]) {

  13.       if (!visited(r)) {

  14.         visited += r

  15.         // Kind of ugly: need to register RDDs with the cache here since

  16.         // we can't do it in its constructor because # of partitions is unknown

  17.         for (dep <- r.dependencies) {

  18.           dep match {

  19.             case shufDep: ShuffleDependency[_, _, _] =>

  20.               parents += getShuffleMapStage(shufDep, jobId)

  21.             case _ =>

  22.               waitingForVisit.push(dep.rdd)

  23.           }

  24.         }

  25.       }

  26.     }

  27.  

  28.     waitingForVisit.push(rdd)

  29.     while (!waitingForVisit.isEmpty) {

  30.       visit(waitingForVisit.pop())

  31.     }

  32.     parents.toList

  33.   }

上面提到Spark的计算是从RDD调用action操做时候触发的,咱们来看一个action的代码

RDDcollect方法是一个action操做,做用是将RDD中的数据返回到一个数组中。能够看到,在此action中,会触发Spark上下文环境SparkContext中的runJob方法,这是一系列计算的起点。

  1. abstract class RDD[T: ClassTag](

  2.     @transient private var sc: SparkContext,

  3.     @transient private var deps: Seq[Dependency[_]]

  4.   ) extends Serializable with Logging {

  5.   //….

  6. /**

  7.    * Return an array that contains all of the elements in this RDD.

  8.    */

  9.   def collect(): Array[T] = {

  10.     val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

  11.     Array.concat(results: _*)

  12.   }

  13. }

SparkContext拥有DAGScheduler的实例,在runJob方法中会进一步调用DAGSchedulerrunJob方法。在此时,DAGScheduler会生成DAGStage,将Stage提交给TaskSchedulerTaskSchdulerStage包装成TaskSet,发送到Worker节点进行真正的计算,同时还要监测任务状态,重试失败和长时间无返回的任务。整个过程如图5所示。

 

5 Spark中任务的生成 

2.3 RDD的缓存与容错

上文提到,Spark的计算是从action开始触发的,若是在action操做以前逻辑上不少transformation操做,一旦中间发生计算失败,Spark会从新提交任务,这在不少场景中代价过大。还有一些场景,若有些迭代算法,计算的中间结果会被重复使用,重复计算一样增长计算时间和形成资源浪费。所以,在提升计算效率和更好支持容错,Spark提供了基于RDDcache机制和checkpoint机制。

咱们能够经过RDDtoDebugString来查看其递归的依赖信息,图6展现了在spark shell中经过调用这个函数来查看wordCount RDD的依赖关系,也就是它的Lineage.

6 RDD wordCountlineage 

若是发现Lineage过长或者里面有被屡次重复使用的RDD,咱们就能够考虑使用cache机制或checkpoint机制了。

咱们能够经过在程序中直接调用RDDcache方法将其保存在内存中,这样这个RDD就能够被多个任务共享,避免重复计算。另外,RDD还提供了更为灵活的persist方法,能够指定存储级别。从源码中能够看到RDD.cache就是简单的调用了RDD.persist(StorageLevel.MEMORY_ONLY)

  1. /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */

  2.   def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  3.   def cache(): this.type = persist()

一样,咱们能够调用RDDcheckpoint方法将其保存到磁盘。咱们须要在SparkContext中设置checkpoint的目录,不然调用会抛出异常。值得注意的是,在调用checkpoint以前建议先调用cache方法将RDD放入内存,不然将RDD保存到文件的时候须要从新计算。 

  1.   /**

  2.    * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint

  3.    * directory set with SparkContext.setCheckpointDir() and all references to its parent

  4.    * RDDs will be removed. This function must be called before any job has been

  5.    * executed on this RDD. It is strongly recommended that this RDD is persisted in

  6.    * memory, otherwise saving it on a file will require recomputation.

  7.    */

  8.   def checkpoint() {

  9.     if (context.checkpointDir.isEmpty) {

  10.       throw new SparkException("Checkpoint directory has not been set in the SparkContext")

  11.     } else if (checkpointData.isEmpty) {

  12.       checkpointData = Some(new RDDCheckpointData(this))

  13.       checkpointData.get.markForCheckpoint()

  14.     }

  15.   }

Cache机制和checkpoint机制的差异在于cacheRDD保存到内存,并保留Lineage,若是缓存失效RDD还能够经过Lineage重建。而checkpointRDD落地到磁盘并切断Lineage,由文件系统保证其重建。

2.4 Spark任务的部署

Spark的集群部署分为StandaloneMesosYarn三种模式,咱们以Standalone模式为例,简单介绍Spark程序的部署。如图7示,集群中的Spark程序运行时分为3种角色,driver, masterworkerslave)。在集群启动前,首先要配置masterworker节点。启动集群后,worker节点会向master节点注册本身,master节点会维护worker节点的心跳。Spark程序都须要先建立Spark上下文环境,也就是SparkContext。建立SparkContext的进程就成为了driver角色,上一节提到的DAGSchedulerTaskScheduler都在driver中运行。Spark程序在提交时要指定master的地址,这样能够在程序启动时向master申请worker的计算资源。Drivermasterworker之间的通讯由Akka支持。Akka 也使用 Scala 编写,用于构建可容错的、高可伸缩性的Actor 模型应用。关于Akka,能够访问其官方网站进行进一步了解,本文不作详细介绍。

7 Spark任务部署

3、更深一步了解Spark内核

了解了Spark内核的基本概念和实现后,更深一步理解其工做原理的最好方法就是阅读源码。最新的Spark源码能够从Spark官方网站下载。源码推荐使用IntelliJ IDEA阅读,会自动安装Scala插件。读者能够从core工程,也就是Spark内核工程开始阅读,更能够设置断点尝试跟踪一个任务的执行。另外,读者还能够经过分析Spark的日志来进一步理解Spark的运行机制,Spark使用log4j记录日志,能够在启动集群前修改log4j的配置文件来配置日志输出和格式。

【编辑推荐】

  1. Spark:利用Eclipse构建Spark集成开发环境

  2. Spark实战:单节点本地模式搭建Spark运行环境

  3. Spark:为大数据处理点亮一盏明灯

  4. 专访Spark亚太研究院王家林:从技术的角度探索Spark

  5. StormSpark:谁才是咱们的实时处理利器

相关文章
相关标签/搜索