SparkStreaming基本架构及使用

一、简介

Spark Streaming处理的数据流图:html

 

 

Spark Streaming在内部的处理机制是,接收实时流的数据,并根据必定的时间间隔拆分红一批批的数据,而后经过Spark Engine处理这些批数据,最终获得处理后的一批批结果数据。python

对应的批数据,在Spark内核对应一个RDD实例,所以,对应流数据的DStream能够当作是一组RDDs,即RDD的一个序列。通俗点理解的话,在流数据分红一批一批后,经过一个先进先出的队列,而后 Spark Engine从该队列中依次取出一个个批数据,把批数据封装成一个RDD,而后进行处理,这是一个典型的生产者消费者模型。git

1.2 术语定义

  l离散流(discretized stream)或DStream:Spark Streaming对内部持续的实时数据流的抽象描述,即咱们处理的一个实时数据流,在Spark Streaming中对应于一个DStream 实例。github

  l批数据(batch data):这是化整为零的第一步,将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。随着持续时间的推移,这些处理结果就造成了对应的结果数据流了。数据库

  l时间片或批处理时间间隔( batch interval):人为地对流数据进行定量的标准,以时间片做为咱们拆分流数据的依据。一个时间片的数据对应一个RDD实例apache

  l窗口长度(window length):一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数,编程

  l滑动时间间隔:前一个窗口到后一个窗口所通过的时间长度。必须是批处理时间间隔的倍数网络

  lInput DStream :一个input DStream是一个特殊的DStream,将Spark Streaming链接到一个外部数据源来读取数据。数据结构

 

二、运行原理

2.1 Streaming架构

  SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,能够对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)进行相似Map、Reduce和Join等复杂操做,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。架构

  l计算流程:Spark Streaming是将流式计算分解成一系列短小的批处理做业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分红一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset)而后将Spark Streaming中对DStream的Transformation操做变为针对Spark中对RDD的Transformation操做,将RDD通过操做变成中间结果保存在内存中。整个流式计算根据业务的需求能够对中间的结果进行叠加或者存储到外部设备。下图显示了Spark Streaming的整个流程。

 

 

图Spark Streaming构架

 

2.2 编程模型

DStream(Discretized Stream)做为Spark Streaming的基础抽象,它表明持续性的数据流。这些数据流既能够经过外部输入源赖获取,也能够经过现有的Dstream的transformation操做来得到。在内部实现上,DStream由一组时间序列上连续的RDD来表示。每一个RDD都包含了本身特定时间间隔内的数据流。如图7-3所示。

 

 

图7-3   DStream中在时间轴下生成离散的RDD序列

 

 

对DStream中数据的各类操做也是映射到内部的RDD上来进行的,如图7-4所示,对Dtream的操做能够经过RDD的transformation生成新的DStream。这里的执行引擎是Spark。

 

2.2.1 如何使用Spark Streaming

"""  Counts words in UTF8 encoded, '\n' delimited text directly received from Kafka in every 2 seconds. Usage: direct_kafka_wordcount.py <broker_list> <topic> To run this on your local machine, you need to setup Kafka and create a producer first, see http://kafka.apache.org/documentation.html#quickstart
 and then run the example `$ bin/spark-submit --jars \ external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \ examples/src/main/python/streaming/direct_kafka_wordcount.py \ localhost:9092 test` """ from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr) exit(-1) sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount") ssc = StreamingContext(sc, 2) brokers, topic = sys.argv[1:] kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
  #这里kafka产生的是一个map, key是null, value是实际发送的数据,因此取x[1] lines
