spark第六篇:Spark Streaming Programming Guide

预览html

Spark Streaming是Spark核心API的扩展,支持高扩展,高吞吐量,实时数据流的容错流处理。数据能够从Kafka,Flume或TCP socket等许多来源获取,而且可使用复杂的算法进行处理(好比map,reduce,join,window等高级函数)。最终,处理的结果数据能够推送到文件系统,数据库或实时仪表盘上。          java

在内部,它的工做原理以下图。Spark Streaming接收实时输入数据流并将数据分红批,而后由Spark引擎处理,进而批量生成最终结果流。node

Spark Streaming提供了一个高层次的抽象,称为DStream(离散流),它表明连续数据流。DStream能够经过Kafka,Flume等来源的输入数据流建立,也能够经过在其余DStream上应用高级函数来建立。在内部,一个DStream被表示为一系列RDD。git

本指南将向你介绍如何使用DStream编写Spark Streaming程序。github

一个快速例子算法

假如咱们想统计从监听TCP套接字的数据服务器接收到的文本数据中的字数。sql

java代码示例:数据库

    public static void main(String[] args) throws Exception {
        System.setProperty("hadoop.home.dir", "D:/Users/KOUSHENGRUI976/winutils-master/winutils-master/hadoop-2.6.4");
        SparkConf conf = new SparkConf().setAppName("heihei").setMaster("local[*]");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("127.0.0.1", 9999);
        JavaDStream<String> words = lines.flatMap(p -> Arrays.asList(p.split(" ")).iterator());
        JavaPairDStream<String, Integer> pairs = words.mapToPair(p -> new Tuple2<>(p, 1));
        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
        wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
    }

首先,咱们建立一个JavaStreamingContext对象,这是全部Spark Streaming程序的入口。使用这个StreamingContext对象建立一个DStream来表示来自TCP源的流数据,指定主机(好比127.0.0.1)和端口(好比9999)。以上lines表示从数据服务器接收的数据流,流中每条记录都是一行文本。以后,咱们把lines用空格分割成words。flatMap是一个转换操做,它经过从源DStream中的每条记录生成多个新记录来建立一个新的DStream。在这种状况下,每一行被分红多个单词,单词流被表示为words DStream。flatMap()方法的参数是一个FlatMapFunction对象(接收一个入参,返回一个Iterator对象)。接下来,咱们要统计这些单词:words DStream经过mapToPair()方法,进一步映射成pair DStream,而后调用reduceByKey()方法,求每批数据中每一个单词的出现频率。注意,执行以上行时,Spark Streaming只会设置它在启动后执行的计算,并未实际处理。最终咱们调用StreamingContext 的start()方法,在全部转换完成以后开始处理。完整的代码能够参阅JavaNetworkWordCountapache

若是你已经下载而且构建了Spark,你能够如下面方式运行这个例子。你首先须要运行Netcat(若是提示nc命令找不到的话,就yum install nc 安装便可),来做为数据服务器:编程

$ nc -lk 9999

而后,在一个不一样的终端,你能够经过以下命令启动这个例子:

$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999

run-example文件在SPAKR_HOME/bin目录中。

观察现象http://www.jianshu.com/p/59733597d448

基本概念

引入jar包

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.2.1</version>
</dependency>

若是数据来源是Kafka或者Flume的话,还须要引入整合jar包。Kafka整合jar包版本在Spark Streaming + Kafka Integration Guide有讨论:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.2.1</version>
    <scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.11</artifactId>
    <version>2.2.1</version>
</dependency>

初始化StreamingContext

开发Spark Streaming应用时,StreamingContext对象(java语言中是JavaStreamingContext对象)是全部Spark Streaming程序的入口。JavaStreamingContext对象能够经过SparkConf对象建立。这会在内部建立一个JavaSparkContext对象,能够经过jssc.sparkContext()来获取。批处理间隔必须根据应用程序和可用集群资源的延迟要求来设置。

JavaStreamingContext对象也可经过现有的JavaSparkConext对象来建立:

import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

定义JavaStreamingContext对象后,你必须执行如下操做:

1.经过建立input DStream来定义输入源。

2.经过对DStream应用转换和输出操做定义流式计算。

3.调用JavaStreamingContext对象的start()方法开始接收数据并处理。

4.调用JavaStreamingContext对象的awaitTermination()方法来等待处理中止(人为结束或者由于错误结束)。

5.能够经过调用JavaStreamingContext对象的stop()方法来人为中止处理。

须要注意的几点:

1.一旦一个JavaStreamingContext对象启动(调用start()方法)后,就不能创建或者添加新的流式计算了。

2.一旦一个JavaStreamingContext对象中止后,就不能再从新启动了。

3.在一个JVM中,最多只有一个StreamingContext能够处于活跃状态。

4.stop()方法会在中止JavaStreamingConext的同时中止SparkContext,若是只想中止JavaStreamingContext,则须要调用有参的stop()方法,参数值为false。

5.可使用一个SparkContext对象来建立多个JavaStreamingContext对象,只要先前的JavaStreamingContext对象在下一个JavaStreamingContext对象被建立以前中止(不中止SparkContext)就能够。

离散流(DStream)

离散流(DStream)是Spark Streaming中提供的基本抽象。它表明了一个持续的数据流,或者是从源接收的输入数据流,或者是经过转换输入流而生成的处理过的数据流。在内部,DStream由连续的RDD表示,每一个RDD都包含必定时间间隔的数据,以下图所示:

在DStream上应用的任何操做都会转换为对RDD的操做。例如,在前面将lines DStream转换成words DStream的例子中,flatMap操做是应用到lines DStream中的每个RDD的,进而生成words DStream中的RDD。以下图所示:

RDD的这些转换由Spark引擎处理。DStream操做隐藏了大部分细节,并为开发人员提供了一个更高级别的API。这些操做将在后面的章节中详细讨论。

