Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。数据能够从许多来源(如Kafka,Flume,Kinesis或TCP套接字)中获取,而且可使用以高级函数表示的复杂算法进行处理map
,例如reduce
,join
和window
。最后,处理后的数据能够推送到文件系统,数据库和实时仪表板。实际上,您能够在数据流上应用Spark的 机器学习和 图形处理算法。html
在内部,它的工做原理以下。Spark Streaming接收实时输入数据流并将数据分红批处理,而后由Spark引擎处理,以批量生成最终结果流。java
Spark Streaming提供称为离散流或DStream的高级抽象,表示连续的数据流。DStream能够历来自Kafka,Flume和Kinesis等源的输入数据流建立,也能够经过在其余DStream上应用高级操做来建立。在内部,DStream表示为一系列 RDD。node
本指南向您展现如何使用DStreams开始编写Spark Streaming程序。您可使用Scala,Java或Python编写Spark Streaming程序(在Spark 1.2中引入),全部这些都在本指南中介绍。您能够在本指南中找到标签,让您在不一样语言的代码段之间进行选择。python
注意:有一些API在Python中不一样或不可用。在本指南中,您将找到标记Python API,突出显示这些差别。git
在咱们详细介绍如何编写本身的Spark Streaming程序以前,让咱们快速了解一下简单的Spark Streaming程序是什么样的。假设咱们想要计算从TCP套接字上侦听的数据服务器接收的文本数据中的字数。您须要作的就是以下。github
首先,咱们将Spark Streaming类的名称和StreamingContext中的一些隐式转换导入到咱们的环境中,以便将有用的方法添加到咱们须要的其余类(如DStream)。StreamingContext是全部流功能的主要入口点。咱们使用两个执行线程建立一个本地StreamingContext,批处理间隔为1秒。web
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent a starvation scenario. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1))
使用此上下文,咱们能够建立一个DStream来表示来自TCP源的流数据,指定为主机名(例如localhost
)和端口(例如9999
)。算法
// Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999)
此lines
DStream表示将从数据服务器接收的数据流。此DStream中的每条记录都是一行文本。接下来,咱们但愿将空格字符分割为单词。sql
// Split each line into words val words = lines.flatMap(_.split(" "))
flatMap
是一对多DStream操做,它经过从源DStream中的每一个记录生成多个新记录来建立新的DStream。在这种状况下,每行将被分红多个单词,单词流表示为 words
DStream。接下来,咱们要计算这些单词。shell
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print()
所述words
DSTREAM被进一步映射(一到一个变换)到一个DSTREAM (word, 1)
对,而后将其还原获得的单词的频率数据中的每一批。最后,wordCounts.print()
将打印每秒生成的一些计数。
请注意,执行这些行时,Spark Streaming仅设置启动时将执行的计算,而且还没有启动实际处理。要在设置完全部转换后开始处理,咱们最终调用
ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate
完整的代码能够在Spark Streaming示例 NetworkWordCount中找到。
若是您已经下载并构建了 Spark,则能够按以下方式运行此示例。您首先须要使用Netcat(在大多数类Unix系统中找到的小实用程序)做为数据服务器运行
$ nc -lk 9999
而后,在不一样的终端中,您可使用启动示例
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
而后,在运行netcat服务器的终端中键入的任何行将被计数并每秒在屏幕上打印。它看起来像下面这样。
# TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world ... |
# TERMINAL 2: RUNNING NetworkWordCount $ ./bin/run-example streaming.NetworkWordCount localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) ... |
接下来,咱们将超越简单的示例,详细介绍Spark Streaming的基础知识。
与Spark相似,Spark Streaming可经过Maven Central得到。要编写本身的Spark Streaming程序,必须将如下依赖项添加到SBT或Maven项目中。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.4.0</version> </dependency>
要从Spark Streaming核心API中不存在的Kafka,Flume和Kinesis等源中提取数据,您必须将相应的工件添加spark-streaming-xyz_2.11
到依赖项中。例如,一些常见的以下。
资源 | 神器 |
---|---|
卡夫卡 | 火花流 - 卡夫卡0-10_2.11 |
水槽 | 火花流,flume_2.11 |
室壁运动 | spark-streaming-kinesis-asl_2.11 [亚马逊软件许可证] |
有关最新列表,请参阅 Maven存储库 以获取受支持的源和工件的完整列表。
要初始化Spark Streaming程序,必须建立一个StreamingContext对象,它是全部Spark Streaming功能的主要入口点。
甲的StreamingContext对象能够从被建立SparkConf对象。
import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1))
该appName
参数是应用程序在群集UI上显示的名称。 master
是Spark,Mesos或YARN群集URL,或在本地模式下运行的特殊“local [*]”字符串。实际上,当在群集上运行时,您不但愿master
在程序中进行硬编码,而是启动应用程序spark-submit
并在那里接收它。可是,对于本地测试和单元测试,您能够传递“local [*]”以在进程中运行Spark Streaming(检测本地系统中的核心数)。请注意,这会在内部建立一个SparkContext(全部Spark功能的起点),能够访问它ssc.sparkContext
。
必须根据应用程序的延迟要求和可用的群集资源设置批处理间隔。有关 更多详细信息,请参见性能调整部分。
甲StreamingContext
目的还能够从现有的建立的SparkContext
对象。
import org.apache.spark.streaming._ val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1))
定义上下文后,您必须执行如下操做。
streamingContext.start()
。streamingContext.awaitTermination()
。streamingContext.stop()
。要记住的要点:
stop()
called 的可选参数设置stopSparkContext
为false。Discretized Stream或DStream是Spark Streaming提供的基本抽象。它表示连续的数据流,能够是从源接收的输入数据流,也能够是经过转换输入流生成的已处理数据流。在内部,DStream由一系列连续的RDD表示,这是Spark对不可变分布式数据集的抽象(有关更多详细信息,请参阅Spark编程指南)。DStream中的每一个RDD都包含来自特定时间间隔的数据,以下图所示。
应用于DStream的任何操做都转换为底层RDD上的操做。例如,在先前将行流转换为字的示例中,flatMap
操做应用于lines
DStream中的每一个RDD 以生成DStream的 words
RDD。以下图所示。
这些底层RDD转换由Spark引擎计算。DStream操做隐藏了大部分细节,并为开发人员提供了更高级别的API以方便使用。这些操做将在后面的章节中详细讨论。
输入DStream是表示从流源接收的输入数据流的DStream。在快速示例中,lines
输入DStream是表示从netcat服务器接收的数据流。每一个输入DStream(文件流除外,本节稍后讨论)都与Receiver (Scala doc, Java doc)对象相关联,该对象从源接收数据并将其存储在Spark的内存中进行处理。
Spark Streaming提供两类内置流媒体源。
咱们将在本节后面讨论每一个类别中的一些来源。
请注意,若是要在流应用程序中并行接收多个数据流,能够建立多个输入DStream(在“ 性能调整”部分中进一步讨论)。这将建立多个接收器,这些接收器将同时接收多个数据流。但请注意,Spark worker / executor是一个长期运行的任务,所以它占用了分配给Spark Streaming应用程序的其中一个核心。所以,重要的是要记住,Spark Streaming应用程序须要分配足够的内核(或线程,若是在本地运行)来处理接收的数据,以及运行接收器。
要记住的要点
在本地运行Spark Streaming程序时,请勿使用“local”或“local [1]”做为主URL。这两种方法都意味着只有一个线程将用于本地运行任务。若是您正在使用基于接收器的输入DStream(例如套接字,Kafka,Flume等),则单线程将用于运行接收器,不会留下任何线程来处理接收到的数据。所以,在本地运行时,始终使用“local [ n ]”做为主URL,其中n >要运行的接收器数量(有关如何设置主服务器的信息,请参阅Spark属性)。
将逻辑扩展到在集群上运行,分配给Spark Streaming应用程序的核心数必须大于接收器数。不然系统将接收数据,但没法处理数据。
咱们已经看过快速示例中的内容ssc.socketTextStream(...)
,该示例 根据经过TCP套接字链接接收的文本数据建立DStream。除了套接字以外,StreamingContext API还提供了从文件建立DStream做为输入源的方法。
对于从与HDFS API兼容的任何文件系统(即HDFS,S3,NFS等)上的文件读取数据,能够建立DStream做为via StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]
。
文件流不须要运行接收器,所以不须要分配任何内核来接收文件数据。
对于简单的文本文件,最简单的方法是StreamingContext.textFileStream(dataDirectory)
。
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
对于文本文件
streamingContext.textFileStream(dataDirectory)
如何监控目录
Spark Streaming将监视目录dataDirectory
并处理在该目录中建立的任何文件。
"hdfs://namenode:8040/logs/"
。直接在这种路径下的全部文件将在发现时进行处理。"hdfs://namenode:8040/logs/2017/*"
。这里,DStream将包含与模式匹配的目录中的全部文件。那就是:它是目录的模式,而不是目录中的文件。"hdfs://namenode:8040/logs/2016-*"
,重命名整个目录以匹配路径,则会将目录添加到受监视目录列表中。只有修改时间在当前窗口内的目录中的文件才会包含在流中。FileSystem.setTimes()
时间戳是一种在稍后的窗口中拾取文件的方法,即便其内容未更改。使用对象存储做为数据源
“完整”文件系统(如HDFS)会在建立输出流后当即在其文件上设置修改时间。打开文件时,即便在数据彻底写入以前,它也可能包含在DStream
- 以后 - 将忽略同一窗口中文件的更新。即:可能会遗漏更改,并从流中省略数据。
要保证在窗口中选择更改,请将文件写入不受监视的目录,而后在关闭输出流后当即将其重命名为目标目录。若是重命名的文件在其建立窗口期间出如今扫描的目标目录中,则将拾取新数据。
相比之下,Amazon S3和Azure Storage等对象存储一般具备较慢的重命名操做,由于其实是复制了数据。此外,重命名的对象可能将rename()
操做的时间做为其修改时间,所以可能不被视为原始建立时间所暗示的窗口的一部分。
须要对目标对象存储进行仔细测试,以验证存储的时间戳行为是否与Spark Streaming所指望的一致。多是直接写入目标目录是经过所选对象库流式传输数据的适当策略。
有关此主题的更多详细信息,请参阅Hadoop文件系统规范。
可使用经过自定义接收器接收的数据流建立DStream。有关详细信息,请参阅自定义接收器指南。
为了测试带有测试数据的Spark Streaming应用程序,还可使用基于RDD队列建立DStream streamingContext.queueStream(queueOfRDDs)
。推入队列的每一个RDD将被视为DStream中的一批数据,并像流同样处理。
有关从套接字和文件流的详细信息,请参阅在相关函数的API单证 的StreamingContext斯卡拉,JavaStreamingContext 对Java和的StreamingContext为Python。
Python API从Spark 2.4.0开始,在这些来源中,Kafka,Kinesis和Flume在Python API中可用。
此类源须要与外部非Spark库链接,其中一些库具备复杂的依赖性(例如,Kafka和Flume)。所以,为了最大限度地减小与依赖项版本冲突相关的问题,从这些源建立DStream的功能已移至可在必要时显式连接的单独库。
请注意,Spark shell中不提供这些高级源,所以没法在shell中测试基于这些高级源的应用程序。若是您真的想在Spark shell中使用它们,则必须下载相应的Maven工件JAR及其依赖项,并将其添加到类路径中。
其中一些高级资源以下。
Kafka: Spark Streaming 2.4.0与Kafka经纪人版本0.8.2.1或更高版本兼容。有关更多详细信息,请参阅Kafka集成指南。
Flume: Spark Streaming 2.4.0与Flume 1.6.0兼容。有关详细信息,请参阅Flume集成指南。
Kinesis: Spark Streaming 2.4.0与Kinesis Client Library 1.2.1兼容。有关详细信息,请参阅Kinesis集成指南。
Python API Python尚不支持此功能。
输入DStream也能够从自定义数据源建立。您所要作的就是实现一个用户定义的接收器(参见下一节以了解它是什么),它能够从自定义源接收数据并将其推送到Spark。有关详细信息,请参阅自定义接收器指南
根据其可靠性,能够有两种数据源。来源(如Kafka和Flume)容许传输数据获得确认。若是从这些可靠来源接收数据的系统正确地确认接收到的数据,则能够确保不会因任何类型的故障而丢失数据。这致使两种接收器:
“ 自定义接收器指南”中讨论了如何编写可靠接收器的详细信息 。
与RDD相似,转换容许修改来自输入DStream的数据。DStreams支持普通Spark RDD上可用的许多转换。一些常见的以下。
转型 | 含义 |
---|---|
地图(功能) | 经过将源DStream的每一个元素传递给函数func来返回一个新的DStream 。 |
flatMap(func) | 与map相似,但每一个输入项能够映射到0个或更多输出项。 |
过滤器(功能) | 经过仅选择func返回true 的源DStream的记录来返回新的DStream 。 |
从新分区(numPartitions) | 经过建立更多或更少的分区来更改此DStream中的并行度级别。 |
union(otherStream) | 返回一个新的DStream,它包含源DStream和otherDStream中元素的并 集。 |
count() | 经过计算源DStream的每一个RDD中的元素数量,返回单元素RDD的新DStream。 |
减小(功能) | 经过使用函数func(它接受两个参数并返回一个)聚合源DStream的每一个RDD中的元素,返回单元素RDD的新DStream 。该函数应该是关联的和可交换的,以即可以并行计算。 |
countByValue() | 当在类型K的元素的DStream上调用时,返回(K,Long)对的新DStream,其中每一个键的值是其在源DStream的每一个RDD中的频率。 |
reduceByKey(func,[numTasks ]) | 当在(K,V)对的DStream上调用时,返回(K,V)对的新DStream,其中使用给定的reduce函数聚合每一个键的值。注意:默认状况下,这使用Spark的默认并行任务数(本地模式为2,在群集模式下,数量由config属性肯定spark.default.parallelism )进行分组。您能够传递可选numTasks 参数来设置不一样数量的任务。 |
join(otherStream,[ numTasks]) | 当在(K,V)和(K,W)对的两个DStream上调用时,返回(K,(V,W))对的新DStream与每一个键的全部元素对。 |
协同组(otherStream,[numTasks ]) | 当在(K,V)和(K,W)对的DStream上调用时,返回(K,Seq [V],Seq [W])元组的新DStream。 |
变换(功能) | 经过将RDD-to-RDD函数应用于源DStream的每一个RDD来返回新的DStream。这能够用于在DStream上执行任意RDD操做。 |
updateStateByKey(func) | 返回一个新的“状态”DStream,其中经过在键的先前状态和键的新值上应用给定函数来更新每一个键的状态。这可用于维护每一个密钥的任意状态数据。 |
其中一些转换值得更详细地讨论。
该updateStateByKey
操做容许您在使用新信息持续更新时保持任意状态。要使用它,您必须执行两个步骤。
在每一个批处理中,Spark都会对全部现有密钥应用状态更新功能,不管它们是否在批处理中都有新数据。若是更新函数返回,None
则将删除键值对。
让咱们举一个例子来讲明这一点。假设您要维护文本数据流中看到的每一个单词的运行计数。这里,运行计数是状态,它是一个整数。咱们将更新函数定义为:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = ... // add the new values with the previous running count to get the new count Some(newCount) }
这适用于包含单词的DStream(例如,前面示例中pairs
包含DStream (word, 1)
的对象)。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
将为每一个单词调用更新函数,newValues
其序列为1(来自(word, 1)
成对)并runningCount
具备前一个计数。
请注意,使用updateStateByKey
须要配置检查点目录,这将在检查点部分中详细讨论。
该transform
操做(及其变体transformWith
)容许在DStream上应用任意RDD到RDD功能。它可用于应用未在DStream API中公开的任何RDD操做。例如,将数据流中的每一个批次与另外一个数据集链接的功能不会直接在DStream API中公开。可是,您能够轻松地使用它transform
来执行此操做。这使得很是强大的可能性。例如,能够经过将输入数据流与预先计算的垃圾邮件信息(也可使用Spark生成)链接,而后根据它进行过滤来进行实时数据清理。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform { rdd => rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... }
请注意,在每一个批处理间隔中都会调用提供的函数。这容许您进行时变RDD操做,即RDD操做,分区数,广播变量等能够在批次之间进行更改。
Spark Streaming还提供窗口计算,容许您在滑动数据窗口上应用转换。下图说明了此滑动窗口。
如该图所示,每个窗口时间的幻灯片在源DSTREAM,落入窗口内的源RDDS被组合及操做,以产生加窗DSTREAM的RDDS。在这种特定状况下,操做应用于最后3个时间单位的数据,并按2个时间单位滑动。这代表任何窗口操做都须要指定两个参数。
这两个参数必须是源DStream的批处理间隔的倍数(图中的1)。
让咱们举一个例子来讲明窗口操做。好比说,您但愿经过每隔10秒在最后30秒的数据中生成字数来扩展 前面的示例。为此,咱们必须在最后30秒的数据reduceByKey
上对pairs
DStream (word, 1)
对应用操做。这是使用该操做完成的reduceByKeyAndWindow
。
// Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
一些常见的窗口操做以下。全部这些操做都采用上述两个参数 - windowLength和slideInterval。
转型 | 含义 |
---|---|
window(windowLength,slideInterval) | 返回一个新的DStream,它是根据源DStream的窗口批次计算的。 |
countByWindow(windowLength,slideInterval) | 返回流中元素的滑动窗口数。 |
reduceByWindow(func,windowLength,slideInterval) | 返回一个新的单元素流,经过使用func在滑动间隔内聚合流中的元素而建立。该函数应该是关联的和可交换的,以即可以并行正确计算。 |
reduceByKeyAndWindow(func,windowLength,slideInterval,[ numTasks ]) | 当在(K,V)对的DStream上调用时,返回(K,V)对的新DStream,其中使用给定的reduce函数func 在滑动窗口中的批次聚合每一个键的值。注意:默认状况下,这使用Spark的默认并行任务数(本地模式为2,在群集模式下,数量由config属性肯定spark.default.parallelism )进行分组。您能够传递可选 numTasks 参数来设置不一样数量的任务。 |
reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[ numTasks ]) | 上述更有效的版本, |
countByValueAndWindow(windowLength, slideInterval,[numTasks ]) | 当在(K,V)对的DStream上调用时,返回(K,Long)对的新DStream,其中每一个键的值是其在滑动窗口内的频率。一样reduceByKeyAndWindow ,reduce任务的数量可经过可选参数进行配置。 |
最后,值得强调的是,您能够轻松地在Spark Streaming中执行不一样类型的链接。
流链接
Streams能够很容易地与其余流链接。
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2)
这里,在每一个批处理间隔中,生成的RDD stream1
将与生成的RDD链接stream2
。你也能够作leftOuterJoin
,rightOuterJoin
,fullOuterJoin
。此外,在流的窗口上进行链接一般很是有用。这也很容易。
val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2)
流数据集链接
在解释DStream.transform
操做时已经显示了这一点。这是将窗口流与数据集链接的另外一个示例。
val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
实际上,您还能够动态更改要加入的数据集。提供给的函数在transform
每一个批处理间隔进行评估,所以将使用dataset
引用指向的当前数据集。
API文档中提供了完整的DStream转换列表。对于Scala API,请参阅DStream 和PairDStreamFunctions。对于Java API,请参阅JavaDStream 和JavaPairDStream。对于Python API,请参阅DStream。
输出操做容许将DStream的数据推送到外部系统,如数据库或文件系统。因为输出操做实际上容许外部系统使用转换后的数据,所以它们会触发全部DStream转换的实际执行(相似于RDD的操做)。目前,定义了如下输出操做:
输出操做 | 含义 |
---|---|
打印() | 在运行流应用程序的驱动程序节点上打印DStream中每批数据的前十个元素。这对开发和调试颇有用。 Python API这在Python API中称为 pprint()。 |
saveAsTextFiles(前缀,[ 后缀 ]) | 将此DStream的内容保存为文本文件。每一个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS [.suffix]”。 |
saveAsObjectFiles(前缀,[ 后缀 ]) | 将此DStream的内容保存为SequenceFiles 序列化Java对象。每一个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS [.suffix]”。 Python API这在Python API中不可用。 |
saveAsHadoopFiles(前缀,[ 后缀 ]) | 将此DStream的内容保存为Hadoop文件。每一个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS [.suffix]”。 Python API这在Python API中不可用。 |
foreachRDD(func) | 最通用的输出运算符,它将函数func应用于从流生成的每一个RDD。此函数应将每一个RDD中的数据推送到外部系统,例如将RDD保存到文件,或经过网络将其写入数据库。请注意,函数func在运行流应用程序的驱动程序进程中执行,而且一般会在其中执行RDD操做,这将强制计算流式RDD。 |
dstream.foreachRDD
是一个功能强大的原语,容许将数据发送到外部系统。可是,了解如何正确有效地使用此原语很是重要。一些常见的错误要避免以下。
一般将数据写入外部系统须要建立链接对象(例如,与远程服务器的TCP链接)并使用它将数据发送到远程系统。为此,开发人员可能无心中尝试在Spark驱动程序中建立链接对象,而后尝试在Spark工做程序中使用它来保存RDD中的记录。例如(在Scala中),
dstream.foreachRDD { rdd => val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker } }
这是不正确的,由于这须要链接对象被序列化并从驱动程序发送到worker。这种链接对象不多能够跨机器转移。此错误可能表现为序列化错误(链接对象不可序列化),初始化错误(须要在worker处初始化链接对象)等。正确的解决方案是在worker处建立链接对象。
可是,这可能会致使另外一个常见错误 - 为每条记录建立一个新链接。例如,
dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } }
一般,建立链接对象会产生时间和资源开销。所以,为每一个记录建立和销毁链接对象可能会产生没必要要的高开销,而且可能显着下降系统的总吞吐量。更好的解决方案是使用 rdd.foreachPartition
- 建立单个链接对象并使用该链接发送RDD分区中的全部记录。
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }
这会在许多记录上分摊链接建立开销。
最后,经过跨多个RDD /批处理重用链接对象,能够进一步优化这一点。因为多个批次的RDD被推送到外部系统,所以能够维护链接对象的静态池,而不是能够重用的链接对象,从而进一步减小了开销。
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }
请注意,池中的链接应根据须要延迟建立,若是暂时不使用,则会超时。这实现了最有效的数据发送到外部系统。
其余要记住的要点:
DStreams由输出操做延迟执行,就像RDD由RDD操做延迟执行同样。具体而言,DStream输出操做中的RDD操做会强制处理接收到的数据。所以,若是您的应用程序没有任何输出操做,或者输出操做dstream.foreachRDD()
没有任何RDD操做,那么就不会执行任何操做。系统将简单地接收数据并将其丢弃。
默认状况下,输出操做一次执行一次。它们按照应用程序中定义的顺序执行。
您能够轻松地对流数据使用DataFrames和SQL操做。您必须使用StreamingContext正在使用的SparkContext建立SparkSession。此外,必须这样作以即可以在驱动器故障时从新启动。这是经过建立一个延迟实例化的SparkSession单例实例来完成的。这在如下示例中显示。它修改了早期的单词计数示例,以使用DataFrames和SQL生成单词计数。每一个RDD都转换为DataFrame,注册为临时表,而后使用SQL进行查询。
/** DataFrame operations inside your streaming program */ val words: DStream[String] = ... words.foreachRDD { rdd => // Get the singleton instance of SparkSession val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word") // Create a temporary view wordsDataFrame.createOrReplaceTempView("words") // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() }
查看完整的源代码。
您还能够对从不一样线程(即,与正在运行的StreamingContext异步)的流数据上定义的表运行SQL查询。只需确保将StreamingContext设置为记住足够数量的流数据,以便查询能够运行。不然,不知道任何异步SQL查询的StreamingContext将在查询完成以前删除旧的流数据。例如,若是要查询最后一批,但查询可能须要5分钟才能运行,则调用streamingContext.remember(Minutes(5))
(在Scala中,或在其余语言中等效)。
有关DataFrame的详细信息,请参阅DataFrames和SQL指南。
您还能够轻松使用MLlib提供的机器学习算法。首先,有流媒体机器学习算法(例如流媒体线性回归,流媒体KMeans等),它们能够同时学习流数据以及将模型应用于流数据。除此以外,对于更大类的机器学习算法,您能够离线学习学习模型(即便用历史数据),而后在线将数据应用于流数据。有关详细信息,请参阅MLlib指南。
与RDD相似,DStreams还容许开发人员将流的数据保存在内存中。也就是说,persist()
在DStream上使用该方法将自动将该DStream的每一个RDD保留在内存中。若是DStream中的数据将被屡次计算(例如,对相同数据进行屡次操做),这将很是有用。对于像reduceByWindow
和这样的基于窗口的操做和 reduceByKeyAndWindow
基于状态的操做updateStateByKey
,这是隐含的。所以,基于窗口的操做生成的DStream会自动保留在内存中,而无需开发人员调用persist()
。
对于经过网络接收数据的输入流(例如,Kafka,Flume,套接字等),默认持久性级别设置为将数据复制到两个节点以实现容错。
请注意,与RDD不一样,DStreams的默认持久性级别使数据在内存中保持序列化。“ 性能调整”部分对此进行了进一步讨论。有关不一样持久性级别的更多信息,请参阅“ Spark编程指南”。
流应用程序必须全天候运行,所以必须可以适应与应用程序逻辑无关的故障(例如,系统故障,JVM崩溃等)。为了实现这一点,Spark Streaming须要将足够的信息检查到容错存储系统,以便它能够从故障中恢复。检查点有两种类型的数据。
总而言之,元数据检查点主要用于从驱动程序故障中恢复,而若是使用状态转换,即便对于基本功能也须要数据或RDD检查点。
必须为具备如下任何要求的应用程序启用检查点:
updateStateByKey
或reduceByKeyAndWindow
使用反函数),则必须提供检查点目录以容许按期RDD检查点。请注意,能够在不启用检查点的状况下运行没有上述有状态转换的简单流应用程序。在这种状况下,驱动程序故障的恢复也将是部分的(某些已接收但未处理的数据可能会丢失)。这一般是能够接受的,而且许多以这种方式运行Spark Streaming应用程序。预计对非Hadoop环境的支持将在将来获得改善。
能够经过在容错,可靠的文件系统(例如,HDFS,S3等)中设置目录来启用检查点,以便保存检查点信息。这是经过使用完成的streamingContext.checkpoint(checkpointDirectory)
。这将容许您使用上述有状态转换。此外,若是要使应用程序从驱动程序故障中恢复,则应重写流应用程序以使其具备如下行为。
使用此行为变得简单StreamingContext.getOrCreate
。其用法以下。
// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start() context.awaitTermination()
若是checkpointDirectory
存在,则将从检查点数据从新建立上下文。若是目录不存在(即,第一次运行),则将functionToCreateContext
调用该函数以建立新上下文并设置DStream。请参阅Scala示例 RecoverableNetworkWordCount。此示例将网络数据的字数附加到文件中。
除了使用以外getOrCreate
还须要确保驱动程序进程在失败时自动重启。这只能经过用于运行应用程序的部署基础结构来完成。这在“ 部署”部分中进一步讨论 。
请注意,RDD的检查点会致使节省可靠存储的成本。这可能致使RDD被检查点的那些批次的处理时间增长。所以,须要仔细设置检查点的间隔。在小批量(例如1秒)下,每批次检查点可能会显着下降操做吞吐量。相反,检查点过于频繁会致使谱系和任务大小增长,这可能会产生不利影响。对于须要RDD检查点的有状态转换,默认时间间隔是批处理间隔的倍数,至少为10秒。它能够经过使用来设置dstream.checkpoint(checkpointInterval)
。一般,DStream的5-10个滑动间隔的检查点间隔是一个很好的设置。
没法从Spark Streaming中的检查点恢复累加器和广播变量。若是启用了检查点并使用 累加器或广播变量 ,则必须为累加器和广播变量建立延迟实例化的单例实例, 以便在驱动程序从新启动失败后从新实例化它们。这在如下示例中显示。
object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance } } object DroppedWordsCounter { @volatile private var instance: LongAccumulator = null def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { instance = sc.longAccumulator("WordsInBlacklistCounter") } } } instance } } wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter.add(count) false } else { true } }.collect().mkString("[", ", ", "]") val output = "Counts at time " + time + " " + counts })
查看完整的源代码。
本节讨论部署Spark Streaming应用程序的步骤。
要运行Spark Streaming应用程序,您须要具有如下条件。
具备集群管理器的集群 - 这是任何Spark应用程序的通常要求,并在部署指南中进行了详细讨论。
打包应用程序JAR - 您必须将流应用程序编译为JAR。若是您正在使用spark-submit
启动应用程序,那么您将不须要在JAR中提供Spark和Spark Streaming。可是,若是您的应用程序使用高级源(例如Kafka,Flume),则必须将它们连接的额外工件及其依赖项打包在用于部署应用程序的JAR中。例如,使用的应用程序KafkaUtils
必须包含spark-streaming-kafka-0-10_2.11
应用程序JAR中的全部传递依赖项。
为执行程序配置足够的内存 - 因为接收的数据必须存储在内存中,所以必须为执行程序配置足够的内存来保存接收的数据。请注意,若是您正在进行10分钟的窗口操做,则系统必须至少将最后10分钟的数据保留在内存中。所以,应用程序的内存要求取决于其中使用的操做。
配置检查点 - 若是流应用程序须要它,则必须将Hadoop API兼容容错存储中的目录(例如HDFS,S3等)配置为检查点目录,并以检查点信息能够写入的方式编写流应用程序用于故障恢复。有关详细信息,请参阅检查点部分。
配置预写日志 - 自Spark 1.2起,咱们引入了预写日志以实现强大的容错保证。若是启用,则从接收器接收的全部数据都将写入配置检查点目录中的预写日志。这能够防止驱动程序恢复时的数据丢失,从而确保零数据丢失(在容错语义部分中详细讨论 )。这能够经过设置来启用配置参数 spark.streaming.receiver.writeAheadLog.enable
来true
。然而,这些更强的语义可能以单个接收器的接收吞吐量为代价。这能够经过并行运行更多接收器来纠正 增长总吞吐量。此外,建议在启用预写日志时禁用Spark中接收数据的复制,由于日志已存储在复制存储系统中。这能够经过将输入流的存储级别设置为来完成StorageLevel.MEMORY_AND_DISK_SER
。使用S3(或任何不支持刷新的文件系统)进行预写日志时,请记得启用 spark.streaming.driver.writeAheadLog.closeFileAfterWrite
和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite
。有关详细信息,请参阅 Spark Streaming配置。请注意,启用I / O加密时,Spark不会加密写入预写日志的数据。若是须要加密预写日志数据,则应将其存储在本机支持加密的文件系统中。
spark.streaming.receiver.maxRate
spark.streaming.kafka.maxRatePerPartition
spark.streaming.backpressure.enabled
true
若是须要使用新的应用程序代码升级正在运行的Spark Streaming应用程序,则有两种可能的机制。
升级的Spark Streaming应用程序启动并与现有应用程序并行运行。一旦新的(接收与旧的数据相同的数据)已经预热并准备好黄金时间,旧的能够被放下。请注意,这能够用于支持将数据发送到两个目标(即早期和升级的应用程序)的数据源。
现有应用程序正常关闭(请参阅 StreamingContext.stop(...)
或JavaStreamingContext.stop(...)
用于正常关闭选项),确保在关闭以前彻底处理已接收的数据。而后能够启动升级的应用程序,该应用程序将从早期应用程序中止的同一点开始处理。请注意,这只能经过支持源端缓冲的输入源(如Kafka和Flume)来完成,由于在前一个应用程序关闭且升级的应用程序还没有启动时须要缓冲数据。而且没法从早期检查点从新启动升级前代码的信息。检查点信息基本上包含序列化的Scala / Java / Python对象,并尝试使用新的修改类反序列化对象可能会致使错误。在这种状况下,要么使用不一样的检查点目录启动升级的应用程序,要么删除之前的检查点目录。
除了Spark的监控功能外,还有一些特定于Spark Streaming的功能。使用StreamingContext时, Spark Web UI会显示一个附加Streaming
选项卡,其中显示有关运行接收器的统计信息(接收器是否处于活动状态,接收的记录数,接收器错误等)和已完成的批处理(批处理时间,排队延迟等)。 )。这可用于监视流应用程序的进度。
Web UI中的如下两个指标尤其重要:
若是批处理时间始终大于批处理间隔和/或排队延迟不断增长,则代表系统没法以最快的速度处理批处理而且落后。在这种状况下,请考虑 减小批处理时间。
还可使用StreamingListener接口监视Spark Streaming程序的进度,该 接口容许您获取接收器状态和处理时间。请注意,这是一个开发人员API,将来可能会对其进行改进(即报告更多信息)。
从群集上的Spark Streaming应用程序中得到最佳性能须要进行一些调整。本节介绍了许多能够调整以提升应用程序性能的参数和配置。在高层次上,您须要考虑两件事:
经过有效使用群集资源减小每批数据的处理时间。
设置正确的批量大小,以即可以像接收到的那样快速处理批量数据(即,数据处理与数据摄取保持同步)。
能够在Spark中进行许多优化,以最大限度地缩短每一个批处理的处理时间。这些已在“ 调整指南”中详细讨论过。本节重点介绍一些最重要的内容。
经过网络接收数据(如Kafka,Flume,socket等)须要将数据反序列化并存储在Spark中。若是数据接收成为系统中的瓶颈,则考虑并行化数据接收。请注意,每一个输入DStream都会建立一个接收单个数据流的接收器(在工做机器上运行)。所以,能够经过建立多个输入DStream并将它们配置为从源接收数据流的不一样分区来实现接收多个数据流。例如,接收两个数据主题的单个Kafka输入DStream能够分红两个Kafka输入流,每一个输入流只接收一个主题。这将运行两个接收器,容许并行接收数据,从而提升总体吞吐量。这些多个DStream能够组合在一块儿以建立单个DStream。而后,能够在统一流上应用在单个输入DStream上应用的转换。这样作以下。
val numStreams = 5 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) } val unifiedStream = streamingContext.union(kafkaStreams) unifiedStream.print()
应考虑的另外一个参数是接收器的块间隔,它由配置参数决定 spark.streaming.blockInterval
。对于大多数接收器,接收的数据在存储在Spark的内存中以前合并为数据块。每一个批次中的块数决定了在相似地图的转换中用于处理接收数据的任务数。每批每一个接收器的任务数量大约是(批处理间隔/块间隔)。例如,200 ms的块间隔将每2秒批次建立10个任务。若是任务数量太少(即,少于每台机器的核心数),那么效率将会很低,由于全部可用的核心都不会用于处理数据。要增长给定批处理间隔的任务数,请减小块间隔。可是,块间隔的建议最小值约为50 ms,低于该值时,任务启动开销可能会成为问题。
使用多个输入流/接收器接收数据的替代方案是显式地从新分区输入数据流(使用inputStream.repartition(<number of partitions>)
)。这会在进一步处理以前将收到的批量数据分布到群集中指定数量的计算机上。
有关直接流的信息,请参阅Spark Streaming + Kafka集成指南
若是在计算的任何阶段中使用的并行任务的数量不够高,则群集资源可能未被充分利用。例如,对于像reduceByKey
和这样的分布式reduce操做reduceByKeyAndWindow
,默认的并行任务数由spark.default.parallelism
配置属性控制。您能够将并行级别做为参数传递(请参阅PairDStreamFunctions
文档),或者设置spark.default.parallelism
配置属性以更改默认值。
经过调整序列化格式能够减小数据序列化的开销。在流式传输的状况下,有两种类型的数据被序列化。
输入数据:默认状况下,经过Receiver接收的输入数据经过StorageLevel.MEMORY_AND_DISK_SER_2存储在执行程序的内存中。也就是说,数据被序列化为字节以减小GC开销,而且复制以容忍执行程序失败。此外,数据首先保存在内存中,而且仅在内存不足以保存流式计算所需的全部输入数据时才溢出到磁盘。这种序列化显然有开销 - 接收器必须反序列化接收的数据并使用Spark的序列化格式从新序列化。
流式传输操做生成的持久RDD:流式计算生成的RDD能够保留在内存中。例如,窗口操做将数据保留在内存中,由于它们将被屡次处理。可是,与StorageLevel.MEMORY_ONLY的Spark Core默认值不一样,默认状况下,经过流量计算生成的持久RDD与StorageLevel.MEMORY_ONLY_SER(即序列化)一块儿保留,以最大限度地减小GC开销。
在这两种状况下,使用Kryo序列化能够减小CPU和内存开销。有关详细信息,请参阅Spark Tuning Guide。对于Kryo,请考虑注册自定义类,并禁用对象引用跟踪(请参阅“ 配置指南”中的Kryo相关配置)。
在须要为流应用程序保留的数据量不大的特定状况下,将数据(两种类型)保存为反序列化对象多是可行的,而不会产生过多的GC开销。例如,若是您使用的是几秒钟的批处理间隔而没有窗口操做,则能够尝试经过相应地显式设置存储级别来禁用持久数据中的序列化。这将减小因为序列化致使的CPU开销,可能在没有太多GC开销的状况下提升性能。
若是每秒启动的任务数量很高(例如,每秒50或更多),则向从属设备发送任务的开销可能很大,而且将难以实现亚秒级延迟。经过如下更改能够减小开销:
这些更改能够将批处理时间减小100毫秒,从而容许亚秒级批量大小可行。
要使群集上运行的Spark Streaming应用程序保持稳定,系统应该可以以接收数据的速度处理数据。换句话说,批处理数据应该在生成时尽快处理。经过监视流式Web UI中的处理时间能够找到是否适用于应用程序 ,其中批处理时间应小于批处理间隔。
根据流式计算的性质,所使用的批处理间隔可能对应用程序在固定的一组集群资源上能够维持的数据速率产生重大影响。例如,让咱们考虑一下早期的WordCountNetwork示例。对于特定数据速率,系统可能可以每2秒(即,2秒的批处理间隔)跟上报告字数,但不是每500毫秒。所以须要设置批处理间隔,以即可以维持生产中的预期数据速率。
肯定应用程序正确批量大小的好方法是使用保守的批处理间隔(例如,5-10秒)和低数据速率进行测试。要验证系统是否可以跟上数据速率,您能够检查每一个已处理批处理所遇到的端到端延迟的值(在Spark驱动程序log4j日志中查找“总延迟”,或使用 StreamingListener 接口)。若是延迟保持与批量大小至关,则系统稳定。不然,若是延迟不断增长,则意味着系统没法跟上,所以不稳定。一旦了解了稳定配置,就能够尝试提升数据速率和/或减少批量大小。注意,只要延迟减少到低值(即,小于批量大小),因为临时数据速率增长而致使的延迟的瞬时增长多是正常的。
“调优指南”中详细讨论了调整 Spark应用程序的内存使用状况和GC行为。强烈建议您阅读。在本节中,咱们将特别在Spark Streaming应用程序的上下文中讨论一些调优参数。
Spark Streaming应用程序所需的集群内存量在很大程度上取决于所使用的转换类型。例如,若是要在最后10分钟的数据上使用窗口操做,那么您的群集应该有足够的内存来在内存中保存10分钟的数据。或者若是你想使用updateStateByKey
大量的键,那么必要的内存将很高。相反,若是你想作一个简单的map-filter-store操做,那么必要的内存就会不多。
一般,因为经过接收器接收的数据与StorageLevel.MEMORY_AND_DISK_SER_2一块儿存储,所以不适合内存的数据将溢出到磁盘。这可能会下降流应用程序的性能,所以建议您根据流应用程序的须要提供足够的内存。最好尝试小规模地查看内存使用状况并进行相应估算。
内存调整的另外一个方面是垃圾收集。对于须要低延迟的流应用程序,不但愿由JVM垃圾收集致使大的暂停。
有一些参数能够帮助您调整内存使用和GC开销:
DStream的持久性级别:如前面数据序列化部分所述,输入数据和RDD默认持久化为序列化字节。与反序列化持久性相比,这减小了内存使用和GC开销。启用Kryo序列化可进一步减小序列化大小和内存使用量。经过压缩(参见Spark配置spark.rdd.compress
)能够实现内存使用的进一步减小,但代价是CPU时间。
清除旧数据:默认状况下,DStream转换生成的全部输入数据和持久RDD都会自动清除。Spark Streaming根据使用的转换决定什么时候清除数据。例如,若是您使用10分钟的窗口操做,那么Spark Streaming将保留最后10分钟的数据,并主动丢弃旧数据。经过设置,能够将数据保留更长的持续时间(例如,交互式查询旧数据)streamingContext.remember
。
CMS垃圾收集器:强烈建议使用并发标记和清除GC,以保持GC相关的暂停始终较低。尽管已知并发GC会下降系统的总体处理吞吐量,但仍建议使用它来实现更一致的批处理时间。确保在驱动程序(使用--driver-java-options
输入spark-submit
)和执行程序(使用Spark配置 spark.executor.extraJavaOptions
)上设置CMS GC 。
其余提示:为了进一步下降GC开销,这里有一些尝试的提示。
OFF_HEAP
存储级别保留RDD 。请参阅Spark编程指南中的更多详细信息。要记住的要点:
DStream与单个接收器相关联。为了得到读取并行性,须要建立多个接收器,即多个DStream。接收器在执行器内运行。它占据一个核心。确保在预订接收器插槽后有足够的核心进行处理,即spark.cores.max
应考虑接收器插槽。接收器以循环方式分配给执行器。
当从流源接收数据时,接收器建立数据块。每隔blockInterval毫秒生成一个新的数据块。在batchInterval期间建立N个数据块,其中N = batchInterval / blockInterval。这些块由当前执行程序的BlockManager分发给其余执行程序的块管理器。以后,在驱动程序上运行的网络输入跟踪器将被告知有关进一步处理的块位置。
在驱动程序上为batchInterval期间建立的块建立RDD。batchInterval期间生成的块是RDD的分区。每一个分区都是spark中的任务。blockInterval == batchinterval意味着建立了一个分区,而且可能在本地处理它。
块中的映射任务在执行器中处理(一个接收块,另外一个块复制块),具备块而无论块间隔,除非非本地调度启动。具备更大的blockinterval意味着更大的块。较高的值会spark.locality.wait
增长在本地节点上处理块的机会。须要在这两个参数之间找到平衡,以确保在本地处理更大的块。
您能够经过调用来定义分区数,而不是依赖于batchInterval和blockInterval inputDstream.repartition(n)
。这会随机从新调整RDD中的数据以建立n个分区。是的,为了更大的并行性。虽然以洗牌为代价。RDD的处理由驾驶员的jobcheduler做为工做安排。在给定的时间点,只有一个做业处于活动状态。所以,若是一个做业正在执行,则其余做业将排队。
若是你有两个dstream,那么将造成两个RDD,而且将建立两个将一个接一个地安排的做业。为了不这种状况,你能够结合两个dstreams。这将确保为dstream的两个RDD造成单个unionRDD。而后,此unionRDD被视为单个做业。可是,RDD的分区不受影响。
若是批处理时间超过批处理间隔,那么显然接收方的内存将开始填满,最终会抛出异常(最有多是BlockNotFoundException)。目前,没有办法暂停接收器。使用SparkConf配置spark.streaming.receiver.maxRate
,能够限制接收器的速率。
在本节中,咱们将讨论Spark Streaming应用程序在发生故障时的行为。
要理解Spark Streaming提供的语义,让咱们记住Spark的RDD的基本容错语义。
Spark对容错文件系统(如HDFS或S3)中的数据进行操做。所以,从容错数据生成的全部RDD也是容错的。可是,Spark Streaming不是这种状况,由于大多数状况下的数据是经过网络接收的(除非 fileStream
使用时)。要为全部生成的RDD实现相同的容错属性,接收的数据将在群集中的工做节点中的多个Spark执行程序之间进行复制(默认复制因子为2)。这致使系统中须要在发生故障时恢复的两种数据:
此外,咱们应该关注两种失败:
有了这些基础知识,让咱们了解Spark Streaming的容错语义。
流系统的语义一般根据系统处理每条记录的次数来捕获。系统能够在全部可能的操做条件下提供三种类型的保证(尽管出现故障等)
在任何流处理系统中,从广义上讲,处理数据有三个步骤。
接收数据:使用接收器或其余方式从数据源接收数据。
转换数据:使用DStream和RDD转换转换接收的数据。
推出数据:最终转换的数据被推送到外部系统,如文件系统,数据库,仪表板等。
若是流应用程序必须实现端到端的一次性保证,那么每一个步骤都必须提供一次性保证。也就是说,每条记录必须只接收一次,转换一次,而后推送到下游系统一次。让咱们理解Spark Streaming上下文中这些步骤的语义。
接收数据:不一样的输入源提供不一样的保证。这将在下一小节中详细讨论。
转换数据:因为RDD提供的保证,全部已接收的数据将只处理一次。即便存在故障,只要能够访问接收的输入数据,最终变换的RDD将始终具备相同的内容。
推出数据:默认状况下输出操做至少确保一次语义,由于它取决于输出操做的类型(幂等或不等)和下游系统的语义(支持或不支持事务)。可是用户能够实现本身的事务机制来实现一次性语义。本节稍后将对此进行更详细的讨论。
不一样的输入源提供不一样的保证,范围从至少一次到刚好一次。阅读更多详情。
若是全部输入数据都已存在于HDFS等容错文件系统中,则Spark Streaming始终能够从任何故障中恢复并处理全部数据。这给出 了一次性语义,这意味着不管失败什么,全部数据都将被处理一次。
对于基于接收器的输入源,容错语义取决于故障情形和接收器类型。正如咱们所讨论的前面,有两种类型的接收器:
根据使用的接收器类型,咱们实现如下语义。若是工做节点发生故障,则可靠接收器不会丢失数据。对于不可靠的接收器,接收但未复制的数据可能会丢失。若是驱动程序节点出现故障,那么除了这些丢失以外,在内存中接收和复制的全部过去数据都将丢失。这将影响有状态转换的结果。
为了不丢失过去收到的数据,Spark 1.2引入了预写日志,将接收到的数据保存到容错存储中。经过启用预写日志和可靠的接收器,数据丢失为零。在语义方面,它提供至少一次保证。
下表总结了失败时的语义:
部署方案 | 工人失败 | 司机失败 |
---|---|---|
Spark 1.1或更早版本,或 Spark 1.2或更高版本,没有预写日志 |
使用不可靠的接收器丢失缓冲数据使用可靠的接收器实现 零数据丢失 至少一次语义 |
使用不可靠的接收器丢失缓冲数据 过去的数据在全部接收器中丢失 未定义的语义 |
Spark 1.2或更高版本,具备预写日志 | 使用可靠的接收器实现零数据丢失 至少一次语义 |
使用可靠的接收器和文件实现零数据丢失 至少一次语义 |
在Spark 1.3中,咱们引入了一个新的Kafka Direct API,它能够确保Spark Streaming只接收一次全部Kafka数据。除此以外,若是您实现一次性输出操做,您能够实现端到端的一次性保证。“ Kafka集成指南”中进一步讨论了这种方法。
输出操做(例如foreachRDD
)具备至少一次语义,即,在工做者失败的状况下,转换后的数据可能被屡次写入外部实体。虽然这对于使用saveAs***Files
操做保存到文件系统是能够接受的 (由于文件将被简单地用相同的数据覆盖),可是可能须要额外的努力来实现精确一次的语义。有两种方法。
幂等更新:屡次尝试始终写入相同的数据。例如,saveAs***Files
始终将相同的数据写入生成的文件。
事务性更新:全部更新都是以事务方式进行的,以便以原子方式完成更新。一种方法是:
foreachRDD
)和RDD的分区索引来建立标识符。该标识符惟一地标识流应用程序中的blob数据。使用标识符以事务方式(即,一次,原子地)使用此blob更新外部系统。也就是说,若是标识符还没有提交,则以原子方式提交分区数据和标识符。不然,若是已经提交,请跳过更新。
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } }