spark记录(5)Spark运行流程及在不一样集群中的运行过程

摘自:https://www.cnblogs.com/qingyunzong/p/8945933.htmlhtml

1、Spark中的基本概念

(1)Application:表示你的应用程序node

(2)Driver:表示main()函数,建立SparkContext。由SparkContext负责与ClusterManager通讯,进行资源的申请,任务的分配和监控等。程序执行完毕后关闭SparkContextweb

(3)Executor:某个Application运行在Worker节点上的一个进程,该进程负责运行某些task,而且负责将数据存在内存或者磁盘上。在Spark on Yarn模式下,其进程名称为 CoarseGrainedExecutor Backend,一个CoarseGrainedExecutor Backend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task,这样,每一个CoarseGrainedExecutorBackend能并行运行Task的数据就取决于分配给它的CPU的个数。算法

(4)Worker:集群中能够运行Application代码的节点。在Standalone模式中指的是经过slave文件配置的worker节点,在Spark on Yarn模式中指的就是NodeManager节点。shell

(5)Task:在Executor进程中执行任务的工做单元,多个Task组成一个Stageapache

(6)Job:包含多个Task组成的并行计算,是由Action行为触发的安全

(7)Stage:每一个Job会被拆分不少组Task,做为一个TaskSet,其名称为Stage网络

(8)DAGScheduler:根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler,其划分Stage的依据是RDD之间的依赖关系多线程

(9)TaskScheduler:将TaskSet提交给Worker(集群)运行,每一个Executor运行什么Task就是在此处分配的。架构

2、Spark的运行流程

2.1 Spark的基本运行流程

一、说明

(1)构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(能够是Standalone、Mesos或YARN)注册并申请运行Executor资源;

(2)资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行状况将随着心跳发送到资源管理器上;

(3)SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task

(4)Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor。

(5)Task在Executor上运行,运行完毕释放全部资源。

二、图解

三、Spark运行架构特色

(1)每一个Application获取专属的executor进程,该进程在Application期间一直驻留,并以多线程方式运行tasks。这种Application隔离机制有其优点的,不管是从调度角度看(每一个Driver调度它本身的任务),仍是从运行角度看(来自不一样Application的Task运行在不一样的JVM中)。固然,这也意味着Spark Application不能跨应用程序共享数据,除非将数据写入到外部存储系统。

(2)Spark与资源管理器无关,只要可以获取executor进程,并能保持相互通讯就能够了。

(3)提交SparkContext的Client应该靠近Worker节点(运行Executor的节点),最好是在同一个Rack里,由于Spark Application运行过程当中SparkContext和Executor之间有大量的信息交换;若是想在远程集群中运行,最好使用RPC将SparkContext提交给集群,不要远离Worker运行SparkContext。

(4)Task采用了数据本地性和推测执行的优化机制。数据本地性是尽可能将计算移到数据所在的节点上进行,即“计算向数据靠拢”,由于移动计算比移动数据所占的网络资源要少得多。并且,Spark采用了延时调度机制,能够在更大的程度上实现执行过程优化。好比,拥有数据的节点当前正被其余的任务占用,那么,在这种状况下是否须要将数据移动到其余的空闲节点呢?答案是不必定。由于,若是通过预测发现当前节点结束当前任务的时间要比移动数据的时间还要少,那么,调度就会等待,直到当前节点可用。

四、DAGScheduler

Job=多个stage,Stage=多个同种task, Task分为ShuffleMapTask和ResultTask,Dependency分为ShuffleDependency和NarrowDependency

面向stage的切分,切分依据为宽依赖

维护waiting jobs和active jobs,维护waiting stages、active stages和failed stages,以及与jobs的映射关系

主要职能:

一、接收提交Job的主入口,submitJob(rdd, ...)runJob(rdd, ...)。在SparkContext里会调用这两个方法。 

  • 生成一个Stage并提交,接着判断Stage是否有父Stage未完成,如有,提交并等待父Stage,以此类推。结果是:DAGScheduler里增长了一些waiting stage和一个running stage。
  • running stage提交后,分析stage里Task的类型,生成一个Task描述,即TaskSet。
  • 调用TaskScheduler.submitTask(taskSet, ...)方法,把Task描述提交给TaskScheduler。TaskScheduler依据资源量和触发分配条件,会为这个TaskSet分配资源并触发执行。
  • DAGScheduler提交job后,异步返回JobWaiter对象,可以返回job运行状态,可以cancel job,执行成功后会处理并返回结果

