SparkStreaming Java

Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具备可扩展性、高吞吐量、可容错性等特色。咱们能够从kafka、flume、Twitter、 ZeroMQ、Kinesis等源获取数据,也能够经过由 高阶函数map、reduce、join、window等组成的复杂算法计算出数据。最后,处理后的数据能够推送到文件系统、数据库、实时仪表盘中。html

基本概念

  • 离散流(Discretized Streams \ DStreams):Spark Streaming对内部持续的实时数据流点的抽象描述,及咱们处理的一个实时数据流,在SparkStreaming中对应于一个DStream实例。
  • 批数据(batch data):将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。随着时间片的推移,处理结果就造成了对应的结果数据流。
  • 时间片或者批处理间隔(batch interval):人为地将流数据进行定量的标准,以时间片做为拆分数据流的依据。一个时间片的数据对应一个RDD实例。
  • 窗口长度(window length):一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数。
  • 滑动时间间隔:前一个窗口到后一个窗口所经历的时间长度。必须是批处理的时间间隔的倍数。
  • Input DStream :一个input DStream是一个特殊的DStream,将Spark Stream链接到一个外部数据源来读取数据。
  • Receiver:长时间运行在Excetor。每一个Receiver负责一个Input DStream(例如一个读取Kafka消息的输入流)。每一个Receiver,加上input DSteam会占用一个core/slot。

一、项目建立java

关于Java:选用1.7或者1.8.为了通用性,本章内容使用1.7进行编写。node

关于Scala:工程不须要增长scala nature,即不需Add Scala Nature。若增长在java代码中调用scala library会有异常。python

关于Spark版本:使用1.6.3进行编写。git

maven 依赖github

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.3</version>
</dependency>
</dependencies>

例子:算法

public class NetworkWordCount {
	public static void main(String[] args) {
		networkWC();
	}
	public static void networkWC() {
		// Create a local StreamingContext with two working thread and batch
		// interval of 5 second
		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
		
		// Create a DStream that will connect to hostname:port, like
		// localhost:9999
		JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
		// Split each line into words
		JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			@Override
			public Iterable<String> call(String x) {
				return Arrays.asList(x.split(" "));
			}
		});
		// Count each word in each batch
		JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
			@Override
			public Tuple2<String, Integer> call(String s) {
				return new Tuple2<String, Integer>(s, 1);
			}
		});
		JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
			@Override
			public Integer call(Integer i1, Integer i2) {
				return i1 + i2;
			}
		});
		// Print the first ten elements of each RDD generated in this DStream to
		// the console
		wordCounts.print();
		jssc.start(); // Start the computation
		jssc.awaitTermination(); // Wait for the computation to terminate
	}
}

二、SparkStreaming

2.一、初始化

为了初始化Spark Streaming程序,一个JavaStreamingContext对象必需被建立,它是SparkStreaming全部流操做的主要入口。一个JavaStreamingContext 对象能够用SparkConf对象建立。须要注意的是,它在内部建立了一个JavaSparkContext对象,你能够经过 jssc.sparkContext 访问这个SparkContext对象。sql

// Create a local StreamingContext with two working thread and batch
// interval of 5 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));

当一个上下文(context)定义以后,你必须按照如下几步进行操做数据库

  • 定义输入源;
  • 准备好流计算指令;
  • 利用 streamingContext.start() 方法接收和处理数据;
  • 处理过程将一直持续,直到 streamingContext.stop() 方法被调用。

几点须要注意的地方:apache

  • 一旦一个context已经启动,就不能有新的流算子创建或者是添加到context中。
  • 一旦一个context已经中止,它就不能再从新启动
  • 在JVM中,同一时间只能有一个StreamingContext处于活跃状态
  • 在StreamingContext上调用 stop() 方法,也会关闭SparkContext对象。若是只想仅关闭StreamingContext对象,设置 stop() 的可选参数为false
  • 一个SparkContext对象能够重复利用去建立多个StreamingContext对象,前提条件是前面的StreamingContext在后面StreamingContext建立以前关闭(不关闭SparkContext)。

