Spark Streaming之一:总体介绍

提到Spark Streaming,咱们不得不说一下BDAS(Berkeley Data Analytics Stack),这个伯克利大学提出的关于数据分析的软件栈。从它的视角来看,目前的大数据处理能够分为如如下三个类型。 html

  • 复杂的批量数据处理(batch data processing),一般的时间跨度在数十分钟到数小时之间。
  • 基于历史数据的交互式查询(interactive query),一般的时间跨度在数十秒到数分钟之间。
  • 基于实时数据流的数据处理(streaming data processing),一般的时间跨度在数百毫秒到数秒之间。 

目前已有不少相对成熟的开源软件来处理以上三种情景,咱们能够利用MapReduce来进行批量数据处理,能够用Impala来进行交互式查询,对于流式数据处理,咱们能够采用Storm。对于大多数互联网公司来讲,通常都会同时遇到以上三种情景,那么在使用的过程当中这些公司可能会遇到以下的不便。 node

  • 三种情景的输入输出数据没法无缝共享,须要进行格式相互转换。
  • 每个开源软件都须要一个开发和维护团队,提升了成本。
  • 在同一个集群中对各个系统协调资源分配比较困难。 

BDAS就是以Spark为基础的一套软件栈,利用基于内存的通用计算模型将以上三种情景一网打尽,同时支持Batch、Interactive、Streaming的处理,且兼容支持HDFS和S3等分布式文件系统,能够部署在YARN和Mesos等流行的集群资源管理器之上。算法

一、Spark Streaming简介

1.1 概述

Spark Streaming 是Spark核心API的一个扩展,能够实现高吞吐量的、具有容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,从数据源获取数据以后,可使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还能够将处理结果存储到文件系统,数据库和现场仪表盘。在“One Stack rule them all”的基础上,还可使用Spark的其余子框架,如集群学习、图计算等,对流数据进行处理。shell

Spark Streaming处理的数据流图:数据库

 

Spark的各个子框架,都是基于核心Spark的,Spark Streaming在内部的处理机制是,接收实时流的数据,并根据必定的时间间隔拆分红一批批的数据,而后经过Spark Engine处理这些批数据,最终获得处理后的一批批结果数据。apache

对应的批数据,在Spark内核对应一个RDD实例,所以,对应流数据的DStream能够当作是一组RDDs,即RDD的一个序列。通俗点理解的话,在流数据分红一批一批后,经过一个先进先出的队列,而后 Spark Engine从该队列中依次取出一个个批数据,把批数据封装成一个RDD,而后进行处理,这是一个典型的生产者消费者模型,对应的就有生产者消费者模型的问题,即如何协调生产速率和消费速率。编程

1.2 术语定义

l离散流(discretized stream)或DStream:这是Spark Streaming对内部持续的实时数据流的抽象描述,即咱们处理的一个实时数据流,在Spark Streaming中对应于一个DStream 实例。缓存

l批数据(batch data):这是化整为零的第一步,将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。随着持续时间的推移,这些处理结果就造成了对应的结果数据流了。服务器

l时间片或批处理时间间隔( batch interval):这是人为地对流数据进行定量的标准,以时间片做为咱们拆分流数据的依据。一个时间片的数据对应一个RDD实例。网络

l窗口长度(window length):一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数,

l滑动时间间隔:前一个窗口到后一个窗口所通过的时间长度。必须是批处理时间间隔的倍数

lInput DStream :一个input DStream是一个特殊的DStream,将Spark Streaming链接到一个外部数据源来读取数据。

1.3 Storm与Spark Streming比较

l处理模型以及延迟

虽然两框架都提供了可扩展性(scalability)和可容错性(fault tolerance),可是它们的处理模型从根本上说是不同的。Storm能够实现亚秒级时延的处理,而每次只处理一条event,而Spark Streaming能够在一个短暂的时间窗口里面处理多条(batches)Event。因此说Storm能够实现亚秒级时延的处理,而Spark Streaming则有必定的时延。

l容错和数据保证

