基于Spark 2.0 Preview的材料翻译,原[英]文地址: html
http://spark.apache.org/docs/2.0.0-preview/streaming-programming-guide.htmljava
Streaming应用实战,参考:http://my.oschina.net/u/2306127/blog/635518python
Spark Streaming 是基于Spark 核心API的扩展,使高伸缩性、高带宽、容错的流式数据处理成为可能。数据能够来自于多种源,如Kafka、Flume、Kinesis、或者TCP sockets等,并且可使用map、reduce
、join
和 window等高级接口实现复杂算法的处理。最终,处理的数据能够被推送到数据库、文件系统以及动态布告板。实际上,
你还能够将Spark的机器学习( machine learning) 和图计算 (graph processing )算法用于数据流的处理。git
内部工做流程以下。Spark Streaming接收数据流的动态输入,而后将数据分批,每一批数据经过Spark建立一个结果数据集而后进行处理。github
Spark Streaming提供一个高级别的抽象-离散数据流(DStream),表明一个连续的数据流。DStreams能够从Kafka, Flume, and Kinesis等源中建立,或者在其它的DStream上执行高级操做。在内部,DStream表明一系列的 RDDs。web
本指南将岩石如何经过DStreams开始编写一个Spark Streaming程序。你可使用Scala、Java或者Python。能够经过相应的链接切换去查看相应语言的代码。算法
注意:这在Python里有一些不一样,不多部分API暂时没有,本指南进行了Python API标注。sql
在开始Spark Streaming编程以前咱们先看看一个简单的Spark Streaming程序将长什么样子。咱们从基于TCP socket的数据服务器接收一个文本数据,而后对单词进行计数。看起来像下面这个样子。shell
首先,咱们导入Spark Streaming的类命名空间和一些StreamingContext的转换工具。 StreamingContext 是全部的Spark Streaming功能的主入口点。咱们建立StreamingContext,指定两个执行线程和分批间隔为1秒钟。数据库
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent from a starvation scenario. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1))
使用这个context,咱们能够建立一个DStream,这是来自于TCP数据源 的流数据,咱们经过hostname (e.g. localhost
) 和端口 (e.g. 9999
)来指定这个数据源。
// Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999)
这里line是一个DStream对象,表明从服务器收到的流数据。每个DStream中的记录是一个文本行。下一步,咱们将每一行中以空格分开的单词分离出来。
// Split each line into words val words = lines.flatMap(_.split(" "))
flatMap是“一对多”的DStream操做,经过对源DStream的每个记录产生多个新的记录建立新DStream。这里,每一行将被分解多个单词,而且单词流表明了words DStream。下一步,咱们对这些单词进行计数统计。
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print()
words
DStream而后映射为(word, 1)的键值对的Dstream,而后用于统计单词出现的频度。最后,wordCounts.print()打印出每秒钟建立出的计数值。
注意,上面这些代码行执行的时候,仅仅是设定了计算执行的逻辑,并无真正的处理数据。在全部的设定完成后,为了启动处理,须要调用:
ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate
完整的代码能够Spark Streaming 的例程 NetworkWordCount 中找到。
若是已经下载和构建了Spark,你能够按照下面的方法运行这个例子。首先运行Netcat(一个Unix风格的小工具)做为数据服务器,以下所示:
$ nc -lk 9999
而后,到一个控制台窗口,启动例程:
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
而后,任何在netcat服务器运行控制台键入的行都会被计数而后每隔一秒钟在屏幕上打印出来,以下所示:
# TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world ... |
# TERMINAL 2: RUNNING NetworkWordCount $ ./bin/run-example streaming.NetworkWordCount localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) ... |
下一步,咱们将离开这个简单的例子,详细阐述Spark Streaming的基本概念和功能。
与Spark相似,Spark Streaming也能够经过Maven中心库访问。为了编写你本身的Spark Streaming程序,您将加入下面的依赖到你的SBT或者Maven工程文件。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.0.0-preview</version> </dependency>
为了从Kafka/Flume/Kinesis等非Spark Streaming核心API等数据源注入数据,咱们须要添加对应的spark-streaming-xyz_2.11到依赖中。例如,像下面的这样:
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka-0-8_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
对于最新的列表,参考Maven repository 得到全面的数据源河访问部件的列表。
为了初始化Spark Streaming程序,StreamingContext 对象必须首先建立做为总入口。
StreamingContext 对象能够经过 SparkConf 对象建立,以下所示。
import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1))
这里 appName参数是应用在集群中的名称。 master
是 Spark, Mesos 或 YARN cluster URL, 或者“local[*]” 字符串指示运行在 local 模式下。实践中,当运行一个集群, 您不该该硬编码 master
参数在集群中, 而是经过 launch the application with spark-submit
接收其参数。可是, 对于本地测试和单元测试, 你能够传递“local[*]” 来运行 Spark Streaming 在进程内运行(自动检测本地系统的CPU内核数量)。 注意,这里内部建立了 SparkContext (全部的Spark 功能的入口点) ,能够经过 ssc.sparkContext
进行存取。
分批间隔时间基于应用延迟需求和可用的集群资源进行设定(译注:设定间隔要大于应用数据的最小延迟需求,同时不能设置过小以致于系统没法在给定的周期内处理完毕),参考 Performance Tuning 部分得到更多信息。
StreamingContext
对象也能够从已有的 SparkContext
对象中建立。
import org.apache.spark.streaming._ val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1))
在context建立以后,能够接着开始以下的工做:
streamingContext.start()
。streamingContext.awaitTermination()
.streamingContext.stop()
。
记住:
stopSparkContext的
Stop时设置选项为false。离散数据流(DStream)是Spark Streaming最基本的抽象。它表明了一种连续的数据流,要么从某种数据源提取数据,要么从其余数据流映射转换而来。DStream内部是由一系列连 续的RDD组成的,每一个RDD都是不可变、分布式的数据集(详见Spark编程指南 – Spark Programming Guide)。每一个RDD都包含了特定时间间隔内的一批数据,以下图所示:
任何做用于DStream的算子,其实都会被转化为对其内部RDD的操做。例如,在前面的例子中,咱们将 lines 这个DStream转成words DStream对象,其实做用于lines上的flatMap算子,会施加于lines中的每一个RDD上,并生成新的对应的RDD,而这些新生成的RDD 对象就组成了words这个DStream对象。其过程以下图所示:
底层的RDD转换仍然是由Spark引擎来计算。DStream的算子将这些细节隐藏了起来,并为开发者提供了更为方便的高级API。后续会详细讨论这些高级算子。
输入DStream表明从某种流式数据源流入的数据流。在以前的例子里,lines 对象就是输入DStream,它表明从netcat server收到的数据流。每一个输入DStream(除文件数据流外)都和一个接收器(Receiver – Scala doc, Java doc)相关联,而接收器则是专门从数据源拉取数据到内存中的对象。
Spark Streaming主要提供两种内建的流式数据源:
本节中,咱们将会从每种数据源中挑几个继续深刻讨论。
注意,若是你须要同时从多个数据源拉取数据,那么你就须要建立多个DStream对象(详见后续的性能调优这一小节)。多个DStream对象其实也就同 时建立了多个数据流接收器。可是请注意,Spark的worker/executor 都是长期运行的,所以它们都会各自占用一个分配给Spark Streaming应用的CPU。因此,在运行Spark Streaming应用的时候,须要注意分配足够的CPU core(本地运行时,须要足够的线程)来处理接收到的数据,同时还要足够的CPU core来运行这些接收器。
前面的快速入门例子中,咱们已经看到,使用ssc.socketTextStream(…) 能够从一个TCP链接中接收文本数据。而除了TCP套接字外,StreamingContext API 还支持从文件或者Akka actor中拉取数据。
文件数据流(File Streams): 能够从任何兼容HDFS API(包括:HDFS、S三、NFS等)的文件系统,建立方式以下:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming将监视该dataDirectory目录,并处理该目录下任何新建的文件(目前还不支持嵌套目录)。注意:
另外,文件数据流不是基于接收器的,因此不须要为其单独分配一个CPU core。
Python API fileStream目前暂时不可用,Python目前只支持textFileStream。
对于简单的文本文件,更简单的方式是调用 streamingContext.textFileStream(dataDirectory)。
基于自定义Actor的数据流(Streams based on Custom Actors): DStream能够由Akka actor建立获得,只需调用 streamingContext.actorStream(actorProps, actor-name)。详见自定义接收器(Custom Receiver Guide)。actorStream暂时不支持Python API。
关于套接字、文件以及Akka actor数据流更详细信息,请参考相关文档:StreamingContext for Scala,JavaStreamingContext for Java, and StreamingContext for Python。
Python API自 Spark 2.0.0(译注:1.6.1就已经支持了) 起,Kafka、Kinesis、Flume和MQTT这些数据源将支持Python。
使用这类数据源须要依赖一些额外的代码库,有些依赖还挺复杂的(如:Kafka、Flume)。所以为了减小依赖项版本冲突问题,各个数据源 DStream的相关功能被分割到不一样的代码包中,只有用到的时候才须要连接打包进来。
例如,若是你须要使用Twitter的tweets做为数据源,你 须要如下步骤:
注意,高级数据源在spark-shell中不可用,所以不能用spark-shell来测试基于高级数据源的应用。若是真有须要的话,你须要自行下载相应数据源的Maven工件及其依赖项,并将这些Jar包部署到spark-shell的classpath中。
下面列举了一些高级数据源:
Python API自定义数据源目前还不支持Python。
输入DStream也能够用自定义的方式建立。你须要作的只是实现一个自定义的接收器(receiver),以便从自定义的数据源接收数据,而后将数据推入Spark中。详情请参考自定义接收器指南(Custom Receiver Guide)。
从可靠性角度来划分,大体有两种数据源。其中,像Kafka、Flume这样的数据源,它们支持对所传输的数据进行确认。系统收到这类可靠数据源过来的数据,而后发出确认信息,这样就可以确保任何失败状况下,都不会丢数据。所以咱们能够将接收器也相应地分为两类:
自定义接收器指南(Custom Receiver Guide)中详细讨论了如何写一个可靠接收器。
和RDD相似,DStream也支持从输入DStream通过各类transformation算子映射成新的DStream。DStream支持不少RDD上常见的transformation算子,一些经常使用的见下表:
Transformation算子 | 用途 |
---|---|
map(func) | 返回会一个新的DStream,并将源DStream中每一个元素经过func映射为新的元素 |
flatMap(func) | 和map相似,不过每一个输入元素再也不是映射为一个输出,而是映射为0到多个输出 |
filter(func) | 返回一个新的DStream,并包含源DStream中被func选中(func返回true)的元素 |
repartition(numPartitions) | 更改DStream的并行度(增长或减小分区数) |
union(otherStream) | 返回新的DStream,包含源DStream和otherDStream元素的并集 |
count() | 返回一个包含单元素RDDs的DStream,其中每一个元素是源DStream中各个RDD中的元素个数 |
reduce(func) | 返回一个包含单元素RDDs的DStream,其中每一个元素是经过源RDD中各个RDD的元素经func(func输入两个参数并返回一个同类型结果数据)聚合获得的结果。func必须知足结合律,以便支持并行计算。 |
countByValue() | 若是源DStream包含的元素类型为K,那么该算子返回新的DStream包含元素为(K, Long)键值对,其中K为源DStream各个元素,而Long为该元素出现的次数。 |
reduceByKey(func, [numTasks]) | 若是源DStream 包含的元素为 (K, V) 键值对,则该算子返回一个新的也包含(K, V)键值对的DStream,其中V是由func聚合获得的。注意:默认状况下,该算子使用Spark的默认并发任务数(本地模式为2,集群模式下由 spark.default.parallelism 决定)。你能够经过可选参数numTasks来指定并发任务个数。 |
join(otherStream, [numTasks]) | 若是源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中源DStream和otherDStream中每一个K都对应一个 (K, (V, W))键值对元素。 |
cogroup(otherStream, [numTasks]) | 若是源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中每一个元素类型为包含(K, Seq[V], Seq[W])的tuple。 |
transform(func) | 返回一个新的DStream,其包含的RDD为源RDD通过func操做后获得的结果。利用该算子能够对DStream施加任意的操做。 |
updateStateByKey(func) | 返回一个包含新”状态”的DStream。源DStream中每一个key及其对应的values会做为func的输入,而func能够用于对每一个key的“状态”数据做任意的更新操做。 |
下面咱们会挑几个transformation算子深刻讨论一下。
updateStateByKey 算子支持维护一个任意的状态。要实现这一点,只须要两步:
在每个批次数据到达后,Spark都会调用状态更新函数,来更新全部已有key(无论key是否存在于本批次中)的状态。若是状态更新函数返回None,则对应的键值对会被删除。
举例以下。假设你须要维护一个流式应用,统计数据流中每一个单词的出现次数。这里将各个单词的出现次数这个整型数定义为状态。咱们接下来定义状态更新函数以下:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = ... // add the new values with the previous running count to get the new count Some(newCount) }
该状态更新函数能够做用于一个包括(word, 1) 键值对的DStream上(见本文开头的例子)。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
该状态更新函数会为每一个单词调用一次,且相应的newValues是一个包含不少个”1″的数组(这些1来自于(word,1)键值对),而runningCount包含以前该单词的计数。本例的完整代码请参考 StatefulNetworkWordCount.scala。
注意,调用updateStateByKey前须要配置检查点目录,后续对此有详细的讨论,见检查点(checkpointing)这节。
transform算子(及其变体transformWith)能够支持任意的RDD到RDD的映射操做。也就是说,你能够用tranform算子来包装 任何DStream API所不支持的RDD算子。例如,将DStream每一个批次中的RDD和另外一个Dataset进行关联(join)操做,这个功能DStream API并无直接支持。不过你能够用transform来实现这个功能,可见transform其实为DStream提供了很是强大的功能支持。好比说, 你能够用事先算好的垃圾信息,对DStream进行实时过滤。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform(rdd => { rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... })
注意,这里transform包含的算子,其调用时间间隔和批次间隔是相同的。因此你能够基于时间改变对RDD的操做,如:在不一样批次,调用不一样的RDD算子,设置不一样的RDD分区或者广播变量等。
Spark Streaming一样也提供基于时间窗口的计算,也就是说,你能够对某一个滑动时间窗内的数据施加特定tranformation算子。以下图所示:
如上图所示,每次窗口滑动时,源DStream中落入窗口的RDDs就会被合并成新的windowed DStream。在上图的例子中,这个操做会施加于3个RDD单元,而滑动距离是2个RDD单元。由此能够得出任何窗口相关操做都须要指定一下两个参数:
注意,这两个参数都必须是DStream批次间隔(上图中为1)的整数倍.
下面我们举个例子。假设,你须要扩展前面的那个小栗子,你须要每隔10秒统计一下前30秒内的单词计数。为此,咱们须要在包含(word, 1)键值对的DStream上,对最近30秒的数据调用reduceByKey算子。不过这些均可以简单地用一个 reduceByKeyAndWindow搞定。
// Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
如下列出了经常使用的窗口算子。全部这些算子都有前面提到的那两个参数 – 窗口长度 和 滑动距离。
Transformation窗口算子 | 用途 |
---|---|
window(windowLength, slideInterval) | 将源DStream窗口化,并返回转化后的DStream |
countByWindow(windowLength,slideInterval) | 返回数据流在一个滑动窗口内的元素个数 |
reduceByWindow(func, windowLength,slideInterval) | 基于数据流在一个滑动窗口内的元素,用func作聚合,返回一个单元素数据流。func必须知足结合律,以便支持并行计算。 |
reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks]) | 基于(K, V)键值对DStream,将一个滑动窗口内的数据进行聚合,返回一个新的包含(K,V)键值对的DStream,其中每一个value都是各个key通过func聚合后的结果。 注意:若是不指定numTasks,其值将使用Spark的默认并行任务数(本地模式下为2,集群模式下由 spark.default.parallelism决定)。固然,你也能够经过numTasks来指定任务个数。 |
reduceByKeyAndWindow(func, invFunc,windowLength,slideInterval, [numTasks]) | 和前面的reduceByKeyAndWindow() 相似,只是这个版本会用以前滑动窗口计算结果,递增地计算每一个窗口的归约结果。当新的数据进入窗口时,这些values会被输入func作归约计算,而这 些数据离开窗口时,对应的这些values又会被输入 invFunc 作”反归约”计算。举个简单的例子,就是把新进入窗口数据中各个单词个数“增长”到各个单词统计结果上,同时把离开窗口数据中各个单词的统计个数从相应的 统计结果中“减掉”。不过,你的本身定义好”反归约”函数,即:该算子不只有归约函数(见参数func),还得有一个对应的”反归约”函数(见参数中的 invFunc)。和前面的reduceByKeyAndWindow() 相似,该算子也有一个可选参数numTasks来指定并行任务数。注意,这个算子须要配置好检查点(checkpointing)才能用。 |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) | 基于包含(K, V)键值对的DStream,返回新的包含(K, Long)键值对的DStream。其中的Long value都是滑动窗口内key出现次数的计数。 和前面的reduceByKeyAndWindow() 相似,该算子也有一个可选参数numTasks来指定并行任务数。 |
最后,值得一提的是,你在Spark Streaming中作各类关联(join)操做很是简单。
一个数据流能够和另外一个数据流直接关联。
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2)
上面代码中,stream1的每一个批次中的RDD会和stream2相应批次中的RDD进行join。一样,你能够相似地使用 leftOuterJoin, rightOuterJoin, fullOuterJoin 等。此外,你还能够基于窗口来join不一样的数据流,其实现也很简单,以下;)
val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2)
其实这种状况已经在前面的DStream.transform算子中介绍过了,这里再举个基于滑动窗口的例子。
val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
实际上,在上面代码里,你能够动态地该表join的数据集(dataset)。传给tranform算子的操做函数会在每一个批次从新求值,因此每次该函数都会用最新的dataset值,因此不一样批次间你能够改变dataset的值。
完整的DStream transformation算子列表见API文档。Scala请参考 DStream 和 PairDStreamFunctions. Java请参考 JavaDStream 和 JavaPairDStream. Python见 DStream。
输出算子能够将DStream的数据推送到外部系统,如:数据库或者文件系统。由于输出算子会将最终完成转换的数据输出到外部系统,所以只有输出算 子调用时,才会真正触发DStream transformation算子的真正执行(这一点相似于RDD 的action算子)。目前所支持的输出算子以下表:
输出算子 | 用途 |
---|---|
print() | 在驱动器(driver)节点上打印DStream每一个批次中的头十个元素。 Python API 对应的Python API为 pprint() |
saveAsTextFiles(prefix, [suffix]) | 将DStream的内容保存到文本文件。 每一个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]” |
saveAsObjectFiles(prefix, [suffix]) | 将DStream内容以序列化Java对象的形式保存到顺序文件中。 每一个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]”Python API 暂不支持Python |
saveAsHadoopFiles(prefix, [suffix]) | 将DStream内容保存到Hadoop文件中。 每一个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]”Python API 暂不支持Python |
foreachRDD(func) | 这是最通用的输出算子了,该算子接收一个函数func,func将做用于DStream的每一个RDD上。 func应该实现将每一个RDD的数据推到外部系统中,好比:保存到文件或者写到数据库中。 注意,func函数是在streaming应用的驱动器进程中执行的,因此若是其中包含RDD的action算子,就会触发对DStream中RDDs的实际计算过程。 |
DStream.foreachRDD是一个很是强大的原生工具函数,用户能够基于此算子将DStream数据推送到外部系统中。不过用户须要了解如何正确而高效地使用这个工具。如下列举了一些常见的错误。
一般,对外部系统写入数据须要一些链接对象(如:远程server的TCP链接),以便发送数据给远程系统。所以,开发人员可能会不经意地在Spark驱动器(driver)进程中建立一个链接对象,而后又试图在Spark worker节点上使用这个链接。以下例所示:
dstream.foreachRDD { rdd => val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker } }
这段代码是错误的,由于它须要把链接对象序列化,再从驱动器节点发送到worker节点。而这些链接对象一般都是不能跨节点(机器)传递的。好比,链接对 象一般都不能序列化,或者在另外一个进程中反序列化后再次初始化(链接对象一般都须要初始化,所以从驱动节点发到worker节点后可能须要从新初始化) 等。解决此类错误的办法就是在worker节点上建立链接对象。
然而,有些开发人员可能会走到另外一个极端 – 为每条记录都建立一个链接对象,例如:
dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } }
通常来讲,链接对象是有时间和资源开销限制的。所以,对每条记录都进行一次链接对象的建立和销毁会增长不少没必要要的开销,同时也大大减少了系统的吞吐量。 一个比较好的解决方案是使用 rdd.foreachPartition – 为RDD的每一个分区建立一个单独的链接对象,示例以下:
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }
这样一来,链接对象的建立开销就摊到不少条记录上了。
最后,还有一个更优化的办法,就是在多个RDD批次之间复用链接对象。开发者能够维护一个静态链接池来保存链接对象,以便在不一样批次的多个RDD之间共享同一组链接对象,示例以下:
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }
注意,链接池中的链接应该是懒惰建立的,而且有肯定的超时时间,超时后自动销毁。这个实现应该是目前发送数据最高效的实现方式。
其余要点:
首先须要注意的是,累加器(Accumulators)和广播变量(Broadcast variables)是没法从Spark Streaming的检查点中恢复回来的。因此若是你开启了检查点功能,并同时在使用累加器和广播变量,那么你最好是使用懒惰实例化的单例模式,由于这样累加器和广播变量才能在驱动器(driver)故障恢复后从新实例化。代码示例以下:
object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance } } object DroppedWordsCounter { @volatile private var instance: Accumulator[Long] = null def getInstance(sc: SparkContext): Accumulator[Long] = { if (instance == null) { synchronized { if (instance == null) { instance = sc.accumulator(0L, "WordsInBlacklistCounter") } } } instance } } wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter += count false } else { true } }.collect() val output = "Counts at time " + time + " " + counts })
这里有完整代码:source code。
在Streaming应用中能够调用DataFrames and SQL来 处理流式数据。开发者能够用经过StreamingContext中的SparkContext对象来建立一个SQLContext,而且,开发者须要确保一旦驱动器(driver)故障恢复后,该SQLContext对象能从新建立出来。一样,你仍是可使用懒惰建立的单例模式来实例化 SQLContext,以下面的代码所示,这里咱们将最开始的那个例子作了一些修改,使用DataFrame和SQL来统计单词计数。其实就是,将每一个 RDD都转化成一个DataFrame,而后注册成临时表,再用SQL查询这些临时表。
/** DataFrame operations inside your streaming program */ val words: DStream[String] = ... words.foreachRDD { rdd => // Get the singleton instance of SQLContext val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word") // Create a temporary view wordsDataFrame.createOrReplaceTempView("words") // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() }
这里有完整代码:source code。
你也能够在其余线程里执行SQL查询(异步查询,即:执行SQL查询的线程和运行StreamingContext的线程不一样)。不过这种状况下, 你须要确保查询的时候 StreamingContext 没有把所需的数据丢弃掉,不然StreamingContext有可能已将老的RDD数据丢弃掉了,那么异步查询的SQL语句也可能没法获得查询结果。举 个栗子,若是你须要查询上一个批次的数据,可是你的SQL查询可能要执行5分钟,那么你就须要StreamingContext至少保留最近5分钟的数 据:streamingContext.remember(Minutes(5)) (这是Scala为例,其余语言差很少)
更多DataFrame和SQL的文档见这里: DataFrames and SQL
MLlib 提供了不少机器学习算法。首先,你须要关注的是流式计算相关的机器学习算法(如:Streaming Linear Regression, Streaming KMeans),这些流式算法能够在流式数据上一边学习训练模型,一边用最新的模型处理数据。除此之外,对更多的机器学习算法而言,你须要离线训练这些模型,而后将训练好的模型用于在线的流式数据。详见MLlib。
和RDD相似,DStream也支持将数据持久化到内存中。只须要调用 DStream的persist() 方法,该方法内部会自动调用DStream中每一个RDD的persist方法进而将数据持久化到内存中。这对于可能须要计算不少次的DStream很是有 用(例如:对于同一个批数据调用多个算子)。对于基于滑动窗口的算子,如:reduceByWindow和reduceByKeyAndWindow,或 者有状态的算子,如:updateStateByKey,数据持久化就更重要了。所以,滑动窗口算子产生的DStream对象默认会自动持久化到内存中 (不须要开发者调用persist)。
对于从网络接收数据的输入数据流(如:Kafka、Flume、socket等),默认的持久化级别会将数据持久化到两个不一样的节点上互为备份副本,以便支持容错。
注意,与RDD不一样的是,DStream的默认持久化级别是将数据序列化到内存中。进一步的讨论见性能调优这一小节。关于持久化级别(或者存储级别)的更详细说明见Spark编程指南(Spark Programming Guide)。
通常来讲Streaming 应用都须要7*24小时长期运行,因此必须对一些与业务逻辑无关的故障有很好的容错(如:系统故障、JVM崩溃等)。对于这些可能性,Spark Streaming 必须在检查点保存足够的信息到一些可容错的外部存储系统中,以便可以随时从故障中恢复回来。因此,检查点须要保存如下两种数据:
总之,元数据检查点主要是为了恢复驱动器节点上的故障,而数据或RDD检查点是为了支持对有状态转换操做的恢复。
什么时候启用检查点
若是有如下状况出现,你就必须启用检查点了:
注意,一些简单的流式应用,若是没有用到前面所说的有状态转换算子,则彻底能够不开启检查点。不过这样的话,驱动器(driver)故障恢复后,有 可能会丢失部分数据(有些已经接收但还未处理的数据可能会丢失)。不过一般这点丢失时可接受的,不少Spark Streaming应用也是这样运行的。对非Hadoop环境的支持将来还会继续改进。
如何配置检查点
检查点的启用,只须要设置好保存检查点信息的检查点目录便可,通常会会将这个目录设为一些可容错的、可靠性较高的文件系统(如:HDFS、S3 等)。开发者只须要调用 streamingContext.checkpoint(checkpointDirectory)。设置好检查点,你就可使用前面提到的有状态转换 算子了。另外,若是你须要你的应用可以支持从驱动器故障中恢复,你可能须要重写部分代码,实现如下行为:
不过这个行为能够用StreamingContext.getOrCreate来实现,示例以下:
// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start() context.awaitTermination()
若是 checkpointDirectory 目录存在,则context对象会从检查点数据从新构建出来。若是该目录不存在(如:首次运行),则 functionToCreateContext 函数会被调用,建立一个新的StreamingContext对象并定义好DStream数据流。完整的示例请参见RecoverableNetworkWordCount,这个例子会将网络数据中的单词计数统计结果添加到一个文件中。
除了使用getOrCreate以外,开发者还须要确保驱动器进程能在故障后重启。这一点只能由应用的部署环境基础设施来保证。进一步的讨论见部署(Deployment)这一节。
另外须要注意的是,RDD检查点会增长额外的保存数据的开销。这可能会致使数据流的处理时间变长。所以,你必须仔细的调整检查点间隔时间。若是批次 间隔过小(好比:1秒),那么对每一个批次保存检查点数据将大大减少吞吐量。另外一方面,检查点保存过于频繁又会致使血统信息和任务个数的增长,这一样会影响 系统性能。对于须要RDD检查点的有状态转换算子,默认的间隔是批次间隔的整数倍,且最小10秒。开发人员能够这样来自定义这个间 隔:dstream.checkpoint(checkpointInterval)。通常推荐设为批次间隔时间的5~10倍。
本节中将主要讨论一下如何部署Spark Streaming应用。
要运行一个Spark Streaming 应用,你首先须要具有如下条件:
spark-submit
提交应用,那么你不须要提供Spark和Spark Streaming的相关JAR包。可是,若是你使用了高级数据源(advanced sources – 如:Kafka、Flume、Twitter等),那么你须要将这些高级数据源相关的JAR包及其依赖一块儿打包并部署。例如,若是你使用了 TwitterUtils,那么就必须将spark-streaming-twitter_2.10及其相关依赖都打到应用的JAR包中。升级Spark Streaming应用程序代码,可使用如下两种方式:
StreamingContext.stop(...)
or JavaStreamingContext.stop(...)
), 即:确保所收到的数据都已经处理完毕后再退出。而后再启动新的Streaming程序,而新程序将接着在老程序退出点上继续拉取数据。注意,这种方式须要 数据源支持数据缓存(或者叫数据堆积,如:Kafka、Flume),由于在新旧程序交接的这个空档时间,数据须要在数据源处缓存。目前还不能支持从检查 点重启,由于检查点存储的信息包含老程序中的序列化对象信息,在新程序中将其反序列化可能会出错。这种状况下,只能要么指定一个新的检查点目录,要么删除 老的检查点目录。除了Spark自身的监控能力(monitoring capabilities)以外,对Spark Streaming还有一些额外的监控功能可用。若是实例化了StreamingContext,那么你能够在Spark web UI上看到多出了一个Streaming tab页,上面显示了正在运行的接收器(是否活跃,接收记录的条数,失败信息等)和处理完的批次信息(批次处理时间,查询延时等)。这些信息均可以用来监控streaming应用。
web UI上有两个度量特别重要:
若是批次处理耗时一直比批次间隔时间大,或者批次调度延时持续上升,就意味着系统处理速度跟不上数据接收速度。这时候你就得考虑一下怎么把批次处理时间降下来(reducing)。
Spark Streaming程序的处理进度能够用StreamingListener接口来监听,这个接口能够监听到接收器的状态和处理时间。不过须要注意的是,这是一个developer API接口,换句话说这个接口将来极可能会变更(可能会增长更多度量信息)。
要得到Spark Streaming应用的最佳性能须要一点点调优工做。本节将深刻解释一些可以改进Streaming应用性能的配置和参数。整体上来讲,你须要考虑这两方面的事情:
有很多优化手段均可以减小Spark对每一个批次的处理时间。细节将在优化指南(Tuning Guide)中详谈。这里仅列举一些最重要的。
跨网络接收数据(如:从Kafka、Flume、socket等接收数据)须要在Spark中序列化并存储数据。
若是接收数据的过程是系统瓶颈,那么能够考虑增长数据接收的并行度。注意,每一个输入DStream只包含一个单独的接收器(receiver,运行 约worker节点),每一个接收器单独接收一路数据流。因此,配置多个输入DStream就能从数据源的不一样分区分别接收多个数据流。例如,能够将从 Kafka拉取两个topic的数据流分红两个Kafka输入数据流,每一个数据流拉取其中一个topic的数据,这样一来会同时有两个接收器并行地接收数 据,于是增长了整体的吞吐量。同时,另外一方面咱们又能够把这些DStream数据流合并成一个,而后能够在合并后的DStream上使用任何可用的 transformation算子。示例代码以下:
val numStreams = 5 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) } val unifiedStream = streamingContext.union(kafkaStreams) unifiedStream.print()
另外一个能够考虑优化的参数就是接收器的阻塞间隔,该参数由配置参数(configuration parameter)spark.streaming.blockInterval 决定。大多数接收器都会将数据合并成一个个数据块,而后再保存到spark内存中。对于map类算子来讲,每一个批次中数据块的个数将会决定处理这批数据并 行任务的个数,每一个接收器每批次数据处理任务数约等于 (批次间隔 / 数据块间隔)。例如,对于2秒的批次间隔,若是数据块间隔为200ms,则建立的并发任务数为10。若是任务数太少(少于单机cpu core个数),则资源利用不够充分。如需增长这个任务数,对于给定的批次间隔来讲,只须要减小数据块间隔便可。不过,咱们仍是建议数据块间隔至少要 50ms,不然任务的启动开销占比就过高了。
另外一个切分接收数据流的方法是,显示地将输入数据流划分为多个分区(使用 inputStream.repartition(<number of partitions>))。该操做会在处理前,将数据散开从新分发到集群中多个节点上。
在计算各个阶段(stage)中,任何一个阶段的并发任务数不足都有可能形成集群资源利用率低。例如,对于reduce类的算子, 如:reduceByKey 和 reduceByKeyAndWindow,其默认的并发任务数是由 spark.default.parallelism 决定的。你既能够修改这个默认值(spark.default.parallelism),也能够经过参数指定这个并发数量(见PairDStreamFunctions)。
调整数据的序列化格式能够大大减小数据序列化的开销。在spark Streaming中主要有两种类型的数据须要序列化:
无论是上面哪种数据,均可以使用Kryo序列化来减小CPU和内存开销,详见Spark Tuning Guide。另,对于Kryo,你能够考虑这些优化:注册自定义类型,禁用对象引用跟踪(详见Configuration Guide)。
在一些特定的场景下,若是数据量不是很大,那么你能够考虑不用序列化格式,不过你须要注意的是取消序列化是否会致使大量的GC开销。例如,若是你的 批次间隔比较短(几秒)而且没有使用基于窗口的算子,这种状况下你能够考虑禁用序列化格式。这样能够减小序列化的CPU开销以优化性能,同时GC的增加也 很少。
若是每秒启动的任务数过多(好比每秒50个以上),那么将任务发送给slave节点的开销会明显增长,那么你也就很难达到亚秒级(sub-second)的延迟。不过如下两个方法能够减小任务的启动开销:
这些调整有可能可以减小100ms的批次处理时间,这也使得亚秒级的批次间隔成为可能。
要想streaming应用在集群上稳定运行,那么系统处理数据的速度必须能跟上其接收数据的速度。换句话说,批次数据的处理速度应该和其生成速度同样快。对于特定的应用来讲,能够从其对应的监控(monitoring)页面上观察验证,页面上显示的处理耗时应该要小于批次间隔时间。
根据spark streaming计算的性质,在必定的集群资源限制下,批次间隔的值会极大地影响系统的数据处理能力。例如,在WordCountNetwork示例 中,对于特定的数据速率,一个系统可能可以在批次间隔为2秒时跟上数据接收速度,但若是把批次间隔改成500毫秒系统可能就处理不过来了。因此,批次间隔 须要谨慎设置,以确保生产系统可以处理得过来。
要找出适合的批次间隔,你能够从一个比较保守的批次间隔值(如5~10秒)开始测试。要验证系统是否能跟上当前的数据接收速率,你可能须要检查一下端到端的批次处理延迟(能够看看Spark驱动器log4j日志中的Total delay,也能够用StreamingListener接 口来检测)。若是这个延迟能保持和批次间隔差很少,那么系统基本就是稳定的。不然,若是这个延迟持久在增加,也就是说系统跟不上数据接收速度,那也就意味 着系统不稳定。一旦系统文档下来后,你就能够尝试提升数据接收速度,或者减小批次间隔值。不过须要注意,瞬间的延迟增加能够只是暂时的,只要这个延迟后续 会自动降下来就没有问题(如:降到小于批次间隔值)
Spark应用内存占用和GC调优已经在调优指南(Tuning Guide)中有详细的讨论。墙裂建议你读一读那篇文档。本节中,咱们只是讨论一下几个专门用于Spark Streaming的调优参数。
Spark Streaming应用在集群中占用的内存量严重依赖于具体所使用的tranformation算子。例如,若是想要用一个窗口算子操纵最近10分钟的数 据,那么你的集群至少须要在内存里保留10分钟的数据;另外一个例子是updateStateByKey,若是key不少的话,相对应的保存的key的 state也会不少,而这些都须要占用内存。而若是你的应用只是作一个简单的 “映射-过滤-存储”(map-filter-store)操做的话,那须要的内存就不多了。
通常状况下,streaming接收器接收到的数据会以 StorageLevel.MEMORY_AND_DISK_SER_2 这个存储级别存到spark中,也就是说,若是内存装不下,数据将被吐到磁盘上。数据吐到磁盘上会大大下降streaming应用的性能,所以仍是建议根 据你的应用处理的数据量,提供充足的内存。最好就是,一边小规模地放大内存,再观察评估,而后再放大,再评估。
另外一个内存调优的方向就是垃圾回收。由于streaming应用每每都须要低延迟,因此确定不但愿出现大量的或耗时较长的JVM垃圾回收暂停。
如下是一些可以帮助你减小内存占用和GC开销的参数或手段:
本节中,咱们将讨论Spark Streaming应用在出现失败时的具体行为。
要理解Spark Streaming所提供的容错语义,咱们首先须要回忆一下Spark RDD所提供的基本容错语义。
Spark主要操做一些可容错文件系统的数据,如:HDFS或S3。所以,全部从这些可容错数据源产生的RDD也是可容错的。然而,对于Spark Streaming并不是如此,由于多数状况下Streaming须要从网络远端接收数据,这回致使Streaming的数据源并不可靠(尤为是对于使用了 fileStream的应用)。要实现RDD相同的容错属性,数据接收就必须用多个不一样worker节点上的Spark执行器来实现(默认副本因子是 2)。所以一旦出现故障,系统须要恢复两种数据:
此外,还有两种可能的故障类型须要考虑:
有了以上这些基本知识,下面咱们就进一步了解一下Spark Streaming的容错语义。
流式系统的可靠度语义能够据此来分类:单条记录在系统中被处理的次数保证。一个流式系统可能提供保证一定是如下三种之一(无论系统是否出现故障):
任何流式处理系统通常都会包含如下三个数据处理步骤:
若是Streaming应用须要作到端到端的“精确一次”的保证,那么就必须在以上三个步骤中各自都保证精确一次:即,每条记录必须,只接收一次、处理一次、推送一次。下面让咱们在Spark Streaming的上下文环境中来理解一下这三个步骤的语义:
不一样的输入源提供不一样的数据可靠性级别,从“至少一次”到“精确一次”。
若是全部的输入数据都来源于可容错的文件系统,如HDFS,那么Spark Streaming就能在任何故障中恢复并处理全部的数据。这种状况下就能保证精确一次语义,也就是说无论出现什么故障,全部的数据老是精确地只处理一次,很少也很多。
对于基于接收器的输入源,容错语义将同时依赖于故障场景和接收器类型。前面也已经提到过,spark Streaming主要有两种类型的接收器:
对于不一样的接收器,咱们能够得到以下不一样的语义。若是一个worker节点故障了,对于可靠接收器来书,不会有数据丢失。而对于不可靠接收器,缓存 的(接收但还没有保存副本)数据可能会丢失。若是driver节点故障了,除了接收到的数据以外,其余的已经接收且已经保存了内存副本的数据都会丢失,这将 会影响有状态算子的计算结果。
为了不丢失已经收到且保存副本的数,从 spark 1.2 开始引入了WAL(write ahead logs),以便将这些数据写入到可容错的存储中。只要你使用可靠接收器,同时启用WAL(write ahead logs enabled),那么久不再用为数据丢失而担忧了。而且这时候,还能提供“至少一次”的语义保证。
下表总结了故障状况下的各类语义:
部署场景 | Worker 故障 | Driver 故障 |
---|---|---|
Spark 1.1及之前版本 或者 Spark 1.2及之后版本,且未开启WAL |
若使用不可靠接收器,则可能丢失缓存(已接收但还没有保存副本)数据; 若使用可靠接收器,则没有数据丢失,且提供至少一次处理语义 |
若使用不可靠接收器,则缓存数据和已保存数据均可能丢失; 若使用可靠接收器,则没有缓存数据丢失,但已保存数据可能丢失,且不提供语义保证 |
Spark 1.2及之后版本,并启用WAL | 若使用可靠接收器,则没有数据丢失,且提供至少一次语义保证 | 若使用可靠接收器和文件,则无数据丢失,且提供至少一次语义保证 |
从Spark 1.3开始,咱们引入Kafka Direct API,该API能为Kafka数据源提供“精确一次”语义保证。有了这个输入API,再加上输出算子的“精确一次”保证,你就能真正实现端到端的“精确 一次”语义保证。(改功能截止Spark 1.6.1仍是实验性的)更详细的说明见:Kafka Integration Guide。
输出算子(如 foreachRDD)提供“至少一次”语义保证,也就是说,若是worker故障,单条输出数据可能会被屡次写入外部实体中。不过这对于文件系统来讲是 能够接受的(使用saveAs***Files 屡次保存文件会覆盖以前的),因此咱们须要一些额外的工做来实现“精确一次”语义。主要有两种实现方式:
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // 使用uniqueId做为事务的惟一标识,基于uniqueId实现partitionIterator所指向数据的原子事务提交 } }
在Spark 0.9.1和Spark 1.0之间,有一些API接口变动,变动目的是为了保障将来版本API的稳定。本节将详细说明一下从已有版本迁移升级到1.0所需的工做。
输入DStream(Input DStreams): 全部建立输入流的算子(如:StreamingContext.socketStream, FlumeUtils.createStream 等)的返回值再也不是DStream(对Java来讲是JavaDStream),而是 InputDStream / ReceiverInputDStream(对Java来讲是JavaInputDStream / JavaPairInputDStream /JavaReceiverInputDStream / JavaPairReceiverInputDStream)。这样才能确保特定输入流的功能可以在将来持续增长到这些class中,而不会打破二进制兼容性。注意,已有的Spark Streaming应用应该不须要任何代码修改(新的返回类型都是DStream的子类),只不过须要基于Spark 1.0从新编译一把。
定制网络接收器(Custom Network Receivers): 自从Spark Streaming发布以来,Scala就能基于NetworkReceiver来定制网络接收器。但因为错误处理和汇报API方便的限制,该类型不能在Java中使用。因此Spark 1.0开始,用 Receiver 来替换掉这个NetworkReceiver,主要的好处以下:
为了将已有的基于NetworkReceiver的自定义接收器迁移到Receiver上来,你须要以下工做:
org.apache.spark.streaming.receiver.Receiver继承,而再也不是
org.apache.spark.streaming.dstream.NetworkReceiver。
基于Actor的接收器(Actor-based Receivers): 从actor class继承后,并实现了
org.apache.spark.streaming.receiver.Receiver 后,
便可从Akka Actors中获取数据。获取数据的类被重命名为 org.apache.spark.streaming.receiver.ActorHelper
,而保存数据的pushBlocks(…)方法也被重命名为 store(…)。其余org.apache.spark.streaming.receivers包中的工具类也被移到 org.apache.spark.streaming.receiver
包下并重命名,新的类名应该比以前更加清晰。