2.二、Discretized Streams (DStreams)

DStreams是Spark Streaming提供的基本的抽象,它表明一个连续的数据流。它要么是从源中获取的输入流,要么是输入流经过转换算子生成的处理后的数据流。在内部,DStreams由一系列连续的 RDD组成。DStreams中的每一个RDD都包含肯定时间间隔内的数
据,以下图所示:

任何对DStreams的操做都转换成了对DStreams隐含的RDD的操做。在前面的例子中, flatMap 操做应用于 lines 这个DStreams的每一个RDD,生成 words 这个DStreams的RDD。过程以下图所示:

经过Spark引擎计算这些隐含RDD的转换算子。DStreams操做隐藏了大部分的细节,而且为了更便捷,为开发者提供了更高层的API。下面几节将具体讨论这些操做的细节。

2.三、Input DStreams and Receivers

输入DStreams表示从数据源获取输入数据流的DStreams。在快速例子中, lines 表示输入DStream,它表明从netcat服务器获取的数据流。每个输入流DStream 和一个 Receiver 对象相关联,这个 Receiver 从源中获取数据,并将数据存入内存中用于处理。

输入DStreams表示从数据源获取的原始数据流。Spark Streaming拥有两类数据源

  • 基本源(Basic sources):这些源在StreamingContext API中直接可用。例如文件系统、套接字链接、Akka的actor等。
  • 高级源(Advanced sources):这些源包括Kafka,Flume,Kinesis,Twitter等等。它们须要经过额外的类来使用。咱们在关联那一节讨论了类依赖。

须要注意的是,若是你想在一个流应用中并行地建立多个输入DStream来接收多个数据流,你可以建立多个输入流(这将在性能调优那一节介绍) 。它将建立多个Receiver同时接收多个数据流。可是, receiver 做为一个长期运行的任务运行在Spark worker或executor中。所以,它占有一个核,这个核是分配给Spark Streaming应用程序的全部 核中的一个(itoccupies one of the cores allocated to the Spark Streaming application)。因此,为SparkStreaming应用程序分配足够的核(若是是本地运行,那么是线程) 用以处理接收的数据而且运行 receiver 是很是重要的。

几点须要注意的地方:

  • 若是分配给应用程序的核的数量少于或者等于输入DStreams或者receivers的数量,系统只可以接收数据而不能处理它们。
  • 当运行在本地,若是你的master URL被设置成了“local”,这样就只有一个核运行任务。这对程序来讲是不足的,由于做为 receiver 的输入DStream将会占用这个核,这样就没有剩余的核来处理数据了。

2.3.一、Basic sources

咱们已经在快速例子中看到, ssc.socketTextStream(...) 方法用来把从TCP套接字获取的文本数据建立成DStream。除了套接字,StreamingContext API也支持把文件 以及Akka actors做为输入源建立DStream。

套接字:从一个ip:port监控套接字。

文件流(File Streams):从任何与HDFS API兼容的文件系统中读取数据,一个DStream能够经过以下方式建立

streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);

Spark Streaming将会监控 dataDirectory 目录,而且处理目录下生成的任何文件(嵌套目录不被支持)。须要注意一下三点:

  1. 全部文件必须具备相同的数据格式
  2. 全部文件必须在`dataDirectory`目录下建立,文件是自动的移动和重命名到数据目录下
  3. 一旦移动,文件必须被修改。因此若是文件被持续的附加数据,新的数据不会被读取。

对于简单的文本文件,有一个更简单的方法 streamingContext.textFileStream(dataDirectory) 能够被调用。文件流不须要运行一个receiver,因此不须要分配核。

2.3.二、Advanced sources

(待补充)

2.3.三、Custom Sources