然而二者的代价都是容错时候的数据保证,Spark Streaming的容错为有状态的计算提供了更好的支持。在Storm中,每条记录在系统的移动过程当中都须要被标记跟踪,因此Storm只能保证每条记录最少被处理一次,可是容许从错误状态恢复时被处理屡次。这就意味着可变动的状态可能被更新两次从而致使结果不正确。

任一方面,Spark Streaming仅仅须要在批处理级别对记录进行追踪,因此他能保证每一个批处理记录仅仅被处理一次,即便是node节点挂掉。虽说Storm的 Trident library能够保证一条记录被处理一次,可是它依赖于事务更新状态,而这个过程是很慢的,而且须要由用户去实现。

l实现和编程API

Storm主要是由Clojure语言实现,Spark Streaming是由Scala实现。若是你想看看这两个框架是如何实现的或者你想自定义一些东西你就得记住这一点。Storm是由BackType和 Twitter开发,而Spark Streaming是在UC Berkeley开发的。

Storm提供了Java API,同时也支持其余语言的API。 Spark Streaming支持Scala和Java语言(其实也支持Python)。

l批处理框架集成

Spark Streaming的一个很棒的特性就是它是在Spark框架上运行的。这样你就能够想使用其余批处理代码同样来写Spark Streaming程序,或者是在Spark中交互查询。这就减小了单独编写流批量处理程序和历史数据处理程序。

l生产支持

Storm已经出现好多年了,并且自从2011年开始就在Twitter内部生产环境中使用,还有其余一些公司。而Spark Streaming是一个新的项目,而且在2013年仅仅被Sharethrough使用(据做者了解)。

Storm是 Hortonworks Hadoop数据平台中流处理的解决方案,而Spark Streaming出如今 MapR的分布式平台和Cloudera的企业数据平台中。除此以外,Databricks是为Spark提供技术支持的公司,包括了Spark Streaming。

虽说二者均可以在各自的集群框架中运行,可是Storm能够在Mesos上运行, 而Spark Streaming能够在YARN和Mesos上运行。

二、运行原理

2.1 Streaming架构

SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,能够对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)进行相似Map、Reduce和Join等复杂操做,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。

l计算流程:Spark Streaming是将流式计算分解成一系列短小的批处理做业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分红一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),而后将Spark Streaming中对DStream的Transformation操做变为针对Spark中对RDD的Transformation操做,将RDD通过操做变成中间结果保存在内存中。整个流式计算根据业务的需求能够对中间的结果进行叠加或者存储到外部设备。下图显示了Spark Streaming的整个流程。

 

图Spark Streaming构架

l容错性:对于流式计算来讲,容错性相当重要。首先咱们要明确一下Spark中RDD的容错机制。每个RDD都是一个不可变的分布式可重算的数据集,其记录着肯定性的操做继承关系(lineage),因此只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是能够利用原始输入数据经过转换操做而从新算出的。  

对于Spark Streaming来讲,其RDD的传承关系以下图所示,图中的每个椭圆形表示一个RDD,椭圆形中的每一个圆形表明一个RDD中的一个Partition,图中的每一列的多个RDD表示一个DStream(图中有三个DStream),而每一行最后一个RDD则表示每个Batch Size所产生的中间结果RDD。咱们能够看到图中的每个RDD都是经过lineage相链接的,因为Spark Streaming输入数据能够来自于磁盘,例如HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming会将网络输入数据的每个数据流拷贝两份到其余的机器)都能保证容错性,因此RDD中任意的Partition出错,均可以并行地在其余机器上将缺失的Partition计算出来。这个容错恢复方式比连续计算模型(如Storm)的效率更高。

 

Spark Streaming中RDD的lineage关系图

l实时性:对于实时性的讨论,会牵涉到流式处理框架的应用场景。Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会通过Spark DAG图分解以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),因此Spark Streaming可以知足除对实时性要求很是高(如高频实时交易)以外的全部流式准实时计算场景。

l扩展性与吞吐量:Spark目前在EC2上已可以线性扩展到100个节点(每一个节点4Core),能够以数秒的延迟处理6GB/s的数据量(60M records/s),其吞吐量也比流行的Storm高2~5倍,图4是Berkeley利用WordCount和Grep两个用例所作的测试,在Grep这个测试中,Spark Streaming中的每一个节点的吞吐量是670k records/s,而Storm是115k records/s。

 