输入DStream和接收器

输入DStream表明从流数据源接收的DStream。在上面例子中,lines就是一个输入DStream,它表明了从netcat服务器接收到的流数据。每个输入DStream(除了文件流)都与一个接收器(Receiver)对象相关联,该对象从一个源接收数据并将数据存储在Spark的内存中以供处理。

Spark Streaming提供了两类内置的流数据源:

1.基本数据源:好比文件系统、socket链接。

2.高级数据源:像Kafka,Flume等其余中间件,客户端编程时须要引入额外的jar依赖。

若是但愿在流处理程序中同时接受多个数据流,则须要建立多个输入DStream。这将建立多个接收器,同时接受多个数据流。

须要注意的是,每一个接收器都会占用一个核(或者是线程,若是是以本地模式运行的话)。所以,当在本地运行Spark Streaming程序时,不要使用"local"或者"local[1]"做为master URL,这两个都意味着只有一个线程运行任务。若是你使用基于接收器的输入DStream,则单线程将用于运行接收器,而不会处理接收到的数据。一样的,以集群模式运行时,分配给Spark Streaming程序的核心数也必须大于接收器数量,不然也只是接收数据而不作处理。

基本数据源

在上面例子中,咱们已经看了jssc.socketTextStream("127.0.0.1", 9999),它经过TCP socket接收文本数据并建立一个输入DStream。除了socket外,StreamingContext API还提供了从文件建立输入DStream的方法。

文件流:为了从与HDFS API兼容的任何文件系统上(如HDFS,S3,NFS等)的文件中读取数据,能够经过如下方式建立输入DStream:

jssc.fileStream(String directory, Class<K> kClass, Class<V> vClass, Class<F> fClass);

Spark Streaming将监视directory目录,并处理在该目录中建立的全部文件(不包括子目录中的文件)。注意:

1.这些文件必须具备相同的数据格式。

2.这些文件必须经过经过原子移动或者重命名来建立。

3.一旦移动,文件不得更改。因此即便文件被连续追加,新的数据将不会被读取。

对于简单的文本文件,有一个更简单的方法:

jssc.textFileStream(String directory);

文件流不须要运行接收器,因此不须要分配内核。

高级数据源

这类数据源须要整合其余第三方中间件,好比Kafka和Flume。在编程方面,须要引入一些与第三方中间件整合使用的依赖jar,从这些源建立DStream的功能被移到了jar包中。

其中一些高级数据源以下:

Kafka:Spark Streaming2.2.1与Kafka broker0.8.2.1及更高版本兼容。有关更多详细信息,请参阅Spark Streaming + Kafka Integration Guide

Flume:Spark Streaming2.2.1与Flume1.6.0及更高版本兼容。有关更多详细信息,请参阅Spark Streaming + Flume Integration Guide

接收器可靠性

数据源根据其可靠性能够分为两类。像Kafka和Flume这种数据源,容许传输的数据被确认。若是从这些可靠的数据源接收数据的系统正确地确认接收到的数据,则能够确保没有数据因为任何故障而丢失。这致使两种接收器:

1.可靠的接收器,一个可靠的接收器在收到数据并将数据存储在Spark中时,能够正确地向可靠的数据源发送确认。

2.不可靠的接收器,不可靠的接收器不会向数据源发送确认。这能够用于不支持确认的数据源,或者不但愿或者不须要确认的数据源。

DStream上的转换操做

Similar to that of RDDs, transformations allow the data from the input DStream to be modified. DStreams support many of the transformations available on normal Spark RDD’s. Some of the common ones are as follows:

map(Function func):

flatMap(FlatMapFunction func)

filter(Function<T, Boolean> func)

repartition(int numPartitions)

union(JavaDStream otherDStream)  union(JavaPairDStream otherPairDStream)

count()  获得一个JavaDStream<Long>实例

reduce(Function2 func)

countByValue()

reduceByKey(Function2 func)  仅可由JavaPairDStream实例调用,返回一个新的JavaPairDStream实例

join(JavaPairDStream otherPairDStream)  仅可由JavaPairDStream实例调用,返回一个新的JavaPairDStream实例

cogroup(JavaPairDStream otherPairDStream)  仅可由JavaPairDStream实例调用,返回一个新的JavaPairDStream实例

transform(Function func):在源DStream中的每一个RDD上应用任意的RDD-RDD函数,返回一个新的DStream。

updateStateByKey(Function2 updateFunc)  仅可由JavaPairDStream实例调用,返回一个新的JavaPairDStream实例

下面针对一些转换操做作更详细的解释。

UpdateStateByKey操做

The updateStateByKey operation allows you to maintain arbitrary state while continuously updating it with new information。使用这个转换操做须要两个步骤:

1.定义状态。状态能够是任意的数据类型。

2.定义状态更新函数。用函数指定如何使用以前的状态和来自输入流的新值来更新状态

在每一批数据中,Spark会对全部现有的key应用状态更新函数,无论批数据是否有新数据。若是状态更新函数返回none,则键值对将被消除。

咱们来举个例子说明一下。假设你想保持在文本数据流中出现的每一个单词出现的次数。在这里,出现的次数是状态,它是一个整数。咱们将更新函数定义为:

Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
  (values, state) -> {
    Integer newSum = ...  // add the new values with the previous running count to get the new count
    return Optional.of(newSum);
  };

这适用于包含单词的DStream,例如,上面例子中的包含(word, 1)对的pairs DStream。

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);

上例中的Optional类不是jdk8自带的那个,而是org.apache.spark.api.java.Optional,在spark-core.jar中。Function2是个函数式接口,全类名是org.apache.spark.api.java.function.Function2,也在spark-core.jar中,相似的函数式接口还有Function、Function0、Function三、Function4。