(待补充)

2.四、Transformations on DStreams

和RDD相似,transformation容许从输入DStream来的数据被修改。DStreams支持不少在RDD中可用的transformation算子。一些经常使用的算子以下所示:

Transformation Meaning
map(func) Return a new DStream by passing each element of the source DStream through a function func.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items.
filter(func) Return a new DStream by selecting only the records of the source DStream on which func returns true.
repartition(numPartitions) Changes the level of parallelism in this DStream by creating more or fewer partitions.
union(otherStream) Return a new DStream that contains the union of the elements in the source DStream and otherDStream.
count() Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.
reduce(func) Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel.
countByValue() When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.
reduceByKey(func, [numTasks]) When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config propertyspark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
join(otherStream, [numTasks]) When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.
cogroup(otherStream, [numTasks]) When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.
transform(func) Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.
updateStateByKey(func) Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.

 

2.4.一、UpdateStateByKey Operation

updateStateByKey操做容许不断用新信息更新它的同时保持任意状态。你须要经过两步来使用它

  • 定义状态-状态能够是任何的数据类型
  • 定义状态更新函数-怎样利用更新前的状态和从输入流里面获取的新值更新状态

让咱们举个例子说明。在例子中,你想保持一个文本数据流中每一个单词的运行次数,运行次数用一个state表示,它的类型是整数

import com.google.common.base.Optional;
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
  new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
    @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
      Integer newSum = ...  // add the new values with the previous running count to get the new count
      return Optional.of(newSum);
    }
  };

这被应用在一个包含word( pairs DStream 含有(word,1)对)的DStream上。

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);

这个update函数被每一个单词word调用,newValues 拥有一系列的1(从 (词, 1)对而来),runningCount拥有以前的次数。要看完整的代码,见例子JavaStatefulNetworkWordCount.java. 备注使用updateStateByKey时须要定义checkpoint目录,更多细节在checkpointing部分。

2.4.二、Transform Operation

(略)

2.4.三、Window Operations

Spark Streaming也支持窗口计算,它容许你在一个滑动窗口数据上应用transformation算子。下图阐明了这个滑动窗口。

如上图显示,窗口在源DStream上滑动,合并和操做落入窗内的源RDDs,产生窗口化的DStream的RDDs。在这个具体的例子中,程序在三个时间单元的数据上进行窗口操做,而且每两个时间单元滑动一次。 这说明,任何一个窗口操做都须要指定两个参数:

  • 窗口长度:窗口的持续时间
  • 滑动的时间间隔:窗口操做执行的时间间隔

这两个参数必须是源DStream的批时间间隔的倍数。
下面举例说明窗口操做。例如,你想扩展前面的例子用来计算过去30秒的词频,间隔时间是10秒。为了达到这个目的,咱们必须在过去30秒的 pairs DStream上应用 reduceByKey 操做。用方法 reduceByKeyAndWindow 实现。

// Reduce function adding two integers, defined separately for clarity
Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() {
  @Override public Integer call(Integer i1, Integer i2) {
    return i1 + i2;
  }
};

// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(10));

一些经常使用的窗口操做以下所示,这些操做都须要用到上文提到的两个参数:窗口长度和滑动的时间间隔

Transformation Meaning
window(windowLength, slideInterval) Return a new DStream which is computed based on windowed batches of the source DStream.
countByWindow(windowLength, slideInterval) Return a sliding window count of elements in the stream.
reduceByWindow(func, windowLength, slideInterval) Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative so that it can be computed correctly in parallel.
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and “inverse reducing” the old data that leaves the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable only to “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameter invFunc). Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. Note that checkpointing must be enabled for using this operation.

countByValueAndWindow(windowLength, slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.

2.4.四、Join Operations

最后,值得强调的是,您能够轻松地在Spark Streaming中执行不一样类型的join。

Stream-stream joins

Streams 可以方便地和其余streams进行join操做.

JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);