二、处理TaskCompletionEvent 

  • 若是task执行成功,对应的stage里减去这个task,作一些计数工做: 
    • 若是task是ResultTask,计数器Accumulator加一,在job里为该task置true,job finish总数加一。加完后若是finish数目与partition数目相等,说明这个stage完成了,标记stage完成,从running stages里减去这个stage,作一些stage移除的清理工做
    • 若是task是ShuffleMapTask,计数器Accumulator加一,在stage里加上一个output location,里面是一个MapStatus类。MapStatusShuffleMapTask执行完成的返回,包含location信息和block size(能够选择压缩或未压缩)。同时检查该stage完成,向MapOutputTracker注册本stage里的shuffleId和location信息。而后检查stage的output location里是否存在空,若存在空,说明一些task失败了,整个stage从新提交;不然,继续从waiting stages里提交下一个须要作的stage
  • 若是task是重提交,对应的stage里增长这个task
  • 若是task是fetch失败,立刻标记对应的stage完成,从running stages里减去。若是不容许retry,abort整个stage;不然,从新提交整个stage。另外,把这个fetch相关的location和map任务信息,从stage里剔除,从MapOutputTracker注销掉。最后,若是此次fetch的blockManagerId对象不为空,作一次ExecutorLost处理,下次shuffle会换在另外一个executor上去执行。
  • 其余task状态会由TaskScheduler处理,如Exception, TaskResultLost, commitDenied等。

三、其余与job相关的操做还包括:cancel job, cancel stage, resubmit failed stage等

其余职能:

 cacheLocations 和 preferLocation

五、TaskScheduler

维护task和executor对应关系,executor和物理资源对应关系,在排队的task和正在跑的task。

内部维护一个任务队列,根据FIFO或Fair策略,调度任务。

TaskScheduler自己是个接口,spark里只实现了一个TaskSchedulerImpl,理论上任务调度能够定制。

主要功能:

一、submitTasks(taskSet),接收DAGScheduler提交来的tasks 

  • 为tasks建立一个TaskSetManager,添加到任务队列里。TaskSetManager跟踪每一个task的执行情况,维护了task的许多具体信息。
  • 触发一次资源的索要。 
    • 首先,TaskScheduler对照手头的可用资源和Task队列,进行executor分配(考虑优先级、本地化等策略),符合条件的executor会被分配给TaskSetManager
    • 而后,获得的Task描述交给SchedulerBackend,调用launchTask(tasks),触发executor上task的执行。task描述被序列化后发给executor,executor提取task信息,调用task的run()方法执行计算。

二、cancelTasks(stageId),取消一个stage的tasks 

  • 调用SchedulerBackendkillTask(taskId, executorId, ...)方法。taskId和executorId在TaskScheduler里一直维护着。

三、resourceOffer(offers: Seq[Workers]),这是很是重要的一个方法,调用者是SchedulerBacnend,用途是底层资源SchedulerBackend把空余的workers资源交给TaskScheduler,让其根据调度策略为排队的任务分配合理的cpu和内存资源,而后把任务描述列表传回给SchedulerBackend 

  • 从worker offers里,搜集executor和host的对应关系、active executors、机架信息等等
  • worker offers资源列表进行随机洗牌,任务队列里的任务列表依据调度策略进行一次排序
  • 遍历每一个taskSet,按照进程本地化、worker本地化、机器本地化、机架本地化的优先级顺序,为每一个taskSet提供可用的cpu核数,看是否知足 
    • 默认一个task须要一个cpu,设置参数为"spark.task.cpus=1"
    • 为taskSet分配资源,校验是否知足的逻辑,最终在TaskSetManagerresourceOffer(execId, host, maxLocality)方法里
    • 知足的话,会生成最终的任务描述,而且调用DAGSchedulertaskStarted(task, info)方法,通知DAGScheduler,这时候每次会触发DAGScheduler作一次submitMissingStage的尝试,即stage的tasks都分配到了资源的话,立刻会被提交执行