Spark Streaming与Storm吞吐量比较图

2.2 编程模型

DStream(Discretized Stream)做为Spark Streaming的基础抽象,它表明持续性的数据流。这些数据流既能够经过外部输入源赖获取,也能够经过现有的Dstream的transformation操做来得到。在内部实现上,DStream由一组时间序列上连续的RDD来表示。每一个RDD都包含了本身特定时间间隔内的数据流。如图7-3所示。

 

 

       图7-4

对DStream中数据的各类操做也是映射到内部的RDD上来进行的,如图7-4所示,对Dtream的操做能够经过RDD的transformation生成新的DStream。这里的执行引擎是Spark。

2.2.1 如何使用Spark Streaming

做为构建于Spark之上的应用框架,Spark Streaming承袭了Spark的编程风格,对于已经了解Spark的用户来讲可以快速地上手。接下来以Spark Streaming官方提供的WordCount代码为例来介绍Spark Streaming的使用方式。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()              // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

1.建立StreamingContext对象 同Spark初始化须要建立SparkContext对象同样,使用Spark Streaming就须要建立StreamingContext对象。建立StreamingContext对象所需的参数与SparkContext基本一致,包括指明Master,设定名称(如NetworkWordCount)。须要注意的是参数Seconds(1),Spark Streaming须要指定处理数据的时间间隔,如上例所示的1s,那么Spark Streaming会以1s为时间窗口进行数据处理。此参数须要根据用户的需求和集群的处理能力进行适当的设置;

2.建立InputDStream如同Storm的Spout,Spark Streaming须要指明数据源。如上例所示的socketTextStream,Spark Streaming以socket链接做为数据源读取数据。固然Spark Streaming支持多种不一样的数据源,包括Kafka、Flume、HDFS/S三、Kinesis和Twitter等数据源;

3.操做DStream对于从数据源获得的DStream,用户能够在其基础上进行各类操做,如上例所示的操做就是一个典型的WordCount执行流程:对于当前时间窗口内从数据源获得的数据首先进行分割,而后利用Map和ReduceByKey方法进行计算,固然最后还有使用print()方法输出结果;

4.启动Spark Streaming以前所做的全部步骤只是建立了执行流程,程序没有真正链接上数据源,也没有对数据进行任何操做,只是设定好了全部的执行计划,当ssc.start()启动后程序才真正进行全部预期的操做。

至此对于Spark Streaming的如何使用有了一个大概的印象,在后面的章节咱们会经过源代码深刻探究一下Spark Streaming的执行流程。

2.2.2 DStream的输入源

在Spark Streaming中全部的操做都是基于流的,而输入源是这一系列操做的起点。输入 DStreams 和 DStreams 接收的流都表明输入数据流的来源,在Spark Streaming 提供两种内置数据流来源:

l  基础来源 在 StreamingContext API 中直接可用的来源。例如:文件系统、Socket(套接字)链接和 Akka actors;

l  高级来源 如 Kafka、Flume、Kinesis、Twitter 等,能够经过额外的实用工具类建立。

2.2.2.1 基础来源

在前面分析怎样使用Spark Streaming的例子中咱们已看到ssc.socketTextStream()方法,能够经过 TCP 套接字链接,从从文本数据中建立了一个 DStream。除了套接字,StreamingContext 的API还提供了方法从文件和 Akka actors 中建立DStreams做为输入源。

Spark Streaming提供了streamingContext.fileStream(dataDirectory)方法能够从任何文件系统(如:HDFS、S三、NFS等)的文件中读取数据,而后建立一个DStream。Spark Streaming 监控 dataDirectory 目录和在该目录下任何文件被建立处理(不支持在嵌套目录下写文件)。须要注意的是:读取的必须是具备相同的数据格式的文件;建立的文件必须在 dataDirectory目录下,并经过自动移动或重命名成数据目录;文件一旦移动就不能被改变,若是文件被不断追加,新的数据将不会被阅读。对于简单的文本文,可使用一个简单的方法streamingContext.textFileStream(dataDirectory)来读取数据。

