Spark Streaming是Spark Core API的一种扩展,它能够用于进行大规模、高吞吐量、容错的实时数据流的处理。它支持从不少种数据源中读取数据,好比Kafka、Flume、Twitter、ZeroMQ、Kinesis或者是TCP Socket。而且可以使用相似高阶函数的复杂算法来进行数据处理,好比map、reduce、join和window。处理后的数据能够被保存到文件系统、数据库、Dashboard等存储中。html
在内部,它的工做原理以下。Spark Streaming接收实时输入数据流,并将数据分红批(batch),而后由Spark引擎处理数据,以分批生成最终的结果流。java
Spark-Streaming提供了一种高级的抽象,叫作DStream,英文全称为Discretized Stream,中文翻译为“离散流”,它表明了一个持续不断的数据流。DStream能够经过输入数据源来建立,好比Kafka、Flume和Kinesis;也能够经过对其余DStream应用高阶函数来建立,好比map、reduce、join、window。
DStream的内部,其实一系列持续不断产生的RDD。RDD是Spark Core的核心抽象,即,不可变的,分布式的数据集。DStream中的每一个RDD都包含了一个时间段内的数据。git
本指南介绍如何开始使用DStreams编写Spark Streaming程序。您能够在Scala,Java或Python中编写Spark Streaming程序(在Spark 1.2中引入),全部这些都在本指南中介绍。您能够在本指南中找到标签,让您能够选择不一样语言的代码段。github
Spark Streaming发挥流式处理的优点,选择一次处理一批数据而不是一次处理一个事件。这统一了Spark对批处理和实时处理的编程模型。算法
micro batching定义为流数据切分为几组小批量的程序,有助于提升流处理的性能,同时低延迟处理每条信息。数据库
spark streaming中mirco batch基于时间建立而不是数据量,必定周期(一般是毫秒)的数据汇集为一批次。保证了数据处理的低延迟。apache
另外一个优点是保证处理的数据量在控制范围内。假设处理引擎须要在每条数据消费后发送回复(ack),批处理的状况下,回复将在批次处理完后发送而不是每条信息处理后都发送。编程
缺点是,有错误发生时,批次的数据都须要从新处理。vim
SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx").setMaster("local[*]"); JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1)); JavaReceiverInputDStream<String> StreamingLines = stream JavaDStream<String> words = StreamingLines.flatMap( str -> Arrays.asList(str.split(" ")).iterator() ); JavaPairDStream<String, Integer> wordCounts = words.mapT wordCounts.print(); streamingContext.start(); streamingContext.awaitTermination();
1.建立Streaming Context,Java中是JavaSteamingContext网络
2.StreamingContext接受两个参数,SparkConf,batchDuration决定小批次的数据间隔
3.调用start方法启动Spark Streaming Job
4.调用awaitTermination或stop 显示终止Job
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.1</version> </dependency>
1.基于Receiver
这种方式使用Receiver接收数据。Receiver使用Kafka的高阶API实现。与全部接收器同样,经过接收器从Kafka接收的数据存储在Spark Executor中,而后由Spark Streaming启动的做业处理数据。
然而,默认配置下,这种方法在故障时可能形成数据丢失。能够同步地将全部接收到的Kafka数据同时保存在分布式文件系统(例如HDFS)上的写入日志中,以便在故障时能够恢复全部数据。
2.Direct
在Spark 1.3中引入了这种新的无接收器“直接”方法,以确保更强的端到端保证。该方法不是使用接收器来接收数据,而是按期查询Kafka中每一个主题分区中的最新偏移量,并相应地定义每一个批处理中要处理的偏移范围。当启动处理数据的做业时,Kafka的简单消费者API用于读取Kafka定义的偏移范围(相似于从文件系统读取文件)。请注意,这是Spark 1.3中为Scala和Java API引入的实验功能。Spark 1.4添加了一个Python API,但它尚未彻底同步。
场景:
航空公司须要处理航班传递的实时温度数据:
1.仅仅打印或保存实时温度,无状态的流处理(stateless)。
2.每隔十分钟,统计前一个小时的平均温度。每十分钟需肯定一小时的时间窗口,属于固定时间窗的有状态流处理(stateful within a window time frame )。
3.统计航班整个行程的平均温度,非特定时间周期,属于spark streaming job全程的有状态处理。
数据以JSON格式回传:
{"flightId":"tz302","timestamp":1494423926816,"temperature":21.12,"landed":false} where flightId = unique id of the flight timestamp = time at which event is generated temperature = temperature reading at the time of event generation landed = boolean flag which defines whether flight has been landed or still on the way Following POJO will be used to map messages to Java Object for further processing public class FlightDetails implements Serializable { private String flightId; private double temp; private boolean landed; private long temperature; }
1.map
JavaReceiverInputDStream<String> inStream= jssc.socketTextStream("localhost", 9999); JavaDStream<FlightDetails> flightDetailsStream = inStream.map(x -> { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(x, FlightDetails.class); });
flightDetailsStream.foreachRDD(rdd->rdd.saveAsTextFile());
2.flatMap
3.filter
4.join DStream由键值对RDD组成,(K,V)及(K,X),返回新的键值对DStream (K,(V,X))
5.union
6.count
7.reduce
8.reduceByKey
在必定周期内(大于批处理间隔或整个job session内),维护状态值。经过建立检查点实现,严格来说,windowing不须要检查点,但windowing期间的其余操做须要。
Spark Streaming还提供了窗口计算,容许您在数据的滑动窗口上应用转换。下图说明了这个滑动窗口。
如图所示,每当窗口滑过源DStream时,落在窗口内的源RDD被组合并运行,以产生窗口DStream的RDD。在这种具体状况下,操做应用于最近3个时间单位的数据,并以2个时间单位滑动。这代表任何窗口操做都须要指定两个参数。
window length - 窗口的持续时间(图中的3)。
sliding interval - 执行窗口操做的间隔(图中的2)。
这两个参数必须是源DStream的批间隔的倍数(图中的1)。
咱们以一个例子来讲明窗口操做。假设您但愿经过在过去30秒的数据中每10秒产生一个字数来扩展早期的示例。为此,咱们必须在最近30秒的数据中对(word,1)对的对DStream应用reduceByKey操做。这是使用reduceByKeyAndWindow操做完成的。
———————————————————————————————————————————————————————————————————————————————
与RDD相似,DStreams还容许开发人员将流的数据保留在内存中。也就是说,在DStream上使用persist()方法将自动保留该DStream的每一个RDD在内存中。若是DStream中的数据将被屡次计算(例如,相同数据上的多个操做),这是有用的。对于基于窗口的操做,如reduceByWindow和reduceByKeyAndWindow以及基于状态的操做,如updateStateByKey,这是隐含的。所以,基于窗口的操做生成的DStream会自动保存在内存中,即便开发人员不调用persist()。
对于经过网络接收数据(例如Kafka,Flume,套接字等)的输入流,默认持久性级别被设置为将数据复制到两个节点进行容错。
请注意,与RDD不一样,DStream的默认持久性级别将数据序列化在内存中。这在“性能调优”部分进一步讨论。有关不一样持久性级别的更多信息,请参见“Spark编程指南”。
每个Spark Streaming应用,正常来讲,都是要7*24小时运转的,这就是实时计算程序的特色,由于要持续不断地对数据进行计算,所以,对实时计算应用的要求,应该是必需要可以对应用程序逻辑无关的失败,进行容错,若是要实现这个目标,Spark-Streaming程序就必须将足够的信息checkpoint到容错的存储系统上(HDFS),从而让它可以在失败中进行恢复。检查点有两种类型的数据:
1.元数据checkpoint —-—将定义了流式计算逻辑的信息,报错到容错的存储系统上,好比HDFS。当运行Spark-Streaming应用程序的Driver进程所在的节点失败时,该信息能够用于进行恢复。
元数据信息包括了:
1.1:配置信息—建立Spark-Streaming应用程序的配置信息,好比SparkConf
1.2:DStream的操做信息—-定义了Spark-Stream应用程序的计算逻辑的DStream操做信息
1.3:未处理的batch信息—-哪些job正在排队,还没处理的batch信息。
2.数据checkpoint ———将实时计算过程当中产生的RDD的数据保存到可靠的存储系统中。对于一些将多个batch的数据进行聚合的,有状态的transformation操做,这是很是有用的。在这种tranformation操做中,生成的RDD是依赖与以前的batch的,这会致使随着时间的推移,Rdd的依赖链条愈来愈长,要避免因为依赖链条愈来愈长,致使一块儿变得愈来愈长的失败恢复时间,有状态的transformation 操做执行过程当中间产生的RDD,会按期的被checkpoint盗可靠的存储系统上,好比HDFS,从而削减RDD的依赖链条,进而缩短失败恢复时, RDD的恢复时间。
总而言之,元数据检查点主要用于从驱动程序故障恢复,而数据或RDD检查点是必要的,尤为是使用基本功能的有状态转换(stateful transformation)时。
1.使用了有状态的transformation操做 ———好比updateStateByKey,或者reduceByKeyAndWindow操做被使用了, 那么checkpoint目录要求是必须提供的,也就必须开启checkpoint机制,从而进行周期性的RDD checkpoint
2.要保证能够从Driver失败中进行恢复 ———元数据checkpoint须要启用,来进行这种状况的恢复。
要注意的是,并非说全部的Spark-Streaming应用程序,都要启用checkpoint机制,若是不强制要求从Driver 失败中自动进行恢复,有没有使用有状态的transformation操做,那么就不须要启用checkpoint,事实上这么作反而是用利于提高性能的。
要配置该机制,首先要调用StreamingContext的checkpoint()方法设置一个checkpoint目录,而后须要将spark.streaming.receiver.writeAheadLog.enable参数设置为true。这将容许您使用上述有状态转换。此外,若是要使应用程序从驱动程序故障中恢复,您应该重写流式应用程序以具备如下行为:
1.当程序第一次启动时,它将建立一个新的StreamingContext,设置全部流,而后调用start()。
2.当程序在失败后从新启动时,它将从checkpoint目录中的检查点数据从新建立一个StreamingContext。
// Create a factory object that can create a and setup a new JavaStreamingContext JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { JavaStreamingContext jssc = new JavaStreamingContext(...); // new context JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams ... jssc.checkpoint(checkpointDirectory); // set checkpoint directory return jssc; } }; // Get JavaStreamingContext from checkpoint data or create a new one JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory); // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start(); context.awaitTermination();
若是checkpointDirectory存在,则将从检查点数据从新建立上下文。若是目录不存在(即第一次运行),则将调用函数contextFactory来建立新的上下文并设置DStream。请参阅Java示例JavaRecoverableNetworkWordCount。此示例将网络数据的字数添加到文件中。
除了使用getOrCreate以外,还须要确保在失败时自动从新启动驱动程序进程。这只能由用于运行应用程序的部署基础结构完成。这在“部署”部分进一步讨论。
请注意,RDD的检查点会增长保存到可靠存储的成本。这可能会致使须要保存check point的RDD的批处理时间增长。所以,须要仔细设置检查点的间隔。在小批量(例如1秒)的状况下,每一个批次的检查点可能会显着下降操做吞吐量。相反,检查点太少会致使谱系和任务大小增加,这可能会产生不利影响。对于须要RDD检查点的状态转换,默认间隔是至少10秒的批间隔的倍数。它能够经过使用dstream.checkpoint(checkpointInterval)进行设置。一般,DStream的5到10个滑动间隔的检查点间隔是一个很好的设置。
启动预写日志机制
预写日志机制,简写为WAL,全称为Write Ahead Log,从Spark 1.2版本开始,就引入了基于容错的文件系统的WAL机制,若是启用该机制,Receiver接收到的全部数据都会被写入配置的checkpoint目录中的预写日志这种机制可让driver在恢复的时候,避免数据丢失,而且能够确保整个实时计算过程当中,零数据丢失
然而这种极强的可靠性机制,会致使Receiver的吞吐量大幅度降低,由于单位时间内有至关一部分时间须要将数据写入预写日志,若是又但愿开启预写日志机制,确保数据零损失,又不但愿影响系统的吞吐量,那么能够建立多个输入DStream启动多个Reciver,而后将这些receiver接收到的数据使用ssc.union()方法将这些dstream中的数据进行合并 此外在启用了预写日志机制以后,推荐将复制持久化机制禁用掉,由于全部数据已经保存在容错的文件系统中了,不须要再用复制机制进行持久化,保存一份副本了,只要将输入的DStream的持久化机制设置一下便可