----本节内容-------node
1.遗留问题解答web
2.Spark核心概念面试
2.1 RDD及RDD操做算法
2.2 Transformation和Actionsql
2.3 Spark程序架构shell
2.4 Spark on Yarn运行流程编程
2.5 WordCount执行原理数组
3.Spark计算引擎原理缓存
3.1 Spark内部原理网络
3.2 生成逻辑执行图
3.3 生成物理执行图
4.Spark Shuffle解析
4.1 Shuffle 简史
4.2 Spark Shuffle
·Shuffle Write
·Shuffle Read
·Shuffle Aggregate
5.参考资料
---------------------
有些东西是要常提的,由于过重要了那么有哪些概念是必须作到成竹在胸的呢?我以为有如下几点:1)RDD概念:RDD是什么,干什么的,基本原理是什么,为何要整一个RDD这样的概念,解决了什么问题,有什么特色;2)基本算子:Transformation算子,Action算子,经常使用的几10种算子,哪些是transformation,哪些是action,哪些算子会触发shuffle等;3)Spark基本架构,Spark程序基本构成,Spark程序执行的几种模式,程序提交到执行的基本过程。不须要精通全部的细节,但基本概念和原理至少作到八九不离十。若是初次接触不是很好理解,就多看看视频各类机构的视频,多记一点笔记,实在基础不行,哪怕是抄写一遍,每一次感受都会不同。这是本身的一点理解,也许每一个人的学习方法都不同,对Spark的理解也不一样。
另外提一点,写程序真是个体力活,不消耗体力,可是消耗体能,因此运动很重要,锻炼结合兴趣才能乐在其中,坚持不解。系列博客中出现不少户外图片,其实都是和车友们骑行拍的,各位将就着看吧。
补充:本章的内容很关键,反复学习了好几遍才动笔写。
1.遗留问题解答
1.Spark如何处理不能序列化的对象
将不能序列化的对象封装成object对象。
2.企业级生产平台如何搭建
说实话,关于生产集群如何搭建,方法多样,但细节特别多,不少人讲的或者写的我也不是特别满意,后面我本身结合实践写几篇对生产平台搭建的见解;有些细节问题一交流就知道是否是有参与过大型生产集群运维和开发。
3.使用Intelij开发工具链接Spark生产集群
实际开发过程当中不多这样作,Intelj编写程序在本地运行和调试,打成jar包到开发环境,编写代码,本地模式跑结果,不作分布式开发和调试,代价高
4.使用maven开发,打包
在BAT这种级别的公司,通常开发程序打包都是用maven,他们程序那么多,一个个的打包是不现实的,必须用maven这样的程序,自动化打包。
5.Spark日志问题
能够经过spark web ui查看日志,前提额是要开启historyServer,如何开启spark的history Server自行百度,Spark的historyServer依赖MR的historyServer。若是没有开启也能够经过命令拉取日志,前提也是要作一些配置,而且用命令拉回任务日志,用户名要和提交任务的用户名一致。
经过Web UI查看程序运行日志
Web UI 的executor监控界面,这个很是详细了,能够看到driver在哪里运行,消耗的资源,task在哪里运行消耗的资源,strerr那个连接能够看到更详细的日志状况,不要误认为那里只是打印错误日志,其实不是的,全部日志都会网那里输出,是标准日志输出。
若是提交一个程序啥都不指定,程序默认分配给executor2个task,一个task占用1G内存和1个CPU.固然,你也能够指定task占用的资源,可是原则上不要超过资源分配器设置的阀值,由于资源不够的时候,你指定多个task,它也不会给你启动。另外还有一个防止内存碎片的机制,程序申请2.5G,会给你3G,自动补全机制,防止产生内存碎片。
另外从Web UI还能够看出不少东西,如task执行时间,gc时间,数据是否倾斜,RDD之间的依赖关系等等。总之,Spark Web UI很牛,程序有问题,平台有问题,都能看出个大概。
6.有时候xxRDD.foreach(println)在shell窗口打印不出来内容
client在本机,是能够打印出来的,若client不是在本机运行(实际上是在driver运行节点上打印出来了),这样执行在shell所在界面是打印不出内容的,用这种方式:xxRDD.collect().foreach(println)
7.启动手报netty 4040端口已经被使用错误
正常的错误,由于又shell开启来了,被占用,会自动使用4041端口,还被占用,就4042端口
8.Spark shell启动时会启动derby
spark shell启动会启动spark sql,spark sql默认使用derby保存元数据,可是尽可能不要用derby,它是单实例,不利于开发。会在本地生成一个文件metastore_db,若是启动报错,就把那个文件给删了
9.KeyValue格式数据如何获取Key和value
属于scala基础了
key._1:拿到kv的key
key._2:拿到kv的value
2.Spark核心概念
2.1 RDD及RDD操做
· RDD是什么数据集,他是一个描述数据在哪里,对数据作什么操做,以及操做之间的依赖关系的一个数据集
· 为何是弹性,主要是说他的存储,既能够在内存,也能够在磁盘
· 分布式:分布在集群上
· 自动重构:失效够能够自动重构
2.2 Transformation和Action
rdd是数据,有数据,就有计算操做,基本操做分红2类(为何分红2类),
transformation:一类算子的简称,完成转换功能,函数和算子一个意思,当作一个大的数组,里面有元素,被切分放到各个节点上。
action:把rdd变换成一个或者一组值,这些是单机的,前面transformation都是分布式的值,
2.3 Spark程序架构
2个组件组成,application = driver(1个)+ executor(多个)
driver:main函数,2g内存1个cpu,运行指定将相应的jar包和文件传给work node
application:driver+executor,spark应用程序,2个应用程序是没有任何关联,若是共享数据只能hdfs或者tacyon. executor运行是指定,能够同时跑几个task,一个application转为多个task(driver转化),task扔给executor执行,
2.4 Spark on Yarn运行流程
1)client发送资源申请请求
2)RM发送通知NodeManger要调用资源,
3)NodeManger启动AppAplicationMaster
4)AppAplicationMaster通知nodeManager启动各个Executor
5)nodeManager启动Executor
6)nodeManager向Driver回报实时执行状况,也会告知AppAplicationMaster
2.5 WordCount执行原理
WordCount,分布式计算HelloWord,读取数据,切分数据,将数据转为KeyValue对,根据Key进行规约求和,输出规约结果,这就是WordCout的过程。就像1千个读者有一千个哈姆雷特同样,不一样的工具设计者,对WordCount的底层实现不同。
MapReduce对WordCount分为Map和Reduce两个阶执行,这个是很明智的,我以为MapReduce编程模型就是分布式计算模型的汇编语言,写起来很罗嗦,可是霸道啊。Spark再优化,在牛逼,还不是脱离不了Map和Reduce这个编程思想,只不过是人家封装好了,不要你再写那么繁杂的MR代码罢。WordCount在Spark中执行大概经历了这么几个过程,
每个环节都会产生一个RDD,生成一个RDD的依赖关系,这种依赖关系图就是逻辑查询计划,不涉及物理查询,根据逻辑关系,算出数据在那里,考虑数据特征,知道处理什么数据,怎么处理,生成一个物理查询计划,根据物理查询计划,将rdd划分不少stage,每一个stage之间会有依赖关系,每一个stage内部会划分多个task,这些操做都是在driver生成的,driver将生成的可执行物理任务分发到各个executor执行。
问题1:spark中task的类型有几种?
只有2中,分别是
shuffleMapTask:非最后一轮的task都叫这个shuffleMapTask
ResultTask:最后一轮的task
问题2:每一个stage task数目如何决定?
第一个stage,由hdfs中block的个数决定,可使用命令查看,若是源数据是hbase,region个数决定
其余stage的task,能够本身设置,若是没有设置,和前面task个数同样
总结:
1)spark中一个action生成一个job。
2)每一个job生成多个stage ,spark有个优化机制一些已经执行的stage会自动跳过
3)每一个stage会有多个task在跑
因此结合WordCount,能够得出一个做业提交之后在Spark中执行的流程是: driver生成逻辑执行计划->driver生成物理执行计划->driver任务调度->executor任务执行;前三个在driver执行,后面一个在节点上分布式执行。
3.Spark计算引擎原理
上图再次描述一个程序执行的过程,driver生成逻辑执行计划->driver生成物理执行计划->driver任务调度->executor任务执行 。假如面试被问到请你简要介绍spark计算引擎的原理?我会这样回答,(个人我的经验,被面过无数次,也面过别人N次,面试被问到不会答的不要着急,想办法把你懂的知识点都带出来,一样能够达到效果了)
1)四个阶段
逻辑执行计划-》成物理执行计划-》任务调度-》任务执行
2)四个对象
driver-》DAGScheduler-》TaskScheduler-》Executor
3)两种模式
任务解析、优化和提交单机模式-》任务执行分布式模式
再上一个图,好好理解,后面都是基于这个展开了。
3.1 生成逻辑执行图
逻辑执行计划 , 描述RDD之间的依赖关系,这个是逻辑查询计划,但不知道join怎么算,也不知道groupby该怎么group,rdd选择什么样的类型,不清楚,说白了就是知道大方向,具体如何作,不知道,纸上谈兵。应用提交后,造成RDD Graph,而且在后台建立DAG对象(spark不只仅用DAG建模,并且还会执行它,而且里面不是用对象表示,而是用RDD对象之间的关系)
举例: map->mapedRDD->compute()
这里还有个重要的知识点,就是RDD之间的关系:宽依赖和窄依赖
前面提到过 RDD 被分红几个分区,分散在多台机器上。当咱们把一个 RDD A 转化成下一个 RDD B 时,这里有两种状况:
窄依赖:有时候只须要一个 A 里面的一个分区,就能够产生 B 里的一个分区了,好比 map 的例子:A 和 B 之间每一个分区是一一对应的关系,这就是 narrow transofmration.【一对一】
宽依赖: 须要 A 里面全部的分区,才能产生 B 里的一个分区,好比 reduceByKey的例子,这就是 wide transformation【多对一】.
为何要分宽依赖和窄依赖,理解这个很重要,很重要,很重要,董先生竟然没有和各位同胞讲解?
大胆设想一下,若是每一个分区里的数据就待在那台机器的内存里,咱们逐一的调用 map, filter, map 函数到这些分区里,Job 就很好的完成。
更重要的是,因为数据没有转移到别的机器,咱们避免了 Network IO 或者 Disk IO. 惟一的任务就是把 map / filter 的运行环境搬到这些机器上运行,这对现代计算机来讲,overhead 几乎能够忽略不计。
这种把多个操做合并到一块儿,在数据上一口气运行的方法在 Spark 里叫 pipeline (其实 pipeline 被普遍应用的不少领域,好比 CPU)。这时候不一样就出现了:只有 narrow transformation 才能够进行 pipleline 操做。对于 wide transformation, RDD 转换须要不少分区运算,包括数据在机器间搬动,因此失去了 pipeline 的前提。
总结起来一句话:数据和算是否在一块儿,计算的性能是不同的,为了区分,就有了宽依赖和窄依赖。
3.2 生成物理执行图
具体的物理查询计划是在,选择什么样的算法,根据rdd数据量大小。
Spark 会把这个 RDD逻辑计划DAG 交给一个叫 DAG scheduler 的模块,DAG scheduler 会优先使用 pipeline 方法,把 RDD 的 transformation 压缩;当咱们遇到 wide transformation 时,因为以前的 narrow transformation 没法和 wide transformation pipeline, 那 DAG scheduler 会把前面的 transformation 定义成一个 stage.
重要的事情说三遍:DAG scheduler 会分析 Spark Job 全部的 transformation, 用 wide transformation 做为边界,把全部 transformation 分红若干个stages. 一个 stage 里的一个分区就被 Spark 叫作一个task. 因此一个 task 是一个分区的数据和数据上面的操做,这些操做可能包括一个 transformation,也多是多个,但必定是 narrow transformation.
DAG scheduler 工做的结果就是产生一组 stages. 这组 stages 被传到 Spark 的另外一个组件 task scheduler, task scheduler 会使用集群管理器依次执行 task, 当全部的 task 执行完毕,一个 stage 标记完成;再运行下一个 stage …… 直到整个 Spark job 完成。
3.3 调度并执行task
将DAG Scheduler产生的stages传送给task scheduler,task scheduler使用集群管理器依次执行task,task被分配到各个work下执行,当全部的task执行完毕,一个stage标记完成,再运行下一个stage,直到整个spark job完成。
做业调度
FIFO或Fair
优化机制:数据本地性和推测执行
任务执行
Task被序列化后,发送到executor上执行
ShuffleMapTask将中间数据写到本地,ResultTask远程读取数据
数据用的时候再算,并且数据是流到要计算的位置的
4.Spark Shuffle解析
4.1 Shuffle简史
在MapReduce框架中,shuffle是链接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须通过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。Spark做为MapReduce框架的一种实现,天然也实现了shuffle的逻辑。shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,输出结果须要按key哈希,而且分发到每个Reducer上去,这个过程就是shuffle。因为shuffle涉及到了磁盘的读写和网络的传输,所以shuffle性能的高低直接影响到了整个程序的运行效率。下面这幅图清晰地描述了MapReduce算法的整个流程,其中shuffle phase是介于Map phase和Reduce phase之间。概念上shuffle就是一个沟通数据链接的桥梁。
4.1 Spark Shuffle
以图为例简单描述一下Spark中shuffle的整一个流程:
Shuffle 过程本质上都是将 Map 端得到的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。首先每个Mapper会根据Reducer的数量建立出相应的bucket,bucket的数量是M×RM×R,其中MM是Map的个数,RR是Reduce的个数。
其次Mapper产生的结果会根据设置的partition算法填充到每一个bucket中去。这里的partition算法是能够自定义的,固然默认的算法是根据key哈希到不一样的bucket中去。
当Reducer启动时,它会根据本身task的id和所依赖的Mapper的id从远端或是本地的block manager中取得相应的bucket做为Reducer的输入进行处理。
4.1Shuffle Write
在Spark 0.6和0.7的版本中,对于shuffle数据的存储是以文件的方式存储在block manager中,与rdd.persist(StorageLevel.DISk_ONLY)采起相同的策略,能够参看:
我已经将一些干扰代码删去。能够看到Spark在每个Mapper中为每一个Reducer建立一个bucket,并将RDD计算结果放进bucket中。须要注意的是每一个bucket是一个ArrayBuffer,也就是说Map的输出结果是会先存储在内存。
Map的输出必须先所有存储到内存中,而后写入磁盘。这对内存是一个很是大的开销,当内存不足以存储全部的Map output时就会出现OOM。
每个Mapper都会产生Reducer number个shuffle文件,若是Mapper个数是1k,Reducer个数也是1k,那么就会产生1M个shuffle文件,这对于文件系统是一个很是大的负担。同时在shuffle数据量不大而shuffle文件又很是多的状况下,随机写也会严重下降IO的性能。
在Spark 0.8版本中,shuffle write采用了与RDD block write不一样的方式,同时也为shuffle write单首创建了ShuffleBlockManager,部分解决了0.6和0.7版本中遇到的问题。首先咱们来看一下Spark 0.8的具体实现:
在这个版本中为shuffle write添加了一个新的类ShuffleBlockManager,由ShuffleBlockManager来分配和管理bucket。同时ShuffleBlockManager为每个bucket分配一个DiskObjectWriter,每一个write handler拥有默认100KB的缓存,使用这个write handler将Map output写入文件中。能够看到如今的写入方式变为buckets.writers(bucketId).write(pair),也就是说Map output的key-value pair是逐个写入到磁盘而不是预先把全部数据存储在内存中在总体flush到磁盘中去。
Spark 0.8显著减小了shuffle的内存压力,如今Map output不须要先所有存储在内存中,再flush到硬盘,而是record-by-record写入到磁盘中。同时对于shuffle文件的管理也独立出新的ShuffleBlockManager进行管理,而不是与rdd cache文件在一块儿了。
可是这一版Spark 0.8的shuffle write仍然有两个大的问题没有解决:
首先依旧是shuffle文件过多的问题,shuffle文件过多一是会形成文件系统的压力过大,二是会下降IO的吞吐量。
其次虽然Map output数据再也不须要预先在内存中evaluate显著减小了内存压力,可是新引入的DiskObjectWriter所带来的buffer开销也是一个不容小视的内存开销。假定咱们有1k个Mapper和1k个Reducer,那么就会有1M个bucket,于此同时就会有1M个write handler,而每个write handler默认须要100KB内存,那么总共须要100GB的内存。这样的话仅仅是buffer就须要这么多的内存,内存的开销是惊人的。固然实际状况下这1k个Mapper是分时运行的话,所需的内存就只有cores * reducer numbers * 100KB大小了。可是reducer数量不少的话,这个buffer的内存开销也是蛮厉害的。
为了解决shuffle文件过多的状况,Spark 0.8.1引入了新的shuffle consolidation,以期显著减小shuffle文件的数量。
首先咱们以图例来介绍一下shuffle consolidation的原理
假定该job有4个Mapper和4个Reducer,有2个core,也就是能并行运行两个task。咱们能够算出Spark的shuffle write共须要16个bucket,也就有了16个write handler。在以前的Spark版本中,每个bucket对应的是一个文件,所以在这里会产生16个shuffle文件。
而在shuffle consolidation中每个bucket并不是对应一个文件,而是对应文件中的一个segment,同时shuffle consolidation所产生的shuffle文件数量与Spark core的个数也有关系。在上面的图例中,job的4个Mapper分为两批运行,在第一批2个Mapper运行时会申请8个bucket,产生8个shuffle文件;而在第二批Mapper运行时,申请的8个bucket并不会再产生8个新的文件,而是追加写到以前的8个文件后面,这样一共就只有8个shuffle文件,而在文件内部这有16个不一样的segment。所以从理论上讲shuffle consolidation所产生的shuffle文件数量为C×R,其中C是Spark集群的core number,R是Reducer的个数。
须要注意的是当 M=C时shuffle consolidation所产生的文件数和以前的实现是同样的。
Shuffle consolidation显著减小了shuffle文件的数量,解决了以前版本一个比较严重的问题,可是writer handler的buffer开销过大依然没有减小,若要减小writer handler的buffer开销,咱们只能减小Reducer的数量,可是这又会引入新的问题,下文将会有详细介绍。
4.2 Shuffle Read
Shuffle write写出去的数据要被Reducer使用,就须要shuffle fetcher将所需的数据fetch过来,这里的fetch包括本地和远端,由于shuffle数据有可能一部分是存储在本地的。Spark对shuffle fetcher实现了两套不一样的框架:NIO经过socket链接去fetch数据;OIO经过netty server去fetch数据。分别对应的类是BasicBlockFetcherIterator和NettyBlockFetcherIterator。
在Spark 0.7和更早的版本中,只支持BasicBlockFetcherIterator,而BasicBlockFetcherIterator在shuffle数据量比较大的状况下performance始终不是很好,没法充分利用网络带宽,为了解决这个问题,添加了新的shuffle fetcher来试图取得更好的性能。对于早期shuffle性能的评测能够参看Spark usergroup。固然如今BasicBlockFetcherIterator的性能也已经好了不少,使用的时候能够对这两种实现都进行测试比较。
4.3 Shuffle Aggregate
咱们都知道在Hadoop MapReduce的shuffle过程当中,shuffle fetch过来的数据会进行merge sort,使得相同key下的不一样value按序归并到一块儿供Reducer使用,这个过程能够参看下图:
全部的merge sort都是在磁盘上进行的,有效地控制了内存的使用,可是代价是更多的磁盘IO。
那么Spark是否也有merge sort呢,仍是以别的方式实现,下面咱们就细细说明。
首先虽然Spark属于MapReduce体系,可是对传统的MapReduce算法进行了必定的改变。Spark假定在大多数用户的case中,shuffle数据的sort不是必须的,好比word count,强制地进行排序只会使性能变差,所以Spark并不在Reducer端作merge sort。既然没有merge sort那Spark是如何进行reduce的呢?这就要说到aggregator了。
aggregator本质上是一个hashmap,它是以map output的key为key,以任意所要combine的类型为value的hashmap。当咱们在作word count reduce计算count值的时候,它会将shuffle fetch到的每个key-value pair更新或是插入到hashmap中(若在hashmap中没有查找到,则插入其中;若查找到则更新value值)。这样就不须要预先把全部的key-value进行merge sort,而是来一个处理一个,省下了外部排序这一步骤。但同时须要注意的是reducer的内存必须足以存放这个partition的全部key和count值,所以对内存有必定的要求。
在上面word count的例子中,由于value会不断地更新,而不须要将其所有记录在内存中,所以内存的使用仍是比较少的。考虑一下若是是group by key这样的操做,Reducer须要获得key对应的全部value。在Hadoop MapReduce中,因为有了merge sort,所以给予Reducer的数据已是group by key了,而Spark没有这一步,所以须要将key和对应的value所有存放在hashmap中,并将value合并成一个array。能够想象为了可以存放全部数据,用户必须确保每个partition足够小到内存可以容纳,这对于内存是一个很是严峻的考验。所以Spark文档中建议用户涉及到这类操做的时候尽可能增长partition,也就是增长Mapper和Reducer的数量。
增长Mapper和Reducer的数量当然能够减少partition的大小,使得内存能够容纳这个partition。可是咱们在shuffle write中提到,bucket和对应于bucket的write handler是由Mapper和Reducer的数量决定的,task越多,bucket就会增长的更多,由此带来write handler所需的buffer也会更多。在一方面咱们为了减小内存的使用采起了增长task数量的策略,另外一方面task数量增多又会带来buffer开销更大的问题,所以陷入了内存使用的两难境地。
为了减小内存的使用,只能将aggregator的操做从内存移到磁盘上进行,Spark社区也意识到了Spark在处理数据规模远远大于内存大小时所带来的问题。所以PR303提供了外部排序的实现方案,相信在Spark 0.9 release的时候,这个patch应该能merge进去,到时候内存的使用量能够显著地减小。
5.参考资料
1).https://my.oschina.net/repine/blog/545695详细探究Spark的shuffle实现
2)董先生ppt