四、statusUpdate(taskId, taskState, data),另外一个很是重要的方法,调用者是SchedulerBacnend,用途是SchedulerBacnend会将task执行的状态汇报给TaskScheduler作一些决定 

  • TaskLost,找到该task对应的executor,从active executor里移除,避免这个executor被分配到其余task继续失败下去。
  • task finish包括四种状态:finished, killed, failed, lost。只有finished是成功执行完成了。其余三种是失败。
  • task成功执行完,调用TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data),不然调用TaskResultGetter.enqueueFailedTask(taskSet, tid, state, data)TaskResultGetter内部维护了一个线程池,负责异步fetch task执行结果并反序列化。默认开四个线程作这件事,可配参数"spark.resultGetter.threads"=4

 TaskResultGetter取task result的逻辑

一、对于success task,若是taskResult里的数据是直接结果数据,直接把data反序列出来获得结果;若是不是,会调用blockManager.getRemoteBytes(blockId)从远程获取。若是远程取回的数据是空的,那么会调用TaskScheduler.handleFailedTask,告诉它这个任务是完成了的可是数据是丢失的。不然,取到数据以后会通知BlockManagerMaster移除这个block信息,调用TaskScheduler.handleSuccessfulTask,告诉它这个任务是执行成功的,而且把result data传回去。

二、对于failed task,从data里解析出fail的理由,调用TaskScheduler.handleFailedTask,告诉它这个任务失败了,理由是什么。

六、SchedulerBackend

TaskScheduler下层,用于对接不一样的资源管理系统,SchedulerBackend是个接口,须要实现的主要方法以下:

复制代码
def start(): Unit
def stop(): Unit
def reviveOffers(): Unit // 重要方法:SchedulerBackend把本身手头上的可用资源交给TaskScheduler,TaskScheduler根据调度策略分配给排队的任务吗,返回一批可执行的任务描述,SchedulerBackend负责launchTask,即最终把task塞到了executor模型上,executor里的线程池会执行task的run()
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
    throw new UnsupportedOperationException
复制代码

粗粒度:进程常驻的模式,典型表明是standalone模式,mesos粗粒度模式,yarn

细粒度:mesos细粒度模式

这里讨论粗粒度模式,更好理解:CoarseGrainedSchedulerBackend

维护executor相关信息(包括executor的地址、通讯端口、host、总核数,剩余核数),手头上executor有多少被注册使用了,有多少剩余,总共还有多少核是空的等等。

主要职能

一、Driver端主要经过actor监听和处理下面这些事件: 

  • RegisterExecutor(executorId, hostPort, cores, logUrls)。这是executor添加的来源,一般worker拉起、重启会触发executor的注册。CoarseGrainedSchedulerBackend把这些executor维护起来,更新内部的资源信息,好比总核数增长。最后调用一次makeOffer(),即把手头资源丢给TaskScheduler去分配一次,返回任务描述回来,把任务launch起来。这个makeOffer()的调用会出如今任何与资源变化相关的事件中,下面会看到。
  • StatusUpdate(executorId, taskId, state, data)。task的状态回调。首先,调用TaskScheduler.statusUpdate上报上去。而后,判断这个task是否执行结束了,结束了的话把executor上的freeCore加回去,调用一次makeOffer()
  • ReviveOffers。这个事件就是别人直接向SchedulerBackend请求资源,直接调用makeOffer()
  • KillTask(taskId, executorId, interruptThread)。这个killTask的事件,会被发送给executor的actor,executor会处理KillTask这个事件。
  • StopExecutors。通知每个executor,处理StopExecutor事件。
  • RemoveExecutor(executorId, reason)。从维护信息中,那这堆executor涉及的资源数减掉,而后调用TaskScheduler.executorLost()方法,通知上层我这边有一批资源不能用了,你处理下吧。TaskScheduler会继续把executorLost的事件上报给DAGScheduler,缘由是DAGScheduler关心shuffle任务的output location。DAGScheduler会告诉BlockManager这个executor不可用了,移走它,而后把全部的stage的shuffleOutput信息都遍历一遍,移走这个executor,而且把更新后的shuffleOutput信息注册到MapOutputTracker上,最后清理下本地的CachedLocationsMap。

二、reviveOffers()方法的实现。直接调用了makeOffers()方法,获得一批可执行的任务描述,调用launchTasks