Spark Streaming也能够基于自定义 Actors 的流建立DStream ,经过 Akka actors 接受数据流,使用方法streamingContext.actorStream(actorProps, actor-name)。Spark Streaming使用streamingContext.queueStream(queueOfRDDs)方法能够建立基于 RDD 队列的DStream,每一个RDD 队列将被视为DStream 中一块数据流进行加工处理。

2.2.2.2 高级来源

这一类的来源须要外部 non-Spark 库的接口,其中一些有复杂的依赖关系(如 Kafka、Flume)。所以经过这些来源建立DStreams 须要明确其依赖。例如,若是想建立一个使用 Twitter tweets 的数据的DStream 流,必须按如下步骤来作:

1)在 SBT 或 Maven工程里添加 spark-streaming-twitter_2.10 依赖。

2)开发:导入 TwitterUtils 包,经过 TwitterUtils.createStream 方法建立一个DStream。

3)部署:添加全部依赖的 jar 包(包括依赖的spark-streaming-twitter_2.10 及其依赖),而后部署应用程序。

须要注意的是,这些高级的来源通常在Spark Shell中不可用,所以基于这些高级来源的应用不能在Spark Shell中进行测试。若是你必须在Spark shell中使用它们,你须要下载相应的Maven工程的Jar依赖并添加到类路径中。

其中一些高级来源以下:

lTwitter Spark Streaming的TwitterUtils工具类使用Twitter4j,Twitter4J 库支持经过任何方法提供身份验证信息,你能够获得公众的流,或获得基于关键词过滤流。

lFlume Spark Streaming能够从Flume中接受数据。

lKafka Spark Streaming能够从Kafka中接受数据。

lKinesis Spark Streaming能够从Kinesis中接受数据。

须要重申的一点是在开始编写本身的 SparkStreaming 程序以前,必定要将高级来源依赖的Jar添加到SBT 或 Maven 项目相应的artifact中。常见的输入源和其对应的Jar包以下图所示。

另外,输入DStream也能够建立自定义的数据源,须要作的就是实现一个用户定义的接收器。

2.2.3 DStream的操做

与RDD相似,DStream也提供了本身的一系列操做方法,这些操做能够分红四类:

  • Transformations 普通的转换操做
  • Window Operations 窗口转换操做
  • Join Operations 合并操做
  • Output Operations 输出操做

2.2.3.1 普通的转换操做

普通的转换操做以下表所示:

转换

描述

map(func)

源 DStream的每一个元素经过函数func返回一个新的DStream。

flatMap(func)

相似与map操做,不一样的是每一个输入元素能够被映射出0或者更多的输出元素。

filter(func)

在源DSTREAM上选择Func函数返回仅为true的元素,最终返回一个新的DSTREAM 。

repartition(numPartitions)

经过输入的参数numPartitions的值来改变DStream的分区大小。

union(otherStream)

返回一个包含源DStream与其余 DStream的元素合并后的新DSTREAM。

count()

对源DStream内部的所含有的RDD的元素数量进行计数,返回一个内部的RDD只包含一个元素的DStreaam。

reduce(func)

使用函数func(有两个参数并返回一个结果)将源DStream 中每一个RDD的元素进行聚 合操做,返回一个内部所包含的RDD只有一个元素的新DStream。

countByValue()

计算DStream中每一个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次。

reduceByKey(func, [numTasks])

当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新 DStream,其中每一个键的值V都是使用聚合函数func汇总。注意:默认状况下,使用 Spark的默认并行度提交任务(本地模式下并行度为2,集群模式下位8),能够经过配置numTasks设置不一样的并行任务数。

join(otherStream, [numTasks])

当被调用类型分别为(K,V)和(K,W)键值对的2个DStream 时,返回类型为(K,(V,W))键值对的一个新DSTREAM。

cogroup(otherStream, [numTasks])

当被调用的两个DStream分别含有(K, V) 和(K, W)键值对时,返回一个(K, Seq[V], Seq[W])类型的新的DStream。

transform(func)

经过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这能够用来在DStream作任意RDD操做。

updateStateByKey(func)

返回一个新状态的DStream,其中每一个键的状态是根据键的前一个状态和键的新值应用给定函数func后的更新。这个方法能够被用来维持每一个键的任何状态数据。

在上面列出的这些操做中,transform()方法和updateStateByKey()方法值得咱们深刻的探讨一下:

l  transform(func)操做

该transform操做(转换操做)连同其其相似的 transformWith操做容许DStream 上应用任意RDD-to-RDD函数。它能够被应用于未在 DStream API 中暴露任何的RDD操做。例如,在每批次的数据流与另外一数据集的链接功能不直接暴露在DStream API 中,但能够轻松地使用transform操做来作到这一点,这使得DStream的功能很是强大。例如,你能够经过链接预先计算的垃圾邮件信息的输入数据流(可能也有Spark生成的),而后基于此作实时数据清理的筛选,以下面官方提供的伪代码所示。事实上,也能够在transform方法中使用机器学习和图形计算的算法。

l  updateStateByKey操做

该 updateStateByKey 操做可让你保持任意状态,同时不断有新的信息进行更新。要使用此功能,必须进行两个步骤 :

(1)  定义状态 - 状态能够是任意的数据类型。

(2)  定义状态更新函数 - 用一个函数指定如何使用先前的状态和从输入流中获取的新值 更新状态。

让咱们用一个例子来讲明,假设你要进行文本数据流中单词计数。在这里,正在运行的计数是状态并且它是一个整数。咱们定义了更新功能以下:

 

此函数应用于含有键值对的DStream中(如前面的示例中,在DStream中含有(word,1)键值对)。它会针对里面的每一个元素(如wordCount中的word)调用一下更新函数,newValues是最新的值,runningCount是以前的值。

 

详细见:Spark Streaming之六:Transformations 普通的转换操做

2.2.3.2 窗口转换操做

Spark Streaming 还提供了窗口的计算,它容许你经过滑动窗口对数据进行转换,窗口转换操做以下:

转换

描述

window(windowLengthslideInterval)

返回一个基于源DStream的窗口批次计算后获得新的DStream。

countByWindow(windowLength,slideInterval)

返回基于滑动窗口的DStream中的元素的数量。

reduceByWindow(funcwindowLength,slideInterval)

基于滑动窗口对源DStream中的元素进行聚合操做,获得一个新的DStream。

reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks])

基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操做,获得一个新的DStream。

reduceByKeyAndWindow(funcinvFunc,windowLength,slideInterval, [numTasks])

一个更高效的reduceByKkeyAndWindow()的实现版本,先对滑动窗口中新的时间间隔内数据增量聚合并移去最先的与新增数据量的时间间隔内的数据统计量。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么咱们能够将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量,这种方法能够复用中间三秒的统计量,提升统计的效率。

countByValueAndWindow(windowLength,slideInterval, [numTasks])

基于滑动窗口计算源DStream中每一个RDD内每一个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue同样,reduce任务的数量能够经过一个可选参数进行配置。

 

 

批处理间隔示意图

在Spark Streaming中,数据处理是按批进行的,而数据采集是逐条进行的,所以在Spark Streaming中会先设置好批处理间隔(batch duration),当超过批处理间隔的时候就会把采集到的数据汇总起来成为一批数据交给系统去处理。

对于窗口操做而言,在其窗口内部会有N个批处理数据,批处理数据的大小由窗口间隔(window duration)决定,而窗口间隔指的就是窗口的持续时间,在窗口操做中,只有窗口的长度知足了才会触发批数据的处理。除了窗口的长度,窗口操做还有另外一个重要的参数就是滑动间隔(slide duration),它指的是通过多长时间窗口滑动一次造成新的窗口,滑动窗口默认状况下和批次间隔的相同,而窗口间隔通常设置的要比它们两个大。在这里必须注意的一点是滑动间隔和窗口间隔的大小必定得设置为批处理间隔的整数倍。

如批处理间隔示意图所示,批处理间隔是1个时间单位,窗口间隔是3个时间单位,滑动间隔是2个时间单位。对于初始的窗口time 1-time 3,只有窗口间隔知足了才触发数据的处理。这里须要注意的一点是,初始的窗口有可能流入的数据没有撑满,可是随着时间的推动,窗口最终会被撑满。当每一个2个时间单位,窗口滑动一次后,会有新的数据流入窗口,这时窗口会移去最先的两个时间单位的数据,而与最新的两个时间单位的数据进行汇总造成新的窗口(time3-time5)。

