Spark Streaming性能调优

数据接收并行度调优(一)
经过网络接收数据时(好比Kafka、Flume),会将数据反序列化,并存储在Spark的内存中。若是数据接收称为系统的瓶颈,那么能够考虑并行化数据接收。 每个输入DStream都会在某个Worker的Executor上启动一个Receiver,该Receiver接收一个数据流。所以能够经过建立多个输入DStream,而且配置它们接收数据源不一样的分区数据,达到接收多个数据流的效果。好比说 ,一个接收两个Kafka Topic的输入DStream,能够被拆分为两个输入DStream,每一个分别接收一个topic的数据。这样就会建立两个Receiver,从而并行地接收数据,进而提高吞吐量。多个DStream可使用 union算子进行聚合,从而造成一个DStream。而后后续的transformation算子操做都针对该一个聚合后的DStream便可。
 
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
 
 
 
数据接收并行度调优(二)
数据接收并行度调优,除了建立更多输入DStream和Receiver之外,还能够考虑调节 block interval。经过参数,spark.streaming.blockInterval,能够设置block interval,默认是 200ms。对于大多数Receiver来讲,在将接收到的数据保存到Spark的BlockManager以前,都会将数据切分为一个一个的block。 而每一个batch中的block数量,则决定了该batch对应的RDD的partition的数量,以及针对该RDD执行transformation操做时,建立的task的数量。每一个batch对应的task数量是大约估计的,即batch interval / block interval。
 
例如说,batch interval为2s,block interval为200ms,会建立10个task。若是你认为每一个batch的task数量太少,即低于每台机器的cpu core数量,那么就说明batch的task数量是不够的,由于全部的cpu资源没法彻底被利用起来。要为batch增长block的数量,那么就减少block interval。然而, 推荐的block interval最小值是50ms,若是低于这个数值,那么大量task的启动时间,可能会变成一个性能开销点。
 
 
 
数据接收并行度调优(三)
除了上述说的两个提高数据接收并行度的方式,还有一种方法,就是显式地对输入数据流进行重分区。使用inputStream.repartition(<number of partitions>)便可。这样就能够将接收到的batch,分布到指定数量的机器上,而后再进行进一步的操做。
 
 
 
任务启动调优
若是每秒钟启动的task过于多,好比每秒钟启动50个,那么发送这些task去Worker节点上的Executor的性能开销,会比较大,并且此时基本就很难达到毫秒级的延迟了。使用下述操做能够减小这方面的性能开销:
 
一、Task序列化:使用 Kryo序列化机制来序列化task,能够减少task的大小,从而减小发送这些task到各个Worker节点上的Executor的时间。
二、执行模式: 在Standalone模式下运行Spark,能够达到更少的task启动时间
 
上述方式,也许能够将每一个batch的处理时间减小100毫秒。从而从秒级降到毫秒级。
 
 
 
数据处理并行度调优
若是在计算的任何stage中使用的并行task的数量没有足够多,那么集群资源是没法被充分利用的。举例来讲,对于分布式的reduce操做,好比reduceByKey和reduceByKeyAndWindow, 默认的并行task的数量是由spark.default.parallelism参数决定的。你能够在reduceByKey等操做中,传入第二个参数,手动指定该操做的并行度,也能够调节全局的spark.default.parallelism参数。
 
 
 
数据序列化调优(一)
数据序列化形成的系统开销能够由序列化格式的优化来减少。在流式计算的场景下,有两种类型的数据须要序列化。
 
一、输入数据:默认状况下,接收到的输入数据,是存储在Executor的内存中的,使用的持久化级别是 StorageLevel.MEMORY_AND_DISK_SER_2。这意味着,数据被序列化为字节从而减少GC开销,而且会复制以进行executor失败的容错。所以,数据首先会存储在内存中,而后在内存不足时会溢写到磁盘上,从而为流式计算来保存全部须要的数据。这里的序列化有明显的性能开销——Receiver必须反序列化从网络接收到的数据,而后再使用Spark的序列化格式序列化数据。
 
二、流式计算操做生成的持久化RDD:流式计算操做生成的持久化RDD,可能会持久化到内存中。例如,窗口操做默认就会将数据持久化在内存中,由于这些数据后面可能会在多个窗口中被使用,并被处理屡次。然而,不像Spark Core的默认持久化级别, StorageLevel.MEMORY_ONLY,流式计算操做生成的RDD的默认持久化级别是StorageLevel.MEMORY_ONLY_SER ,默认就会减少GC开销。
 
 
 
 
数据序列化调优(二)
在上述的场景中, 使用Kryo序列化类库能够减少CPU和内存的性能开销。使用Kryo时,必定要考虑注册自定义的类,而且禁用对应引用的tracking(spark.kryo.referenceTracking)。
 
