Spark 知识点java
RDD全称是resilient distributed dataset(具备弹性的分布式数据集)。一个RDD仅仅是一个分布式的元素集合。在Spark中,全部工做都表示为建立新的RDDs、转换现有的RDDs,或者调用RDDs上的操做来计算结果。python
在Spark中,一个RDD仅仅是一个不可变的分布式对象集合.每一个RDD被切分红多个能够在不一样集群节点上进行计算的partitions.RDDs能够包含Python,Java和Scala任何类型的对象,包括用户自定义的class文件。web
用户建立RDD的两种方法:经过加载外部数据集或者经过在使用者运行的driver程序中分配一个对象集合。算法
一旦建立,RDD提供两种操做类型:转换(transformations)和执行(actions)。Transformations会根据以前的RDD构造一个新的RDDsql
Transformations和actions的区别在于Spark计算RDD的方式.尽管你能够随时定义新的RDD,可是Spark只在他们第一次被action使用的时候进行计算。shell
最终,每当你在Spark的RDD上运行一个action,默认会从新计算。若是你想在多个action上从新使用一个RDD,RDD.persist()方法能够进行保存。api
总的来说,每一个Spark项目或者shell对话都包含如下步骤:数组
特征:缓存
RDD上的transformation(转换)是惰性求值,意味着Spark除非看到一个action(动做),不然不会开始执行。安全
Spark统一内存管理主要是指Executor 端的内存模型,分为堆内内存和堆外内存两大区域。
(1),堆内内存
默认状况下,executor仅仅使用堆内内存(参数spark.memory.offHeap.enabled设置为true能够开启堆外内存),分为4个部分:
Execution memory:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程当中的临时数据;
Storage memory:主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据;
User Memory:主要用于存储 RDD 转换操做所须要的数据,例如 RDD 依赖等信息。
Reserved Memory:系统预留内存,会用来存储Spark内部对象。
能够归纳为下图:
说明以下:
systemMemory: executor的内存总和,经过spark.executor.memory 或--executor-memory 配置
reservedMemory :默认为300M,通常不可修改,测试场景下能够经过spark.testing.reservedMemory进行指定。
usableMemory
= systemMemory - reservedMemory
,这个就是 Spark任务的可用内存。
经过参数spark.memory.fraction(默认0.6)和spark.memory.storageFraction(默认0.5)参数能够调整User Memory,Execution memory,Storage memory的大小。
Execution memory和 Storage memory的内存具备共享特性,概述以下:
程序提交时咱们都会设定基本的 Execution 内存和 Storage 内存区域;
在程序运行时,若是双方的空间都不足时,则存储到硬盘;
当Execution memory的空间充足,而storage memory不足时,storage能够占用execution memory。若是后续execution须要用到被占用的内存时,能够要求storage将相关数据转存到硬盘来归还内存
当Storage memory的空间充足,而Execution memory不足时,Execution能够占用storage内存,可是在storage须要内存而execution仍然在占用时,则没法要求归还,只能等待execution主动释放。
(2),堆外内存
这种模式不在 JVM 内申请内存,而是调用 Java 的 unsafe 相关 API 直接向操做系统申请内存,因为这种方式不进过 JVM 内存管理,因此能够避免频繁的 GC,这种内存申请的缺点是必须本身编写内存申请和释放的逻辑。
默认状况下堆外内存是关闭的,能够经过spark.memory.offHeap.enabled参数开启,经过spark.memory.offHeap.size
参数设置堆外内存大小。
堆外内存只分为:Execution memory 和Storage memory,以下图
若是堆外内存被启用,那么 Executor 内将同时存在堆内和堆外内存,二者的使用互补影响,这个时候 Executor 中的 Execution 内存是堆内的 Execution 内存和堆外的 Execution 内存之和,同理,Storage 内存也同样。
RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;都有惰性机制,都会根据spark的内存状况自动缓存运算,三者都有partition的概念
区别以下:
RDD:
编译时类型安全;支持非结构化数据;直接经过类名点的方式来操做数据;默认采用的是java序列化方式方式,序列化结果较大效率较低;数据存储在java堆内存中,有可能致使频繁gc;
DataFrame:
编译时不能进行类型转化安全检查;DataFrame每一行的类型固定为Row,只有经过解析才能获取各个字段的值,每一列的值无法用类点名直接访问;支持结构化数据;采用Kryo进行序列化,并且序列化时不须要带上元信息,大大的减小了序列化大小;数据能够保存在堆外内存中,减小了gc次数;支持R和python;
Dataset:
支持结构化和非结构化数据;编译时类型安全;能够采用堆外内存进行存储;Dataset中,每一行是什么类型是不必定的
Spark经过action切分出job,每一个job里面经过shuffle切分stage,而每个stage里面有若干task,task与RDD的partition对应,一个RDD有若干个partition,stage对输入RDD的处理时,每一个partition就对应了一个task。
在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得愈来愈先进。
在Spark 1.2之前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个很是严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操做影响了性能。
所以在Spark 1.2之后的版本中,默认的ShuffleManager改为了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来讲,有了必定的改进。主要就在于,每一个Task在进行shuffle操做时,虽然也会产生较多的临时磁盘文件,可是最后会将全部的临时文件合并(merge)成一个磁盘文件,所以每一个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取本身的数据时,只要根据索引读取每一个磁盘文件中的部分数据便可。
下面咱们详细分析一下HashShuffleManager和SortShuffleManager的原理。
(1)HashShuffleManager运行原理
未经优化的HashShuffleManager
下图说明了未经优化的HashShuffleManager的原理。这里咱们先明确一个假设前提:每一个Executor只有1个CPU core,也就是说,不管这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。
咱们先从shuffle write开始提及。shuffle write阶段,主要就是在一个stage结束计算以后,为了下一个stage能够执行shuffle类的算子(好比reduceByKey),而将每一个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘以前,会先将数据写入内存缓冲中,当内存缓冲填满以后,才会溢写到磁盘文件中去。
那么每一个执行shuffle write的task,要为下一个stage建立多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每一个task就要建立多少份磁盘文件。好比下一个stage总共有100个task,那么当前stage的每一个task都要建立100份磁盘文件。若是当前stage有50个task,总共有10个Executor,每一个Executor执行5个Task,那么每一个Executor上总共就要建立500个磁盘文件,全部Executor上会建立5000个磁盘文件。因而可知,未经优化的shuffle write操做所产生的磁盘文件的数量是极其惊人的。
接着咱们来讲说shuffle read。shuffle read,一般就是一个stage刚开始时要作的事情。此时该stage的每个task就须要将上一个stage的计算结果中的全部相同key,从各个节点上经过网络都拉取到本身所在的节点上,而后进行key的聚合或链接等操做。因为shuffle write的过程当中,task给下游stage的每一个task都建立了一个磁盘文件,所以shuffle read的过程当中,每一个task只要从上游stage的全部task所在节点上,拉取属于本身的那一个磁盘文件便可。
shuffle read的拉取过程是一边拉取一边进行聚合的。每一个shuffle read task都会有一个本身的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,而后经过内存中的一个Map进行聚合等操做。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操做。以此类推,直到最后将全部数据到拉取完,并获得最终的结果。
优化后的HashShuffleManager
下图说明了优化后的HashShuffleManager的原理。这里说的优化,是指咱们能够设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true便可开启优化机制。一般来讲,若是咱们使用HashShuffleManager,那么都建议开启这个选项。
开启consolidate机制以后,在shuffle write过程当中,task就不是为下游stage的每一个task建立一个磁盘文件了。此时会出现shuffleFileGroup的概念,每一个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就能够并行执行多少个task。而第一批并行执行的每一个task都会建立一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用以前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。所以,consolidate机制容许不一样的task复用同一批磁盘文件,这样就能够有效将多个task的磁盘文件进行必定程度上的合并,从而大幅度减小磁盘文件的数量,进而提高shuffle write的性能。
假设第二个stage有100个task,第一个stage有50个task,总共仍是有10个Executor,每一个Executor执行5个task。那么本来使用未经优化的HashShuffleManager时,每一个Executor会产生500个磁盘文件,全部Executor会产生5000个磁盘文件的。可是此时通过优化以后,每一个Executor建立的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每一个Executor此时只会建立100个磁盘文件,全部Executor只会建立1000个磁盘文件。
(2)SortShuffleManager运行原理
SortShuffleManager的运行机制主要分红两种,一种是普通运行机制,另外一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。
普通运行机制
下图说明了普通的SortShuffleManager的原理。在该模式下,数据会先写入一个内存数据结构中,此时根据不一样的shuffle算子,可能选用不一样的数据结构。若是是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边经过Map进行聚合,一边写入内存;若是是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构以后,就会判断一下,是否达到了某个临界阈值。若是达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,而后清空内存数据结构。
在溢写到磁盘文件以前,会先根据key对内存数据结构中已有的数据进行排序。排序事后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是经过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢以后再一次写入磁盘文件中,这样能够减小磁盘IO次数,提高性能。
一个task将全部数据写入内存数据结构的过程当中,会发生屡次磁盘溢写操做,也就会产生多个临时文件。最后会将以前全部的临时磁盘文件都进行合并,这就是merge过程,此时会将以前全部临时磁盘文件中的数据读取出来,而后依次写入最终的磁盘文件之中。此外,因为一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,所以还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
SortShuffleManager因为有一个磁盘文件merge的过程,所以大大减小了文件数量。好比第一个stage有50个task,总共有10个Executor,每一个Executor执行5个task,而第二个stage有100个task。因为每一个task最终只有一个磁盘文件,所以此时每一个Executor上只有5个磁盘文件,全部Executor只有50个磁盘文件。
bypass运行机制
下图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件以下:
• shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
• 不是聚合类的shuffle算子(好比reduceByKey)。
此时task会为每一个下游task都建立一个临时磁盘文件,并将数据按key进行hash而后根据key的hash值,将key写入对应的磁盘文件之中。固然,写入磁盘文件时也是先写入内存缓冲,缓冲写满以后再溢写到磁盘文件的。最后,一样会将全部临时磁盘文件都合并成一个磁盘文件,并建立一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是如出一辙的,由于都要建立数量惊人的磁盘文件,只是在最后会作一个磁盘文件的合并而已。所以少许的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来讲,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不一样在于:第一,磁盘写机制不一样;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程当中,不须要进行数据的排序操做,也就节省掉了这部分的性能开销。
spark的容错性依赖于Lineage机制与Checkpoint机制。RDD的Lineage记录的是粗颗粒度的特定数据Transformation操做(如filter、map、join等)行为。当这个RDD的部分分区数据丢失时,它能够经过Lineage获取足够的信息来从新运算和恢复丢失的数据分区;RDD在Lineage依赖方面分为两种:窄依赖与宽依赖,用来解决数据容错的高效性。
第一,窄依赖能够在某个计算节点上直接经过计算父RDD的某块数据计算获得子RDD对应的某块数据;宽依赖则要等到父RDD全部数据都计算完成以后,而且父RDD的计算结果进行hash并传到对应节点上以后才能计算子RDD。
第二,数据丢失时,对于窄依赖只须要从新计算丢失的那一块数据来恢复;对于宽依赖则要将祖先RDD中的全部数据块所有从新计算来恢复。因此在长“血统”链特别是有宽依赖的时候,须要在适当的时机设置数据检查点。也是这两个特性要求对于不一样依赖关系要采起不一样的任务调度机制和容错恢复机制
在RDD计算中,经过检查点机制进行容错,传统作检查点有两种方式:经过冗余数据和日志记录更新操做。在RDD中的doCheckPoint方法至关于经过冗余数据来缓存数据,而以前介绍的血统就是经过至关粗粒度的记录更新操做来实现容错的。
检查点(本质是经过将RDD写入Disk作检查点)是为了经过lineage作容错的辅助,lineage过长会形成容错成本太高,这样就不如在中间阶段作检查点容错,若是以后有节点出现问题而丢失分区,从作检查点的RDD开始重作Lineage,就会减小开销。
Spark广播变量能够解决闭包函数引用外部大变量引发的性能问题。Spark使用高效的广播算法分发广播变量,以下降通讯成本。广播变量会以只读形式缓存在每一个机器的本地,可使用它们以高效的方式为每一个节点提供大型输入数据集的副本。
Broadcast的实现有两种:HTTPBroadcast与TorrentBroadcast;HTTPBroadcast是经过http来传输,该方式可能会形成Driver所在的节点网络堵塞。TorrentBroadcast。相似于常见的BitTorrent技术,基本思想是将data切分为一组blocks存储于Driver的BlockManager中。假设如今有一些Executors获取到了一些blocks,那么这些Executors就能够成为data server。随着fetch的Executor愈来愈多,也就意味着更多的data server加入,那么data就很快可以传输到全部Executor中
TorrentBroadcast具体实现的原理是:
Driver端:
1. Driver先把data序列化为byte array,而后再切割成blockSize大小的data blocks(Array[ByteBuffer])。
2. 完成切割后,将每一个分块数据存储到driver本身的BlockManager中,StorageLevel为MEMORY_AND_DISK_SER,当存储完毕后会向BlockManagerMaster进行汇报。
Executor端:
1. 首先从本地查询是否缓存了完整的data,若是查询到了,则当即返回;不然调用readBlocks从Driver或者其余Executor拉取 Torrent 块。
2. 新建一组Array[BlockData](numBlocks)用于存储从远程拉取过来的Block,接着按照随机的索引顺序(假设有5个块,那么打乱以后,拉取的索引顺序可能为3-1-4-2-5)一个个的去fetch block data。
3. 每一个block data都有一个惟一id(e.g. broadcast_xx_piece_0),首先会根据这个id向BlockManager查询本地是否有数据,若是有则将数据放到数组中索引对应的位置;不然,根据id向BlockManager从远程拉取数据。
4. BlockManager首先会向Driver查询该id对应的块在哪些locations上存在,当前Executor就会随机选择一个location进行下载,下载完成后,会报告给BlockManagerMaster。随着下载的Executor越多,那么这个location也就越多,data block服务器也就越多。
5. 当获取到全部的BlockData以后(全部的BlockData都将存储在Array[BlockData]中),接着会对该array中的块数据进行反序列化获得原始的data。最后,会将data放到当前Executor的BlockManager中,那么之后再次获取data时,就能够直接从本地获得。
广播变量的一个最重要的特色就是,在同一个执行器上的全部任务均可以共享此Broadcast,而不是每一个任务使用一个变量副本;还有在使用广播变量作join操做时,不会产生shuffle
spark的调度模式分为两种:FIFO(先进先出)和FAIR(公平调度)。默认是FIFO,即谁先提交谁先执行,而FAIR支持在调度池中再进行分组,能够有不一样的权重,根据权重、资源等来决定谁先执行, 例如能够为重要的jobs建立高优先级的池,或者把不一样用户的job放到不一样的组,而后给用户配置相同的资源量从而不至于某些用户的做业少而获得更少的资源。spark的调度模式能够经过spark.scheduler.mode进行设置。
在DAGScheluer对job划分好stage并以TaskSet的形式提交给TaskScheduler后,TaskScheduler的实现类会为每一个TaskSet建立一个TaskSetMagager对象,并将该对象添加到调度池中。schedulableBuilder是SparkContext 中newTaskSchedulerImpl(sc)在建立TaskSchedulerImpl的时候经过scheduler.initialize(backend)的initialize方法对schedulableBuilder进行了实例化。程序会根据配置来建立不一样的调度池,schedulableBuilder有两种实现,分别是FIFOSchedulableBuilder和FairSchedulableBuilder。
FIFOSchedulableBuilder啥也没干。FairSchedulableBuilder的buildPools方法中会先去读取FAIR模式的配置文件默认位于SPARK_HOME/conf/fairscheduler.xml,也能够经过参数spark.scheduler.allocation.file设置用户自定义配置文件。FAIR能够配置多个调度池,即rootPool里面仍是一组Pool,Pool中包含了TaskSetMagager。 FairSchedulableBuilder会根据配置文件建立buildFairSchedulerPool。根据每一个字段值(未设置则为默认值)来实例化一个Pool对象,并添加到rootPool中。
一个spark应用程序包含一个TaskScheduler,一个TaskScheduler包含一个惟一的RootPool,FIFO只有一层Pool,包含TaskSetMagager,而FARI包含两层Pool,RootPool包含子Pool,子Pool包含TaskSetMagager,RootPool都是在实例化SchedulableBuilder的时候建立的。两种调度模式的最终实现都是同样,不过FAIR会在添加以前会获取须要使用的调度池,默认为名字为default的调度池。对于FIFO而言,parentPool都是RootPool,而FAIR,TaskSetMagager的parentPool都是RootPool的子Pool。
FIFO模式的算法类是FIFOSchedulingAlgorithm:
一、先比较priority,在FIFO中该优先级其实是Job ID,越早提交的job的jobId越小,priority越小,优先级越高。
二、若priority相同,则说明是同一个job里的TaskSetMagager,则比较StageId,StageId越小优先级越高。
FAIR模式的算法实现类是FairSchedulingAlgorithm:
一、调度池运行的task数小于minShare的优先级比不小于的优先级要高。
二、若二者运行的task个数都比minShare小,则比较minShare使用率,使用率约低优先级越高。
三、若二者的minShare使用率相同,则比较权重使用率,使用率约低优先级越高。
四、若权重也相同,则比较名字。
reduce(binary_function)
reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。
val c = sc.parallelize(1 to 4)
c.reduce((x, y) => x + y)
//结果10
具体过程,RDD有1 2 3 4个元素,
1+2=3
3+3=6
6+4=10
reduceByKey(binary_function)
reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行binary_function的reduce操做,所以,Key相同的多个元素的值被reduce为一个值,而后与原RDD中的Key组成一个新的KV对。
val a = sc.parallelize(List((1,2),(1,3),(3,4),(3,6)))
a.reduceByKey((x,y) => x + y).collect
//结果 Array((1,5), (3,10))
具体过程,
(1,2),(1,3)被分到一组进行求和获得(1,5)
(3,4),(3,6) 被分到一组进行求和(3,10)
答: 在Yarn中,每一个application都有一个Application Master进程,负责从ResourceManager中申请资源。
在Yarn-cluster模式下,driver运行在被YARN管理的Appliaction Master进程中。Client将其Application交给RM后能够当即关闭而没必要持续到Application的生命周期。
在Yarn-client中,driver运行在client进程中,Application Master仅仅从Yarn中申请资源给Executor,以后client会跟container通讯进行做业的调度。因此client得一直运行直到Application结束。
In cluster
mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client
mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
通常来讲,若是提交任务的节点和Worker集群在同一个网络内,此时client mode比较合适。若是提交任务的节点和Worker集群相隔比较远,就会采用cluster mode来最小化Driver和Executor之间的网络延迟。
Diagnostic Messages for this Task: Container [pid=28020,containerID=container_1459428698943_31555_01_004570] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 5.0 GB of 16.8 GB virtual memory used. Killing container。 |
答:
1) map过程产生大量对象致使内存溢出:
这种溢出的缘由是在单个map中产生了大量的对象致使的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),这个操做在rdd中,每一个对象都产生了10000个对象,这确定很容易产生内存溢出的问题。针对这种问题,在不增长内存的状况下,能够经过减小每一个Task的大小,以便达到每一个Task即便产生大量的对象Executor的内存也可以装得下。具体作法能够在会产生大量对象的map操做以前调用repartition方法,分区成更小的块传入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
2)shuffle后内存溢出:
shuffle内存溢出的状况能够说都是shuffle后,单个文件过大致使的。在Spark中,join,reduceByKey这一类型的过程,都会有shuffle的过程,在shuffle的使用,须要传入一个partitioner,大部分Spark中的shuffle操做,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数,这个参数经过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism参数只对HashPartitioner有效,因此若是是别的Partitioner或者本身实现的Partitioner就不能使用spark.default.parallelism这个参数来控制shuffle的并发量了。若是是别的partitioner致使的shuffle内存溢出,就须要从partitioner的代码增长partitions的数量。
3) standalone模式下资源分配不均匀致使内存溢出:
在standalone的模式下若是配置了--total-executor-cores 和 --executor-memory 这两个参数,可是没有配置--executor-cores这个参数的话,就有可能致使,每一个Executor的memory是同样的,可是cores的数量不一样,那么在cores数量多的Executor中,因为可以同时执行多个Task,就容易致使内存溢出的状况。这种状况的解决方法就是同时配置--executor-cores或者spark.executor.cores参数,确保Executor资源分配均匀。
4) 在内存不足的使用,使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache():
rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗点IO时间。
Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler |
根据时间实例产生RDDs,和batchDuration对齐的,如:timer实例就是1秒,1秒生成一个RDD,
每一个RDD对应一个Job,由于RDD就是DStream操做的时间间隔的最后一个RDD,后面的RDD对前面
的RDD有依赖关系,后面对前面有依赖能够推到出整个依赖链条。
1、有两种方式:
①Receiver-base
②Direct
Receiver-base这种方式是先把数据从kafka中读取出来,而后缓存在内存,再定时处理。Receiver-based的Kafka读取方式是基于Kafka高阶(high-level) api来实现对Kafka数据的消费。在提交Spark Streaming任务后,Spark集群会划出指定的Receivers来专门、持续不断、异步读取Kafka的数据,读取时间间隔以及每次读取offsets范围能够由参数来配置。读取的数据保存在Receiver中,具体StorageLevel方式由用户指定,诸如MEMORY_ONLY等。当driver 触发batch任务的时候,Receivers中的数据会转移到剩余的Executors中去执行
Direct方式采用Kafka简单的consumer api方式来读取数据,无需经由ZooKeeper,此种方式再也不须要专门Receiver来持续不断读取数据。当batch任务触发时,由Executor读取数据,并参与到其余Executor的数据计算过程当中去。driver来决定读取多少offsets,并将offsets交由checkpoints来维护。将触发下次batch任务,再由Executor读取Kafka数据并计算。今后过程咱们能够发现Direct方式无需Receiver读取数据,而是须要计算时再读取数据,因此Direct方式的数据消费对内存的要求不高,只须要考虑批量计算所须要的内存便可;另外batch任务堆积时,也不会影响数据堆积
2、两种方式的优缺点以下:
Direct方式具备如下方面的优点:
一、简化并行(Simplified Parallelism)。不现须要建立以及union多输入源,Kafka topic的partition与RDD的partition一一对应
二、高效(Efficiency)。Receiver-based保证数据零丢失(zero-data loss)须要配置spark.streaming.receiver.writeAheadLog.enable,此种方式须要保存两份数据,浪费存储空间也影响效率。而Direct方式则不存在这个问题。
三、强一致语义(Exactly-once semantics)。High-level数据由Spark Streaming消费,可是Offsets则是由Zookeeper保存。经过参数配置,能够实现at-least once消费,此种状况有重复消费数据的可能。
四、下降资源。Direct不须要Receivers,其申请的Executors所有参与到计算任务中;而Receiver-based则须要专门的Receivers来读取Kafka数据且不参与计算。所以相同的资源申请,Direct 可以支持更大的业务。
五、下降内存。Receiver-based的Receiver与其余Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,若是遇到大业务量时,须要提升Receiver的内存,可是参与计算的Executor并没有需那么多的内存。而Direct 由于没有Receiver,而是在计算时读取数据,而后直接计算,因此对内存的要求很低。实际应用中咱们能够把原先的10G降至如今的2-4G左右。
六、鲁棒性更好。Receiver-based方法须要Receivers来异步持续不断的读取数据,所以遇到网络、存储负载等因素,致使实时任务出现堆积,但Receivers却还在持续读取数据,此种状况很容易致使计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引发程序的失败。
Direct方式的缺点:
提升成本。Direct须要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,经过ZooKeeper来维护Offsets,此提升了用户的开发成本。
监控可视化。Receiver-based方式指定topic指定consumer的消费状况均能经过ZooKeeper来监控,而Direct则没有这种便利,若是作到监控并可视化,则须要投入人力开发。
Receive-base优势:
一、Kafka的high-level数据读取方式让用户能够专一于所读数据,而不用关注或维护consumer的offsets,这减小用户的工做量以及代码量并且相对比较简单。
Receive-base的缺点:
一、防数据丢失。作checkpoint操做以及配置spark.streaming.receiver.writeAheadLog.enable参数,配置spark.streaming.receiver.writeAheadLog.enable参数,每次处理以前须要将该batch内的日志备份到checkpoint目录中,这下降了数据处理效率,反过来又加剧了Receiver端的压力;另外因为数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,致使应用崩溃。
二、单Receiver内存。因为receiver也是属于Executor的一部分,那么为了提升吞吐量,提升Receiver的内存。可是在每次batch计算中,参与计算的batch并不会使用到这么多的内存,致使资源严重浪费。
三、在程序失败恢复时,有可能出现数据部分落地,可是程序失败,未更新offsets的状况,这致使数据重复消费。
四、提升并行度,采用多个Receiver来保存Kafka的数据。Receiver读取数据是异步的,并不参与计算。若是开较高的并行度来平衡吞吐量很不划算。五、Receiver和计算的Executor的异步的,那么遇到网络等因素缘由,致使计算出现延迟,计算队列一直在增长,而Receiver则在一直接收数据,这很是容易致使程序崩溃。
六、采用MEMORY_AND_DISK_SER下降对内存的要求。可是在必定程度上影响计算的速度
SparkthirfServer是一个基于HiveServer2开发的用于多用户访问Spark的服务器,提供了类JDBC接口使得任何使用BI工具的用户可以方便接入Spark.
配置参数spark.sql.thriftServer.incrementalCollect的取值为boolean类型,默认取值为false
当值为false的时候,采用collect取数机制,这种机制会一次性从spark获取全部的数据返回给JDBC,这就意味着一个JDBC请求返回的大量数据集所有放在Spark sql thriftServer的堆内存里面,这样可能会致使内存溢出,可经过增长堆内存和手动分页来规避风险。
当取值为true时,采用叫作tolocalIterator的取数机制,具体为一次只返回一个Spark分区的数值,这样会对性能表现有所影响,可是会减小thriftSerer堆内存消耗。
总共有7个步骤:
SalParser匹配select 语句,获取Distinct语句,投影字段projection,表relation,依次将匹配的字符串层层封装,最终造成一颗LogicPlan的Tree
答:产生小文件缘由:Spark默认在执行聚合(即shuffle)时,会多线程并行往hdfs写数据(由于每一个DataFrame/RDD分红若干个Partition,这些partition能够被并行处理,默认有200个分区,由conf变量“spark.sql.shuffle.partitions”定义)。其结果就是一个存下来的文件,实际上是hdfs中一个目录,在这个目录下才是众多partition对应的文件,最坏的状况是出现好多size为0的文件。
调优/解决办法:调节参数spark.sql.shuffle.partitions,适当下降它的并行度;使用reparation(num)=coalesce(num, true)函数重分区coalesce和repartition,合并小文件
①num-executors
参数说明:该参数用于设置Spark做业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽量按照你的设置来在集群的各个工做节点上,启动相应数量的Executor进程。这个参数很是之重要,若是不设置的话,默认只会给你启动少许的Executor进程,此时你的Spark做业的运行速度是很是慢的。
参数调优建议:每一个Spark做业的运行通常设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都很差。设置的太少,没法充分利用集群资源;设置的太多的话,大部分队列可能没法给予充分的资源。
②executor-memory
参数说明:该参数用于设置每一个Executor进程的内存。Executor内存的大小,不少时候直接决定了Spark做业的性能,并且跟常见的JVM OOM异常,也有直接的关联。
参数调优建议:每一个Executor进程的内存设置4G~8G较为合适。可是这只是一个参考值,具体的设置仍是得根据不一样部门的资源队列来定。能够看看本身团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,就表明了你的Spark做业申请到的总内存量(也就是全部Executor进程的内存总和),这个量是不能超过队列的最大内存量的。此外,若是你是跟团队里其余人共享这个资源队列,那么申请的总内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你本身的Spark做业占用了队列全部的资源,致使别的同窗的做业没法运行。
③executor-cores
参数说明:该参数用于设置每一个Executor进程的CPU core数量。这个参数决定了每一个Executor进程并行执行task线程的能力。由于每一个CPU core同一时间只能执行一个task线程,所以每一个Executor进程的CPU core数量越多,越可以快速地执行完分配给本身的全部task线程。
参数调优建议:Executor的CPU core数量设置为2~4个较为合适。一样得根据不一样部门的资源队列来定,能够看看本身的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每一个Executor进程能够分配到几个CPU core。一样建议,若是是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其余同窗的做业运行。
④driver-memory
参数说明:该参数用于设置Driver进程的内存。
参数调优建议:Driver的内存一般来讲不设置,或者设置1G左右应该就够了。惟一须要注意的一点是,若是须要使用collect算子将RDD的数据所有拉取到Driver上进行处理,那么必须确保Driver的内存足够大,不然会出现OOM内存溢出的问题。
⑤spark.default.parallelism
参数说明:该参数用于设置每一个stage的默认task数量。这个参数极为重要,若是不设置可能会直接影响你的Spark做业性能。
参数调优建议:Spark做业的默认task数量为500~1000个较为合适。不少同窗常犯的一个错误就是不去设置这个参数,那么此时就会致使Spark本身根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。一般来讲,Spark默认设置的数量是偏少的(好比就几十个task),若是task数量偏少的话,就会致使你前面设置好的Executor的参数都前功尽弃。试想一下,不管你的Executor进程有多少个,内存和CPU有多大,可是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!所以Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,好比Executor的总CPU core数量为300个,那么设置1000个task是能够的,此时能够充分地利用Spark集群的资源。
⑥spark.storage.memoryFraction
参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,能够用来保存持久化的RDD数据。根据你选择的不一样的持久化策略,若是内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
参数调优建议:若是Spark做业中,有较多的RDD持久化操做,该参数的值能够适当提升一些,保证持久化的数据可以容纳在内存中。避免内存不够缓存全部的数据,致使数据只能写入磁盘中,下降了性能。可是若是Spark做业中的shuffle类操做比较多,而持久化操做比较少,那么这个参数的值适当下降一些比较合适。此外,若是发现做业因为频繁的gc致使运行缓慢(经过spark web ui能够观察到做业的gc耗时),意味着task执行用户代码的内存不够用,那么一样建议调低这个参数的值。
⑦spark.shuffle.memoryFraction
参数说明:该参数用于设置shuffle过程当中一个task拉取到上个stage的task的输出后,进行聚合操做时可以使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操做。shuffle操做在进行聚合时,若是发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地下降性能。
参数调优建议:若是Spark做业中的RDD持久化操做较少,shuffle操做较多时,建议下降持久化操做的内存占比,提升shuffle操做的内存占比比例,避免shuffle过程当中数据过多时内存不够用,必须溢写到磁盘上,下降了性能。此外,若是发现做业因为频繁的gc致使运行缓慢,意味着task执行用户代码的内存不够用,那么一样建议调低这个参数的值。