三、launchTasks(tasks: Seq[Seq[TaskDescription]])方法。 

  • 遍历每一个task描述,序列化成二进制,而后发送给每一个对应的executor这个任务信息 
    • 若是这个二进制信息太大,超过了9.2M(默认的akkaFrameSize 10M 减去 默认 为akka留空的200K),会出错,abort整个taskSet,并打印提醒增大akka frame size
    • 若是二进制数据大小可接受,发送给executor的actor,处理LaunchTask(serializedTask)事件。

 七、Executor

Executor是spark里的进程模型,能够套用到不一样的资源管理系统上,与SchedulerBackend配合使用。

内部有个线程池,有个running tasks map,有个actor,接收上面提到的由SchedulerBackend发来的事件。

事件处理

    1. launchTask。根据task描述,生成一个TaskRunner线程,丢尽running tasks map里,用线程池执行这个TaskRunner
    2. killTask。从running tasks map里拿出线程对象,调它的kill方法。

3、Spark在不一样集群中的运行架构

Spark注重创建良好的生态系统,它不只支持多种外部文件存储系统,提供了多种多样的集群运行模式。部署在单台机器上时,既能够用本地(Local)模式运行,也可使用伪分布式模式来运行;当以分布式集群部署的时候,能够根据本身集群的实际状况选择Standalone模式(Spark自带的模式)、YARN-Client模式或者YARN-Cluster模式。Spark的各类运行模式虽然在启动方式、运行位置、调度策略上各有不一样,但它们的目的基本都是一致的,就是在合适的位置安全可靠的根据用户的配置和Job的须要运行和管理Task。

3.1 Spark on Standalone运行过程

3.1.1 Standalone-Client

Standalone-client模式是Spark实现的资源调度框架,其主要的节点有Client节点、Master节点和Worker节点。其中Driver既能够运行在Master节点上中,也能够运行在本地Client端。当用spark-shell交互式工具提交Spark的Job时,Driver在Master节点上运行;当使用spark-submit工具提交Job或者在Eclips、IDEA等开发平台上使用”new SparkConf().setMaster(“spark://master:7077”)”方式运行Spark任务时,Driver是运行在本地Client端上的。

运行过程文字说明

一、咱们提交一个任务,任务就叫Application
二、初始化程序的入口SparkContext, 
  2.1 初始化DAG Scheduler
  2.2 初始化Task Scheduler
三、Task Scheduler向master去进行注册并申请资源(CPU Core和Memory)
四、Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪一个Worker上分配资源,而后在该Worker上获取资源,而后启动StandaloneExecutorBackend;顺便初
      始化好了一个线程池
五、StandaloneExecutorBackend向Driver(SparkContext)注册,这样Driver就知道哪些Executor为他进行服务了。
   到这个时候其实咱们的初始化过程基本完成了,咱们开始执行transformation的代码,可是代码并不会真正的运行,直到咱们遇到一个action操做。生产一个job任务,进行stage的划分
六、SparkContext将Applicaiton代码发送给StandaloneExecutorBackend;而且SparkContext解析Applicaiton代码,构建DAG图,并提交给DAG Scheduler分解成Stage(当碰到Action操做        时,就会催生Job;每一个Job中含有1个或多个Stage,Stage通常在获取外部数据和shuffle以前产生)。
七、将Stage(或者称为TaskSet)提交给Task Scheduler。Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行;
八、对task进行序列化,并根据task的分配算法,分配task
九、对接收过来的task进行反序列化,把task封装成一个线程
十、开始执行Task,并向SparkContext报告,直至Task完成。
十一、资源注销

运行过程图形说明

测试代码:

./spark-submit --master spark://node01:7077 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
  • 总结

client模式适用于测试调试程序。Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver端能够看到task执行的状况。生产环境下不能使用client模式,是由于:假设要提交100个application到集群运行,Driver每次都会在client端启动,那么就会致使客户端100次网卡流量暴增的问题

3.1.2 Standalone-Cluster

运行过程文字说明

  1. cluster模式提交应用程序后,会向Master请求启动Driver.
  2. Master接受请求,随机在集群一台节点启动Driver进程。
  3. Driver启动后为当前的应用程序申请资源。
  4. Driver端发送task到worker节点上执行。
  5. worker将执行状况和执行结果返回给Driver端。

运行过程图形说明

  • 总结

Driver进程是在集群某一台Worker上启动的,在客户端是没法查看task的执行状况的。假设要提交100个application到集群运行,每次Driver会随机在集群中某一台Worker上启动,那么这100次网卡流量暴增的问题就散布在集群上。

测试代码:

./spark-submit --master spark://node01:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

¬  总结Standalone两种方式提交任务,Driver与集群的通讯包括:

1. Driver负责应用程序资源的申请

2. 任务的分发。

3. 结果的回收。

4. 监控task执行状况。

3.2 Spark on YARN运行过程

YARN是一种统一资源管理机制,在其上面能够运行多套计算框架。目前的大数据技术世界,大多数公司除了使用Spark来进行数据计算,因为历史缘由或者单方面业务处理的性能考虑而使用着其余的计算框架,好比MapReduce、Storm等计算框架。Spark基于此种状况开发了Spark on YARN的运行模式,因为借助了YARN良好的弹性资源管理机制,不只部署Application更加方便,并且用户在YARN集群中运行的服务和Application的资源也彻底隔离,更具实践应用价值的是YARN能够经过队列的方式,管理同时运行在集群中的多个服务。

Spark on YARN模式根据Driver在集群中的位置分为两种模式:一种是YARN-Client模式,另外一种是YARN-Cluster(或称为YARN-Standalone模式)。

3.2.1 YARN框架流程

任何框架与YARN的结合,都必须遵循YARN的开发模式。在分析Spark on YARN的实现细节以前,有必要先分析一下YARN框架的一些基本原理。

参考:http://www.cnblogs.com/qingyunzong/p/8615096.html

3.2.2 YARN-Client

Yarn-Client模式中,Driver在客户端本地运行,这种模式可使得Spark Application和客户端进行交互,由于Driver在客户端,因此能够经过webUI访问Driver的状态,默认是http://hadoop1:4040访问,而YARN经过http:// hadoop1:8088访问。

YARN-client的工做流程分为如下几个步骤:

文字说明

1.Spark Yarn Client向YARN的ResourceManager申请启动Application Master。同时在SparkContent初始化中将建立DAGScheduler和TASKScheduler等,因为咱们选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend;

2.ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派;

3.Client中的SparkContext初始化完毕后,与ApplicationMaster创建通信,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container);