对于窗口操做,批处理间隔、窗口间隔和滑动间隔是很是重要的三个时间概念,是理解窗口操做的关键所在。

Spark Streaming之五:Window窗体相关操做

2.2.3.3 Join Operations

Join主要可分为两种,

一、DStream对象之间的Join
  这种join通常应用于窗口函数造成的DStream对象之间,具体能够参考第一部分中的join操做,除了简单的join以外,还有leftOuterJoin, rightOuterJoin和fullOuterJoin。

二、DStream和dataset之间的join
  这一种join,能够参考前面transform操做中的示例。

2.2.3.4 输出操做

Spark Streaming容许DStream的数据被输出到外部系统,如数据库或文件系统。因为输出操做实际上使transformation操做后的数据能够经过外部系统被使用,同时输出操做触发全部DStream的transformation操做的实际执行(相似于RDD操做)。如下表列出了目前主要的输出操做:

转换

描述

print()

在Driver中打印出DStream中数据的前10个元素。

saveAsTextFiles(prefix, [suffix])

将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

saveAsObjectFiles(prefix, [suffix])

将DStream中的内容按对象序列化而且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

saveAsHadoopFiles(prefix, [suffix])

将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

foreachRDD(func)

最基本的输出操做,将func函数应用于DStream中的RDD上,这个操做会输出数据到外部系统,好比保存RDD到文件或者网络数据库等。须要注意的是func函数是在运行该streaming应用的Driver进程里执行的。

一、print()
  print操做会将DStream每个batch中的前10个元素在driver节点打印出来。
  看下面这个示例,一行输入超过10个单词,而后将这行语句分割成单个单词的DStream。
val words = lines.flatMap(_.split(" "))
words.print()

 
二、saveAsTextFiles(prefix, [suffix])
  这个操做能够将DStream中的内容保存为text文件,每一个batch的数据单独保存为一个文夹,文件夹名前缀参数必须传入,文件夹名后缀参数可选,最终文件夹名称的完整形式为 prefix-TIME_IN_MS[.suffix]

  好比下面这一行代码
lines.saveAsTextFiles("satf", ".txt")

看一下执行结果,在当前项目路径下,每秒钟生成一个文件夹,打开的两个窗口中的内容分别是nc窗口中的输入。

另外,若是前缀中包含文件完整路径,则该text文件夹会建在指定路径下,以下图所示

三、saveAsObjectFiles(prefix, [suffix])
  这个操做和前面一个相似,只不过这里将DStream中的内容保存为SequenceFile文件类型,这个文件中保存的数据都是通过序列化后的Java对象。 
  实验略过,可参考前面一个操做。 
  
四、saveAsHadoopFiles(prefix, [suffix])
  这个操做和前两个相似,将DStream每一batch中的内容保存到HDFS上,一样能够指定文件的前缀和后缀。 
  
五、foreachRDD(func)

dstream.foreachRDD是一个很是强大的输出操做,它允将许数据输出到外部系统。可是 ,如何正确高效地使用这个操做是很重要的,下面展现了如何去避免一些常见的错误。

一般将数据写入到外部系统须要建立一个链接对象(如 TCP链接到远程服务器),并用它来发送数据到远程系统。出于这个目的,开发者可能在不经意间在Spark driver端建立了链接对象,并尝试使用它保存RDD中的记录到Spark worker上,以下面代码:

 

这是不正确的,这须要链接对象进行序列化并从Driver端发送到Worker上。链接对象不多在不一样机器间进行这种操做,此错误可能表现为序列化错误(链接对不可序列化),初始化错误(链接对象在须要在Worker 上进行须要初始化) 等等,正确的解决办法是在 worker上建立的链接对象。