The updateFunction will be called for each word, with newValues having a sequence of 1’s (from the (word, 1) pairs) and the runningCount having the previous count。须要注意的是,使用updateStateByKey必须配置checkpoint目录,将在下面的checkpointing章节详细讨论。完整的Java代码,请参阅JavaStatefulNetworkWordCount.java

Transform操做

transform操做及其变形(如transformWith、transformToPair、transformWithToPair),容许任意RDD-RDD函数应用于DStream。它能够用于应用那些还没有在DStream API中公开的RDD操做。例如,将数据流中的每一个批次与其余数据集链接起来的功能没有直接暴露在DStream API中。可是,你可使用transform操做完成这个功能。实际应用场景:经过将输入数据流与预先计算的垃圾信息进行实时数据清理,而后基于此进行过滤。

import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
  rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
  ...
});

Note that the supplied function gets called in every batch interval。这容许你执行随时间变化的RDD操做,便可以在批次之间更改RDD操做,更改分区数量或者广播变量等等。

窗口操做

Spark Streaming还提供了窗口化的计算,容许你在滑动的数据窗口上应用转换操做。下图阐释了滑动窗口:

如上图所示,每当窗口滑过源DStream时,窗口内的源RDD被组合并操做以产生窗口DStream的RDD。上图中,操做被应用在最后3个时间单位的数据上,而且滑动2个时间单位。这代表任何窗口操做都须要指定两个参数:

1.窗口长度。窗口的持续时间(图中是3个时间单位)

2.滑动间隔。执行窗口操做的间隔(图中是2个时间单位)

这两个参数必须是源DStream的批次间隔的整数倍。

下面以一个例子来讲明窗口操做。咱们扩展下前面的例子,如今要统计前30s内出现的单词的个数,每10s统计一次。To do this, we have to apply the reduceByKey operation on the pairs DStream of (word, 1) pairs over the last 30 seconds of data. This is done using the operation reduceByKeyAndWindow。

// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));

reduceByKeyAndWindow(Function2 reduceFunc, Duration windowDuration, Duration slideDuration),返回一个JavaPairDStream实例。

一些经常使用的窗口操做以下,它们都须要以上所说的两个参数:

window(Duration windowDuration, Duration slideDuration)  

countByWindow(Duration windowDuration, Duration slideDuration)

countByValueAndWindow(Duration windowDuration, Duration slideDuration)

reduceByWindow(Function2 reduceFunc, Duration windowDuration, Duration slideDuration)

reduceByKeyAndWindow(Function2 reduceFunc, [Function2 invReduceFunc,] Duration windowDuration, Duration slideDuration)

Join操做

最后,值得强调的是,能够轻松地在Spark Streaming中执行不一样类型的链接。

Stream-Stream joins(仅JavaPairDStream有join相关方法,JavaDStream没有)

JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);

在这里,在每一个批间隔中,由stream1生成的RDD将与由stream2生成的RDD链接。除了join(),还有leftOuterJoin()、rightOuterJoin()。

此外,在windowed stream上进行链接也是很经常使用的:

JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);

Stream-dataset joins

这在DStream的transform操做章节已经讲过了。如今咱们用一个windowed stream链接一个dataset:

JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));

这里解释下,DStream与RDD链接不是DStream.join(RDD),DStream与RDD维度不同,DSteam是一连串的RDD,因此DSteam与RDD链接,其实指的是DStream中的RDD与RDD链接。

DStream上的输出操做

输出操做容许将DStream的数据推送到外部系统,如数据库或者文件系统。像RDD的行为操做和转换操做的关系同样,DStream输出操做会触发转换操做的实际执行。

有如下常见输出操做:

print():在driver node上打印DStream每一批数据的前10个元素,仅用于开发和调试。

saveAsHadoopFiles(String prefix, String suffix)

saveAsNewAPIHadoopFiles(String prefix, String suffix)

foreachRDD(VoidFunction func)、foreachRDD(VoidFunction2 func):对DStream/PairDStream的每一个RDD都应用func函数。

正确使用foreachRDD

foreachRDD()功能很强大,容许将数据发送到外部系统。可是,正确且高效使用该方法却不简单,一些常见的错误以下:

一般,将数据写入外部系统须要建立链接对象,并使用它来将数据发送到远程系统。为此,开发人员可能会在Spark driver中建立链接对象,而后在Spark worker上使用它来将RDD的数据发送出去。以下代码:

dstream.foreachRDD(rdd -> {
  Connection connection = createNewConnection(); // executed at the driver
  rdd.foreach(record -> {
    connection.send(record); // executed at the worker
  });
});

这是不正确的,由于这须要将链接对象序列化并从driver发送到worker。这种链接对象是很难跨机器传输的,这种错误可能表现为序列化错误(链接对象不可序列化)或初始化错误(链接对象须要在worker初始化)等。正确作法是在worker建立链接对象。然而,这将会致使另外一个常见错误,即为每个记录建立一个链接对象。代码以下:

dstream.foreachRDD(rdd -> {
  rdd.foreach(record -> {
    Connection connection = createNewConnection();
    connection.send(record);
    connection.close();
  });
});

一般状况下,建立一个链接对象须要时间和资源的开销。因此,为每条记录建立和销毁链接对象可能会产生没必要要的高开销,而且会显著下降系统的吞吐量。

更好的解决方案是使用rdd.foreachPartition(),为每一个分区建立一个链接对象,并使用该链接发送分区中的全部记录。代码以下:

dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    Connection connection = createNewConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    connection.close();
  });
});

最后,经过跨批次重用链接对象,能够进一步优化。咱们能够维护一个静态的链接对象池,里面的链接对象能够被重用。代码以下:

dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    // ConnectionPool is a static, lazily initialized pool of connections
    Connection connection = ConnectionPool.getConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    ConnectionPool.returnConnection(connection); // return to the pool for future reuse
  });
});

至此,就是将数据发送到外部系统的最有效的解决方案。咱们再回顾一下这个方案,首先对DStream执行foreachRDD()方法,去操做每一个RDD的数据,而后,对每一个RDD执行foreachPartition()方法,去操做每一个RDD的每个分区的数据。此外,操做数据的链接要从链接池中拿,而且是懒初始化和有过时时间的。

其余须要注意的几点:

1.若是应用程序没有对DStream执行输出操做的话,那么根本不会执行任何操做。系统只会简单地接收数据并丢弃它。

2.默认状况下,输出操做是一次一个执行的,并且他们是按照在应用程序中定义的顺序执行的。

DataFrame和SQL操做

能够轻松使用DataFrame和SQL操做流数据。你必须使用StreamingContext正在使用的SparkContext来建立一个SparkSession对象。此外,必须这样作才能在驱动程序故障时重启。这是经过懒加载建立一个SparkSession单例实现的。下面的例子展现了这一点,它修改了以前的单词计数示例,使用DataFrame和SQL生成字数。每一个RDD都转换为一个DataFrame,注册为一个临时表,而后使用SQL进行查询。

/** Java Bean class */
public class JavaRow implements java.io.Serializable {
  private String word;
  public String getWord() {
    return word;
  }
  public JavaRow(String word) {
    this.word = word;
  }
}

/** DataFrame operations inside your streaming program */
JavaDStream<String> words = ... 
words.foreachRDD((rdd, time) -> {
  SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
  JavaRDD<JavaRow> rowRDD = rdd.map(word -> new JavaRow(word));
  Dataset<Row> wordsDF = spark.createDataFrame(rowRDD, JavaRow.class);
  wordsDF.createOrReplaceTempView("words");
  Dataset<Row> wordCountsDF = spark.sql("select word, count(*) as total from words group by word");
  wordCountsDF.show();
});

查看完整的源代码。JavaSqlNetworkWordCount.java

MLib操做

用的时候再学。

缓存/持久化 Caching/Persistence

同RDD相似,DStream也容许开发人员将流数据保存在内存中。也就是说,调用DStream的persist()方法会自动将该DStream中的每一个RDD都保存在内存中。若是DStream中的数据被屡次计算的话,这将很是有用。对于像reduceByWindow()和reduceByKeyAndWindow()这样的基于窗口的操做以及像updateStateByKey()这样的基于状态的操做,这是隐含的。所以,基于窗口的操做生成的DStream会自动持久化到内存中,开发人员无需显式调用persist()方法。

对于经过网络接收数据的输入流,例如Kafka,Flume,sockets等,默认持久化级别是将数据复制到两个节点以实现容错。

请注意,与RDD不一样,DStream默认持久化级别使数据在内存中保持序列化。这在Performance Tuning进一步讨论。

检查点 Checkpointing

流应用必须全天候运行,因此必须对与应用程序逻辑无关的故障(例如,系统故障,JVM崩溃等)具备恢复能力。为了作到这一点,Spark Streaming须要存储足够的信息到容错的存储系统,以便从故障中恢复。有两种类型的检查点数据:

1.元数据检查点。将定义流式计算的信息保存到HDFS等容错系统中。This is used to recover from failure of the node running the driver of the streaming application (discussed in detail later)。元数据包括:

①配置--用于建立流应用程序的配置

②DStream操做--定义流应用程序的操做集合

③incomplete batches--Batches whose jobs are queued but have not completed yet

2.数据检查点。将生成的RDD保存到可靠的存储系统中。这在将多个批次的数据组合在一块儿的有状态转换中是必需的。在这样的转换中,生成的RDD依赖于以前批次的RDD,致使依赖链的长度随着时间的推移不断的增长。为了不恢复时间的这种无限增加(与依赖链成比例),有状态转换的中间RDD被周期性地保存到可靠存储系统中(例如HDFS),以切断依赖链。

什么时候启用检查点

必须为具备如下任何要求的应用程序启用检查点:

1.程序中有状态转换操做--若是在程序中使用了updateStateByKey()或者reduceByKeyAndWindow(with inverse function),那么就必须提供检查点目录以容许周期性地保存中间RDD。

2.从运行应用程序的驱动程序的故障中恢复--Metadata checkpoints are used to recover with progress information。

请注意,没有应用上述状态转换操做的简单的流应用程序能够不启用检查点。在这种状况下,从驱动程序故障中恢复也将是部分的(一些接收到但未处理的数据可能会丢失)。这一般是能够接受的,许多都以这种方式运行Spark Streaming应用程序。预计对非Hadoop环境的支持将来将获得改善。

如何配置检查点

检查点能够经过设置一个在容错的、可靠的文件系统(例如HDFS、S3)中的目录来启用,检查点信息将被保存到该文件系统中。这是经过调用StreamingContext实例的checkpoint(String directory)方法来实现的。这将容许你使用上述的状态转换操做。此外,若是你想让应用程序从驱动程序故障中恢复,则应该重写你的流应用程序以具备如下行为:

当程序第一次启动时,它会建立一个新的StreamingContext实例,设置全部的流,而后调用start()方法。

程序在失败后重启时,将会根据检查点目录中的检查点数据从新建立一个StreamingContext实例。

这种行为能够经过JavaStreamingContext.getOrCreate(String checkpointPath, Function0<JavaStreamingContext> creatingFunc)方法很简单地实现:

String checkpointDirectory = "hdfs://192.168.100.100:9000/checkpoint/application1";
Function0<JavaStreamingContext> creatingFunc = () -> {
    SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingText2");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
    JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
    // set checkpoint directory
    jssc.checkpoint(checkpointDirectory);
    return jssc;
};

// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, creatingFunc);

// Do additional setup on context that needs to be done, irrespective of whether it is being started or restarted
//        context. ...