在这里,每批间隔,stream1产生的 RDD将与stream2产生的 RDD进行join操做。你也能够作leftouterjoin,rightouterjoin,fullouterjoin。此外,在stream的窗口上进行join一般是很是有用的。这也很容易。

JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);

Stream-dataset joins

下面是另外一个将窗口流与数据集join的例子。

JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(
    new Function<JavaRDD<Tuple2<String, String>>, JavaRDD<Tuple2<String, String>>>() {
        @Override 
        public JavaRDD<Tuple2<String, String>> call(JavaRDD<Tuple2<String, String>> rdd) {
            return rdd.join(dataset);
        }
    }
);

事实上,您还能够动态地更改你想要join的数据集。提供转换的函数在每一个批处理间隔中进行计算求值,所以将使用数据集引用指向的当前数据集。

The complete list of DStream transformations is available in the API documentation. For the Scala API, see DStream and PairDStreamFunctions. For the Java API, see JavaDStream and JavaPairDStream. For the Python API, see DStream.

2.五、Output Operations on DStreams

输出操做容许DStream的操做推到如数据库、文件系统等外部系统中。由于输出操做其实是容许外部系统消费转换后的数据,它们触发的实际操做是DStream转换。目前,定义了下面几种输出操做:

Output Operation Meaning
print() Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging.
Python API This is called pprint() in the Python API.
saveAsTextFiles(prefix, [suffix]) Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
saveAsObjectFiles(prefix, [suffix]) Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
Python API This is not available in the Python API.
saveAsHadoopFiles(prefix, [suffix]) Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
Python API This is not available in the Python API.
foreachRDD(func) The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

Design Patterns for using foreachRDD

利用foreachRDD的设计模式

dstream.foreachRDD是一个强大的原语,发送数据到外部系统中。然而,明白怎样正确地、有效地用这个原语是很是重要的。下面几点介绍了如何避免通常错误。常常写数据到外部系统须要建一个链接对象(例如到远程服务器的TCP链接),用它发送数据到远程系统。为了达到这个目的,开发人员可能不经意的在Spark驱动中建立一个链接对象,可是在Spark worker中 尝试调用这个链接对象保存记录到RDD中,以下:

dstream.foreachRDD(rdd	=>	{
						val	connection	=	createNewConnection()		//	executed	at	the	driver
						rdd.foreach(record	=>	{
										connection.send(record)	//	executed	at	the	worker
						})
		})

这是不正确的,由于这须要先序列化链接对象,而后将它从driver发送到worker中。这样的链接对象在机器之间不能传送。它可能表现为序列化错误(链接对象不可序列化)或者初始化错误(链接对象应该 在worker中初始化)等等。正确的解决办法是在worker中建立链接对象。

然而,这会形成另一个常见的错误-为每个记录建立了一个链接对象。例如:

dstream.foreachRDD(rdd	=>	{
						rdd.foreach(record	=>	{
										val	connection	=	createNewConnection()
										connection.send(record)
										connection.close()
						})
		})

一般,建立一个链接对象有资源和时间的开支。所以,为每一个记录建立和销毁链接对象会致使很是高的开支,明显的减小系统的总体吞吐量。一个更好的解决办法是利用 rdd.foreachPartition 方法。 为RDD的partition建立一个链接对象,用这个两件对象发送partition中的全部记录。

 

dstream.foreachRDD(rdd	=>	{
						rdd.foreachPartition(partitionOfRecords	=>	{
										val	connection	=	createNewConnection()
										partitionOfRecords.foreach(record	=>	connection.send(record))
										connection.close()
						})
		})

这就将链接对象的建立开销分摊到了partition的全部记录上了。

最后,能够经过在多个RDD或者批数据间重用链接对象作更进一步的优化。开发者能够保有一个静态的链接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支。

