上一篇中介绍了Spark的各类运行模式的基本流程和相关实现,这里主要分析一下各类运行模式中涉及到的一些细节问题的流程和实现java
Spark的各类运行模式虽然启动方式,运行位置,调度手段有所不一样,但它们所要完成的任务基本都是一致的,就是在合适的位置安全可靠的根据用户的配置和Job的须要管理和运行Task,这里粗略的列举一下在运行调度过程当中各类须要考虑的问题apache
环境变量的传递安全
Jar包和各类依赖文件的分发并发
Task的管理和序列化等app
用户参数配置异步
用户及权限控制函数
环境变量的传递oop
Spark的运行参数有很大一部分是经过环境变量来设置的,例如Executor的内存设置,Library路径等等。Local模式固然不存在环境变量的传递问题,在Cluster模式下,就须要将环境变量传递到远端JVM环境中去ui
SparkContext在初始化过程当中 须要传递给Executor的环境变量,会在executorEnvs变量中(HashMap)中收集起来spa
而具体如何将这些变量设置到Executor的环境中,取决于Executor的Launch方式
在Spark Standalone模式中,这些变量被封装在org.apache.spark.deploy.Command中,交给AppClient启动远程Executor,Command经由Spark Master经过Actor再次转发给合适的Worker,Worker经过ExecutorRunner构建Java.lang.Process运行ExecutorBackend,环境变量在ExecutorRunner中传递给java.lang.ProcessBuilder.environment完成整个传递过程
在Mesos相关模式中,这些环境变量被设置到org.apache.mesos.Protos.Environment中,在经过MesosLaunch Task时交给Mesos完成分发工做
在yarn-standalone模式中,这些环境变量首先要经过Yarn Client 设置到Spark AM的运行环境中,基本就是Client类运行环境中以SPARK开头的环境变量所有设置到ContainerLaunchContext中,AM经过WorkerRunnable进一步将它们设置到运行Executor所用的ContainerLaunchContext中
Yarn-client模式与yarn-standalone模式大体相同,虽然SparkContext运行在本地,executor所需的环境变量仍是经过ContainerLaunchContext经AM中转发给Executor
能够注意到,在Yarn相关模式中,并无使用到SparkContext收集的executorEnvs,主要是由于Yarn Standalone模式下Sparkcontext自己就是在远程运行的,所以在Yarn Client中单独实现了相关代码
Jar包和各类依赖文件的分发
Spark程序的运行依赖大体分两类, 一是Spark runtime及其依赖,二是应用程序自身的额外依赖
对于Local模式而言,不存在Jar包分发的问题
对于第一类依赖
在Spark Standalone模式中,整个环境随Spark部署到各个节点中,所以也不存在runtime Jar包分发的问题
Mesos相关模式下,Mesos自己须要部署到各个节点,SparkRuntime能够和Standalone模式同样部署到各个节点中,也能够上传到Mesos能够读取的地方好比HDFS上,而后经过配置spark.executor.uri通知Mesos相关的SchedulerBackend,它们会将该URL传递给Mesos,Mesos在Launch任务时会从指定位置获取相关文件
而Spark 应用程序所额外依赖的文件,在上述模式中能够经过参数将URL传递给SparkContext,对于本地文件SparkContext将启动一个HttpServer用于其它节点读取相关文件,其它如HDFS和外部HTTP等地址上的文件则原封不动,而后这些额外依赖文件的URL在TaskSetmanager中和Task自己一块儿被序列化后发送给Executor,Executor再反序列化获得URL并传递给ExecutorURLClassLoader使用
在Yarn相关模式中,Runtime和程序运行所依赖的文件首先经过HDFS Client API上传到Job的.sparkStaging目录下,而后将对应的文件和URL映射关系经过containerLaunchContext.setLocalResources函数通知Yarn,Yarn的NodeManager在Launch container的时候会从指定URL处下载相关文件做为运行环境的一部分。上面的步骤对于Spark AM来讲是充分的,而对于须要进一步分发到Executor的运行环境中的文件来讲,AM还须要在建立Executor的Container的时候一样调用setLocalResources函数,AM是如何得到对应的文件和URL列表的呢,其实就是SparkYarn Client将这些文件的相关属性如URL,时间戳,尺寸等信息打包成字符串,经过特定的环境变量(SPARK_YARN_CACHE_XXX )传递给AM,AM再把它们从环境变量中还原成所需文件列表
Task管理和序列化
Task的运行要解决的问题不外乎就是如何以正确的顺序,有效地管理和分派任务,如何将Task及运行所需相关数据有效地发送到远端,以及收集运行结果
Task的派发源起于DAGScheduler调用TaskScheduler.submitTasks将一个Stage相关的一组Task一块儿提交调度。
在TaskSchedulerImpl中,这一组Task被交给一个新的TaskSetManager实例进行管理,全部的TaskSetManager经由SchedulableBuilder根据特定的调度策略进行排序,在TaskSchedulerImpl的resourceOffers函数中,当前被选择的TaskSetManager的ResourceOffer函数被调用并返回包含了序列化任务数据的TaskDescription,最后这些TaskDescription再由SchedulerBackend派发到ExecutorBackend去执行
系列化的过程当中,上一节中所述App依赖文件相关属性URL等经过DataOutPutStream写出,而Task自己经过可配置的Serializer来序列化,当前可配制的Serializer包括如JavaSerializer ,KryoSerializer等
Task的运行结果在Executor端被序列化并发送回SchedulerBackend,因为受到Akka Frame Size尺寸的限制,若是运行结果数据过大,结果会存储到BlockManager中,这时候发送到SchedulerBackend的是对应数据的BlockID,TaskScheduler最终会调用TaskResultGetter在线程池中以异步的方式读取结果,TaskSetManager再根据运行结果更新任务状态(好比失败重试等)并汇报给DAGScheduler等
用户参数配置
Spark的用户参数配置途径不少,除了环境变量之外,能够经过Spark.conf文件设置,也能够经过修改系统属性设置 "spark.*"
而这些配置参数的使用环境也不少样化,有些在Sparkcontext本地使用(除了yarn-standalone模式),有些须要分发到Cluster集群中去
在SparkContext中解析和使用,好比spark.master,spark.app.names, spark.jars等等,一般用于配置SparkContext运行参数,建立Executor启动环境等
发送给Executor的参数又分两部分
一部分在ExecutorBackend初始化过程当中须要使用的系统变量,会经过SparkContext在初始化过程当中读取并设置到环境变量中去,在经过前面所述的方式,使用对应的底层资源调度系统设置到运行容器的环境变量中
另外一部分在Executor中才使用的以"spark.*"开头的参数,则经过ExecutorBackend向SchedulerBackend的注册过程,在注册确认函数中传递给ExecutorBackend再在Executor的初始化过程当中设置到SparkConf中
整体看来,这些参数配置的方式和分发途径有些不太统一,稍显混乱,大概还有改进的余地
用户及权限控制
Spark的Task在Executor中运行时,使用hadoop的UerGroupInfomation.doAs 函数将整个Task的运行环境包装起来以特定的sparkUser的身份运行。这样作的目的主要是使得Spark的task在与Hadoop交互时,使用特定的用户而不是Executor启动时所用的用户身份,这有利于在集群中区分Spark Cluster的运行用户和实际使用集群的APP用户身份,以及HDFS等权限控制
用户名在Executor中经过SPARK_USER环境变量获取
对于Local模式来讲,SPARK_USER环境变量就是当前JVM环境下设定的值,固然对Local模式来讲实际上也是不须要doAs的,Executor中若是SPARK_USER变量未设定或者与当前用户名一致,会跳过doAs直接执行task launch相关函数
传递用户身份的问题容易解决,比较麻烦的是身份的认证,例如将Spark运行在经过Kerberos管理权限的Hadoop集群中,这须要完成客户端的身份认证,Security 相关秘钥或Token的获取,分发,更新,失效等工做,在保证效率的同时,还要确保整个过程的安全性,目前的Spark代码对这一方面尚未完善的实现方案,可是有一些提案和Patch正在进行中。
转自:http://blog.csdn.net/colorant/article/details/18603965