// Start the context
context.start();
try {
    context.awaitTermination();
} catch (InterruptedException e) {
    e.printStackTrace();
}

JavaStreamingContext.getOrCreate(String checkpointPath, Function0<JavaStreamingContext> creatingFunc)方法,若是checkpointPath存在,则将根据检查点数据从新建立JavaStreamingContext实例。若是checkpointPath不存在(即第一次运行),那么creatingFunc函数将被调用,来建立一个新的JavaStreamingContext实例并设置输入DStream。

除了使用getOrCreate()方法外,还须要确保驱动程序进程在失败时自动从新启动。这只能经过运行应用程序的部署基础结构来完成,在下面的部署应用章节有详细的讨论。

请注意,RDD的检查点致使了存储到可靠存储系统的成本。这可能会致使RDD检查点的批处理时间变长。所以,检查点须要当心设置。在批间隔很小的状况下(例如1s),检查点可能会显著下降吞吐量。并且,若是检查点过于频繁,则会致使谱系和任务规模增加,这可能有不利影响。

对于须要RDD检查点的有状态转换操做,默认的间隔是批间隔的整数倍。它能够经过调用DStream的checkpoint(Duration checkpointInterval)方法来设置。一般状况下,检查点间隔设置为滑动间隔(若是有的话)的5-10倍是一个很好的尝试。

累加器,广播变量和检查点

累加器和广播变量是不能从检查点恢复的。若是你启用了检查点,并使用了累加器和广播变量,那么你必须建立一个懒实例化的累加器和广播变量单例,以便在驱动程序从新启动时从新实例化这些实例。

见下面例子:

class JavaWordBlacklist {

  private static volatile Broadcast<List<String>> instance = null;

  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaWordBlacklist.class) {
        if (instance == null) {
          List<String> wordBlacklist = Arrays.asList("a", "b", "c");
          instance = jsc.broadcast(wordBlacklist);
        }
      }
    }
    return instance;
  }
}

class JavaDroppedWordsCounter {

  private static volatile LongAccumulator instance = null;

  public static LongAccumulator getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaDroppedWordsCounter.class) {
        if (instance == null) {
          instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
        }
      }
    }
    return instance;
  }
}