一般状况下,建立一个链接对象有时间和资源开销。所以,建立和销毁的每条记录的链接对象可能招致没必要要的资源开销,并显著下降系统总体的吞吐量 。一个更好的解决方案是使用rdd.foreachPartition方法建立一个单独的链接对象,而后使用该链接对象输出的全部RDD分区中的数据到外部系统。

 这缓解了建立多条记录链接的开销。最后,还能够进一步经过在多个RDDs/ batches上重用链接对象进行优化。一个保持链接对象的静态池能够重用在多个批处理的RDD上将其输出到外部系统,从而进一步下降了开销。

 须要注意的是,在静态池中的链接应该按需延迟建立,这样能够更有效地把数据发送到外部系统。另外须要要注意的是:DStreams延迟执行的,就像RDD的操做是由actions触发同样。默认状况下,输出操做会按照它们在Streaming应用程序中定义的顺序一个个执行。

 

2.3  容错、持久化和性能调优

2.3.1 容错

DStream基于RDD组成,RDD的容错性依旧有效,咱们首先回忆一下SparkRDD的基本特性。

lRDD是一个不可变的、肯定性的可重复计算的分布式数据集。RDD的某些partition丢失了,能够经过血统(lineage)信息从新计算恢复;

l若是RDD任何分区因worker节点故障而丢失,那么这个分区能够从原来依赖的容错数据集中恢复;

l因为Spark中全部的数据的转换操做都是基于RDD的,即便集群出现故障,只要输入数据集存在,全部的中间结果都是能够被计算的。

Spark Streaming是能够从HDFS和S3这样的文件系统读取数据的,这种状况下全部的数据均可以被从新计算,不用担忧数据的丢失。可是在大多数状况下,Spark Streaming是基于网络来接受数据的,此时为了实现相同的容错处理,在接受网络的数据时会在集群的多个Worker节点间进行数据的复制(默认的复制数是2),这致使产生在出现故障时被处理的两种类型的数据:

1)Data received and replicated :一旦一个Worker节点失效,系统会从另外一份还存在的数据中从新计算。

2)Data received but buffered for replication :一旦数据丢失,能够经过RDD之间的依赖关系,从HDFS这样的外部文件系统读取数据。

此外,有两种故障,咱们应该关心:

(1)Worker节点失效:经过上面的讲解咱们知道,这时系统会根据出现故障的数据的类型,选择是从另外一个有复制过数据的工做节点上从新计算,仍是直接从从外部文件系统读取数据。

(2)Driver(驱动节点)失效 :若是运行 Spark Streaming应用时驱动节点出现故障,那么很明显的StreamingContext已经丢失,同时在内存中的数据所有丢失。对于这种状况,Spark Streaming应用程序在计算上有一个内在的结构——在每段micro-batch数据周期性地执行一样的Spark计算。这种结构容许把应用的状态(亦称checkpoint)周期性地保存到可靠的存储空间中,并在driver从新启动时恢复该状态。具体作法是在ssc.checkpoint(<checkpoint directory>)函数中进行设置,Spark Streaming就会按期把DStream的元信息写入到HDFS中,一旦驱动节点失效,丢失的StreamingContext会经过已经保存的检查点信息进行恢复。

最后咱们谈一下Spark Stream的容错在Spark 1.2版本的一些改进:

实时流处理系统必需要能在24/7时间内工做,所以它须要具有从各类系统故障中恢复过来的能力。最开始,SparkStreaming就支持从driver和worker故障恢复的能力。然而有些数据源的输入可能在故障恢复之后丢失数据。在Spark1.2版本中,Spark已经在SparkStreaming中对预写日志(也被称为journaling)做了初步支持,改进了恢复机制,并使更多数据源的零数据丢失有了可靠。

对于文件这样的源数据,driver恢复机制足以作到零数据丢失,由于全部的数据都保存在了像HDFS或S3这样的容错文件系统中了。但对于像Kafka和Flume等其它数据源,有些接收到的数据还只缓存在内存中,还没有被处理,它们就有可能会丢失。这是因为Spark应用的分布操做方式引发的。当driver进程失败时,全部在standalone/yarn/mesos集群运行的executor,连同它们在内存中的全部数据,也同时被终止。对于Spark Streaming来讲,从诸如Kafka和Flume的数据源接收到的全部数据,在它们处理完成以前,一直都缓存在executor的内存中。纵然driver从新启动,这些缓存的数据也不能被恢复。为了不这种数据损失,在Spark1.2发布版本中引进了预写日志(WriteAheadLogs)功能。