= kvs.map(lambda x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()

1.建立StreamingContext对象 同Spark初始化须要建立SparkContext对象同样,使用Spark Streaming就须要建立StreamingContext对象。建立StreamingContext对象所需的参数与SparkContext基本一致,包括指明Master,设定名称。Spark Streaming须要指定处理数据的时间间隔,如上例所示的2s,那么Spark Streaming会以2s为时间窗口进行数据处理。此参数须要根据用户的需求和集群的处理能力进行适当的设置;

2.建立InputDStream Spark Streaming须要指明数据源。如socketTextStream,Spark Streaming以socket链接做为数据源读取数据。固然Spark Streaming支持多种不一样的数据源,包括Kafka、 Flume、HDFS/S三、Kinesis和Twitter等数据源;

3.操做DStream 对于从数据源获得的DStream,用户能够在其基础上进行各类操做,如上例所示的操做就是一个典型的WordCount执行流程:对于当前时间窗口内从数据源获得的数据首先进行分割,而后利用Map和ReduceByKey方法进行计算,固然最后还有使用print()方法输出结果;

4.启动Spark Streaming 以前所做的全部步骤只是建立了执行流程,程序没有真正链接上数据源,也没有对数据进行任何操做,只是设定好了全部的执行计划,当ssc.start()启动后程序才真正进行全部预期的操做。

至此对于Spark Streaming的如何使用有了一个大概的印象,在后面的章节咱们会经过源代码深刻探究一下Spark Streaming的执行流程

 

2.2.3 DStream的操做

与RDD相似,DStream也提供了本身的一系列操做方法,这些操做能够分红三类:普通的转换操做、窗口转换操做和输出操做。

2.2.3.1 普通的转换操做

普通的转换操做以下表所示:

转换

描述

map(func)

源 DStream的每一个元素经过函数func返回一个新的DStream。

flatMap(func)

相似与map操做,不一样的是每一个输入元素能够被映射出0或者更多的输出元素。

filter(func)

在源DSTREAM上选择Func函数返回仅为true的元素,最终返回一个新的DSTREAM 。

repartition(numPartitions)

经过输入的参数numPartitions的值来改变DStream的分区大小。

union(otherStream)

返回一个包含源DStream与其余 DStream的元素合并后的新DSTREAM。

count()

对源DStream内部的所含有的RDD的元素数量进行计数,返回一个内部的RDD只包含一个元素的DStreaam。

reduce(func)

使用函数func(有两个参数并返回一个结果)将源DStream 中每一个RDD的元素进行聚 合操做,返回一个内部所包含的RDD只有一个元素的新DStream。

countByValue()

计算DStream中每一个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次。

reduceByKey(func, [numTasks])

当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新 DStream,其中每一个键的值V都是使用聚合函数func汇总。注意:默认状况下,使用 Spark的默认并行度提交任务(本地模式下并行度为2,集群模式下位8),能够经过配置numTasks设置不一样的并行任务数。

join(otherStream, [numTasks])

当被调用类型分别为(K,V)和(K,W)键值对的2个DStream时,返回类型为(K,(V,W))键值对的一个新 DSTREAM。

cogroup(otherStream, [numTasks])

当被调用的两个DStream分别含有(K, V) 和(K, W)键值对时,返回一个(K, Seq[V], Seq[W])类型的新的DStream。

transform(func)

经过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这能够用来在DStream作任意RDD操做。

updateStateByKey(func)

返回一个新状态的DStream,其中每一个键的状态是根据键的前一个状态和键的新值应用给定函数func后的更新。这个方法能够被用来维持每一个键的任何状态数据。

在上面列出的这些操做中,transform()方法和updateStateByKey()方法值得咱们深刻的探讨一下:

l  transform(func)操做

该transform操做(转换操做)连同其其相似的 transformWith操做容许DStream 上应用任意RDD-to-RDD函数。它能够被应用于未在DStream API 中暴露任何的RDD操做。例如,在每批次的数据流与另外一数据集的链接功能不直接暴露在DStream API 中,但能够轻松地使用transform操做来作到这一点,这使得DStream的功能很是强大。

l  updateStateByKey操做

该 updateStateByKey 操做可让你保持任意状态,同时不断有新的信息进行更新。要使用此功能,必须进行两个步骤 :

(1)  定义状态 - 状态能够是任意的数据类型。

(2)  定义状态更新函数 - 用一个函数指定如何使用先前的状态和从输入流中获取的新值 更新状态。

让咱们用一个例子来讲明,假设你要进行文本数据流中单词计数。在这里,正在运行的计数是状态并且它是一个整数。咱们定义了更新功能以下:

 详细案例参考:

http://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#transformations-on-dstreams

此函数应用于含有键值对的DStream中(如前面的示例中,在DStream中含有(word,1)键值对)。它会针对里面的每一个元素(如wordCount中的word)调用一下更新函数,newValues是最新的值,runningCount是以前的值。

 

2.2.3.2 窗口转换操做

Spark Streaming 还提供了窗口的计算,它容许你经过滑动窗口对数据进行转换,窗口转换操做以下:

转换

描述

window(windowLengthslideInterval)

返回一个基于源DStream的窗口批次计算后获得新的DStream。

countByWindow(windowLength,slideInterval)

返回基于滑动窗口的DStream中的元素的数量。

reduceByWindow(funcwindowLength,slideInterval)

基于滑动窗口对源DStream中的元素进行聚合操做,获得一个新的DStream。

reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks])

基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操做,获得一个新的DStream。

reduceByKeyAndWindow(funcinvFunc,windowLength,slideInterval, [numTasks])

一个更高效的reduceByKkeyAndWindow()的实现版本,先对滑动窗口中新的时间间隔内数据增量聚合并移去最先的与新增数据量的时间间隔内的数据统计量。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么咱们能够将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量,这种方法能够复用中间三秒的统计量,提升统计的效率。