在一些特殊的场景中,好比须要为流式应用保持的数据总量并非不少,也许能够将数据以非序列化的方式进行持久化,从而减小序列化和反序列化的CPU开销,并且又不会有太昂贵的GC开销。举例来讲,若是你数秒的batch interval,而且没有使用window操做,那么你能够考虑经过显式地设置持久化级别,来禁止持久化时对数据进行序列化。这样就能够减小用于序列化和反序列化的CPU性能开销,而且不用承担太多的GC开销。
 
 
batch interval调优(最重要)
若是想让一个运行在集群上的Spark Streaming应用程序能够稳定,它就必须尽量快地处理接收到的数据。换句话说,batch应该在生成以后,就尽量快地处理掉。对于一个应用来讲,这个是否是一个问题,能够经过观察Spark UI上的batch处理时间来定。batch处理时间必须小于batch interval时间。
 
基于流式计算的本质,batch interval对于,在固定集群资源条件下,应用能保持的数据接收速率,会有巨大的影响。例如,在WordCount例子中,对于一个特定的数据接收速率,应用业务能够保证每2秒打印一次单词计数,而不是每500ms。所以batch interval须要被设置得,让预期的数据接收速率能够在生产环境中保持住。
 
为你的应用计算正确的batch大小的比较好的方法,是在一个很保守的batch interval,好比5~10s,以很慢的数据接收速率进行测试。要检查应用是否跟得上这个数据速率,能够检查每一个batch的处理时间的延迟,若是处理时间与batch interval基本吻合,那么应用就是稳定的。不然,若是batch调度的延迟持续增加,那么就意味应用没法跟得上这个速率,也就是不稳定的。所以你要想有一个稳定的配置,能够尝试提高数据处理的速度,或者增长batch interval。记住,因为临时性的数据增加致使的暂时的延迟增加,能够合理的,只要延迟状况能够在短期内恢复便可。
 
 
 
内存调优(一)
优化Spark应用的内存使用和GC行为,在Spark Core的调优中,已经讲过了。这里讲一下与Spark Streaming应用相关的调优参数。
 
Spark Streaming应用须要的集群内存资源,是由使用的transformation操做类型决定的。举例来讲, 若是想要使用一个窗口长度为10分钟的window操做,那么集群就必须有足够的内存来保存10分钟内的数据。若是想要使用updateStateByKey来维护许多key的state,那么你的内存资源就必须足够大。反过来讲,若是想要作一个简单的map-filter-store操做,那么须要使用的内存就不多。
 
一般来讲,经过Receiver接收到的数据,会使用StorageLevel.MEMORY_AND_DISK_SER_2持久化级别来进行存储,所以没法保存在内存中的数据会溢写到磁盘上。 而溢写到磁盘上,是会下降应用的性能的。所以,一般是建议为应用提供它须要的足够的内存资源。建议在一个小规模的场景下测试内存的使用量,并进行评估。
 
 
内存调优(二)
内存调优的另一个方面是垃圾回收。对于流式应用来讲,若是要得到低延迟,确定不想要有由于JVM垃圾回收致使的长时间延迟。有不少参数能够帮助下降内存使用和GC开销:
 
一、 DStream的持久化:正如在“数据序列化调优”一节中提到的,输入数据和某些操做生产的中间RDD,默认持久化时都会序列化为字节。与非序列化的方式相比,这会下降内存和GC开销。使用Kryo序列化机制能够进一步减小内存使用和GC开销。 进一步下降内存使用率,能够对数据进行压缩,由spark.rdd.compress参数控制(默认false)。
 
二、 清理旧数据:默认状况下,全部输入数据和经过DStream transformation操做生成的持久化RDD,会自动被清理。Spark Streaming会决定什么时候清理这些数据,取决于transformation操做类型。例如, 你在使用窗口长度为10分钟内的window操做,Spark会保持10分钟之内的数据,时间过了之后就会清理旧数据。可是在某些特殊场景下,好比Spark SQL和Spark Streaming整合使用时,在异步开启的线程中,使用Spark SQL针对batch RDD进行执行查询。那么就须要让Spark保存更长时间的数据,直到Spark SQL查询结束。可使用 streamingContext.remember()方法来实现。
 
三、 CMS垃圾回收器:使用并行的mark-sweep垃圾回收机制,被推荐使用,用来保持GC低开销。虽然并行的GC会下降吞吐量,可是仍是建议使用它,来减小batch的处理时间(下降处理过程当中的gc开销)。若是要使用,那么要在driver端和executor端都开启。 在spark-submit中使用--driver-java-options设置;使用spark.executor.extraJavaOptions参数设置。-XX:+UseConcMarkSweepGC。
相关文章
相关标签/搜索