Spark之性能优化重点——并行流数据接收

问题导读html

一、如何减小批数据的执行时间?
二、Spark有哪些方面的性能优化?
三、有哪些错误咱们须要关心?
node

(一)减小批数据的执行时间程序员

Spark中有几个优化能够减小批处理的时间。这些能够在优化指南中做了讨论。这节重点讨论几个重要的。面试

数据接收的并行水平apache

经过网络(如kafka,flume,socket等)接收数据须要这些数据反序列化并被保存到Spark中。若是数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每一个输入DStream建立一个receiver(运行在worker机器上) 接收单个数据流。建立多个输入DStream并配置它们能够从源中接收不一样分区的数据流,从而实现多数据流接收。例如,接收两个topic数据的单个输入DStream能够被切分为两个kafka输入流,每一个接收一个topic。这将 在两个worker上运行两个receiver,所以容许数据并行接收,提升总体的吞吐量。多个DStream能够被合并生成单个DStream,这样运用在单个输入DStream的transformation操做能够运用在合并的DStream上。api

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
数组

另一个须要考虑的参数是receiver的阻塞时间。对于大部分的receiver,在存入Spark内存以前,接收的数据都被合并成了一个大数据块。每批数据中块的个数决定了任务的个数。这些任务是用类 似map的transformation操做接收的数据。阻塞间隔由配置参数spark.streaming.blockInterval决定,默认的值是200毫秒。缓存

多输入流或者多receiver的可选的方法是明确地从新分配输入数据流(利用inputStream.repartition()),在进一步操做以前,经过集群的机器数分配接收的批数据。性能优化

数据处理的并行水平网络

