基于Spark 2.0 Preview的材料翻译,原[英]文地址: html
http://spark.apache.org/docs/2.0.0-preview/streaming-programming-guide.htmljava
Streaming应用实战,参考:http://my.oschina.net/u/2306127/blog/635518node
Spark Streaming编程指南python
概览git
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map
, reduce
, join
and window
. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.github
Spark Streaming 是基于Spark 核心API的扩展,使高伸缩性、高带宽、容错的流式数据处理成为可能。数据能够来自于多种源,如Kafka、Flume、Kinesis、或者TCP sockets等,并且可使用map、reduce
、join
和 window等高级接口实现复杂算法的处理。最终,处理的数据能够被推送到数据库、文件系统以及动态布告板。实际上,
你还能够将Spark的机器学习( machine learning) 和图计算 (graph processing )算法用于数据流的处理。web
Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.算法
内部工做流程以下。Spark Streaming接收数据流的动态输入,而后将数据分批,每一批数据经过Spark建立一个结果数据集而后进行处理。sql
Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.shell
Spark Streaming提供一个高级别的抽象-离散数据流(DStream),表明一个连续的数据流。DStreams能够从Kafka, Flume, and Kinesis等源中建立,或者在其它的DStream上执行高级操做。在内部,DStream表明一系列的 RDDs。
This guide shows you how to start writing Spark Streaming programs with DStreams. You can write Spark Streaming programs in Scala, Java or Python (introduced in Spark 1.2), all of which are presented in this guide. You will find tabs throughout this guide that let you choose between code snippets of different languages.
本指南将岩石如何经过DStreams开始编写一个Spark Streaming程序。你可使用Scala、Java或者Python。能够经过相应的链接切换去查看相应语言的代码。
Note: There are a few APIs that are either different or not available in Python. Throughout this guide, you will find the tag Python API highlighting these differences.
这在Python里有一些不一样,不多部分API暂时没有,本指南进行了标注。
快速例程
Before we go into the details of how to write your own Spark Streaming program, let’s take a quick look at what a simple Spark Streaming program looks like. Let’s say we want to count the number of words in text data received from a data server listening on a TCP socket. All you need to do is as follows.
在开始Spark Streaming编程以前咱们先看看一个简单的Spark Streaming程序将长什么样子。咱们从基于TCP socket的数据服务器接收一个文本数据,而后对单词进行计数。看起来像下面这个样子。
First, we import the names of the Spark Streaming classes and some implicit conversions from StreamingContext into our environment in order to add useful methods to other classes we need (like DStream). StreamingContext is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and a batch interval of 1 second.
首先,咱们导入Spark Streaming的类命名空间和一些StreamingContext的转换工具。 StreamingContext 是全部的Spark Streaming功能的主入口点。咱们建立StreamingContext,指定两个执行线程和分批间隔为1秒钟。
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent from a starvation scenario. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1))
Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. localhost
) and port (e.g. 9999
).
使用这个context,咱们能够建立一个DStream,这是来自于TCP数据源 的流数据,咱们经过hostname (e.g. localhost
) 和端口 (e.g. 9999
)来指定这个数据源。
// Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999)
This lines
DStream represents the stream of data that will be received from the data server. Each record in this DStream is a line of text. Next, we want to split the lines by space characters into words.
这里line是一个DStream对象,表明从服务器收到的流数据。每个DStream中的记录是一个文本行。下一步,咱们将每一行中以空格分开的单词分离出来。
// Split each line into words val words = lines.flatMap(_.split(" "))
flatMap
is a one-to-many DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream. In this case, each line will be split into multiple words and the stream of words is represented as the words
DStream. Next, we want to count these words.
flatMap是“一对多”的DStream操做,经过对源DStream的每个记录产生多个新的记录建立新DStream。这里,每一行将被分解多个单词,而且单词流表明了words DStream。下一步,咱们对这些单词进行计数统计。
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print()
The words
DStream is further mapped (one-to-one transformation) to a DStream of (word, 1)
pairs, which is then reduced to get the frequency of words in each batch of data. Finally, wordCounts.print()
will print a few of the counts generated every second.
words
DStream而后映射为(word, 1)的键值对的Dstream,而后用于统计单词出现的频度。最后,wordCounts.print()打印出每秒钟建立出的计数值。
Note that when these lines are executed, Spark Streaming only sets up the computation it will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call
注意,上面这些代码行执行的时候,仅仅是设定了计算执行的逻辑,并无真正的处理数据。在全部的设定完成后,为了启动处理,须要调用:
ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate
The complete code can be found in the Spark Streaming example NetworkWordCount.
完整的代码能够Spark Streaming 的例程 NetworkWordCount 中找到。
If you have already downloaded and built Spark, you can run this example as follows. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using
若是已经下载和构建了Spark,你能够按照下面的方法运行这个例子。首先运行Netcat(一个Unix风格的小工具)做为数据服务器,以下所示:
$ nc -lk 9999
Then, in a different terminal, you can start the example by using
而后,到一个控制台窗口,启动例程:
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following.
而后,任何在netcat服务器运行控制台键入的行都会被计数而后每隔一秒钟在屏幕上打印出来,以下所示:
# TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world ... |
# TERMINAL 2: RUNNING NetworkWordCount $ ./bin/run-example streaming.NetworkWordCount localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) ... |
基本概念
Next, we move beyond the simple example and elaborate on the basics of Spark Streaming.
下一步,咱们将离开这个简单的例子,详细阐述Spark Streaming的基本概念和功能。
连接
Similar to Spark, Spark Streaming is available through Maven Central. To write your own Spark Streaming program, you will have to add the following dependency to your SBT or Maven project.
与Spark相似,Spark Streaming也能够经过Maven中心库访问。为了编写你本身的Spark Streaming程序,您将加入下面的依赖到你的SBT或者Maven工程文件。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.0.0-preview</version> </dependency>
For ingesting data from sources like Kafka, Flume, and Kinesis that are not present in the Spark Streaming core API, you will have to add the corresponding artifact spark-streaming-xyz_2.11
to the dependencies. For example, some of the common ones are as follows.
为了从Kafka/Flume/Kinesis等非Spark Streaming核心API等数据源注入数据,咱们须要添加对应的spark-streaming-xyz_2.11到依赖中。例如,像下面的这样:
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka-0-8_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
For an up-to-date list, please refer to the Maven repository for the full list of supported sources and artifacts.
对于最新的列表,参考Maven repository 得到全面的数据源河访问部件的列表。
初始化StreamingContext
To initialize a Spark Streaming program, a StreamingContext object has to be created which is the main entry point of all Spark Streaming functionality.
为了初始化Spark Streaming程序,StreamingContext 对象必须首先建立做为总入口。
A StreamingContext object can be created from a SparkConf object.
StreamingContext 对象能够经过 SparkConf 对象建立,以下所示。
import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1))
The appName
parameter is a name for your application to show on the cluster UI. master
is a Spark, Mesos or YARN cluster URL, or a special “local[*]” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode master
in the program, but rather launch the application with spark-submit
and receive it there. However, for local testing and unit tests, you can pass “local[*]” to run Spark Streaming in-process (detects the number of cores in the local system). Note that this internally creates a SparkContext (starting point of all Spark functionality) which can be accessed as ssc.sparkContext
.
这里 appName参数是应用在集群中的名称。 master
是 Spark, Mesos 或 YARN cluster URL, 或者“local[*]” 字符串指示运行在 local 模式下。实践中,当运行一个集群, 您不该该硬编码 master
参数在集群中, 而是经过 launch the application with spark-submit
接收其参数。可是, 对于本地测试和单元测试, 你能够传递“local[*]” 来运行 Spark Streaming 在进程内运行(自动检测本地系统的CPU内核数量)。 注意,这里内部建立了 SparkContext (全部的Spark 功能的入口点) ,能够经过 ssc.sparkContext
进行存取。
The batch interval must be set based on the latency requirements of your application and available cluster resources. See the Performance Tuning section for more details.
分批间隔时间基于应用延迟需求和可用的集群资源进行设定(译注:设定间隔要大于应用数据的最小延迟需求,同时不能设置过小以致于系统没法在给定的周期内处理完毕),参考 Performance Tuning 部分得到更多信息。
A StreamingContext
object can also be created from an existing SparkContext
object.
StreamingContext
对象也能够从已有的 SparkContext
对象中建立。
import org.apache.spark.streaming._ val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1))
After a context is defined, you have to do the following.
streamingContext.start()
.streamingContext.awaitTermination()
.streamingContext.stop()
.在context建立以后,能够接着开始以下的工做:
streamingContext.start()
。streamingContext.awaitTermination()
.streamingContext.stop()
。Points to remember:
stop()
called stopSparkContext
to false.记住:
stopSparkContext的
Stop时设置选项为false。离散数据流
Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset (see Spark Programming Guide for more details). Each RDD in a DStream contains data from a certain interval, as shown in the following figure.
离散数据流(DStream)是Spark Streaming最基本的抽象。它表明了一种连续的数据流,要么从某种数据源提取数据,要么从其余数据流映射转换而来。DStream内部是由一系列连 续的RDD组成的,每一个RDD都是不可变、分布式的数据集(详见Spark编程指南 – Spark Programming Guide)。每一个RDD都包含了特定时间间隔内的一批数据,以下图所示:
Any operation applied on a DStream translates to operations on the underlying RDDs. For example, in the earlier example of converting a stream of lines to words, the flatMap
operation is applied on each RDD in the lines
DStream to generate the RDDs of the words
DStream. This is shown in the following figure.
任何做用于DStream的算子,其实都会被转化为对其内部RDD的操做。例如,在前面的例子中,咱们将 lines 这个DStream转成words DStream对象,其实做用于lines上的flatMap算子,会施加于lines中的每一个RDD上,并生成新的对应的RDD,而这些新生成的RDD 对象就组成了words这个DStream对象。其过程以下图所示:
These underlying RDD transformations are computed by the Spark engine. The DStream operations hide most of these details and provide the developer with a higher-level API for convenience. These operations are discussed in detail in later sections.
底层的RDD转换仍然是由Spark引擎来计算。DStream的算子将这些细节隐藏了起来,并为开发者提供了更为方便的高级API。后续会详细讨论这些高级算子。
输入DStream和接收器
Input DStreams are DStreams representing the stream of input data received from streaming sources. In the quick example, lines
was an input DStream as it represented the stream of data received from the netcat server. Every input DStream (except file stream, discussed later in this section) is associated with a Receiver (Scala doc, Java doc) object which receives the data from a source and stores it in Spark’s memory for processing.
输入DStream表明从某种流式数据源流入的数据流。在以前的例子里,lines 对象就是输入DStream,它表明从netcat server收到的数据流。每一个输入DStream(除文件数据流外)都和一个接收器(Receiver – Scala doc, Java doc)相关联,而接收器则是专门从数据源拉取数据到内存中的对象。
Spark Streaming provides two categories of built-in streaming sources.
Spark Streaming主要提供两种内建的流式数据源:
We are going to discuss some of the sources present in each category later in this section.
本节中,咱们将会从每种数据源中挑几个继续深刻讨论。
Note that, if you want to receive multiple streams of data in parallel in your streaming application, you can create multiple input DStreams (discussed further in the Performance Tuning section). This will create multiple receivers which will simultaneously receive multiple data streams. But note that a Spark worker/executor is a long-running task, hence it occupies one of the cores allocated to the Spark Streaming application. Therefore, it is important to remember that a Spark Streaming application needs to be allocated enough cores (or threads, if running locally) to process the received data, as well as to run the receiver(s).
注意,若是你须要同时从多个数据源拉取数据,那么你就须要建立多个DStream对象(详见后续的性能调优这一小节)。多个DStream对象其实也就同 时建立了多个数据流接收器。可是请注意,Spark的worker/executor 都是长期运行的,所以它们都会各自占用一个分配给Spark Streaming应用的CPU。因此,在运行Spark Streaming应用的时候,须要注意分配足够的CPU core(本地运行时,须要足够的线程)来处理接收到的数据,同时还要足够的CPU core来运行这些接收器。
When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, where n > number of receivers to run (see Spark Properties for information on how to set the master).
Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it.
基础数据源
We have already taken a look at the ssc.socketTextStream(...)
in the quick example which creates a DStream from text data received over a TCP socket connection. Besides sockets, the StreamingContext API provides methods for creating DStreams from files as input sources.
前面的快速入门例子中,咱们已经看到,使用ssc.socketTextStream(…) 能够从一个TCP链接中接收文本数据。而除了TCP套接字外,StreamingContext API 还支持从文件或者Akka actor中拉取数据。
File Streams: For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming will monitor the directory dataDirectory
and process any files created in that directory (files written in nested directories not supported). Note that
For simple text files, there is an easier method streamingContext.textFileStream(dataDirectory)
. And file streams do not require running a receiver, hence does not require allocating cores.
Python API fileStream
is not available in the Python API, only textFileStream
is available.
文件数据流(File Streams): 能够从任何兼容HDFS API(包括:HDFS、S三、NFS等)的文件系统,建立方式以下:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming将监视该dataDirectory目录,并处理该目录下任何新建的文件(目前还不支持嵌套目录)。注意:
另外,文件数据流不是基于接收器的,因此不须要为其单独分配一个CPU core。
Python API fileStream目前暂时不可用,Python目前只支持textFileStream。
对于简单的文本文件,更简单的方式是调用 streamingContext.textFileStream(dataDirectory)。
Streams based on Custom Receivers: DStreams can be created with data streams received through custom receivers. See the Custom Receiver Guide and DStream Akka for more details.
基于自定义Actor的数据流(Streams based on Custom Actors): DStream能够由Akka actor建立获得,只需调用 streamingContext.actorStream(actorProps, actor-name)。详见自定义接收器(Custom Receiver Guide)。actorStream暂时不支持Python API。
Queue of RDDs as a Stream: For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using streamingContext.queueStream(queueOfRDDs)
. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.
For more details on streams from sockets and files, see the API documentations of the relevant functions in StreamingContext for Scala, JavaStreamingContext for Java, and StreamingContext for Python.
关于套接字、文件以及Akka actor数据流更详细信息,请参考相关文档:StreamingContext for Scala,JavaStreamingContext for Java, and StreamingContext for Python。
高级数据源
Python API As of Spark 2.0.0, out of these sources, Kafka, Kinesis and Flume are available in the Python API.
Python API 自 Spark 2.0.0(译注:1.6.1就已经支持了) 起,Kafka、Kinesis、Flume和MQTT这些数据源将支持Python。
This category of sources require interfacing with external non-Spark libraries, some of them with complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts of dependencies, the functionality to create DStreams from these sources has been moved to separate libraries that can be linked to explicitly when necessary.
使用这类数据源须要依赖一些额外的代码库,有些依赖还挺复杂的(如:Kafka、Flume)。所以为了减小依赖项版本冲突问题,各个数据源 DStream的相关功能被分割到不一样的代码包中,只有用到的时候才须要连接打包进来。
例如,若是你须要使用Twitter的tweets做为数据源,你 须要如下步骤:
Note that these advanced sources are not available in the Spark shell, hence applications based on these advanced sources cannot be tested in the shell. If you really want to use them in the Spark shell you will have to download the corresponding Maven artifact’s JAR along with its dependencies and add it to the classpath.
Some of these advanced sources are as follows.
Kafka: Spark Streaming 2.0.0 is compatible with Kafka 0.8.2.1. See the Kafka Integration Guide for more details.
Flume: Spark Streaming 2.0.0 is compatible with Flume 1.6.0. See the Flume Integration Guide for more details.
Kinesis: Spark Streaming 2.0.0 is compatible with Kinesis Client Library 1.2.1. See the Kinesis Integration Guide for more details.
注意,高级数据源在spark-shell中不可用,所以不能用spark-shell来测试基于高级数据源的应用。若是真有须要的话,你须要自行下载相应数据源的Maven工件及其依赖项,并将这些Jar包部署到spark-shell的classpath中。
下面列举了一些高级数据源:
自定义数据源
Python API This is not yet supported in Python.
Python API 自定义数据源目前还不支持Python。
Input DStreams can also be created out of custom data sources. All you have to do is implement a user-defined receiver (see next section to understand what that is) that can receive data from the custom sources and push it into Spark. See the Custom Receiver Guide for details.
输入DStream也能够用自定义的方式建立。你须要作的只是实现一个自定义的接收器(receiver),以便从自定义的数据源接收数据,而后将数据推入Spark中。详情请参考自定义接收器指南(Custom Receiver Guide)。
接收器可靠性
There can be two kinds of data sources based on their reliability. Sources (like Kafka and Flume) allow the transferred data to be acknowledged. If the system receiving data from these reliable sources acknowledges the received data correctly, it can be ensured that no data will be lost due to any kind of failure. This leads to two kinds of receivers:
从可靠性角度来划分,大体有两种数据源。其中,像Kafka、Flume这样的数据源,它们支持对所传输的数据进行确认。系统收到这类可靠数据源过来的数据,而后发出确认信息,这样就可以确保任何失败状况下,都不会丢数据。所以咱们能够将接收器也相应地分为两类:
The details of how to write a reliable receiver are discussed in the Custom Receiver Guide.
自定义接收器指南(Custom Receiver Guide)中详细讨论了如何写一个可靠接收器。
DStream支持的transformation算子
Similar to that of RDDs, transformations allow the data from the input DStream to be modified. DStreams support many of the transformations available on normal Spark RDD’s. Some of the common ones are as follows.
和RDD相似,DStream也支持从输入DStream通过各类transformation算子映射成新的DStream。DStream支持不少RDD上常见的transformation算子,一些经常使用的见下表:
Transformation | Meaning |
---|---|
map(func) | Return a new DStream by passing each element of the source DStream through a function func. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items. |
filter(func) | Return a new DStream by selecting only the records of the source DStream on which func returns true. |
repartition(numPartitions) | Changes the level of parallelism in this DStream by creating more or fewer partitions. |
union(otherStream) | Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |
count() | Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |
reduce(func) | Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative and commutative so that it can be computed in parallel. |
countByValue() | When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. |
reduceByKey(func, [numTasks]) | When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism ) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
join(otherStream, [numTasks]) | When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherStream, [numTasks]) | When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. |
transform(func) | Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. |
updateStateByKey(func) | Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. |
A few of these transformations are worth discussing in more detail.
Transformation算子 | 用途 |
---|---|
map(func) | 返回会一个新的DStream,并将源DStream中每一个元素经过func映射为新的元素 |
flatMap(func) | 和map相似,不过每一个输入元素再也不是映射为一个输出,而是映射为0到多个输出 |
filter(func) | 返回一个新的DStream,并包含源DStream中被func选中(func返回true)的元素 |
repartition(numPartitions) | 更改DStream的并行度(增长或减小分区数) |
union(otherStream) | 返回新的DStream,包含源DStream和otherDStream元素的并集 |
count() | 返回一个包含单元素RDDs的DStream,其中每一个元素是源DStream中各个RDD中的元素个数 |
reduce(func) | 返回一个包含单元素RDDs的DStream,其中每一个元素是经过源RDD中各个RDD的元素经func(func输入两个参数并返回一个同类型结果数据)聚合获得的结果。func必须知足结合律,以便支持并行计算。 |
countByValue() | 若是源DStream包含的元素类型为K,那么该算子返回新的DStream包含元素为(K, Long)键值对,其中K为源DStream各个元素,而Long为该元素出现的次数。 |
reduceByKey(func, [numTasks]) | 若是源DStream 包含的元素为 (K, V) 键值对,则该算子返回一个新的也包含(K, V)键值对的DStream,其中V是由func聚合获得的。注意:默认状况下,该算子使用Spark的默认并发任务数(本地模式为2,集群模式下由 spark.default.parallelism 决定)。你能够经过可选参数numTasks来指定并发任务个数。 |
join(otherStream, [numTasks]) | 若是源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中源DStream和otherDStream中每一个K都对应一个 (K, (V, W))键值对元素。 |
cogroup(otherStream, [numTasks]) | 若是源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中每一个元素类型为包含(K, Seq[V], Seq[W])的tuple。 |
transform(func) | 返回一个新的DStream,其包含的RDD为源RDD通过func操做后获得的结果。利用该算子能够对DStream施加任意的操做。 |
updateStateByKey(func) | 返回一个包含新”状态”的DStream。源DStream中每一个key及其对应的values会做为func的输入,而func能够用于对每一个key的“状态”数据做任意的更新操做。 |
下面咱们会挑几个transformation算子深刻讨论一下。
updateStateByKey算子
The updateStateByKey
operation allows you to maintain arbitrary state while continuously updating it with new information. To use this, you will have to do two steps.
In every batch, Spark will apply the state update function for all existing keys, regardless of whether they have new data in a batch or not. If the update function returns None
then the key-value pair will be eliminated.
updateStateByKey 算子支持维护一个任意的状态。要实现这一点,只须要两步:
在每个批次数据到达后,Spark都会调用状态更新函数,来更新全部已有key(无论key是否存在于本批次中)的状态。若是状态更新函数返回None,则对应的键值对会被删除。
Let’s illustrate this with an example. Say you want to maintain a running count of each word seen in a text data stream. Here, the running count is the state and it is an integer. We define the update function as:
举例以下。假设你须要维护一个流式应用,统计数据流中每一个单词的出现次数。这里将各个单词的出现次数这个整型数定义为状态。咱们接下来定义状态更新函数以下:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = ... // add the new values with the previous running count to get the new count Some(newCount) }
This is applied on a DStream containing words (say, the pairs
DStream containing (word, 1)
pairs in the earlier example).
该状态更新函数能够做用于一个包括(word, 1) 键值对的DStream上(见本文开头的例子)。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
The update function will be called for each word, with newValues
having a sequence of 1’s (from the (word, 1)
pairs) and the runningCount
having the previous count.
该状态更新函数会为每一个单词调用一次,且相应的newValues是一个包含不少个”1″的数组(这些1来自于(word,1)键值对),而runningCount包含以前该单词的计数。本例的完整代码请参考 StatefulNetworkWordCount.scala。
Note that using updateStateByKey
requires the checkpoint directory to be configured, which is discussed in detail in the checkpointing section.
注意,调用updateStateByKey前须要配置检查点目录,后续对此有详细的讨论,见检查点(checkpointing)这节。
transform算子
The transform
operation (along with its variations like transformWith
) allows arbitrary RDD-to-RDD functions to be applied on a DStream. It can be used to apply any RDD operation that is not exposed in the DStream API. For example, the functionality of joining every batch in a data stream with another dataset is not directly exposed in the DStream API. However, you can easily use transform
to do this. This enables very powerful possibilities. For example, one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it.
transform算子(及其变体transformWith)能够支持任意的RDD到RDD的映射操做。也就是说,你能够用tranform算子来包装 任何DStream API所不支持的RDD算子。例如,将DStream每一个批次中的RDD和另外一个Dataset进行关联(join)操做,这个功能DStream API并无直接支持。不过你能够用transform来实现这个功能,可见transform其实为DStream提供了很是强大的功能支持。好比说, 你能够用事先算好的垃圾信息,对DStream进行实时过滤。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform(rdd => { rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... })
Note that the supplied function gets called in every batch interval. This allows you to do time-varying RDD operations, that is, RDD operations, number of partitions, broadcast variables, etc. can be changed between batches.
注意,这里transform包含的算子,其调用时间间隔和批次间隔是相同的。因此你能够基于时间改变对RDD的操做,如:在不一样批次,调用不一样的RDD算子,设置不一样的RDD分区或者广播变量等。
基于窗口(window)的算子
Spark Streaming also provides windowed computations, which allow you to apply transformations over a sliding window of data. The following figure illustrates this sliding window.
Spark Streaming一样也提供基于时间窗口的计算,也就是说,你能够对某一个滑动时间窗内的数据施加特定tranformation算子。以下图所示:
As shown in the figure, every time the window slides over a source DStream, the source RDDs that fall within the window are combined and operated upon to produce the RDDs of the windowed DStream. In this specific case, the operation is applied over the last 3 time units of data, and slides by 2 time units. This shows that any window operation needs to specify two parameters.
如上图所示,每次窗口滑动时,源DStream中落入窗口的RDDs就会被合并成新的windowed DStream。在上图的例子中,这个操做会施加于3个RDD单元,而滑动距离是2个RDD单元。由此能够得出任何窗口相关操做都须要指定一下两个参数:
These two parameters must be multiples of the batch interval of the source DStream (1 in the figure).
注意,这两个参数都必须是DStream批次间隔(上图中为1)的整数倍.
Let’s illustrate the window operations with an example. Say, you want to extend the earlier example by generating word counts over the last 30 seconds of data, every 10 seconds. To do this, we have to apply the reduceByKey
operation on the pairs
DStream of (word, 1)
pairs over the last 30 seconds of data. This is done using the operation reduceByKeyAndWindow
.
下面我们举个例子。假设,你须要扩展前面的那个小栗子,你须要每隔10秒统计一下前30秒内的单词计数。为此,咱们须要在包含(word, 1)键值对的DStream上,对最近30秒的数据调用reduceByKey算子。不过这些均可以简单地用一个 reduceByKeyAndWindow搞定。
// Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
Some of the common window operations are as follows. All of these operations take the said two parameters - windowLength and slideInterval.
Transformation | Meaning |
---|---|
window(windowLength, slideInterval) | Return a new DStream which is computed based on windowed batches of the source DStream. |
countByWindow(windowLength, slideInterval) | Return a sliding window count of elements in the stream. |
reduceByWindow(func, windowLength, slideInterval) | Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative and commutative so that it can be computed correctly in parallel. |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism ) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | A more efficient version of the above |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow , the number of reduce tasks is configurable through an optional argument. |
如下列出了经常使用的窗口算子。全部这些算子都有前面提到的那两个参数 – 窗口长度 和 滑动距离。
Transformation窗口算子 | 用途 |
---|---|
window(windowLength, slideInterval) | 将源DStream窗口化,并返回转化后的DStream |
countByWindow(windowLength,slideInterval) | 返回数据流在一个滑动窗口内的元素个数 |
reduceByWindow(func, windowLength,slideInterval) | 基于数据流在一个滑动窗口内的元素,用func作聚合,返回一个单元素数据流。func必须知足结合律,以便支持并行计算。 |
reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks]) | 基于(K, V)键值对DStream,将一个滑动窗口内的数据进行聚合,返回一个新的包含(K,V)键值对的DStream,其中每一个value都是各个key通过func聚合后的结果。 注意:若是不指定numTasks,其值将使用Spark的默认并行任务数(本地模式下为2,集群模式下由 spark.default.parallelism决定)。固然,你也能够经过numTasks来指定任务个数。 |
reduceByKeyAndWindow(func, invFunc,windowLength,slideInterval, [numTasks]) | 和前面的reduceByKeyAndWindow() 相似,只是这个版本会用以前滑动窗口计算结果,递增地计算每一个窗口的归约结果。当新的数据进入窗口时,这些values会被输入func作归约计算,而这 些数据离开窗口时,对应的这些values又会被输入 invFunc 作”反归约”计算。举个简单的例子,就是把新进入窗口数据中各个单词个数“增长”到各个单词统计结果上,同时把离开窗口数据中各个单词的统计个数从相应的 统计结果中“减掉”。不过,你的本身定义好”反归约”函数,即:该算子不只有归约函数(见参数func),还得有一个对应的”反归约”函数(见参数中的 invFunc)。和前面的reduceByKeyAndWindow() 相似,该算子也有一个可选参数numTasks来指定并行任务数。注意,这个算子须要配置好检查点(checkpointing)才能用。 |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) | 基于包含(K, V)键值对的DStream,返回新的包含(K, Long)键值对的DStream。其中的Long value都是滑动窗口内key出现次数的计数。 和前面的reduceByKeyAndWindow() 相似,该算子也有一个可选参数numTasks来指定并行任务数。 |
Join相关算子
Finally, its worth highlighting how easily you can perform different kinds of joins in Spark Streaming.
最后,值得一提的是,你在Spark Streaming中作各类关联(join)操做很是简单。
Streams can be very easily joined with other streams.
一个数据流能够和另外一个数据流直接关联。
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2)
Here, in each batch interval, the RDD generated by stream1
will be joined with the RDD generated by stream2
. You can also do leftOuterJoin
, rightOuterJoin
, fullOuterJoin
. Furthermore, it is often very useful to do joins over windows of the streams. That is pretty easy as well.
上面代码中,stream1的每一个批次中的RDD会和stream2相应批次中的RDD进行join。一样,你能够相似地使用 leftOuterJoin, rightOuterJoin, fullOuterJoin 等。此外,你还能够基于窗口来join不一样的数据流,其实现也很简单,以下;)
val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2)
This has already been shown earlier while explain DStream.transform
operation. Here is yet another example of joining a windowed stream with a dataset.
其实这种状况已经在前面的DStream.transform算子中介绍过了,这里再举个基于滑动窗口的例子。
val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
In fact, you can also dynamically change the dataset you want to join against. The function provided to transform
is evaluated every batch interval and therefore will use the current dataset that dataset
reference points to.
实际上,在上面代码里,你能够动态地该表join的数据集(dataset)。传给tranform算子的操做函数会在每一个批次从新求值,因此每次该函数都会用最新的dataset值,因此不一样批次间你能够改变dataset的值。
The complete list of DStream transformations is available in the API documentation. For the Scala API, see DStream and PairDStreamFunctions. For the Java API, see JavaDStream and JavaPairDStream. For the Python API, see DStream.
完整的DStream transformation算子列表见API文档。Scala请参考 DStream 和 PairDStreamFunctions. Java请参考 JavaDStream 和 JavaPairDStream. Python见 DStream。
DStream输出算子
Output operations allow DStream’s data to be pushed out to external systems like a database or a file systems. Since the output operations actually allow the transformed data to be consumed by external systems, they trigger the actual execution of all the DStream transformations (similar to actions for RDDs). Currently, the following output operations are defined:
Output Operation | Meaning |
---|---|
print() | Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. Python API This is called pprint() in the Python API. |
saveAsTextFiles(prefix, [suffix]) | Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
saveAsObjectFiles(prefix, [suffix]) | Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".Python API This is not available in the Python API. |
saveAsHadoopFiles(prefix, [suffix]) | Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". Python API This is not available in the Python API. |
foreachRDD(func) | The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs. |
输出算子能够将DStream的数据推送到外部系统,如:数据库或者文件系统。由于输出算子会将最终完成转换的数据输出到外部系统,所以只有输出算 子调用时,才会真正触发DStream transformation算子的真正执行(这一点相似于RDD 的action算子)。目前所支持的输出算子以下表:
输出算子 | 用途 |
---|---|
print() | 在驱动器(driver)节点上打印DStream每一个批次中的头十个元素。 Python API 对应的Python API为 pprint() |
saveAsTextFiles(prefix, [suffix]) | 将DStream的内容保存到文本文件。 每一个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]” |
saveAsObjectFiles(prefix, [suffix]) | 将DStream内容以序列化Java对象的形式保存到顺序文件中。 每一个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]”Python API 暂不支持Python |
saveAsHadoopFiles(prefix, [suffix]) | 将DStream内容保存到Hadoop文件中。 每一个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]”Python API 暂不支持Python |
foreachRDD(func) | 这是最通用的输出算子了,该算子接收一个函数func,func将做用于DStream的每一个RDD上。 func应该实现将每一个RDD的数据推到外部系统中,好比:保存到文件或者写到数据库中。 注意,func函数是在streaming应用的驱动器进程中执行的,因此若是其中包含RDD的action算子,就会触发对DStream中RDDs的实际计算过程。 |
使用foreachRDD的设计模式
dstream.foreachRDD
is a powerful primitive that allows data to be sent out to external systems. However, it is important to understand how to use this primitive correctly and efficiently. Some of the common mistakes to avoid are as follows.
DStream.foreachRDD是一个很是强大的原生工具函数,用户能够基于此算子将DStream数据推送到外部系统中。不过用户须要了解如何正确而高效地使用这个工具。如下列举了一些常见的错误。
Often writing data to external system requires creating a connection object (e.g. TCP connection to a remote server) and using it to send data to a remote system. For this purpose, a developer may inadvertently try creating a connection object at the Spark driver, and then try to use it in a Spark worker to save records in the RDDs. For example (in Scala),
一般,对外部系统写入数据须要一些链接对象(如:远程server的TCP链接),以便发送数据给远程系统。所以,开发人员可能会不经意地在Spark驱动器(driver)进程中建立一个链接对象,而后又试图在Spark worker节点上使用这个链接。以下例所示:
dstream.foreachRDD { rdd => val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker } }
This is incorrect as this requires the connection object to be serialized and sent from the driver to the worker. Such connection objects are rarely transferrable across machines. This error may manifest as serialization errors (connection object not serializable), initialization errors (connection object needs to be initialized at the workers), etc. The correct solution is to create the connection object at the worker.
这段代码是错误的,由于它须要把链接对象序列化,再从驱动器节点发送到worker节点。而这些链接对象一般都是不能跨节点(机器)传递的。好比,链接对 象一般都不能序列化,或者在另外一个进程中反序列化后再次初始化(链接对象一般都须要初始化,所以从驱动节点发到worker节点后可能须要从新初始化) 等。解决此类错误的办法就是在worker节点上建立链接对象。
However, this can lead to another common mistake - creating a new connection for every record. For example,
然而,有些开发人员可能会走到另外一个极端 – 为每条记录都建立一个链接对象,例如:
dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } }
Typically, creating a connection object has time and resource overheads. Therefore, creating and destroying a connection object for each record can incur unnecessarily high overheads and can significantly reduce the overall throughput of the system. A better solution is to use rdd.foreachPartition
- create a single connection object and send all the records in a RDD partition using that connection.
通常来讲,链接对象是有时间和资源开销限制的。所以,对每条记录都进行一次链接对象的建立和销毁会增长不少没必要要的开销,同时也大大减少了系统的吞吐量。 一个比较好的解决方案是使用 rdd.foreachPartition – 为RDD的每一个分区建立一个单独的链接对象,示例以下:
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }
This amortizes the connection creation overheads over many records.
这样一来,链接对象的建立开销就摊到不少条记录上了。
Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches. One can maintain a static pool of connection objects than can be reused as RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.
最后,还有一个更优化的办法,就是在多个RDD批次之间复用链接对象。开发者能够维护一个静态链接池来保存链接对象,以便在不一样批次的多个RDD之间共享同一组链接对象,示例以下:
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }
Note that the connections in the pool should be lazily created on demand and timed out if not used for a while. This achieves the most efficient sending of data to external systems.
注意,链接池中的链接应该是懒惰建立的,而且有肯定的超时时间,超时后自动销毁。这个实现应该是目前发送数据最高效的实现方式。
DStreams are executed lazily by the output operations, just like RDDs are lazily executed by RDD actions. Specifically, RDD actions inside the DStream output operations force the processing of the received data. Hence, if your application does not have any output operation, or has output operations like dstream.foreachRDD()
without any RDD action inside them, then nothing will get executed. The system will simply receive the data and discard it.
By default, output operations are executed one-at-a-time. And they are executed in the order they are defined in the application.
累加器和广播变量
Accumulators and Broadcast variables cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use Accumulators or Broadcast variables as well, you’ll have to create lazily instantiated singleton instances for Accumulators and Broadcast variables so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example.
首先须要注意的是,累加器(Accumulators)和广播变量(Broadcast variables)是没法从Spark Streaming的检查点中恢复回来的。因此若是你开启了检查点功能,并同时在使用累加器和广播变量,那么你最好是使用懒惰实例化的单例模式,由于这样累加器和广播变量才能在驱动器(driver)故障恢复后从新实例化。代码示例以下:
object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance } } object DroppedWordsCounter { @volatile private var instance: Accumulator[Long] = null def getInstance(sc: SparkContext): Accumulator[Long] = { if (instance == null) { synchronized { if (instance == null) { instance = sc.accumulator(0L, "WordsInBlacklistCounter") } } } instance } } wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter += count false } else { true } }.collect() val output = "Counts at time " + time + " " + counts })
See the full source code.
这里有完整代码:source code。
DataFrame和SQL相关算子
You can easily use DataFrames and SQL operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier word count example to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.
在Streaming应用中能够调用DataFrames and SQL来 处理流式数据。开发者能够用经过StreamingContext中的SparkContext对象来建立一个SQLContext,而且,开发者须要确保一旦驱动器(driver)故障恢复后,该SQLContext对象能从新建立出来。一样,你仍是可使用懒惰建立的单例模式来实例化 SQLContext,以下面的代码所示,这里咱们将最开始的那个例子作了一些修改,使用DataFrame和SQL来统计单词计数。其实就是,将每一个 RDD都转化成一个DataFrame,而后注册成临时表,再用SQL查询这些临时表。
/** DataFrame operations inside your streaming program */ val words: DStream[String] = ... words.foreachRDD { rdd => // Get the singleton instance of SQLContext val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word") // Create a temporary view wordsDataFrame.createOrReplaceTempView("words") // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() }
See the full source code.
You can also run SQL queries on tables defined on streaming data from a different thread (that is, asynchronous to the running StreamingContext). Just make sure that you set the StreamingContext to remember a sufficient amount of streaming data such that the query can run. Otherwise the StreamingContext, which is unaware of the any asynchronous SQL queries, will delete off old streaming data before the query can complete. For example, if you want to query the last batch, but your query can take 5 minutes to run, then call streamingContext.remember(Minutes(5))
(in Scala, or equivalent in other languages).
See the DataFrames and SQL guide to learn more about DataFrames.
这里有完整代码:source code。
你也能够在其余线程里执行SQL查询(异步查询,即:执行SQL查询的线程和运行StreamingContext的线程不一样)。不过这种状况下, 你须要确保查询的时候 StreamingContext 没有把所需的数据丢弃掉,不然StreamingContext有可能已将老的RDD数据丢弃掉了,那么异步查询的SQL语句也可能没法获得查询结果。举 个栗子,若是你须要查询上一个批次的数据,可是你的SQL查询可能要执行5分钟,那么你就须要StreamingContext至少保留最近5分钟的数 据:streamingContext.remember(Minutes(5)) (这是Scala为例,其余语言差很少)
更多DataFrame和SQL的文档见这里: DataFrames and SQL
MLlib算子
You can also easily use machine learning algorithms provided by MLlib. First of all, there are streaming machine learning algorithms (e.g. Streaming Linear Regression, Streaming KMeans, etc.) which can simultaneously learn from the streaming data as well as apply the model on the streaming data. Beyond these, for a much larger class of machine learning algorithms, you can learn a learning model offline (i.e. using historical data) and then apply the model online on streaming data. See the MLlib guide for more details.
MLlib 提供了不少机器学习算法。首先,你须要关注的是流式计算相关的机器学习算法(如:Streaming Linear Regression, Streaming KMeans),这些流式算法能够在流式数据上一边学习训练模型,一边用最新的模型处理数据。除此之外,对更多的机器学习算法而言,你须要离线训练这些模型,而后将训练好的模型用于在线的流式数据。详见MLlib。
缓存/持久化
Similar to RDDs, DStreams also allow developers to persist the stream’s data in memory. That is, using the persist()
method on a DStream will automatically persist every RDD of that DStream in memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple operations on the same data). For window-based operations like reduceByWindow
and reduceByKeyAndWindow
and state-based operations like updateStateByKey
, this is implicitly true. Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling persist()
.
For input streams that receive data over the network (such as, Kafka, Flume, sockets, etc.), the default persistence level is set to replicate the data to two nodes for fault-tolerance.
Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in memory. This is further discussed in the Performance Tuning section. More information on different persistence levels can be found in the Spark Programming Guide.
和RDD相似,DStream也支持将数据持久化到内存中。只须要调用 DStream的persist() 方法,该方法内部会自动调用DStream中每一个RDD的persist方法进而将数据持久化到内存中。这对于可能须要计算不少次的DStream很是有 用(例如:对于同一个批数据调用多个算子)。对于基于滑动窗口的算子,如:reduceByWindow和reduceByKeyAndWindow,或 者有状态的算子,如:updateStateByKey,数据持久化就更重要了。所以,滑动窗口算子产生的DStream对象默认会自动持久化到内存中 (不须要开发者调用persist)。
对于从网络接收数据的输入数据流(如:Kafka、Flume、socket等),默认的持久化级别会将数据持久化到两个不一样的节点上互为备份副本,以便支持容错。
注意,与RDD不一样的是,DStream的默认持久化级别是将数据序列化到内存中。进一步的讨论见性能调优这一小节。关于持久化级别(或者存储级别)的更详细说明见Spark编程指南(Spark Programming Guide)。
检查点
A streaming application must operate 24/7 and hence must be resilient to failures unrelated to the application logic (e.g., system failures, JVM crashes, etc.). For this to be possible, Spark Streaming needs to checkpoint enough information to a fault- tolerant storage system such that it can recover from failures. There are two types of data that are checkpointed.
To summarize, metadata checkpointing is primarily needed for recovery from driver failures, whereas data or RDD checkpointing is necessary even for basic functioning if stateful transformations are used.
通常来讲Streaming 应用都须要7*24小时长期运行,因此必须对一些与业务逻辑无关的故障有很好的容错(如:系统故障、JVM崩溃等)。对于这些可能性,Spark Streaming 必须在检查点保存足够的信息到一些可容错的外部存储系统中,以便可以随时从故障中恢复回来。因此,检查点须要保存如下两种数据:
总之,元数据检查点主要是为了恢复驱动器节点上的故障,而数据或RDD检查点是为了支持对有状态转换操做的恢复。
Checkpointing must be enabled for applications with any of the following requirements:
updateStateByKey
or reduceByKeyAndWindow
(with inverse function) is used in the application, then the checkpoint directory must be provided to allow for periodic RDD checkpointing.Note that simple streaming applications without the aforementioned stateful transformations can be run without enabling checkpointing. The recovery from driver failures will also be partial in that case (some received but unprocessed data may be lost). This is often acceptable and many run Spark Streaming applications in this way. Support for non-Hadoop environments is expected to improve in the future.
若是有如下状况出现,你就必须启用检查点了:
注意,一些简单的流式应用,若是没有用到前面所说的有状态转换算子,则彻底能够不开启检查点。不过这样的话,驱动器(driver)故障恢复后,有 可能会丢失部分数据(有些已经接收但还未处理的数据可能会丢失)。不过一般这点丢失时可接受的,不少Spark Streaming应用也是这样运行的。对非Hadoop环境的支持将来还会继续改进。
Checkpointing can be enabled by setting a directory in a fault-tolerant, reliable file system (e.g., HDFS, S3, etc.) to which the checkpoint information will be saved. This is done by using streamingContext.checkpoint(checkpointDirectory)
. This will allow you to use the aforementioned stateful transformations. Additionally, if you want to make the application recover from driver failures, you should rewrite your streaming application to have the following behavior.
This behavior is made simple by using StreamingContext.getOrCreate
. This is used as follows.
检查点的启用,只须要设置好保存检查点信息的检查点目录便可,通常会会将这个目录设为一些可容错的、可靠性较高的文件系统(如:HDFS、S3 等)。开发者只须要调用 streamingContext.checkpoint(checkpointDirectory)。设置好检查点,你就可使用前面提到的有状态转换 算子了。另外,若是你须要你的应用可以支持从驱动器故障中恢复,你可能须要重写部分代码,实现如下行为:
不过这个行为能够用StreamingContext.getOrCreate来实现,示例以下:
// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start() context.awaitTermination()
If the checkpointDirectory
exists, then the context will be recreated from the checkpoint data. If the directory does not exist (i.e., running for the first time), then the function functionToCreateContext
will be called to create a new context and set up the DStreams. See the Scala example RecoverableNetworkWordCount. This example appends the word counts of network data into a file.
In addition to using getOrCreate
one also needs to ensure that the driver process gets restarted automatically on failure. This can only be done by the deployment infrastructure that is used to run the application. This is further discussed in the Deployment section.
Note that checkpointing of RDDs incurs the cost of saving to reliable storage. This may cause an increase in the processing time of those batches where RDDs get checkpointed. Hence, the interval of checkpointing needs to be set carefully. At small batch sizes (say 1 second), checkpointing every batch may significantly reduce operation throughput. Conversely, checkpointing too infrequently causes the lineage and task sizes to grow, which may have detrimental effects. For stateful transformations that require RDD checkpointing, the default interval is a multiple of the batch interval that is at least 10 seconds. It can be set by using dstream.checkpoint(checkpointInterval)
. Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.
若是 checkpointDirectory 目录存在,则context对象会从检查点数据从新构建出来。若是该目录不存在(如:首次运行),则 functionToCreateContext 函数会被调用,建立一个新的StreamingContext对象并定义好DStream数据流。完整的示例请参见RecoverableNetworkWordCount,这个例子会将网络数据中的单词计数统计结果添加到一个文件中。
除了使用getOrCreate以外,开发者还须要确保驱动器进程能在故障后重启。这一点只能由应用的部署环境基础设施来保证。进一步的讨论见部署(Deployment)这一节。
另外须要注意的是,RDD检查点会增长额外的保存数据的开销。这可能会致使数据流的处理时间变长。所以,你必须仔细的调整检查点间隔时间。若是批次 间隔过小(好比:1秒),那么对每一个批次保存检查点数据将大大减少吞吐量。另外一方面,检查点保存过于频繁又会致使血统信息和任务个数的增长,这一样会影响 系统性能。对于须要RDD检查点的有状态转换算子,默认的间隔是批次间隔的整数倍,且最小10秒。开发人员能够这样来自定义这个间 隔:dstream.checkpoint(checkpointInterval)。通常推荐设为批次间隔时间的5~10倍。
部署应用
This section discusses the steps to deploy a Spark Streaming application.
本节中将主要讨论一下如何部署Spark Streaming应用。
To run a Spark Streaming applications, you need to have the following.
Cluster with a cluster manager - This is the general requirement of any Spark application, and discussed in detail in the deployment guide.
Package the application JAR - You have to compile your streaming application into a JAR. If you are using spark-submit
to start the application, then you will not need to provide Spark and Spark Streaming in the JAR. However, if your application uses advanced sources (e.g. Kafka, Flume), then you will have to package the extra artifact they link to, along with their dependencies, in the JAR that is used to deploy the application. For example, an application using KafkaUtils
will have to include spark-streaming-kafka-0-8_2.11
and all its transitive dependencies in the application JAR.
Configuring sufficient memory for the executors - Since the received data must be stored in memory, the executors must be configured with sufficient memory to hold the received data. Note that if you are doing 10 minute window operations, the system has to keep at least last 10 minutes of data in memory. So the memory requirements for the application depends on the operations used in it.
Configuring checkpointing - If the stream application requires it, then a directory in the Hadoop API compatible fault-tolerant storage (e.g. HDFS, S3, etc.) must be configured as the checkpoint directory and the streaming application written in a way that checkpoint information can be used for failure recovery. See the checkpointing section for more details.
Configuring write ahead logs - Since Spark 1.2, we have introduced write ahead logs for achieving strong fault-tolerance guarantees. If enabled, all the data received from a receiver gets written into a write ahead log in the configuration checkpoint directory. This prevents data loss on driver recovery, thus ensuring zero data loss (discussed in detail in the Fault-tolerance Semantics section). This can be enabled by setting the configuration parameter spark.streaming.receiver.writeAheadLog.enable
to true
. However, these stronger semantics may come at the cost of the receiving throughput of individual receivers. This can be corrected by running more receivers in parallel to increase aggregate throughput. Additionally, it is recommended that the replication of the received data within Spark be disabled when the write ahead log is enabled as the log is already stored in a replicated storage system. This can be done by setting the storage level for the input stream to StorageLevel.MEMORY_AND_DISK_SER
. While using S3 (or any file system that does not support flushing) for write ahead logs, please remember to enable spark.streaming.driver.writeAheadLog.closeFileAfterWrite
and spark.streaming.receiver.writeAheadLog.closeFileAfterWrite
. See Spark Streaming Configuration for more details.
spark.streaming.receiver.maxRate
for receivers and spark.streaming.kafka.maxRatePerPartition
for Direct Kafka approach. In Spark 1.5, we have introduced a feature called backpressure that eliminate the need to set this rate limit, as Spark Streaming automatically figures out the rate limits and dynamically adjusts them if the processing conditions change. This backpressure can be enabled by setting the configuration parameter spark.streaming.backpressure.enabled
to true
.要运行一个Spark Streaming 应用,你首先须要具有如下条件:
spark-submit
提交应用,那么你不须要提供Spark和Spark Streaming的相关JAR包。可是,若是你使用了高级数据源(advanced sources – 如:Kafka、Flume、Twitter等),那么你须要将这些高级数据源相关的JAR包及其依赖一块儿打包并部署。例如,若是你使用了 TwitterUtils,那么就必须将spark-streaming-twitter_2.10及其相关依赖都打到应用的JAR包中。升级应用代码
If a running Spark Streaming application needs to be upgraded with new application code, then there are two possible mechanisms.
The upgraded Spark Streaming application is started and run in parallel to the existing application. Once the new one (receiving the same data as the old one) has been warmed up and is ready for prime time, the old one be can be brought down. Note that this can be done for data sources that support sending the data to two destinations (i.e., the earlier and upgraded applications).
The existing application is shutdown gracefully (see StreamingContext.stop(...)
or JavaStreamingContext.stop(...)
for graceful shutdown options) which ensure data that has been received is completely processed before shutdown. Then the upgraded application can be started, which will start processing from the same point where the earlier application left off. Note that this can be done only with input sources that support source-side buffering (like Kafka, and Flume) as data needs to be buffered while the previous application was down and the upgraded application is not yet up. And restarting from earlier checkpoint information of pre-upgrade code cannot be done. The checkpoint information essentially contains serialized Scala/Java/Python objects and trying to deserialize objects with new, modified classes may lead to errors. In this case, either start the upgraded app with a different checkpoint directory, or delete the previous checkpoint directory.
升级Spark Streaming应用程序代码,可使用如下两种方式:
StreamingContext.stop(...)
or JavaStreamingContext.stop(...)
), 即:确保所收到的数据都已经处理完毕后再退出。而后再启动新的Streaming程序,而新程序将接着在老程序退出点上继续拉取数据。注意,这种方式须要 数据源支持数据缓存(或者叫数据堆积,如:Kafka、Flume),由于在新旧程序交接的这个空档时间,数据须要在数据源处缓存。目前还不能支持从检查 点重启,由于检查点存储的信息包含老程序中的序列化对象信息,在新程序中将其反序列化可能会出错。这种状况下,只能要么指定一个新的检查点目录,要么删除 老的检查点目录。应用监控
Beyond Spark’s monitoring capabilities, there are additional capabilities specific to Spark Streaming. When a StreamingContext is used, the Spark web UI shows an additional Streaming
tab which shows statistics about running receivers (whether receivers are active, number of records received, receiver error, etc.) and completed batches (batch processing times, queueing delays, etc.). This can be used to monitor the progress of the streaming application.
The following two metrics in web UI are particularly important:
If the batch processing time is consistently more than the batch interval and/or the queueing delay keeps increasing, then it indicates that the system is not able to process the batches as fast they are being generated and is falling behind. In that case, consider reducing the batch processing time.
The progress of a Spark Streaming program can also be monitored using the StreamingListener interface, which allows you to get receiver status and processing times. Note that this is a developer API and it is likely to be improved upon (i.e., more information reported) in the future.
除了Spark自身的监控能力(monitoring capabilities)以外,对Spark Streaming还有一些额外的监控功能可用。若是实例化了StreamingContext,那么你能够在Spark web UI上看到多出了一个Streaming tab页,上面显示了正在运行的接收器(是否活跃,接收记录的条数,失败信息等)和处理完的批次信息(批次处理时间,查询延时等)。这些信息均可以用来监控streaming应用。
web UI上有两个度量特别重要:
若是批次处理耗时一直比批次间隔时间大,或者批次调度延时持续上升,就意味着系统处理速度跟不上数据接收速度。这时候你就得考虑一下怎么把批次处理时间降下来(reducing)。
Spark Streaming程序的处理进度能够用StreamingListener接口来监听,这个接口能够监听到接收器的状态和处理时间。不过须要注意的是,这是一个developer API接口,换句话说这个接口将来极可能会变更(可能会增长更多度量信息)。
Getting the best performance out of a Spark Streaming application on a cluster requires a bit of tuning. This section explains a number of the parameters and configurations that can be tuned to improve the performance of you application. At a high level, you need to consider two things:
Reducing the processing time of each batch of data by efficiently using cluster resources.
Setting the right batch size such that the batches of data can be processed as fast as they are received (that is, data processing keeps up with the data ingestion).
要得到Spark Streaming应用的最佳性能须要一点点调优工做。本节将深刻解释一些可以改进Streaming应用性能的配置和参数。整体上来讲,你须要考虑这两方面的事情:
There are a number of optimizations that can be done in Spark to minimize the processing time of each batch. These have been discussed in detail in the Tuning Guide. This section highlights some of the most important ones.
有很多优化手段均可以减小Spark对每一个批次的处理时间。细节将在优化指南(Tuning Guide)中详谈。这里仅列举一些最重要的。
Receiving data over the network (like Kafka, Flume, socket, etc.) requires the data to be deserialized and stored in Spark. If the data receiving becomes a bottleneck in the system, then consider parallelizing the data receiving. Note that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data. Receiving multiple data streams can therefore be achieved by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source(s). For example, a single Kafka input DStream receiving two topics of data can be split into two Kafka input streams, each receiving only one topic. This would run two receivers, allowing data to be received in parallel, thus increasing overall throughput. These multiple DStreams can be unioned together to create a single DStream. Then the transformations that were being applied on a single input DStream can be applied on the unified stream. This is done as follows.
跨网络接收数据(如:从Kafka、Flume、socket等接收数据)须要在Spark中序列化并存储数据。
若是接收数据的过程是系统瓶颈,那么能够考虑增长数据接收的并行度。注意,每一个输入DStream只包含一个单独的接收器(receiver,运行 约worker节点),每一个接收器单独接收一路数据流。因此,配置多个输入DStream就能从数据源的不一样分区分别接收多个数据流。例如,能够将从 Kafka拉取两个topic的数据流分红两个Kafka输入数据流,每一个数据流拉取其中一个topic的数据,这样一来会同时有两个接收器并行地接收数 据,于是增长了整体的吞吐量。同时,另外一方面咱们又能够把这些DStream数据流合并成一个,而后能够在合并后的DStream上使用任何可用的 transformation算子。示例代码以下:
val numStreams = 5 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) } val unifiedStream = streamingContext.union(kafkaStreams) unifiedStream.print()
Another parameter that should be considered is the receiver’s blocking interval, which is determined by the configuration parameter spark.streaming.blockInterval
. For most receivers, the received data is coalesced together into blocks of data before storing inside Spark’s memory. The number of blocks in each batch determines the number of tasks that will be used to process the received data in a map-like transformation. The number of tasks per receiver per batch will be approximately (batch interval / block interval). For example, block interval of 200 ms will create 10 tasks per 2 second batches. If the number of tasks is too low (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval. However, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem.
An alternative to receiving data with multiple input streams / receivers is to explicitly repartition the input data stream (using inputStream.repartition(<number of partitions>)
). This distributes the received batches of data across the specified number of machines in the cluster before further processing.
另外一个能够考虑优化的参数就是接收器的阻塞间隔,该参数由配置参数(configuration parameter)spark.streaming.blockInterval 决定。大多数接收器都会将数据合并成一个个数据块,而后再保存到spark内存中。对于map类算子来讲,每一个批次中数据块的个数将会决定处理这批数据并 行任务的个数,每一个接收器每批次数据处理任务数约等于 (批次间隔 / 数据块间隔)。例如,对于2秒的批次间隔,若是数据块间隔为200ms,则建立的并发任务数为10。若是任务数太少(少于单机cpu core个数),则资源利用不够充分。如需增长这个任务数,对于给定的批次间隔来讲,只须要减小数据块间隔便可。不过,咱们仍是建议数据块间隔至少要 50ms,不然任务的启动开销占比就过高了。
另外一个切分接收数据流的方法是,显示地将输入数据流划分为多个分区(使用 inputStream.repartition(<number of partitions>))。该操做会在处理前,将数据散开从新分发到集群中多个节点上。
数据处理并发度
Cluster resources can be under-utilized if the number of parallel tasks used in any stage of the computation is not high enough. For example, for distributed reduce operations like reduceByKey
and reduceByKeyAndWindow
, the default number of parallel tasks is controlled by the spark.default.parallelism
configuration property. You can pass the level of parallelism as an argument (see PairDStreamFunctions
documentation), or set the spark.default.parallelism
configuration property to change the default.
在计算各个阶段(stage)中,任何一个阶段的并发任务数不足都有可能形成集群资源利用率低。例如,对于reduce类的算子, 如:reduceByKey 和 reduceByKeyAndWindow,其默认的并发任务数是由 spark.default.parallelism 决定的。你既能够修改这个默认值(spark.default.parallelism),也能够经过参数指定这个并发数量(见PairDStreamFunctions)。
The overheads of data serialization can be reduced by tuning the serialization formats. In the case of streaming, there are two types of data that are being serialized.
Input data: By default, the input data received through Receivers is stored in the executors’ memory with StorageLevel.MEMORY_AND_DISK_SER_2. That is, the data is serialized into bytes to reduce GC overheads, and replicated for tolerating executor failures. Also, the data is kept first in memory, and spilled over to disk only if the memory is insufficient to hold all of the input data necessary for the streaming computation. This serialization obviously has overheads – the receiver must deserialize the received data and re-serialize it using Spark’s serialization format.
Persisted RDDs generated by Streaming Operations: RDDs generated by streaming computations may be persisted in memory. For example, window operations persist data in memory as they would be processed multiple times. However, unlike the Spark Core default of StorageLevel.MEMORY_ONLY, persisted RDDs generated by streaming computations are persisted with StorageLevel.MEMORY_ONLY_SER (i.e. serialized) by default to minimize GC overheads.
In both cases, using Kryo serialization can reduce both CPU and memory overheads. See the Spark Tuning Guide for more details. For Kryo, consider registering custom classes, and disabling object reference tracking (see Kryo-related configurations in the Configuration Guide).
In specific cases where the amount of data that needs to be retained for the streaming application is not large, it may be feasible to persist data (both types) as deserialized objects without incurring excessive GC overheads. For example, if you are using batch intervals of a few seconds and no window operations, then you can try disabling serialization in persisted data by explicitly setting the storage level accordingly. This would reduce the CPU overheads due to serialization, potentially improving performance without too much GC overheads.
调整数据的序列化格式能够大大减小数据序列化的开销。在spark Streaming中主要有两种类型的数据须要序列化:
无论是上面哪种数据,均可以使用Kryo序列化来减小CPU和内存开销,详见Spark Tuning Guide。另,对于Kryo,你能够考虑这些优化:注册自定义类型,禁用对象引用跟踪(详见Configuration Guide)。
在一些特定的场景下,若是数据量不是很大,那么你能够考虑不用序列化格式,不过你须要注意的是取消序列化是否会致使大量的GC开销。例如,若是你的 批次间隔比较短(几秒)而且没有使用基于窗口的算子,这种状况下你能够考虑禁用序列化格式。这样能够减小序列化的CPU开销以优化性能,同时GC的增加也 很少。
If the number of tasks launched per second is high (say, 50 or more per second), then the overhead of sending out tasks to the slaves may be significant and will make it hard to achieve sub-second latencies. The overhead can be reduced by the following changes:
These changes may reduce batch processing time by 100s of milliseconds, thus allowing sub-second batch size to be viable.
若是每秒启动的任务数过多(好比每秒50个以上),那么将任务发送给slave节点的开销会明显增长,那么你也就很难达到亚秒级(sub-second)的延迟。不过如下两个方法能够减小任务的启动开销:
这些调整有可能可以减小100ms的批次处理时间,这也使得亚秒级的批次间隔成为可能。
For a Spark Streaming application running on a cluster to be stable, the system should be able to process data as fast as it is being received. In other words, batches of data should be processed as fast as they are being generated. Whether this is true for an application can be found by monitoring the processing times in the streaming web UI, where the batch processing time should be less than the batch interval.
Depending on the nature of the streaming computation, the batch interval used may have significant impact on the data rates that can be sustained by the application on a fixed set of cluster resources. For example, let us consider the earlier WordCountNetwork example. For a particular data rate, the system may be able to keep up with reporting word counts every 2 seconds (i.e., batch interval of 2 seconds), but not every 500 milliseconds. So the batch interval needs to be set such that the expected data rate in production can be sustained.
A good approach to figure out the right batch size for your application is to test it with a conservative batch interval (say, 5-10 seconds) and a low data rate. To verify whether the system is able to keep up with the data rate, you can check the value of the end-to-end delay experienced by each processed batch (either look for “Total delay” in Spark driver log4j logs, or use the StreamingListener interface). If the delay is maintained to be comparable to the batch size, then system is stable. Otherwise, if the delay is continuously increasing, it means that the system is unable to keep up and it therefore unstable. Once you have an idea of a stable configuration, you can try increasing the data rate and/or reducing the batch size. Note that a momentary increase in the delay due to temporary data rate increases may be fine as long as the delay reduces back to a low value (i.e., less than batch size).
要想streaming应用在集群上稳定运行,那么系统处理数据的速度必须能跟上其接收数据的速度。换句话说,批次数据的处理速度应该和其生成速度同样快。对于特定的应用来讲,能够从其对应的监控(monitoring)页面上观察验证,页面上显示的处理耗时应该要小于批次间隔时间。
根据spark streaming计算的性质,在必定的集群资源限制下,批次间隔的值会极大地影响系统的数据处理能力。例如,在WordCountNetwork示例 中,对于特定的数据速率,一个系统可能可以在批次间隔为2秒时跟上数据接收速度,但若是把批次间隔改成500毫秒系统可能就处理不过来了。因此,批次间隔 须要谨慎设置,以确保生产系统可以处理得过来。
要找出适合的批次间隔,你能够从一个比较保守的批次间隔值(如5~10秒)开始测试。要验证系统是否能跟上当前的数据接收速率,你可能须要检查一下端到端的批次处理延迟(能够看看Spark驱动器log4j日志中的Total delay,也能够用StreamingListener接 口来检测)。若是这个延迟能保持和批次间隔差很少,那么系统基本就是稳定的。不然,若是这个延迟持久在增加,也就是说系统跟不上数据接收速度,那也就意味 着系统不稳定。一旦系统文档下来后,你就能够尝试提升数据接收速度,或者减小批次间隔值。不过须要注意,瞬间的延迟增加能够只是暂时的,只要这个延迟后续 会自动降下来就没有问题(如:降到小于批次间隔值)
Tuning the memory usage and GC behavior of Spark applications has been discussed in great detail in the Tuning Guide. It is strongly recommended that you read that. In this section, we discuss a few tuning parameters specifically in the context of Spark Streaming applications.
The amount of cluster memory required by a Spark Streaming application depends heavily on the type of transformations used. For example, if you want to use a window operation on the last 10 minutes of data, then your cluster should have sufficient memory to hold 10 minutes worth of data in memory. Or if you want to use updateStateByKey
with a large number of keys, then the necessary memory will be high. On the contrary, if you want to do a simple map-filter-store operation, then the necessary memory will be low.
In general, since the data received through receivers is stored with StorageLevel.MEMORY_AND_DISK_SER_2, the data that does not fit in memory will spill over to the disk. This may reduce the performance of the streaming application, and hence it is advised to provide sufficient memory as required by your streaming application. Its best to try and see the memory usage on a small scale and estimate accordingly.
Spark应用内存占用和GC调优已经在调优指南(Tuning Guide)中有详细的讨论。墙裂建议你读一读那篇文档。本节中,咱们只是讨论一下几个专门用于Spark Streaming的调优参数。
Spark Streaming应用在集群中占用的内存量严重依赖于具体所使用的tranformation算子。例如,若是想要用一个窗口算子操纵最近10分钟的数 据,那么你的集群至少须要在内存里保留10分钟的数据;另外一个例子是updateStateByKey,若是key不少的话,相对应的保存的key的 state也会不少,而这些都须要占用内存。而若是你的应用只是作一个简单的 “映射-过滤-存储”(map-filter-store)操做的话,那须要的内存就不多了。
通常状况下,streaming接收器接收到的数据会以 StorageLevel.MEMORY_AND_DISK_SER_2 这个存储级别存到spark中,也就是说,若是内存装不下,数据将被吐到磁盘上。数据吐到磁盘上会大大下降streaming应用的性能,所以仍是建议根 据你的应用处理的数据量,提供充足的内存。最好就是,一边小规模地放大内存,再观察评估,而后再放大,再评估。
Another aspect of memory tuning is garbage collection. For a streaming application that requires low latency, it is undesirable to have large pauses caused by JVM Garbage Collection.
There are a few parameters that can help you tune the memory usage and GC overheads:
Persistence Level of DStreams: As mentioned earlier in the Data Serialization section, the input data and RDDs are by default persisted as serialized bytes. This reduces both the memory usage and GC overheads, compared to deserialized persistence. Enabling Kryo serialization further reduces serialized sizes and memory usage. Further reduction in memory usage can be achieved with compression (see the Spark configuration spark.rdd.compress
), at the cost of CPU time.
Clearing old data: By default, all input data and persisted RDDs generated by DStream transformations are automatically cleared. Spark Streaming decides when to clear the data based on the transformations that are used. For example, if you are using a window operation of 10 minutes, then Spark Streaming will keep around the last 10 minutes of data, and actively throw away older data. Data can be retained for a longer duration (e.g. interactively querying older data) by setting streamingContext.remember
.
CMS Garbage Collector: Use of the concurrent mark-and-sweep GC is strongly recommended for keeping GC-related pauses consistently low. Even though concurrent GC is known to reduce the overall processing throughput of the system, its use is still recommended to achieve more consistent batch processing times. Make sure you set the CMS GC on both the driver (using --driver-java-options
in spark-submit
) and the executors (using Spark configuration spark.executor.extraJavaOptions
).
Other tips: To further reduce GC overheads, here are some more tips to try.
OFF_HEAP
storage level. See more detail in the Spark Programming Guide.另外一个内存调优的方向就是垃圾回收。由于streaming应用每每都须要低延迟,因此确定不但愿出现大量的或耗时较长的JVM垃圾回收暂停。
如下是一些可以帮助你减小内存占用和GC开销的参数或手段:
容错语义
In this section, we will discuss the behavior of Spark Streaming applications in the event of failures.
本节中,咱们将讨论Spark Streaming应用在出现失败时的具体行为。
To understand the semantics provided by Spark Streaming, let us remember the basic fault-tolerance semantics of Spark’s RDDs.
Spark operates on data in fault-tolerant file systems like HDFS or S3. Hence, all of the RDDs generated from the fault-tolerant data are also fault-tolerant. However, this is not the case for Spark Streaming as the data in most cases is received over the network (except when fileStream
is used). To achieve the same fault-tolerance properties for all of the generated RDDs, the received data is replicated among multiple Spark executors in worker nodes in the cluster (default replication factor is 2). This leads to two kinds of data in the system that need to recovered in the event of failures:
Furthermore, there are two kinds of failures that we should be concerned about:
With this basic knowledge, let us understand the fault-tolerance semantics of Spark Streaming.
要理解Spark Streaming所提供的容错语义,咱们首先须要回忆一下Spark RDD所提供的基本容错语义。
Spark主要操做一些可容错文件系统的数据,如:HDFS或S3。所以,全部从这些可容错数据源产生的RDD也是可容错的。然而,对于Spark Streaming并不是如此,由于多数状况下Streaming须要从网络远端接收数据,这回致使Streaming的数据源并不可靠(尤为是对于使用了 fileStream的应用)。要实现RDD相同的容错属性,数据接收就必须用多个不一样worker节点上的Spark执行器来实现(默认副本因子是 2)。所以一旦出现故障,系统须要恢复两种数据:
此外,还有两种可能的故障类型须要考虑:
有了以上这些基本知识,下面咱们就进一步了解一下Spark Streaming的容错语义。
The semantics of streaming systems are often captured in terms of how many times each record can be processed by the system. There are three types of guarantees that a system can provide under all possible operating conditions (despite failures, etc.)
流式系统的可靠度语义能够据此来分类:单条记录在系统中被处理的次数保证。一个流式系统可能提供保证一定是如下三种之一(无论系统是否出现故障):
In any stream processing system, broadly speaking, there are three steps in processing the data.
Receiving the data: The data is received from sources using Receivers or otherwise.
Transforming the data: The received data is transformed using DStream and RDD transformations.
Pushing out the data: The final transformed data is pushed out to external systems like file systems, databases, dashboards, etc.
If a streaming application has to achieve end-to-end exactly-once guarantees, then each step has to provide an exactly-once guarantee. That is, each record must be received exactly once, transformed exactly once, and pushed to downstream systems exactly once. Let’s understand the semantics of these steps in the context of Spark Streaming.
Receiving the data: Different input sources provide different guarantees. This is discussed in detail in the next subsection.
Transforming the data: All data that has been received will be processed exactly once, thanks to the guarantees that RDDs provide. Even if there are failures, as long as the received input data is accessible, the final transformed RDDs will always have the same contents.
Pushing out the data: Output operations by default ensure at-least once semantics because it depends on the type of output operation (idempotent, or not) and the semantics of the downstream system (supports transactions or not). But users can implement their own transaction mechanisms to achieve exactly-once semantics. This is discussed in more details later in the section.
任何流式处理系统通常都会包含如下三个数据处理步骤:
若是Streaming应用须要作到端到端的“精确一次”的保证,那么就必须在以上三个步骤中各自都保证精确一次:即,每条记录必须,只接收一次、处理一次、推送一次。下面让咱们在Spark Streaming的上下文环境中来理解一下这三个步骤的语义:
Different input sources provide different guarantees, ranging from at-least once to exactly once. Read for more details.
If all of the input data is already present in a fault-tolerant file system like HDFS, Spark Streaming can always recover from any failure and process all of the data. This gives exactly-once semantics, meaning all of the data will be processed exactly once no matter what fails.
For input sources based on receivers, the fault-tolerance semantics depend on both the failure scenario and the type of receiver. As we discussed earlier, there are two types of receivers:
Depending on what type of receivers are used we achieve the following semantics. If a worker node fails, then there is no data loss with reliable receivers. With unreliable receivers, data received but not replicated can get lost. If the driver node fails, then besides these losses, all of the past data that was received and replicated in memory will be lost. This will affect the results of the stateful transformations.
To avoid this loss of past received data, Spark 1.2 introduced write ahead logs which save the received data to fault-tolerant storage. With the write ahead logs enabled and reliable receivers, there is zero data loss. In terms of semantics, it provides an at-least once guarantee.
不一样的输入源提供不一样的数据可靠性级别,从“至少一次”到“精确一次”。
若是全部的输入数据都来源于可容错的文件系统,如HDFS,那么Spark Streaming就能在任何故障中恢复并处理全部的数据。这种状况下就能保证精确一次语义,也就是说无论出现什么故障,全部的数据老是精确地只处理一次,很少也很多。
对于基于接收器的输入源,容错语义将同时依赖于故障场景和接收器类型。前面也已经提到过,spark Streaming主要有两种类型的接收器:
对于不一样的接收器,咱们能够得到以下不一样的语义。若是一个worker节点故障了,对于可靠接收器来书,不会有数据丢失。而对于不可靠接收器,缓存 的(接收但还没有保存副本)数据可能会丢失。若是driver节点故障了,除了接收到的数据以外,其余的已经接收且已经保存了内存副本的数据都会丢失,这将 会影响有状态算子的计算结果。
为了不丢失已经收到且保存副本的数,从 spark 1.2 开始引入了WAL(write ahead logs),以便将这些数据写入到可容错的存储中。只要你使用可靠接收器,同时启用WAL(write ahead logs enabled),那么久不再用为数据丢失而担忧了。而且这时候,还能提供“至少一次”的语义保证。
The following table summarizes the semantics under failures:
Deployment Scenario | Worker Failure | Driver Failure |
---|---|---|
Spark 1.1 or earlier, OR Spark 1.2 or later without write ahead logs |
Buffered data lost with unreliable receivers Zero data loss with reliable receivers At-least once semantics |
Buffered data lost with unreliable receivers Past data lost with all receivers Undefined semantics |
Spark 1.2 or later with write ahead logs | Zero data loss with reliable receivers At-least once semantics |
Zero data loss with reliable receivers and files At-least once semantics |
下表总结了故障状况下的各类语义:
部署场景 | Worker 故障 | Driver 故障 |
---|---|---|
Spark 1.1及之前版本 或者 Spark 1.2及之后版本,且未开启WAL |
若使用不可靠接收器,则可能丢失缓存(已接收但还没有保存副本)数据; 若使用可靠接收器,则没有数据丢失,且提供至少一次处理语义 |
若使用不可靠接收器,则缓存数据和已保存数据均可能丢失; 若使用可靠接收器,则没有缓存数据丢失,但已保存数据可能丢失,且不提供语义保证 |
Spark 1.2及之后版本,并启用WAL | 若使用可靠接收器,则没有数据丢失,且提供至少一次语义保证 | 若使用可靠接收器和文件,则无数据丢失,且提供至少一次语义保证 |
In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. This approach (experimental as of Spark 2.0.0) is further discussed in the Kafka Integration Guide.
从Spark 1.3开始,咱们引入Kafka Direct API,该API能为Kafka数据源提供“精确一次”语义保证。有了这个输入API,再加上输出算子的“精确一次”保证,你就能真正实现端到端的“精确 一次”语义保证。(改功能截止Spark 1.6.1仍是实验性的)更详细的说明见:Kafka Integration Guide。
Output operations (like foreachRDD
) have at-least once semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to file systems using the saveAs***Files
operations (as the file will simply get overwritten with the same data), additional effort may be necessary to achieve exactly-once semantics. There are two approaches.
Idempotent updates: Multiple attempts always write the same data. For example, saveAs***Files
always writes the same data to the generated files.
Transactional updates: All updates are made transactionally so that updates are made exactly once atomically. One way to do this would be the following.
foreachRDD
) and the partition index of the RDD to create an identifier. This identifier uniquely identifies a blob data in the streaming application.Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. That is, if the identifier is not already committed, commit the partition data and the identifier atomically. Else, if this was already committed, skip the update.
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } }
从Spark 1.3开始,咱们引入Kafka Direct API,该API能为Kafka数据源提供“精确一次”语义保证。有了这个输入API,再加上输出算子的“精确一次”保证,你就能真正实现端到端的“精确 一次”语义保证。(改功能截止Spark 1.6.1仍是实验性的)更详细的说明见:Kafka Integration Guide。
输出算子(如 foreachRDD)提供“至少一次”语义保证,也就是说,若是worker故障,单条输出数据可能会被屡次写入外部实体中。不过这对于文件系统来讲是 能够接受的(使用saveAs***Files 屡次保存文件会覆盖以前的),因此咱们须要一些额外的工做来实现“精确一次”语义。主要有两种实现方式:
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // 使用uniqueId做为事务的惟一标识,基于uniqueId实现partitionIterator所指向数据的原子事务提交 } }
Between Spark 0.9.1 and Spark 1.0, there were a few API changes made to ensure future API stability. This section elaborates the steps required to migrate your existing code to 1.0.
Input DStreams: All operations that create an input stream (e.g., StreamingContext.socketStream
, FlumeUtils.createStream
, etc.) now returns InputDStream / ReceiverInputDStream (instead of DStream) for Scala, and JavaInputDStream / JavaPairInputDStream / JavaReceiverInputDStream / JavaPairReceiverInputDStream (instead of JavaDStream) for Java. This ensures that functionality specific to input streams can be added to these classes in the future without breaking binary compatibility. Note that your existing Spark Streaming applications should not require any change (as these new classes are subclasses of DStream/JavaDStream) but may require recompilation with Spark 1.0.
Custom Network Receivers: Since the release to Spark Streaming, custom network receivers could be defined in Scala using the class NetworkReceiver. However, the API was limited in terms of error handling and reporting, and could not be used from Java. Starting Spark 1.0, this class has been replaced by Receiver which has the following advantages.
stop
and restart
have been added to for better control of the lifecycle of a receiver. See the custom receiver guide for more details.在Spark 0.9.1和Spark 1.0之间,有一些API接口变动,变动目的是为了保障将来版本API的稳定。本节将详细说明一下从已有版本迁移升级到1.0所需的工做。
输入DStream(Input DStreams): 全部建立输入流的算子(如:StreamingContext.socketStream, FlumeUtils.createStream 等)的返回值再也不是DStream(对Java来讲是JavaDStream),而是 InputDStream / ReceiverInputDStream(对Java来讲是JavaInputDStream / JavaPairInputDStream /JavaReceiverInputDStream / JavaPairReceiverInputDStream)。这样才能确保特定输入流的功能可以在将来持续增长到这些class中,而不会打破二进制兼容性。注意,已有的Spark Streaming应用应该不须要任何代码修改(新的返回类型都是DStream的子类),只不过须要基于Spark 1.0从新编译一把。
定制网络接收器(Custom Network Receivers): 自从Spark Streaming发布以来,Scala就能基于NetworkReceiver来定制网络接收器。但因为错误处理和汇报API方便的限制,该类型不能在Java中使用。因此Spark 1.0开始,用 Receiver 来替换掉这个NetworkReceiver,主要的好处以下:
To migrate your existing custom receivers from the earlier NetworkReceiver to the new Receiver, you have to do the following.
org.apache.spark.streaming.receiver.Receiver
instead of org.apache.spark.streaming.dstream.NetworkReceiver
.onStart()
and onStop()
methods. The new Receiver class makes this unnecessary as it adds a set of methods named store(<data>)
that can be called to store the data in Spark. So, to migrate your custom network receiver, remove any BlockGenerator object (does not exist any more in Spark 1.0 anyway), and use store(...)
methods on received data.为了将已有的基于NetworkReceiver的自定义接收器迁移到Receiver上来,你须要以下工做:
org.apache.spark.streaming.receiver.Receiver继承,而再也不是
org.apache.spark.streaming.dstream.NetworkReceiver。Actor-based Receivers: The Actor-based Receiver APIs have been moved to DStream Akka. Please refer to the project for more details.
基于Actor的接收器(Actor-based Receivers): 从actor class继承后,并实现了
org.apache.spark.streaming.receiver.Receiver 后,
便可从Akka Actors中获取数据。获取数据的类被重命名为 org.apache.spark.streaming.receiver.ActorHelper
,而保存数据的pushBlocks(…)方法也被重命名为 store(…)。其余org.apache.spark.streaming.receivers包中的工具类也被移到 org.apache.spark.streaming.receiver
包下并重命名,新的类名应该比以前更加清晰。
更多的参考资料