4.一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通讯,要求它在得到的Container中启动启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;

5.Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,以让Client随时掌握各个任务的运行状态,从而能够在任务失败时从新启动任务;

6.应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭本身。

图片说明

 

测试代码:

./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
  • 总结

Yarn-client模式一样是适用于测试,由于Driver运行在本地,Driver会与yarn集群中的Executor进行大量的通讯,会形成客户机网卡流量的大量增长.

3.2.3 YARN-Cluster

在YARN-Cluster模式中,当用户向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:第一个阶段是把Spark的Driver做为一个ApplicationMaster在YARN集群中先启动;第二个阶段是由ApplicationMaster建立应用程序,而后为它向ResourceManager申请资源,并启动Executor来运行Task,同时监控它的整个运行过程,直到运行完成。

YARN-cluster的工做流程分为如下几个步骤:

文字说明

1.   Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、须要在Executor中运行的程序等;

2.   ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化;

3.   ApplicationMaster向ResourceManager注册,这样用户能够直接经过ResourceManage查看应用程序的运行状态,而后它将采用轮询的方式经过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;

4.   一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通讯,要求它在得到的Container中启动启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。这一点和Standalone模式同样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务的调度,其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增长了对Executor的等待逻辑等;

5.   ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而能够在任务失败时从新启动任务;

6.   应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭本身。

 图片说明

 

测试代码:

./spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
  • 总结

Yarn-Cluster主要用于生产环境中,由于Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能经过yarn查看日志。

3.2.4 YARN-Client 与 YARN-Cluster 区别

理解YARN-Client和YARN-Cluster深层次的区别以前先清楚一个概念:Application Master。在YARN中,每一个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。它负责和ResourceManager打交道并请求资源,获取资源以后告诉NodeManager为其启动Container。从深层次的含义讲YARN-Cluster和YARN-Client模式的区别其实就是ApplicationMaster进程的区别。

一、YARN-Cluster模式下,Driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督做业的运行情况。当用户提交了做业以后,就能够关掉Client,做业会继续在YARN上运行,于是YARN-Cluster模式不适合运行交互类型的做业;

二、YARN-Client模式下,Application Master仅仅向YARN请求Executor,Client会和请求的Container通讯来调度他们工做,也就是说Client不能离开。

 

 

相关文章
相关标签/搜索