dstream.foreachRDD(rdd	=>	{
						rdd.foreachPartition(partitionOfRecords	=>	{
										//	ConnectionPool	is	a	static,	lazily	initialized	pool	of	connections
										val	connection	=	ConnectionPool.getConnection()
										partitionOfRecords.foreach(record	=>	connection.send(record))
										ConnectionPool.returnConnection(connection)		//	return	to	the	pool	for	future	reuse
						})
		})

须要注意的是,池中的链接对象应该根据须要延迟建立,而且在空闲一段时间后自动超时。这样就获取了最有效的方式发生数据到外部系统。

其它须要注意的地方:

  • 输出操做经过懒执行的方式操做DStreams,正如RDD action经过懒执行的方式操做RDD。具体地看,RDD actions和DStreams输出操做接收数据的处理。所以,若是你的应用程序没有任何输出操做或者 用于输出操做 dstream.foreachRDD() ,可是没有任何RDD action操做在 dstream.foreachRDD() 里面,那么什么也不会执行。系统仅仅会接收输入,而后丢弃它们。
  • 默认状况下,DStreams输出操做是分时执行的,它们按照应用程序的定义顺序按序执行。

2.六、DataFrame and SQL Operations

你能够很容易地使用DataFrames and SQL操做数据流。你必须使用StreamingContext已用的SparkContext,以建立一个SQLContext。此外,这必须这样作,它能够在驱动程序故障的时候从新启动。这是经过sqlcontext惰性实例化一个单例实例来完成的。下面的例子说明了这一点。它改变了早期的word count example 。每一个RDD转换为一个DataFrame,做为一个临时表注册并经过SQL进行查询。

/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
  private String word;

  public String getWord() {
    return word;
  }

  public void setWord(String word) {
    this.word = word;
  }
}

...

/** DataFrame operations inside your streaming program */

JavaDStream<String> words = ... 

words.foreachRDD(
  new Function2<JavaRDD<String>, Time, Void>() {
    @Override
    public Void call(JavaRDD<String> rdd, Time time) {

      // Get the singleton instance of SQLContext
      SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());

      // Convert RDD[String] to RDD[case class] to DataFrame
      JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
        public JavaRow call(String word) {
          JavaRow record = new JavaRow();
          record.setWord(word);
          return record;
        }
      });
      DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);

      // Register as table
      wordsDataFrame.registerTempTable("words");

      // Do word count on table using SQL and print it
      DataFrame wordCountsDataFrame =
          sqlContext.sql("select word, count(*) as total from words group by word");
      wordCountsDataFrame.show();
      return null;
    }
  }
);

查看完整代码 source code.

You can also run SQL queries on tables defined on streaming data from a different thread (that is, asynchronous to the running StreamingContext). Just make sure that you set the StreamingContext to remember a sufficient amount of streaming data such that the query can run. Otherwise the StreamingContext, which is unaware of the any asynchronous SQL queries, will delete off old streaming data before the query can complete. For example, if you want to query the last batch, but your query can take 5 minutes to run, then call streamingContext.remember(Minutes(5)) (in Scala, or equivalent in other languages).

2.七、Input DStreams and Receivers

和RDD类似,DStreams也容许开发者持久化流数据到内存中。在DStream上使用 persist() 方法能够自动地持久化DStream中的RDD到内存中。若是DStream中的数据须要计算屡次,这是很是有用的。像 reduceByWindow 和 reduceByKeyAndWindow 这种窗口操做、 updateStateByKey 这种基于状态的操做,持久化是默认的,不须要开发者调用 persist() 方法。例如经过网络(如kafka,flume等)获取的输入数据流,默认的持久化策略是复制数据到两个不一样的节点以容错。
注意,与RDD不一样的是,DStreams默认持久化级别是存储序列化数据到内存中,这将在Performance Tuning 章节介绍。更多的信息请看rdd持久化 Spark Programming Guide.

2.八、Checkpointing