预写日志功能的流程是:1)一个SparkStreaming应用开始时(也就是driver开始时),相关的StreamingContext使用SparkContext启动接收器成为长驻运行任务。这些接收器接收并保存流数据到Spark内存中以供处理。2)接收器通知driver。3)接收块中的元数据(metadata)被发送到driver的StreamingContext。这个元数据包括:(a)定位其在executor内存中数据的块referenceid,(b)块数据在日志中的偏移信息(若是启用了)。

用户传送数据的生命周期以下图所示。

 

相似Kafka这样的系统能够经过复制数据保持可靠性。容许预写日志两次高效地复制一样的数据:一次由Kafka,而另外一次由SparkStreaming。Spark将来版本将包含Kafka容错机制的原生支持,从而避免第二个日志。

2.3.2 持久化

与RDD同样,DStream一样也能经过persist()方法将数据流存放在内存中,默认的持久化方式是MEMORY_ONLY_SER,也就是在内存中存放数据同时序列化的方式,这样作的好处是遇到须要屡次迭代计算的程序时,速度优点十分的明显。而对于一些基于窗口的操做,如reduceByWindow、reduceByKeyAndWindow,以及基于状态的操做,如updateStateBykey,其默认的持久化策略就是保存在内存中。

对于来自网络的数据源(Kafka、Flume、sockets等),默认的持久化策略是将数据保存在两台机器上,这也是为了容错性而设计的。

另外,对于窗口和有状态的操做必须checkpoint,经过StreamingContext的checkpoint来指定目录,经过 Dtream的checkpoint指定间隔时间,间隔必须是滑动间隔(slide interval)的倍数。

2.3.3 性能调优

1.  优化运行时间

增长并行度 确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操做,增长其并行度以确保更为充分地使用集群资源;

减小数据序列化,反序列化的负担 Spark Streaming默认将接受到的数据序列化后存储,以减小内存的使用。可是序列化和反序列话须要更多的CPU时间,所以更加高效的序列化方式(Kryo)和自定义的系列化接口能够更高效地使用CPU;

设置合理的batch duration(批处理时间间) 在Spark Streaming中,Job之间有可能存在依赖关系,后面的Job必须确保前面的做业执行结束后才能提交。若前面的Job执行的时间超出了批处理时间间隔,那么后面的Job就没法按时提交,这样就会进一步拖延接下来的Job,形成后续Job的阻塞。所以设置一个合理的批处理间隔以确保做业可以在这个批处理间隔内结束时必须的;

l  减小因任务提交和分发所带来的负担 一般状况下,Akka框架可以高效地确保任务及时分发,可是当批处理间隔很是小(500ms)时,提交和分发任务的延迟就变得不可接受了。使用Standalone和Coarse-grained Mesos模式一般会比使用Fine-grained Mesos模式有更小的延迟。

2.  优化内存使用

l控制batch size(批处理间隔内的数据量) Spark Streaming会把批处理间隔内接收到的全部数据存放在Spark内部的可用内存区域中,所以必须确保当前节点Spark的可用内存中少能容纳这个批处理时间间隔内的全部数据,不然必须增长新的资源以提升集群的处理能力;

l及时清理再也不使用的数据 前面讲到Spark Streaming会将接受的数据所有存储到内部可用内存区域中,所以对于处理过的再也不须要的数据应及时清理,以确保Spark Streaming有富余的可用内存空间。经过设置合理的spark.cleaner.ttl时长来及时清理超时的无用数据,这个参数须要当心设置以避免后续操做中所须要的数据被超时错误处理;

l观察及适当调整GC策略 GC会影响Job的正常运行,可能延长Job的执行时间,引发一系列不可预料的问题。观察GC的运行状况,采用不一样的GC策略以进一步减少内存回收对Job运行的影响。

 

参考资料:

(1)《Spark Streaming》 http://blog.debugo.com/spark-streaming/

转自:http://www.cnblogs.com/shishanyuan/p/4747749.html

相关文章
相关标签/搜索