本文基于Spark Streaming Programming Guide原文翻译, 加上一些本身的理解和小实验的结果。
php
Spark Streaming是基于Core Spark API的可扩展,高吞吐量,并具备容错能力的用于处理实时数据流的一个组件。Spark Streaming能够接收各类数据源传递来的数据,好比Kafka, Flume, Kinesis或者TCP等,对接收到的数据还可使用一些用高阶函数(好比map, reduce, join
及window
)进行封装的复杂算法作进一步的处理。最后,处理好的数据能够写入到文件系统,数据库,或者直接用于实时展现。除此以外,还能够在数据流上应用一些机器学习或者图计算等算法。
html
上图展现了Spark Streaming的总体数据流转状况。在Spark Streaming中的处理过程能够参考下图,Spark Streaming接收实时数据,而后把这些数据分割成一个个batch,而后经过Spark Engine分别处理每个batch并输出。
java
Spark Streaming中一个最重要的概念是DStream,即离散化数据流(discretized stream),DStream由一系列连续的数据集组成。DStream的建立有两种办法,一种是从数据源接收数据生成初始DStream,另外一种是由DStream A经过转换生成DStream B。一个DStream实质上是由一系列的RDDs组成。
本文介绍了如何基于DStream
写出Spark Streaming程序。Spark Streaming提供了Scala, Java以及Python接口,在官方文档中对这三种语言都有示例程序的实现,在这里只分析Scala写的程序。python
在深刻分析Spark Streaming的特性和原理以前,以写一个简单的Spark Streaming程序并运行起来为入口先了解一些相关的基础知识。这个示例程序从TCP socket中接收数据,进行Word Count操做。git
首先须要导入Spark Streaming相关的类,其中StreamingContext是全部Streaming程序的主要入口。接下来的代码中建立一个local StreamingContext
,batch时间为1秒,execution线程数为2。github
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// 建立一个local StreamingContext batch时间为1秒,execution线程数为2
// master的线程数数最少为2,后面会详细解释
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, econds(1))
使用上面这个ssc
对象,就能够建立一个lines
变量用来表示从TCP接收的数据流了,指定机器名为localhost
端口号为9999
web
// 建立一个链接到hostname:port的DStream, 下面代码中使用的是localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
lines
中的每一条记录都是TCP中的一行文本信息。接下来,使用空格将每一行语句进行分割。算法
// 将每一行分割成单词
val words = lines.flatMap(_.split(" "))
上面使用的flatMap
操做是一个一对多的DStream
操做,在这里表示的是每输入一行记录,会根据空格生成多个单词,这些单词造成一个新的DStream words
。接下来统计单词个数。sql
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// 统计每一个batch中的不一样单词个数
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// 打印出其中前10个单词出现的次数
wordCounts.print()
上面代码中,将每个单词使用map
方法映射成(word, 1)
的形式,即paris变量。而后调用reduceByKey
方法,将相同单词出现的次数进行叠加,最终打印出统计的结果。数据库
写完上面的代码,Spark Streaming程序尚未运行起来,须要写入如下两行代码使Spark Streaming程序可以真正的开始执行。
ssc.start() // 开始计算
ssc.awaitTermination() // 等待计算结束
(1)运行Netcat
使用如下命令启动一个Netcat
nc -lk 9999
接下来就能够在命令行中输入任意语句了。
(2)运行Spark Streaming程序
./bin/run-example streaming.NetworkWordCount localhost 9999
程序运行起来后Netcat中输入的任何语句,都会被统计每一个单词出现的次数,例如
这一部分详细介绍Spark Streaming中的基本概念。
Spark Streaming相关jar包的依赖也可使用Maven来管理,写一个Spark Streaming程序的时候,须要将下面的内容写入到Maven项目中
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.0</version>
</dependency>
对于从Kafka,Flume,Kinesis这些数据源接收数据的状况,Spark Streaming core API中不提供这些类和接口,须要添加下面这些依赖。
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka-0-8_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
Spark Streaming程序的主要入口是一个StreamingContext
对象,在程序的开始,须要初始化该对象,代码以下
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
其中的参数appName
是当前应用的名称,能够在Cluster UI上进行显示。master
是Spark的运行模式,能够参考 Spark, Mesos or YARN cluster URL,或者设置成local[*]
的形式在本地模式下运行。在生产环境中运行Streaming应用时,通常不会将master参数写死在代码中,而是在使用spark-submit
命令提交时动态传入--master
参数,具体能够参考 launch the application with spark-submit 。
至于batch时间间隔的设置,须要综合考虑程序的性能要求以及集群可提供的资源状况。
也能够基于SparkContext
对象,生成一个StreamingContext
对象,使用以下代码
import org.apache.spark.streaming._
val sc = ... // 已有的SparkContext对象
val ssc = new StreamingContext(sc, Seconds(1))
当context初始化后,还须要作的工做有:
DStreams
transformation
以及输出操做处理输入的DStreams
streamingContext.start()
启动程序,开始接收并处理数据streamingContext.awaitTermination()
等待程序运行终止(包括手动中止,或者遇到Error
后退出应用)streamingContext.stop()
手动中止应用须要注意的点:
context
开始运行后,不能再往其中添加新的计算逻辑context
被中止后,不能restart
StreamingContext
对象处于运行状态StreamingContext
中的stop()
方法一样会终止SparkContext
。若是只须要中止StreamingContext
,将stop()
方法的可选参数设置成false
,避免SparkContext
被终止SparkContext
对象,能够用于构造多个StreamingContext
对象,只要在新的StreamingContext
对象被建立前,旧的StreamingContext
对象被中止便可。 DStream
是Spark Streaming中最基本最重要的一个抽象概念。DStream
由一系列的数据组成,这些数据既能够是从数据源接收到的数据,也能够是从数据源接收到的数据通过transform
操做转换后的数据。从本质上来讲一个DStream
是由一系列连续的RDDs
组成,DStream
中的每个RDD
包含了一个batch的数据。
DStream
上的每个操做,最终都反应到了底层的RDDs
上。好比,在前面那个Word Count代码中将lines
转化成words
的逻辑,lines
上的flatMap
操做就如下图中所示的形式,做用到了每个底层的RDD
上。
这些底层RDDs
上的转换操做会有Spark Engine进行计算。对于开发者来讲,DStream
提供了一个更方便使用的高阶API,从而开发者无需过多的关注每个转换操做的细节。
DStream
上能够执行的操做后续文章中会有进一步的介绍。
(1)基本数据源
在前面Word Count的示例程序中,已经使用到了ssc.socketTextStream(...)
,这个会根据TCP socket中接收到的数据建立一个DStream
。除了sockets以外,StreamingContext API还支持以文件为数据源生成DStream
。
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming程序会监控用户输入的dataDirectory
路径,接收并处理该路径中的全部文件,不过不支持子文件夹中的文件。
须要注意的地方有:
a、全部的文件数据格式必须相同
b、该路径下的文件应该是原子性的移动到该路径,或者重命名到该路径
c、文件进入该路径后不可再发生变化,因此这种数据源不支持数据连续写入的形式
对于简单的text文件,有一个简单的StreamingContext.textFileStream(dataDirectory)
方法来进行处理。而且文件数据源的形式不须要运行一个receiver进程,因此对Execution的核数没有要求。
streamingContext.queueStream(queueOfRDDs)
,能够将一系列的RDDs转化成一个DStream。该queue中的每个RDD会被当作DStream
中的一个batcn,而后以Streaming的形式处理这些数据。(2)高阶数据源
(3)自定义数据源
除了上面两类数据源以外,也能够支持自定义数据源。自定义数据源时,须要实现一个能够从自定义数据源接收数据并发送到Spark中的用户自定义receiver。具体能够参考 Custom Receiver Guide。
(4)数据接收的可靠性
相似于RDDs,transformations
可使输入DStream
中的数据内容根据特定逻辑发生转换。DStreams
上支持不少RDDs
上相同的一些transformations
。
具体含义和使用方法可参考另外一篇博客:Spark Streaming中的操做函数分析
在上面这些transformations
中,有一些须要进行进一步的分析
(1)UpdateStateByKey操做
(2)Transform操做
transform
操做及其相似的一些transformwith
操做,可使DStream
中的元素可以调用任意的RDD-to-RDD的操做。可使DStream
调用一些只有RDD才有而DStream API没有提供的算子。例如,DStream API就不支持一个data DStream中的每个batch数据能够直接和另外的一个数据集作join
操做,可是使用transform
就能够实现这一功能。这个操做能够说进一步丰富了DStream
的操做功能。
再列举一个这个操做的使用场景,将某处计算到的重复信息与实时数据流中的记录进行join,而后进行filter操做,能够当作一种数据清理的方法。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // 一个包含重复信息的RDD
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // 将重复信息与实时数据作join,而后根据指定规则filter,用于数据清洗
...})
这里须要注意的是,transform
传入的方法是被每个batch调用的。这样能够支持在RDD上作一些时变的操做,即RDD,分区数以及广播变量能够在不一样的batch之间发生变化。
(3)Window操做
Spark Streaming提供一类基于窗口的操做,这类操做能够在一个滑动窗口中的数据集上进行一些transformations
操做。下图展现了窗口操做的示例
上图中,窗口在一个DStream
源上滑动,DStream
源中暴露在该窗口中的RDDs
可让这个窗口进行相关的一些操做。在上图中能够看到,该窗口中任一时刻都只能看到3个RDD,而且这个窗口每2秒中往前滑动一次。这里提到的两个参数,正好是任意一个窗口操做都必须指定的。
滑动间隔:指窗口多长时间往前滑动一次,上图中为2。
须要注意的一点是,上面这两个参数,必须是batch时间的整数倍,上图中的batch时间为1。
接下来展现一个简单的窗口操做示例。好比说,在前面那个word count示例程序的基础上,我但愿每隔10秒钟就统计一下当前30秒时间内的每一个单词出现的次数。这一功能的简单描述是,在paris DStream
的当前30秒的数据集上,调用reduceByKey
操做进行统计。为了实现这一功能,可使用窗口操做reduceByKeyAndWindow
。
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
更多的窗口操做能够参考:Spark Streaming中的操做函数分析
DStream
上的输出操做,可使DStream
中的数据发送到外部系统,好比数据库或文件系统中。DStream
只有通过输出操做,其中的数据才能被外部系统使用。而且下面这些输出操做才真正的触发DStream对象上调用的transformations
操做。这一点相似于RDDs上的Actions
算子。
输出操做的使用和功能请参考:Spark Streaming中的操做函数分析
下面主要进一步分析foreachRDD
操做往外部数据库写入数据的一些注意事项。
dstream.foreachRDD
是DStream输出操做中最经常使用也最重要的一个操做。关于这个操做如何正确高效的使用,下面会列举出一些使用方法和案例,能够帮助读者在使用过程当中避免踩到一些坑。
一般状况下,若是想把数据写入到某个外部系统中时,须要为之建立一个链接对象(好比提供一个TCP链接工具用于链接远程服务器),使用这个链接工具才能将数据发送到远程系统。在Spark Streaming中,开发者极可能会在Driver端建立这个对象,而后又去Worker
端使用这个对象处理记录。好比下面这个例子
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // 在driver端执行
rdd.foreach { record =>
connection.send(record) // 在wroker端执行
}}
上面这个使用方法实际上是错误的,当在driver端建立这个链接对象后,须要将这个链接对象序列化并发送到wroker端。一般状况下,链接对象都是不可传输的,即wroker端没法获取该链接对象,固然也就没法将记录经过这个链接对象发送出去了。这种状况下,应用系统的报错提示多是序列化错误(链接对象没法序列化),或者初始化错误(链接对象须要在wroker端完成初始化),等等。
正确的作法是在worker端建立这个链接对象。
可是,即便是在worker建立这个对象,又可能会犯如下错误。
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}}
上面代码会为每一条记录建立一个链接对象,致使链接对象太多。 链接对象的建立个数会受到时间和系统资源状况的限制,所以为每一条记录都建立一个链接对象会致使系统出现没必要要的高负载,进一步致使系统吞吐量下降。
一个好的办法是使用rdd.foreachPartition
操做,而后为RDD的每个partition
,使一个partition
中的记录使用同一个链接对象。以下面代码所示
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}}
最后,能够经过使用链接对象池进一步对上面的代码进行优化。使用链接对象池能够进一步提升链接对象的使用效率,使得多个RDDs/batches
之间能够重复使用链接对象。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// 链接对象池是静态的,而且创建对象只有在真正使用时才被建立
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // 使用完以后,将链接对象归还到池中以便下一次使用
}}
须要注意的是,链接对象池中的对象最好设置成懒生成模式,即在真正使用时才去建立链接对象,而且给链接对象设置一个生命周期,必定时间不使用则注销该链接对象。
总结一下关键点:
DStreams
的transformations
操做是由输出操做触发的,相似于RDDs
中的actions
操做。上面列举出某些DStream
的输出操做中能够将其中的元素转化成RDD
,进而能够调用RDD提供的一些API操做,这时若是对RDD
调用actions
操做会当即强制对接收到的数据进行处理。所以,若是用户应用程序中DStream不须要任何的输出操做,或者仅仅对DStream
使用一些相似于dstream.foreachRDD
操做可是在这个操做中不调用任何的RDD action
操做时,程序是不会进行任何实际运算的。系统只会简单的接收数据,任何丢弃数据。 Spark Streaming的累加器和广播变量没法从checkpoint
恢复。若是在应用中既使用到checkpoint
又使用了累加器和广播变量的话,最好对累加器和广播变量作懒实例化操做,这样才可使累加器和广播变量在driver失败重启时可以从新实例化。参考下面这段代码
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}}
object DroppedWordsCounter {
@volatile private var instance: Accumulator[Long] = null
def getInstance(sc: SparkContext): Accumulator[Long] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.accumulator(0L, "WordsInBlacklistCounter")
}
}
}
instance
}}
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter += count
false
} else {
true
}
}.collect()
val output = "Counts at time " + time + " " + counts})
查看完整代码请移步 source code
在streaming数据上也能够很方便的使用到DataFrames
和SQL操做。为了支持这种操做,须要用StreamingContext对象使用的SparkContext
对象初始化一个SQLContext
对象出来,SQLContext
对象设置成一个懒初始化的单例对象。下面代码对前面的Word Count进行一些修改,经过使用DataFrames
和SQL
来实现Word Count的功能。每个RDD都被转化成一个DataFrame
对象,而后注册成一个临时表,最后就能够在这个临时表上进行SQL查询了。
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// 获取单例SQLContext对象
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
// 将RDD[String]转化成DataFrame
val wordsDataFrame = rdd.toDF("word")
// 注册表
wordsDataFrame.registerTempTable("words")
// 在该临时表上执行sql语句操做
val wordCountsDataFrame =
sqlContext.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()}
查看完整代码请移步 source code.
也能够在另外一线程获取到的Streaming
数据上进行SQL操做(这里涉及到异步运行StreamingContext
)。StreamingContext
对象没法感知到异步SQL查询的存在,所以有StreamingContext
对象有可能在SQL查询完成以前把历史数据删除掉。为了保证StreamingContext
不删除须要用到的历史数据,须要告诉StreamingContext
保留必定量的历史数据。例如,若是你想在某一个batch的数据上执行SQL查询操做,可是你这个SQL须要执行5分钟的时间,那么,须要执行streamingContext.remember(Minutes(5))
语句告诉StreamingContext
将历史数据保留5分钟。
有关DataFrames的更多介绍,参考另外一篇博客:Spark-SQL之DataFrame操做大全
相似于RDDs
,DStreams
也容许开发者将stream
中的数据持久化到内存中。在DStream
对象上使用persist()
方法会将DStream
对象中的每个RDD
自动持久化到内存中。这个功能在某个DStream的数据须要进行屡次计算时特别有用。对于窗口操做好比reduceByWindow
,以及涉及到状态的操做好比updateStateByKey
,默认会对DStream
对象执行持久化。所以,程序在运行时会自动将窗口操做和涉及到状态的这些操做生成的DStream对象持久化到内存中,不须要开发者显示的执行persist()
操做。
对那些经过网络接收到的streams
数据(好比Kafka, Flume, Socket等),默认的持久化等级是将数据持久化到两个节点上,以保证其容错能力。
注意,不一样于RDDs
,默认状况下DStream
的持久化等级是将数据序列化保存在内存中。这一特性会在后面的性能调优中进一步分析。有关持久化级别的介绍,能够参考rdd-persistence
当Streaming
应用运行起来时,基本上须要7 * 24的处于运行状态,因此须要有必定的容错能力。检查点的设置就是可以支持Streaming
应用程序快速的从失败状态进行恢复的。检查点保存的数据主要有两种:
1 . 元数据(Metadata
)检查点:保存Streaming
应用程序的定义信息。主要用于恢复运行Streaming
应用程序的driver节点上的应用。元数据包括:
a、配置信息:建立Streaming应用程序的配置信息
b、DStream
操做:在DStream
上进行的一系列操做方法
c、未处理的batch:记录进入等待队列可是还未处理完成的批次
2 . 数据(Data)检查点:将计算获得的RDD保存起来。在一些跨批次计算并保存状态的操做时,必须设置检查点。由于在这些操做中基于其余批次数据计算获得的RDDs,随着时间的推移,计算链路会愈来愈长,若是发生错误重算的代价会特别高。
元数据检查点信息主要用于恢复driver端的失败,数据检查点主要用于计算的恢复。
(1)何时须要使用检查点
当应用程序出现如下两种状况时,须要配置检查点。
- 使用到状态相关的操做算子-好比updateStateByKey
或者reduceByKeyAndWindow
等,这种状况下必须为应用程序设置检查点,用于按期的对RDD进行检查点设置。
- Driver端应用程序恢复-当应用程序从失败状态恢复时,须要从检查点中读取相关元数据信息。
(2)检查点设置
通常是在具备容错能力,高可靠的文件系统上(好比HDFS, S3等)设置一个检查点路径,用于保存检查点数据。设置检查点能够在应用程序中使用streamingContext.checkpoint(checkpointDirectory)
来指定路径。
若是想要应用程序在失败重启时使用到检查点存储的元数据信息,须要应用程序具备如下两个特性,须要使用StreamingContext.getOrCreate
代码在失败时从新建立StreamingContext
对象:
StreamingContext
对象,而后开始执行程序处理DStream。当应用程序失败重启时,能够从设置的检查点路径获取元数据信息,建立一个StreamingContext
对象,并恢复到失败前的状态。
下面用Scala代码实现上面的要求。
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // 建立一个新的StreamingContext对象
val lines = ssc.socketTextStream(...) // 获得DStreams
...
ssc.checkpoint(checkpointDirectory) // 设置checkpoint路径
ssc
}
// 用checkpoint元数据建立StreamingContext对象或根据上面的函数建立新的对象
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// 设置context的其余参数
context. ...
// 启动context
context.start()
context.awaitTermination()
若是checkpointDirectory
路径存在,会使用检查点元数据恢复一个StreamingContext
对象。若是路径不存在,或者程序是第一次运行,则会使用functionToCreateContext
来建立一个新的StreamingContext
对象。
RecoverableNetWorkWordCount示例代码演示了一个从检查点恢复应用程序的示例。
须要注意的是,想要用到上面的getOrCreate
功能,须要在应用程序运行时使其支持失败自动重跑的功能。这一功能,在接下来一节中有分析。
另外,在往检查点写入数据这一过程,是会增长系统负荷的。所以,须要合理的设置写入检查点数据的时间间隔。对于小批量时间间隔(好比1秒)的应用,若是每个batch都执行检查点写入操做,会显著的下降系统的吞吐性能。相反的,若是写入检查点数据间隔过久,会致使lineage过长。对那些状态相关的须要对RDD进行检查点写入的算子,检查点写入时间间隔最好设置成batch时间间隔的整数倍。好比对于1秒的batch间隔,设置成10秒。有关检查点时间间隔,可使用dstream.checkpoint(checkpointInterval)
。通常来讲,检查点时间间隔设置成5~10
倍滑动时间间隔是比较合理的。
这一节主要讨论如何将一个Spark Streaming应用程序部署起来。
(1)需求
运行一个Spark Streaming应用程序,须要知足一下要求。
KafkaUtils
的话,须要将spark-streaming-kafka-0.8_2.11
以及其依赖都打入到应用程序JAR包中。spark.streaming.receiver.writeAheadLog.enable=true
来开启这一功能。然而,这一功能的开启会下降数据接收的吞吐量。这是能够经过同时并行运行多个接收进程(这一点在后面的性能调优部分会有介绍)进行来抵消该负面影响。另外,若是已经设置了输入数据流的存储级别为Storagelevel.MEMORY_AND_DISK_SET
,因为接收到的数据已经会在文件系统上保存一份,这样就能够关闭WAL功能了。当使用S3以及其余任何不支持flushng功能的文件系统来write ahead logs时,要记得设置spark.streaming.driver.writeAheadLog.closeFileAfterWrite
以及spark.streaming.receiver.writeAheadLog.closeFileAfterWrite
两个参数。spark.streaming.receiver.maxRate
,对于Direct Kafka模式,设置spark.streaming.kafka.maxRatePerPartition
限制从每一个Kafka的分区读取数据的速率。假如某个Topic有8个分区,spark.streaming.kafka.maxRatePerpartition=100
,那么每一个batch最大接收记录为800
。从Spark-1.5版本开始,引入了一个backpressure
的机制来避免设置这个限制阈值。Spark Streaming会自动算出当前的速率限制,而且动态调整这个阈值。经过将spark.streaming.backpressure.enabled
为true
开启backpressure
功能。(2)升级应用代码
若是运行中的应用程序有更新,须要运行更新后的代码,有如下两种机制。
SparkStreamingContext
对象再也不合适,由于检查点中的信息可能不包含更新的代码逻辑,这样会致使程序出现错误。在这种状况下,要么从新指定一个检查点,要么删除以前的检查点。 在Spark Streaming应用程序运行时,Spark Web UI页面上会多出一个Streaming
的选项卡,在这里面能够显示一些Streaming相关的参数,好比Receiver是否在运行,接收了多少记录,处理了多少记录等。以及Batch相关的信息,包括batch的执行时间,等待时间,完成的batch数,运行中的batch数等等。这里面有两个时间参数须要注意理解一些:
Scheduling Delay - 当前batch从进入队列到开始执行的延迟时间
若是处理时间一直比batch时间跨度要长,或者延迟时间逐渐增加,表示系统已经没法处理当前的数据量了,这时候就须要考虑如何去下降每个batch的处理时间。如何下降batch处理时间,能够参考第四节。
除了监控页面以外,Spark还提供了StreamingListener接口,经过这个接口能够获取到receiver以及batch的处理时间等信息。
为了使Spark Streaming应用可以更好的运行,须要进行一些调优设置,这一节会分析一些性能调优中的参数和设置规则。在性能调优方面,主要须要考虑如下两个问题:
接下来的内容在Spark性能调优中已有介绍,这里再次强调一下在Streaming中须要注意的一些地方。
(1)接收数据进程的并行度
经过网络(好比Kafka, Flume, socket等)接收到的数据,首先须要反序列化而后保存在Spark中。当数据接收成为系统的瓶颈时,就须要考虑如何提升系统接收数据的能力了。每个输入的DStream会在一个Worker节点上运行一个接收数据流的进程。若是建立了多个接收数据流进程,就能够生成多个输入DStream了。好比说,对于Kafka数据源,若是使用的是一个DStream接收来自两个Topic中的数据的话,就能够将这两个Topic拆开,由两个数据接收进程分开接收。当用两个receiver接收到DStream后,能够在应用中将这两个DStream再进行合并。好比下面代码中所示
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
须要注意一个参数spark.streaming.blockInterval
。对于Receiver来讲,接收到的数据在保存到Spark内存中以前,会以block的形式汇聚到一块儿。每一个Batch中block的个数决定了程序运行时处理这些数据的task的个数。每个receiver的每个batck对应的task个数大体为(batch时间间隔 / block时间间隔)。好比说对于一个2m
的batch,若是block时间间隔为200ms
那么,将会有10个task。若是task的数量太少,对数据的处理就不会很高效。在batch时间固定的状况下,若是想增大task个数,那么就须要下降blockInterval
参数了,这个参数默认值为200ms
,官方建议的该参数下限为50ms
,若是低于50ms
可能会引发其余问题。
另外一个提升数据并发处理能力的方法是显式的对接收数据从新分区,inputStream.repartition(<number of partitions>)
。
(2)数据处理的并行度
对于reduceByKey
和reduceByKeyAndWindow
操做来讲,并行task个数由参数spark.default.parallelism
来控制。若是想要提升数据处理的并行度,能够在调用这类方法时,指定并行参数,或者将spark.default.parallelism
参数根据集群实际状况进行调整。
(3)数据序列化
能够经过调整序列化相关的参数,来提升数据序列化性能。在Streaming应用中,有两类数据须要序列化操做。
StorageLevel.MEMORY_AND_DIS_SER_2
的形式保存在Executor的内存中。也就是说,为了下降GC开销,这些数据会被序列化成bytes形式,而且还考虑到executor失败的容错。这些数据首先会保存在内存中,当内存不足时会spill到磁盘上。使用这种方式的一个明显问题是,Spark接收到数据后,首先须要反序列化这些数据,而后再按照Spark的方式对这些数据从新序列化。Streaming操做中持久化的RDD:Streaming计算产生的RDD可能也会持久化到内存中。好比窗口操做函数会将数据缓存起来以便后续屡次使用。而且Streaming应用中,这些数据的存储级别是StorageLevel.MEMORY_ONLY_SET
(Spark Core的默认方式是StorageLevel.MEMORY_ONLY
)。Streaming对这些数据多了一个序列化操做,这主要也是为了下降GC开销。
在上面这两种状况中,可使用Kyro
方式对数据进行序列化,同时下降CPU和内存的开销。有关序列化能够进一步参考Spark调优。对于Kyro
方式的参数设置,请参考Spark Kyro参数设置。
通常状况下,若是须要缓存的数据量不大,能够直接将数据以非序列化的形式进行存储,这样不会明显的带来GC的开销。好比说,batch时间只有若干秒,而且没有使用到窗口函数操做,那么能够在持久化时显示的指定存储级别,避免持久化数据时对数据的序列化操做。
(4)提升task启动性能
若是每秒启动的task个数太多(通常指50个以上),那么对task的频繁启动也是一个不容忽视的损耗。遇到这种状况时,须要考虑一下Execution模式了。通常来讲,在Spark的Standalone模式以及coarse-grained Mesos模式下task的启动时间会比fine-grained Mesos模式要低。
为了使一个Spark Streaming应用在集群上稳定运行,须要保证应用在接收到数据时可以及时处理。若是处理速率不匹配,随着时间的积累,等待处理的数据将会愈来愈多,最终致使应用没法正常运行。最好的状况是batch的处理时间小于batch的间隔时间。因此,正确合理的设置Batch时间间隔是很重要的。
有关Spark内存的使用以及Spark应用的GC性能调节的更多细节在Spark调优中已经有了更加详细的描述。这里简单分析一些Spark Streaming应用程序会用到的参数。
一个Spark Streaming应用程序须要使用集群多少内存资源,很大程度上是由该应用中的具体逻辑来决定的,即须要看应用程序中的transformations
的类型。好比代码中使用到长达10分钟的窗口操做时,就须要使用到可以把10分钟的数据都保存到内存中的内存量。若是使用updateStateByKey
这种操做,而数据中不一样key
特别多,也会使用更多的内存。若是应用的逻辑比较简单,仅仅是接收-过滤-存储等一系列操做时,消耗的内存量会明显减小。
默认状况下,receivers接收到的数据会以StorageLevel.MEMORY_AND_DISK_SER_2
级别进程存储,当内存中容纳不下时会spill到磁盘上,可是这样会下降应用的处理性能,因此为了应用可以更高效的运行,最好仍是多分配一些内存以供使用。通常能够经过在少许数据的状况下,评估一下数据使用的内存量,继而计算出应用正式部署时须要分配的总内存量大小。
内存调节的另外一方面是垃圾回收的设置。对一个低延迟的应用系统来讲,JVM在垃圾回收时致使应用长时间暂停运行是一个很讨厌的场景。
下面有一些可用于调节内存使用量和GC性能的方面:
Kryo
方式进行序列化可以进一步下降序列化后数据大小和内存的使用。想要进一步下降内存的使用量,能够在数据上再增长一个压缩功能,经过参数spark.rdd.compress
来设置。transformations
持久化的数据都会自动进行清理。Spark Streaming根据transformations
的不一样来决定哪些数据须要被清理掉。例如,当使用10分钟的窗口函数时,Spark Streaming会保存最少10分钟的数据。想要数据保存更长时间,能够设置streamingContext.remenber
参数。spark-submit
命令的--driver-java-options
参数来指定,executor上经过设置spark.executor.extraJavaOptions
参数来指定。OFF_HEAP
存储级别来持久化RDDs,能够参考RDD Persistence本节主要讨论Spark Streaming应用程序失败后的处理办法。
(1)Files输入
(2)基于Receiverd 数据源
(3)Kafka Direct输入方式