一个流应用程序必须全天候运行,全部必须可以解决应用程序逻辑无关的故障(如系统错误,JVM崩溃等)。为了使这成为可能,Spark Streaming须要checkpoint足够的信息到容错存储系统中, 以使系统从故障中恢复。

Metadata checkpointing:保存流计算的定义信息到容错存储系统如HDFS中。这用来恢复应用程序中运行worker的节点的故障。元数据包括

  • Configuration :建立Spark Streaming应用程序的配置信息
  • DStream operations :定义Streaming应用程序的操做集合
  • Incomplete batches:操做存在队列中的未完成的批

Data checkpointing :保存生成的RDD到可靠的存储系统中,这在有状态transformation(如结合跨多个批次的数据)中是必须的。在这样一个transformation中,生成的RDD依赖于以前 批的RDD,随着时间的推移,这个依赖链的长度会持续增加。在恢复的过程当中,为了不这种无限增加。有状态的transformation的中间RDD将会定时地存储到可靠存储系统中,以截断这个依赖链。元数据checkpoint主要是为了从driver故障中恢复数据。若是transformation操做被用到了,数据checkpoint即便在简单的操做中都是必须的。

什么时候checkpoint

应用程序在下面两种状况下必须开启checkpoint

  • 使用有状态的transformation。若是在应用程序中用到了 updateStateByKey 或者 reduceByKeyAndWindow ,checkpoint目录必需提供用以按期checkpoint RDD。
  • 从运行应用程序的driver的故障中恢复过来。使用元数据checkpoint恢复处理信息。

注意,没有前述的有状态的transformation的简单流应用程序在运行时能够不开启checkpoint。在这种状况下,从driver故障的恢复将是部分恢复(接收到了可是尚未处理的数据将会丢失)。 这一般是能够接受的,许多运行的Spark Streaming应用程序都是这种方式。

怎样配置Checkpointing

在容错、可靠的文件系统(HDFS、s3等)中设置一个目录用于保存checkpoint信息。着能够经过 streamingContext.checkpoint(checkpointDirectory) 方法来作。这运行你用以前介绍的 有状态transformation。另外,若是你想从driver故障中恢复,你应该如下面的方式重写你的Streaming应用程序。

  • 当应用程序是第一次启动,新建一个StreamingContext,启动全部Stream,而后调用 start() 方法
  • 当应用程序由于故障从新启动,它将会从checkpoint目录checkpoint数据从新建立StreamingContext
// Create a factory object that can create a and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
  @Override public JavaStreamingContext create() {
    JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
    JavaDStream<String> lines = jssc.socketTextStream(...);     // create DStreams
    ...
    jssc.checkpoint(checkpointDirectory);                       // set checkpoint directory
    return jssc;
  }
};

// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start();
context.awaitTermination();

若是 checkpointDirectory 存在,上下文将会利用checkpoint数据从新建立。若是这个目录不存在,将会调用 functionToCreateContext 函数建立一个新的上下文,创建DStreams。 请看RecoverableNetworkWordCount例子。

除了使用 getOrCreate ,开发者必须保证在故障发生时,driver处理自动重启。只能经过部署运行应用程序的基础设施来达到该目的。在部署章节将有更进一步的讨论。

注意,RDD的checkpointing有存储成本。这会致使批数据(包含的RDD被checkpoint)的处理时间增长。所以,须要当心的设置批处理的时间间隔。在最小的批容量(包含1秒的数据)状况下,checkpoint每批数据会显著的减小 操做的吞吐量。相反,checkpointing太少会致使谱系以及任务大小增大,这会产生有害的影响。由于有状态的transformation须要RDD checkpoint。默认的间隔时间是批间隔时间的倍数,最少10秒。它能够经过 dstream.checkpoint 来设置。典型的状况下,设置checkpoint间隔是DStream的滑动间隔的5-10大小是一个好的尝试。

相关文章
相关标签/搜索