countByValueAndWindow(windowLength,slideInterval, [numTasks])

基于滑动窗口计算源DStream中每一个RDD内每一个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue同样,reduce任务的数量能够经过一个可选参数进行配置。

 

 

2.2.3.3 输出操做

Spark Streaming容许DStream的数据被输出到外部系统,如数据库或文件系统。因为输出操做实际上使transformation操做后的数据能够经过外部系统被使用,同时输出操做触发全部DStream的transformation操做的实际执行(相似于RDD操做)。如下表列出了目前主要的输出操做:

转换

描述

print()

在Driver中打印出DStream中数据的前10个元素。

saveAsTextFiles(prefix, [suffix])

将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

saveAsObjectFiles(prefix, [suffix])

将DStream中的内容按对象序列化而且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

saveAsHadoopFiles(prefix, [suffix])

将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

foreachRDD(func)

最基本的输出操做,将func函数应用于DStream中的RDD上,这个操做会输出数据到外部系统,好比保存RDD到文件或者网络数据库等。须要注意的是func函数是在运行该streaming应用的Driver进程里执行的。

dstream.foreachRDD是一个很是强大的输出操做,它允将许数据输出到外部系统。详细案例请参考:

http://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#output-operations-on-dstreams

 

三、spark整合kafka

用spark streaming流式处理kafka中的数据,第一步固然是先把数据接收过来,转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:1.利用Receiver接收数据,2.直接从kafka读取数据。

基于Receiver的方式

这种方式利用接收器(Receiver)来接收kafka中的数据,其最基本是使用Kafka高阶用户API接口。对于全部的接收器,从kafka接收来的数据会存储在spark的executor中,以后spark streaming提交的job会处理这些数据。以下图:
Receiver图形解释

还有几个须要注意的点:

  • 在Receiver的方式中,ssc中的partition和kafka中的partition并非相关的,因此若是咱们加大每一个topic的partition数量,仅仅是增长线程来处理由单一Receiver消费的主题。可是这并无增长Spark在处理数据上的并行度。
  • 对于不一样的Group和topic咱们可使用多个Receiver建立不一样的Dstream来并行接收数据,以后能够利用union来统一成一个Dstream。
  • 若是咱们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level须要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是
KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] ) 
对于全部的receivers接收到的数据将会保存在spark executors中,而后经过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,该日志存储在HDFS上 

直接读取方式

在spark1.3以后,引入了Direct方式。不一样于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每一个topic的每一个partition中的最新offsets,以后根据设定的maxRatePerPartition偏移量范围来处理每一个batch。其形式以下图:

这种方法相较于Receiver方式的优点在于:

  • 简化的并行:在Receiver的方式中咱们提到建立多个Receiver以后利用union来合并成一个Dstream的方式提升数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,会建立和kafka分区同样的rdd个数。
  • 高效:在Receiver的方式中,为了达到0数据丢失须要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,第一次是被kafka复制,另外一次是写到wal中,浪费!而第二种方式不存在这个问题,只要咱们Kafka的数据保留时间足够长,咱们都可以从Kafka进行数据恢复。
  • 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但因为Spark Streaming消费的数据和Zookeeper中记录的offset不一样步,这种方式偶尔会形成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了zk和ssc偏移量不一致的问题。缺点是没法使用基于zookeeper的kafka监控工具。

以上主要是对官方文档[1]的一个简单翻译,详细内容你们能够直接看下官方文档这里再也不赘述。

http://spark.apache.org/docs/1.6.3/streaming-kafka-integration.html

不一样于Receiver的方式,是从Zookeeper中读取offset值,那么天然zookeeper就保存了当前消费的offset值,那么若是从新启动开始消费就会接着上一次offset值继续消费。

而在Direct的方式中,咱们是直接从kafka来读数据,那么offset须要本身记录,能够利用checkpoint、数据库或文件记录或者回写到zookeeper中进行记录。这里咱们给出利用Kafka底层API接口,将offset及时同步到zookeeper中的通用类,我将其放在了github上:
Spark streaming+Kafka demo
示例中KafkaManager是一个通用类,而KafkaCluster是kafka源码中的一个类,因为包名权限的缘由我把它单独提出来,ComsumerMain简单展现了通用类的使用方法,在每次建立KafkaStream时,都会先从zooker中查看上次的消费记录offsets,而每一个batch处理完成后,会同步offsets到zookeeper中。 

refer:http://blog.csdn.net/zhong_han_jun/article/details/50814038

参考:

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍

Spark Streaming 源码解析系列

Spark Streaming 与 Kafka 集成分析

http://blog.selfup.cn/1665.html#comments

Spark踩坑记——Spark Streaming+Kafka

Spark踩坑记——从RDD看集群调度

相关文章
相关标签/搜索