wordCounts.foreachRDD((rdd, time) -> {
  // Get or register the blacklist Broadcast
  Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
  // Get or register the droppedWordsCounter Accumulator
  LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
  // Use blacklist to drop words and use droppedWordsCounter to count them
  String counts = rdd.filter(wordCount -> {
    if (blacklist.value().contains(wordCount._1())) {
      droppedWordsCounter.add(wordCount._2());
      return false;
    } else {
      return true;
    }
  }).collect().toString();
  String output = "Counts at time " + time + " " + counts;
}

所有代码请参阅JavaRecoverableNetworkWordCount.

部署应用

本节讨论部署Spark Streaming应用程序的步骤。

要求

要运行一个Spark Streaming应用程序,你须要如下:

1.具备集群管理器的集群--这是任何Spark应用程序的通常要求。

2.打包应用程序jar--你必须将你的流应用程序编译成jar包。

3.为executor节点配置足够的内存--因为接收到的数据必须存储在内存中,executor节点必须配置足够的内存以保存接收到的数据。请注意,若是你正在进行10分钟的窗口操做,则系统必须在内存中保留至少10分钟的数据。因此应用程序须要的内存取决于在其中使用的操做。

4.配置检查点 - 若是流应用程序须要配置检查点,那么必须将Hadoop API兼容的容错存储系统(HDFS)中的一个目录设置为检查点目录,使得流应用能以检查点信息来进行故障恢复。

5.配置驱动程序的自动重启 - 为了能自动从驱动程序故障中恢复,运行流应用程序的部署基础架构必须监视驱动程序进程,并在驱动程序失败时从新启动驱动程序。不一样的集群管理器有不一样的工具来实现:

①Spark Standalone - Spark驱动应用程序能够提交到Spark Standalone集群中运行。也就是说,驱动应用程序自己在其中一个工做节点上运行。此外,Standalone集群管理器能够被指示监督驱动应用程序,并在驱动程序因为非零退出代码或者因为所在的节点故障而失败时从新启动它。查看Spark Standalone来获取更多详情。

②YARN - YARN支持相似的机制来重启应用程序。

6.配置预写入日志 - Spark能够启用预写入日志来实现强大的容错保证。启用以后,从接收器接收到的全部数据将被写入到检查点目录中的预写入日志中。这能够避免驱动程序恢复时数据丢失,从而确保数据零丢失(在下面容错语义部分详细讨论)。启用的方式是设置配置参数spark.streaming.receiver.writeAheadLog.enable为true。然而,这些更强语义多是以单个接收器的接收吞吐量为代价的。这能够经过并行运行更多的接收器来增长总吞吐量来纠正。此外,建议在启用预写入日志时,Spark接收数据不要存储副本,由于至关于一个副本的日志已存储在容错存储系统中。这能够经过将输入流的存储级别设置为StorageLevel.MEMORY_AND_DISK_SER来实现。查看Spark Streaming Configuration获取更多详情。请注意,启用I/O加密时,Spark不会加密写入预写入日志的数据。若是须要对预写入日志的数据进行加密,则应将其存储在原生支持加密的文件系统中。

7,.设置最大接收速率 - 若是集群资源不足以使流式应用程序处理数据的速度与接收速度同样快,从1.5开始,Spark提供了一个叫作backpressure的特性,启用此特性后,Spark Streaming会自动调整速率限制。启用方式是设置配置参数spark.streaming.backpressure.enabled为true。

升级应用程序代码

若是正在运行的Spark Streaming应用程序须要使用新的应用程序代码升级,则有两种机制可供选择:

1.新的Spark Streaming应用程序启动并与旧应用程序并行运行。一旦新应用程序(接收的数据与旧应用程序相同)被预热好,旧应用程序就能够中止。请注意,在数据源支持发送数据到两个目的地的状况下,这种机制才可使用。

2.旧应用程序平滑关闭(调用JavaStreamingContext的stop()方法),以确保已接收到的数据在关闭完被处理。而后启用新应用程序,该应用程序从旧应用程序中止的同一点开始处理。请注意,只有使用支持源端缓冲的数据源(如Kafka和Flume)才能完成此操做,由于数据须要在旧应用程序关闭、新应用程序启动前进行缓冲。同时,还须要使用不一样的检查点目录来启动新应用程序,或者删除以前的检查点目录。由于检查点信息本质上包含序列化的Scala/Java/Python对象,试图用新应用程序的类来反序列化这些对象可能会出错。

监控应用程序

除了Spark自己的监控功能外,Spark Streaming还提供了一些其余特定功能。当使用StreamingContext时,Spark Web UI会显示一个额外的Streaming选项卡,它显示有关正在运行的接收器(接收器是否处于活动状态,接收到的记录数量,接收器错误等)和已完成的批次(批处理时间,排队延迟等),这可用来监控流应用程序的进度。

Web UI中的如下两个指标尤其重要:

1.处理时间 - 处理每批数据的时间。

2.Scheduling Delay - 批次在队列中等待处理先前批次的时间(the time a batch waits in a queue for the processing of previous batches to finish)。

若是批处理时间一直比批处理间隔大,或者scheduling delay持续增大,则表示系统处理数据的速度比不上接收数据的速度。在这种状况下,考虑如何减小批处理时间。

Spark Streaming程序的进度也可使用StreamingListener接口进行监控,该接口容许你获取接收器状态和处理时间等信息。

性能调整

使集群上的Spark Streaming应用程序表现出最佳的性能须要进行一些调整。本节介绍可调整的参数和配置,以提升应用程序的性能。从较高层次考虑,你须要考虑两件事情:

1.经过有效利用集群资源减小批处理时间。

2.设置正确的批次大小,以使得批处理的速度能跟得上接收数据的数据。

减小批处理时间

Spark有不少优化可使每一个批次的处理时间最少。这些在Tuning Guide有详细讨论。本节介绍一些最重要的内容:

数据接收的并行度

经过网络接收数据(如Kafka,Flume,socket等)须要将数据反序列化并存储到Spark中。若是数据接收成为系统中的瓶颈,则能够考虑并行化数据接收。请注意,每一个输入DStream都会建立一个接收器。所以能够经过建立多个输入DStream并配置它们接收流数据源的不一样分区。例如,单个接收两个主题数据的输入DStream能够被切分为两个输入DStream,每一个输入DStream接收一个主题的数据。这将建立两个接收器,并行接收数据,从而提升总体吞吐量。多个DStream能够经过union操做链接成一个DStream。以下;

int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();

另外一个应该考虑的参数是接收器的块间隔,它由配置参数spark.streaming.blockInterval决定。对于大多数接收器来讲,接收到的数据在存储在Spark的内存以前会被合并成块。每一个批次的块数决定了将用于处理数据的任务数量。块数=batch interval / block interval。例如,批间隔设置为2s,块间隔设置为200ms,那么会建立10个任务。若是任务数太少(少于每台机器的内核数),那么效率将会很低,由于不是全部的内核都用于处理数据。要增长任务数量,就须要减小块间隔。可是,建议的块间隔最少为50ms,低于此值,任务启动开销可能会成为问题。

用多个输入DStream、多个接收器去接收数据的另外一种方法是显示从新分配输入DStream的分区(调用inputStream.repartition(int numPartitions))。

数据处理的并行度

若是在计算的某些阶段使用的并行任务数量不够高,则集群资源可能未被充分利用。例如,对于像reduceByKey和reduceByKeyAndWindow这样的分布式reduce操做,并行任务数量的默认值由spark.default.parallelism配置属性控制。你能够将并行级别看成一个参数传递,或者设置上述属性以更改默认值。

数据序列化

数据序列化的开销能够经过调整序列化格式来减小。在Spark Streaming中,有两种类型的数据被序列化;

1.输入数据。默认状况下,经过接收器接收到的输入数据会以StorageLevel.MEMORY_AND_DISK_SER_2级别存储在executor的内存中。useDisk,useMemory,not useOffHeap,not deserialized,2 replication。也就是说,将数据序列化为字节以减小GC开销,并有2个副本用于容错。此外,数据首先保存在内存中,而且只有当内存不足以保存流式计算所需的全部输入数据时才会溢出到磁盘。这个序列化显然有CPU开销 - 接收器必须反序列化接收的数据,并使用Spark的序列化格式从新序列化它。

2.持久化经过流操做生成的RDD。由流计算生成的RDD可能会持久化到内存中。例如,窗口操做会将数据存储到内存中,由于它们将被屡次处理。可是,与Spark Core默认的StorageLevel.MEMORY_ONLY级别不一样,流式计算生成的RDD持久化级别是StorageLevel.MEMORY_ONLY_SER,以最大限度地减小GC开销。

在以上两种状况下,使用Kryo序列化既能够减小CPU又能够减小内存开销。有关更多详细信息,查看Spark Tuning Guide。对于Kryo,请考虑注册自定义类,并禁用对象引用跟踪。查看Spark Configuration

若是须要为流应用程序保存的数据量不大的话,能够将数据保存为反序列化对象而不会致使过分的GC开销。例如,若是使用几秒钟的批间隔而且没有窗口操做,那么你能够经过显式设置存储级别来禁用持久化数据的序列化。这能够减小因为序列化形成的CPU开销,并可能在不增长太多GC开销的状况下提升性能。综上,序列化会形成很较大的CPU开销,可是减小GC开销。

任务启动开销

若是每秒启动的任务不少(好比说50或者更多),那么向slaves发送任务的开销就会很大,这使得很难达到亚秒级的延迟。能够经过如下更改来减小开销:

Execution mode:以Spark Standalone模式运行Spark或者以粗粒度的Mesos模式运行Spark会比以细粒度的Mesos模式运行Spark有更好的任务启动时间。查看Running Spark on Mesos获取更多详情。

这种更改可能会使批处理时间减小100ms,从而容许亚秒批大小可行。亚秒:没有达到秒,也就是说不到一秒。

设置合适的批间隔

为了让Spark Streaming应用程序稳定地运行在集群上,系统应该有能力像接收数据同样快速处理数据。换句话说,数据处理应该像数据生成同样快。经过Web UI,咱们能够看到批处理时间是否比批间隔小,进而判断程序是否稳定。

为你的应用程序找出合适的批大小的一个好方法是用一个保守的批间隔(比方说,5-10s)和较低的数据速率进行测试。要验证系统是否可以跟上数据速率,能够检查每一个处理过的批次所经历的端到端延迟的值(能够在Spark驱动程序log4j日志中查找"Total delay",或者使用StremingListener接口)。若是延迟保持与批大小至关,那么系统是稳定的。不然,若是延迟不断增长,则意味着系统没法跟上,不稳定。一旦你有了一个稳定的配置,你能够尝试提升数据速率或者减少批大小。注意,只要延迟小于批大小,因为数据速率增长而引发的瞬时延迟增长是正常的。

内存调整

Tune Spark中详细讨论了调整Spark应用程序的内存使用状况和GC行为,必须阅读。在本节中,咱们将专门讨论Spark Streaming应用程序中的一些调优参数。

Spark Streaming应用程序须要的内存大小很大程度上取决于所使用的转换操做。例如,若是你想在最后10分钟的数据上使用窗口操做,那么你的集群应该有足够的内存来存储10分钟的数据。或者你想在有不少key的状况下使用updateStateByKey操做,那么就须要不少内存。相反,若是你想作一个简单的map-filter-store操做,那么就不须要不少内存。

一般,由于经过接收器接收的数据是以StorageLevel.MEMORY_AND_DISK_SER_2级别存储的,内存中放不下的数据会溢出到磁盘,这可能会下降流应用程序的性能,所以建议根据流应用程序的须要提供足够的内存。最好尝试一下小规模的内存使用状况并作相应的评估。

内存调整的另外一个方面是GC(垃圾回收)。对于须要低延迟的流应用程序,不但愿由于JVM垃圾回收而形成大量暂停。有几个参数能够帮你调整内存使用状况和GC开销:

1.DStream的持久化级别:如在上面数据序列化部分所说,输入数据和RDD默认以序列化的字节被持久化。与反序列化的持久化相比,这同时减小了内存使用和GC开销。启用Kryo序列化进一步减小了序列化的大小和内存使用。进一步减小内存使用还能够经过压缩来实现(查看Spark的配置参数spark.rdd.compress),代价是cpu时间。

2.清理旧数据:默认状况下,全部的输入数据和持久化的RDD是自动清理的。Spark Streaming根据使用的转换操做来决定什么时候清理数据。例如,你正在使用10分钟的窗口操做,则Spark Streaming将保留最近10分钟的数据,并主动丢弃旧数据。经过设置streamingcontext.remember,数据能够保留更长的时间。

3.CMS垃圾回收器:强烈建议使用CMS(concurrent mark-and sweep,并发标记扫描)GC,以使GC相关的暂停持续低水平。尽管并发GC会下降系统的总体吞吐量,但仍推荐使用并行GC来实现更加一致的批处理时间。确保在驱动程序上(在spark-submit中使用 --driver-java-options )和executor上(使用Spark配置参数spark.executor.extraJavaOptions)设置了CMS GC。

4.其余技巧:为了进一步减小GC开销,这里有更多的技巧值得尝试。

①用StorageLevel.OFF_HEAP存储级别持久化RDD。(useDisk,useMemory,useOffHeap,not deserialized,1 replication)

②使用更多的executor和较小的堆大小。这会下降每一个JVM 堆内的GC压力。

须要记住的几点:

1.每一个DStream都与一个接收器相关联(除了文件流)。为了并行读取数据,须要建立多个接收器,即多个DStream。接收器在executor上运行,它占据一个内核。确保除去接收器所占内核以后,还有足够的内核来处理数据。spark.cores.max应该把接收器的内核也算在内。接收器以循环的方式分配给executor。

2.当从数据源接收数据时,接收器建立数据块。每隔块间隔(blockInterval)都会生成一个新的数据块。在批间隔中会生成N个数据块,N=批间隔/块间隔。这些块由当前executor的块管理器分发给其余executor的块管理器。以后,驱动程序上运行的的网络输入跟踪器(Network Input Tracker)将被通知块的位置以供进一步处理。

3.An RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally.

4.The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in. Having bigger blockinterval means bigger blocks. A high value of spark.locality.wait increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally.

5.Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling inputDstream.repartition(n). This reshuffles the data in RDD randomly to create n number of partitions. Yes, for greater parallelism. Though comes at the cost of a shuffle. An RDD’s processing is scheduled by driver’s jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued.

6.If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted.

容错语义

在本节中,咱们将讨论Spark Streaming应用程序在发生故障时的行为。

背景:

为了理解Spark Streaming提供的语义,让咱们记住Spark RDD的基本容错语义:

1.RDD是一个不可变的,能够从新计算的,分布式的数据集。Each RDD remembers the lineage of deterministic operations that were used on a fault-tolerant input dataset to create it.

2.若是RDD的任何分区因为工做节点故障而丢失,那么这个分区能够由原始数据集经过操做谱系从新计算出来。

3.假定RDD的全部转换操做都是肯定的,最终转换的RDD中的数据老是相同的,而无论Spark集群中的故障如何。

Spark操做容错文件系统(好比说HDFS)中的数据。因此,全部由容错数据生成的RDD也应该是容错的。然而,对于Spark Streaming,状况并不是如此,由于大多数状况下是经过网络接收的数据(除了文件流)。

要为全部生成的RDD实现相同的容错属性,须要为接收的数据在集群工做节点的多个executor上建立副本(默认是2个副本)。这致使系统中有两种数据在发生故障时须要恢复:

1.接收到以及建立了副本的数据。这些数据在单个工做节点故障时仍然存在,由于它在另外一个工做节点上有副本。

2.Data received but buffered for replication。由于这些数据没有副本,恢复这些数据惟一的方法是从数据源再接收一次。

此外,有两种失败咱们应该关注:

1.工做节点失败。运行executor的任意一个工做节点均可能会失败,此时这些节点上全部的内存中的数据都会丢失。若是在失败节点上有接收器运行,那么它们缓冲的数据将会丢失。

2.驱动程序节点失败。若是运行Spark Streaming应用的驱动程序节点失败,那么显然SparkContext会丢失,全部executor及其内存中的数据都会丢失。

定义

关于每一个记录可能会被系统处理多少次,有三种可能:

1.At most once。每一个记录将被处理一次或者根本不处理。

2.At least once。每一个记录将被处理一到屡次。这比最多一次要强一些,由于它确保不会丢失任何数据,可是可能有重复。

3.Exactly once。每一个记录将被处理一次,没有数据丢失,也没有数据被重复处理。这是三个最好的。

基本语义

在任何流处理系统中,广义来说,处理数据有三个步骤。

1.接收数据:数据经过接收器或其余方式从源接收数据。

2.转换数据:使用DStream和RDD的转换操做来转换接收到的数据。

3.输出数据:最终转换的数据被输出到外部系统,好比文件系统,数据库或者仪表盘等。

若是一个流应用程序必须实现端到端的exactly-once保证,那么每一个步骤都必须提供exactly-once保证。也就是说,每一个记录只能被接收一次,只能被转换一次,并被输出到下游系统一次。让咱们在Spark Streaming的上下文中理解这些步骤的语义。

1.接收数据:不一样的输入源提供不一样的保证。在下面会讨论。

2.转换数据:因为RDD提供的保证,全部接收的数据都会被正好处理一次。即便有故障,只要接收到的输入数据是可访问的,最终转换的RDD将老是具备相同的内容。

3.输出数据:输出操做默认保证至少一次,由于它依赖于输出操做的类型(是否幂等性idempotent)以及下游系统(是否支持事务)。可是用户能够实现本身的事务机制来实现exactly-once。在下面会讨论。

接收数据的语义

不一样的输入源提供不一样的保证,从最少一次到恰好一次。

文件流

若是全部输入数据都在容错系统中,如HDFS,那么Spark Streaming能够从任意故障中恢复并处理全部数据。这就是恰好一次,意味着全部的数据都会被恰好处理一次,无论什么失败。

基于接收器的源

对于基于接收器的输入源,容错语义取决于故障状况和接收器的类型。正如咱们以前讨论过的,有两种类型的接收器;

1.可靠接收器。这些接收器仅在接收到的数据建立完副本后才去确认通知可靠的数据源。若是这样的接收器失败了,那么数据源不会收到缓冲(未复制)数据的确认通知,因此,若是接收器重启的话,数据源将从新发送数据,而且没有数据会因为失败而丢失。

2.不可靠接收器。这些接收器不会发送确认通知,因此在工做节点或者驱动程序故障时可能会丢失数据。

若是一个工做节点失败,可靠的接收器不会丢失数据。对于不可靠的接收器,接收可是没有建立副本的数据会丢失。若是驱动程序故障,那么无论是可靠接收器仍是不可靠接收器,除了上面这些丢失外,全部在内存中接收和复制的数据都会丢失,这会影响有状态转换操做的结果。 If the driver node fails, then besides these losses, all of the past data that was received and replicated in memory will be lost. This will affect the results of the stateful transformations.???

为了不以前接收到的数据丢失,从1.2版本开始,Spark引入了预写入日志,将接收到的数据保存到容错存储系统中。预写入日志的启用和可靠的接收器能够确保数据零丢失(即便是驱动程序故障状况下)。就语义而言,它提供了至少一次的保证。

Kafka直接API

从1.3版本开始,咱们引入了一个新的Kafka Direct API,它能够确保全部的Kafka数据只被Spark Streaming接收一次。除此以外,若是你执行的是exactly-once的输入操做,那么能够实现端到端的exactly-once。参阅Kafka Integration Guide查看详情。

输出操做的语义

输出操做(好比foreachRDD)有至少一次的语义。也就是说,转换后的数据可能在工做节点失败的状况下不止一次地被写入外部实体。虽然这对于使用saveAs*HadoopFiles操做保存到文件系统是能够接受的(由于文件将被相同的数据简单地覆盖),可是可要额外的努力来实现exactly-once。有两种方法:

1.幂等更新(Idempotent updates):屡次尝试老是写相同的数据。例如,saveAs*HadoopFiles老是将相同的数据写到生成的文件中。

2.事务更新(Transctional updates):全部的更新都以事务方式进行,所以更新只能以原子方式进行一次。下面是一个具体实现方法:

①使用批次时间(在foreachRDD中可用)和RDD的分区索引建立一个标识符,该标识符惟一标识流应用程序中的blob data。

②使用这个标识符事务性地更新外部系统。Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. 也就是说,若是这个标识符还没提交,就以原子方式提交分区数据和这个标识符。不然,若是该标识符已经提交的话,就跳过更新。

dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  }
}
相关文章
相关标签/搜索