若是运行在计算stage上的并发任务数不足够大,就不会充分利用集群的资源。例如,对于分布式reduce操做如reduceByKey和reduceByKeyAndWindow,默认的并发任务数经过配置属性来肯定(configuration.html#spark-properties) spark.default.parallelism。你能够经过参数(PairDStreamFunctions (api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions))传递并行度,或者设置参数 spark.default.parallelism修改默认值。

数据序列化

数据序列化的总开销是日常大的,特别是当sub-second级的批数据被接收时。下面有两个相关点:

Spark中RDD数据的序列化。关于数据序列化请参照Spark优化指南。注意,与Spark不一样的是,默认的RDD会被持久化为序列化的字节数组,以减小与垃圾回收相关的暂停。

输入数据的序列化。从外部获取数据存到Spark中,获取的byte数据须要从byte反序列化,而后再按照Spark的序列化格式从新序列化到Spark中。所以,输入数据的反序列化花费多是一个瓶颈。

任务的启动开支

每秒钟启动的任务数是很是大的(50或者更多)。发送任务到slave的花费明显,这使请求很难得到亚秒(sub-second)级别的反应。经过下面的改变能够减少开支

任务序列化。运行kyro序列化任何能够减少任务的大小,从而减少任务发送到slave的时间。

执行模式。在Standalone模式下或者粗粒度的Mesos模式下运行Spark能够在比细粒度Mesos模式下运行Spark得到更短的任务启动时间。能够在在Mesos下运行Spark中获取更多信息。

These changes may reduce batch processing time by 100s of milliseconds, thus allowing sub-second batch size to be viable.

(二)设置正确的批容量

为了Spark Streaming应用程序可以在集群中稳定运行,系统应该可以以足够的速度处理接收的数据(即处理速度应该大于或等于接收数据的速度)。这能够经过流的网络UI观察获得。批处理时间应该小于批间隔时间。

根据流计算的性质,批间隔时间可能显著的影响数据处理速率,这个速率能够经过应用程序维持。能够考虑WordCountNetwork这个例子,对于一个特定的数据处理速率,系统可能能够每2秒打印一次单词计数 (批间隔时间为2秒),但没法每500毫秒打印一次单词计数。因此,为了在生产环境中维持指望的数据处理速率,就应该设置合适的批间隔时间(即批数据的容量)。

找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。为了验证你的系统是否能知足数据处理速率,你能够经过检查端到端的延迟值来判断(能够在 Spark驱动程序的log4j日志中查看"Total delay"或者利用StreamingListener接口)。若是延迟维持稳定,那么系统是稳定的。若是延迟持续增加,那么系统没法跟上数据处理速率,是不稳定的。 你可以尝试着增长数据处理速率或者减小批容量来做进一步的测试。注意,由于瞬间的数据处理速度增长致使延迟瞬间的增加多是正常的,只要延迟能从新回到了低值(小于批容量)。

(三)内存调优

调整内存的使用以及Spark应用程序的垃圾回收行为已经在Spark优化指南中详细介绍。在这一节,咱们重点介绍几个强烈推荐的自定义选项,它们能够 减小Spark Streaming应用程序垃圾回收的相关暂停,得到更稳定的批处理时间。

• Default persistence level of DStreams:和RDDs不一样的是,默认的持久化级别是序列化数据到内存中(DStream是StorageLevel.MEMORY_ONLY_SER,RDD是StorageLevel.MEMORY_ONLY)。 即便保存数据为序列化形态会增长序列化/反序列化的开销,可是能够明显的减小垃圾回收的暂停。

• Clearing persistent RDDs:默认状况下,经过Spark内置策略(LUR),Spark Streaming生成的持久化RDD将会从内存中清理掉。若是spark.cleaner.ttl已经设置了,比这个时间存在更老的持久化 RDD将会被定时的清理掉。正如前面提到的那样,这个值须要根据Spark Streaming应用程序的操做当心设置。然而,能够设置配置选项spark.streaming.unpersist为true来更智能的去持久化(unpersist)RDD。这个 配置使系统找出那些不须要常常保有的RDD,而后去持久化它们。这能够减小Spark RDD的内存使用,也可能改善垃圾回收的行为。

• Concurrent garbage collector:使用并发的标记-清除垃圾回收能够进一步减小垃圾回收的暂停时间。尽管并发的垃圾回收会减小系统的总体吞吐量,可是仍然推荐使用它以得到更稳定的批处理时间。

(四)容错语义

这一节,咱们将讨论在节点错误事件时Spark Streaming的行为。为了理解这些,让咱们先记住一些Spark RDD的基本容错语义。

• 一个RDD是不可变的、肯定可重复计算的、分布式数据集。每一个RDD记住一个肯定性操做的谱系(lineage),这个谱系用在容错的输入数据集上来建立该RDD。

• 若是任何一个RDD的分区由于节点故障而丢失,这个分区能够经过操做谱系从源容错的数据集中从新计算获得。

• 假定全部的RDD transformations是肯定的,那么最终转换的数据是同样的,不论Spark机器中发生何种错误。

Spark运行在像HDFS或S3等容错系统的数据上。所以,任何从容错数据而来的RDD都是容错的。然而,这不是在Spark Streaming的状况下,由于Spark Streaming的数据大部分状况下是从 网络中获得的。为了得到生成的RDD相同的容错属性,接收的数据须要重复保存在worker node的多个Spark executor上(默认的复制因子是2),这致使了当出现错误事件时,有两类数据须要被恢复

Data received and replicated :在单个worker节点的故障中,这个数据会幸存下来,由于有另一个节点保存有这个数据的副本。

Data received but buffered for replication:由于没有重复保存,因此为了恢复数据,惟一的办法是从源中从新读取数据。

有两种错误咱们须要关心

• worker节点故障:任何运行executor的worker节点都有可能出故障,那样在这个节点中的全部内存数据都会丢失。若是有任何receiver运行在错误节点,它们的缓存数据将会丢失

• Driver节点故障:若是运行Spark Streaming应用程序的Driver节点出现故障,很明显SparkContext将会丢失,全部执行在其上的executors也会丢失。

做为输入源的文件语义(Semantics with files as input source)

若是全部的输入数据都存在于一个容错的文件系统如HDFS,Spark Streaming总能够从任何错误中恢复而且执行全部数据。这给出了一个刚好一次(exactly-once)语义,即不管发生什么故障, 全部的数据都将会刚好处理一次。

基于receiver的输入源语义

对于基于receiver的输入源,容错的语义既依赖于故障的情形也依赖于receiver的类型。正如以前讨论的,有两种类型的receiver

Reliable Receiver:这些receivers只有在确保数据复制以后才会告知可靠源。若是这样一个receiver失败了,缓冲(非复制)数据不会被源所认可。若是receiver重启,源会重发数 据,所以不会丢失数据。

Unreliable Receiver:当worker或者driver节点故障,这种receiver会丢失数据

选择哪一种类型的receiver依赖于这些语义。若是一个worker节点出现故障,Reliable Receiver不会丢失数据,Unreliable Receiver会丢失接收了可是没有复制的数据。若是driver节点 出现故障,除了以上状况下的数据丢失,全部过去接收并复制到内存中的数据都会丢失,这会影响有状态transformation的结果。

为了不丢失过去接收的数据,Spark 1.2引入了一个实验性的特征write ahead logs,它保存接收的数据到容错存储系统中。有了write ahead logs和Reliable Receiver,咱们能够 作到零数据丢失以及exactly-once语义。

下面的表格总结了错误语义:

 输出操做的语义

根据其肯定操做的谱系,全部数据都被建模成了RDD,全部的从新计算都会产生一样的结果。全部的DStream transformation都有exactly-once语义。那就是说,即便某个worker节点出现 故障,最终的转换结果都是同样。然而,输出操做(如foreachRDD)具备at-least once语义,那就是说,在有worker事件故障的状况下,变换后的数据可能被写入到一个外部实体不止一次。 利用saveAs***Files将数据保存到HDFS中的状况下,以上写屡次是可以被接受的(由于文件会被相同的数据覆盖)。

结语

感谢您的观看,若有不足之处,欢迎批评指正。

若是有对大数据感兴趣的小伙伴或者是从事大数据的老司机能够加群:

658558542    

欢迎你们交流分享,学习交流,共同进步。(里面还有大量的免费资料,帮助你们在成为大数据工程师,乃至架构师的路上披荆斩棘!)

最后祝福全部遇到瓶颈的大数据程序员们突破本身,祝福你们在日后的工做与面试中一切顺利。

相关文章
相关标签/搜索