《深入理解Spark:核心思想与源码分析》一书前言的内容请看链接《深入理解SPARK:核心思想与源码分析》一书正式出版上市
《深入理解Spark:核心思想与源码分析》一书第一章的内容请看链接《第1章 环境准备》
《深入理解Spark:核心思想与源码分析》一书第二章的内容请看链接《第2章 SPARK设计理念与基本架构》
由于本书的第3章内容较多,所以打算分别开辟三篇随笔分别展现。本文展现第3章第一部分的内容:
“道生一,一生二,二生三,三生万物。”——《道德经》
本章导读:
SparkContext的初始化是Driver应用程序提交执行的前提,本章内容以local模式为主,并按照代码执行顺序讲解,这将有助于首次接触Spark的读者理解源码。读者朋友如果能边跟踪代码,边学习本章内容,也许是快速理解SparkContext初始化过程的便捷途径。已经熟练使用Spark的开发人员可以选择跳过本章内容。
本章将在介绍SparkContext初始化过程的同时,向读者介绍各个组件的作用,为阅读后面的章节打好基础。Spark中的组件很多,就其功能而言涉及到网络通信、分布式、消息、存储、计算、缓存、测量、清理、文件服务、Web UI的方方面面。
Spark Driver用于提交用户应用程序,实际可以看作Spark的客户端。了解Spark Driver的初始化,有助于读者理解用户应用程序在客户端的处理过程。
Spark Driver的初始化始终围绕着SparkContext的初始化。SparkContext可以算得上是所有Spark应用程序的发动机引擎,轿车要想跑起来,发动机首先要启动。SparkContext初始化完毕,才能向Spark集群提交任务。在平坦的公路上,发动机只需以较低的转速,较低的功率就可以游刃有余;在山区,你可能需要一台能够提供大功率的发动机,这样才能满足你转山的体验。这些参数都是通过驾驶员操作油门、档位等传送给发动机的,而SparkContext的配置参数则由SparkConf负责,SparkConf就是你的操作面板。
SparkConf的构造很简单,主要是通过ConcurrentHashMap来维护各种Spark的配置属性。SparkConf代码结构见代码清单3-1。Spark的配置属性都是以“spark.”开头的字符串。
代码清单3-1 SparkConf代码结构
现在开始介绍SparkContext,SparkContext的初始化步骤如下:
1) 创建Spark执行环境SparkEnv;
2) 创建RDD清理器metadataCleaner;
3) 创建并初始化SparkUI;
4) Hadoop相关配置及Executor环境变量的设置
5) 创建任务调度TaskScheduler;
6) 创建和启动DAGScheduler;
7) TaskScheduler的启动;
8) 初始化块管理器BlockManager(BlockManager是存储体系的主要组件之一,将在第4章介绍);
9) 启动测量系统MetricsSystem;
10) 创建和启动Executor分配管理器ExecutorAllocationManager;
11) ContextCleaner的创建与启动;
12) Spark环境更新;
13) 创建DAGSchedulerSource和BlockManagerSource;
14) 将SparkContext标记为激活。
SparkContext的主构造器参数为SparkConf,其实现如下。
上面代码中的CallSite存储了线程栈中最靠近栈顶的用户类及最靠近栈底的Scala或者Spark核心类信息。Utils.getCallSite的详细信息见附录A。SparkContext默认只有一个实例(由属性spark.driver.allowMultipleContexts来控制,用户需要多个SparkContext实例时,可以将其设置为true),方法markPartiallyConstructed用来确保实例的唯一性,并将当前SparkContext标记为正在构建中。
接下来会对SparkConf进行拷贝,然后对各种配置信息进行校验,代码如下。
从上面校验的代码看到必须指定属性spark.master 和spark.app.name,否则会抛出异常,结束初始化过程。spark.master用于设置部署模式,spark.app.name指定应用程序名称。
SparkEnv是Spark的执行环境对象,其中包括众多与Executor执行相关的对象。由于在local模式下Driver会创建Executor,local-cluster部署模式或者Standalone部署模式下Worker另起的CoarseGrainedExecutorBackend进程中也会创建Executor,所以SparkEnv存在于Driver或者CoarseGrainedExecutorBackend进程中。创建SparkEnv 主要使用SparkEnv的createDriverEnv,createDriverEnv方法有三个参数,conf、isLocal和 listenerBus。
上面代码中的conf是对SparkConf的拷贝,isLocal标识是否是单机模式,listenerBus采用监听器模式维护各类事件的处理,在3.14节会详细介绍。
SparkEnv的方法createDriverEnv最终调用create创建SparkEnv。SparkEnv的构造步骤如下:
1) 创建安全管理器SecurityManager;
2) 创建基于Akka的分布式消息系统ActorSystem;
3) 创建Map任务输出跟踪器mapOutputTracker;
4) 实例化ShuffleManager;
5) 创建ShuffleMemoryManager;
6) 创建块传输服务BlockTransferService;
7) 创建BlockManagerMaster;
8) 创建块管理器BlockManager;
9) 创建广播管理器BroadcastManager;
10) 创建缓存管理器CacheManager;
11) 创建HTTP文件服务器HttpFileServer;
12) 创建测量系统MetricsSystem;
13) 创建SparkEnv;
SecurityManager主要对权限、账号进行设置,如果使用Hadoop YARN作为集群管理器,则需要使用证书生成 secret key登录,最后给当前系统设置默认的口令认证实例,此实例采用匿名内部类实现,参见代码清单3-2。
代码清单3-2 SecurityManager的实现
ActorSystem是Spark中最基础的设施,Spark既使用它发送分布式消息,又用它实现并发编程。怎么,消息系统可以实现并发?要解释清楚这个问题,首先应该简单的介绍下Scala语言的Actor并发编程模型:Scala认为Java线程通过共享数据以及通过锁来维护共享数据的一致性是糟糕的做法,容易引起锁的争用,而且线程的上下文切换会带来不少开销,降低并发程序的性能,甚至会引入死锁的问题。在Scala中只需要自定义类型继承Actor,并且提供act方法,就如同Java里实现Runnable接口,需要实现run方法一样。但是不能直接调用act方法,而是通过发送消息的方式(Scala发送消息是异步的),传递数据。如:
Actor ! message
Akka是Actor编程模型的高级类库,类似于JDK 1.5之后越来越丰富的并发工具包,简化了程序员并发编程的难度。ActorSystem便是Akka提供的用于创建分布式消息通信系统的基础类。Akka的具体信息见附录B。
正式因为Actor轻量级的并发编程、消息发送以及ActorSystem支持分布式消息发送等特点,Spark选择了ActorSystem。
SparkEnv中创建ActorSystem时用到了AkkaUtils工具类,见代码清单3-3。AkkaUtils.createActorSystem方法用于启动ActorSystem,见代码清单3-4。AkkaUtils使用了Utils的静态方法startServiceOnPort, startServiceOnPort最终会回调方法startService: Int => (T, Int),此处的startService实际是方法doCreateActorSystem。真正启动ActorSystem是由doCreateActorSystem方法完成的,doCreateActorSystem的具体实现细节请见附录B。Spark的Driver中Akka的默认访问地址是akka://sparkDriver,Spark的Executor中Akka的默认访问地址是akka://sparkExecutor。如果不指定ActorSystem的端口,那么所有节点的ActorSystem端口在每次启动时随机产生。关于startServiceOnPort的实现,请见附录A。
代码清单3-3 使用AkkaUtils工具类创建和启动[计算机3] [初霖4] ActorSystem
代码清单3-4 ActorSystem的创建和启动
mapOutputTracker用于跟踪map阶段任务的输出状态,此状态便于reduce阶段任务获取地址及中间输出结果。每个map任务或者reduce任务都会有其唯一标识,分别为mapId和reduceId。每个reduce任务的输入可能是多个map任务的输出,reduce会到各个map任务的所在节点上拉取Block,这一过程叫做shuffle。每批shuffle过程都有唯一的标识shuffleId。
这里先介绍下MapOutputTrackerMaster。MapOutputTrackerMaster内部使用mapStatuses:TimeStampedHashMap[Int,Array[MapStatus]]来维护跟踪各个map任务的输出状态。其中key对应shuffleId,Array存储各个map任务对应的状态信息MapStatus。由于MapStatus维护了map输出Block的地址BlockManagerId,所以reduce任务知道从何处获取map任务的中间输出。MapOutputTrackerMaster还使用cachedSerializedStatuses:TimeStampedHashMap[Int, Array[Byte]]维护序列化后的各个map任务的输出状态。其中key对应shuffleId,Array存储各个序列化MapStatus生成的字节数组。
Driver和Executor处理MapOutputTrackerMaster的方式有所不同:
无论是Driver还是Executor,最后都由mapOutputTracker的属性trackerActor持有MapOutputTrackerMasterActor的引用,参见代码清单3-5。
代码清单3-5 registerOrLookup方法用于查找或者注册Actor的实现
在后面章节大家会知道map任务的状态正是由Executor向持有的MapOutputTrackerMasterActor发送消息,将map任务状态同步到mapOutputTracker的mapStatuses和cachedSerializedStatuses的。Executor究竟是如何找到MapOutputTrackerMasterActor的?registerOrLookup方法通过调用AkkaUtils.makeDriverRef找到MapOutputTrackerMasterActor,实际正是利用ActorSystem提供的分布式消息机制实现的,具体细节参见附录B。这里第一次使用到了Akka提供的功能,以后大家会渐渐感觉到使用Akka的便捷。
ShuffleManager负责管理本地及远程的block数据的shuffle操作。ShuffleManager默认为通过反射方式生成的SortShuffleManager的实例,可以修改属性spark.shuffle.manager为hash来显式[计算机5] [初霖6] 使用HashShuffleManager。SortShuffleManager通过持有的IndexShuffleBlockManager间接操作BlockManager中的DiskBlockManager将map结果写入本地,并根据shuffleId、mapId写入索引文件,也能通过MapOutputTrackerMaster中维护的mapStatuses从本地或者其他远程节点读取文件。有读者可能会问,为什么需要shuffle?Spark作为并行计算框架,同一个作业会被划分为多个任务在多个节点上并行执行,reduce的输入可能存在于多个节点上,因此需要通过“洗牌”将所有reduce的输入汇总起来,这个过程就是shuffle。这个问题以及对ShuffleManager的具体使用会在第5章和第6章详述。ShuffleManager的实例化见代码清单3-6。代码清单3-6最后创建的ShuffleMemoryManager,将在3.2.5节介绍。
代码清单3-6 ShuffleManager的实例化及ShuffleMemoryManager的创建
ShuffleMemoryManager负责管理shuffle线程占有内存的分配与释放,并通过threadMemory:mutable.HashMap[Long, Long]缓存每个线程的内存字节数,见代码清单3-7。
代码清单3-7 ShuffleMemoryManager的数据结构
getMaxMemory方法用于获取shuffle所有线程占用的最大内存,实现如下。
从上面代码可以看出,shuffle所有线程占用的最大内存的计算公式为:
从上面代码可以看出,shuffle所有线程占用的最大内存的计算公式为:
Java运行时最大内存 * Spark的shuffle最大内存占比 * Spark的安全内存占比
可以配置属性spark.shuffle.memoryFraction修改Spark的shuffle最大内存占比,配置属性spark.shuffle.safetyFraction修改Spark的安全内存占比。
注意:ShuffleMemoryManager通常运行在Executor中, Driver中的ShuffleMemoryManager 只有在local模式下才起作用。
BlockTransferService默认为NettyBlockTransferService(可以配置属性spark.shuffle.blockTransferService使用NioBlockTransferService),它使用Netty提供的异步事件驱动的网络应用框架,提供web服务及客户端,获取远程节点上Block的集合。
NettyBlockTransferService的具体实现将在第4章详细介绍。这里大家可能觉得奇怪,这样的网络应用为何也要放在存储体系?大家不妨先带着疑问,直到你真正了解存储体系。
BlockManagerMaster负责对Block的管理和协调,具体操作依赖于BlockManagerMasterActor。Driver和Executor处理BlockManagerMaster的方式不同:
无论是Driver还是Executor,最后BlockManagerMaster的属性driverActor将持有对BlockManagerMasterActor的引用。BlockManagerMaster的创建代码如下。
registerOrLookup已在3.2.3节介绍过了,不再赘述。BlockManagerMaster及BlockManagerMasterActor的具体实现将在第4章详细介绍。
BlockManager负责对Block的管理,只有在BlockManager的初始化方法initialize被调用后,它才是有效的。BlockManager作为存储系统的一部分,具体实现见第4章。BlockManager的创建代码如下。