1、实验目的
经过本实验对基于Spark Streaming流式计算框架有所全面了解,掌握对DStreams抽象的操做。html
2、实验内容
实验内容针对新浪股票数据接口,基于Spark Streaming实时接收并处理数据,并对特定的某一支股票走势进行简单预测。算法
3、实验要求
以小组为单元进行实验,每小组5人,小组自协商一位组长,由组长安排和分配实验任务,具体参加试验内容中实验过程。小组成员须要对流式计算有所了解,特别是对Spark Streaming流式计算框架有所了解和学习。 数据库
4、准备知识
4.1 Spark Streaming流式计算框架apache
在Spark Streaming上实现DStream,它是基于Spark处理引擎的一个修改版本。Spark Streaming由三部分组成,如图4.1所示编程
图4.1 Spark Streaming组件windows
图4.1显示了Spark Streaming在Spark Core上所作的修改:服务器
(1)master跟踪DStream lineage,并调度任务来计算新的RDD分区。网络
(2)工做节点接收数据,保存输入分区和已计算的RDD,并执行任务。数据结构
(3)客户端用于发送数据给系统。架构
如图4.1所示,如图中所示,Spark Streaming 重用了 Spark 的许多组件,但仍然须要修改和添加多个组件来支持流处理。
从架构角度来看,Spark Streaming 和传统的流系统之间区别在于,Spark Streaming 将计算过程分解为小的、无状态的、肯定的任务。每一个任务均可以在集群中的任何节点或同时在多个节点运行。在传统系统的固定拓扑结构中,将部分计算过程转移到另外一台机器是一个很大的动做。Spark Streaming 的作法,能够很是直接地在集群上进行负载均衡,应对故障或启动慢节点恢复。同理也能用于批处理系统——如 MapReduce。然而,因为 RDD 运行于内存中,Spark
Streaming 的任务执行时间会短得多,通常只有 50-200 毫秒。
不一样于之前系统将状态存储在长时间运行的处理过程当中,Spark Streaming 中的全部状态都以容错数据结构(RDD)来保存。因为 RDD 分区被肯定性地计算出来,它能够驻留在任何节点上,甚至能够在多个节点上进行计算。这套系统试图最大限度地提升数据局部性,同时这种底层的灵活性使得推测执行和并行恢复成为可能。
这些优点在批处理平台(Spark)上运行时能够很天然地得到。但依然须要进行显著的修改来支持流处理。
应用程序执行:
Spark Streaming 的应用从一个或多个输入流开始执行。系统加载数据流的方式,要么是经过直接从客户端接收记录数据,要么是经过周期性的从外部存储系统中加载数据,如 HDFS,外部的存储系统也能够被日志收集系统[9]所代替。在前一种方式下,因为 D-Streams 须要输入的数据被可靠地进行存储来从新计算结果,所以咱们须要确保新的数据在向客户端程序发送确认以前,在两个工做节点间被复制。若是一个工做节点发生故障,客户端程序向另外一个工做节点
发送未经确认的数据。
全部的数据在每个工做节点上被一个块存储进行管理,同时利用主服务器上的跟踪器来让各个节点找到数据块的位置。因为咱们的输入数据块和咱们从数据块计算获得的 RDD 的分区是不可变的,所以对块存储的跟踪是相对简单的:每个数据块只是简单的给定一个惟一 ID,并全部拥有这个 ID 的节点都可以对其进行操做(例如,若是多个节点同时计算它)。块存储将新的数据块存储在内存中,但会以 LRU 策略将这些数据块丢弃, 这在后面会进行描述。
为了肯定什么时候开始一个新的时间周期,咱们假设各个节点经过 NTP 进行了时钟同步,而且在每个周期结束时每个节点都会向主服务器报告它所接收到的数据块 IDs。主服务器以后会启动任务来计算这个周期内的输出 RDDs, 不须要其余任何同步。和其余的批处理调度器同样,一旦完成上个周期任务,它就简单地开始每一个后续任务。
Spark Streaming 依赖于每个时间间隔内 Spark 现有的批处理调度器,并加入了像DryadLINQ系统中的大量优化:(1)它对一个单独任务中的多个操做进行了管道式执行,如一个 map 操做后紧跟着另外一个 map操做。(2)它根据数据的本地性对各个任务进行调度。(3)它对 RDD 的各个划分进行了控制,以免在网络中数据的 shuffle。例如,在一个reduceByWindow 的操做中,每个周期内的任务须要从当前的周期内“增长” 新的部分结果(例如,每个页面的点击数),和“删除” 多个周期之前的结果。调度器使用相同的方式对不一样周期内的状态 RDD 进行切分,以使在同一个节点的每个 key 的数据(例如,一个页面) 在各时间分片间保持一致。
流处理优化:
尽管 Spark Streaming 创建在 Spark 之上,咱们仍然必须优化这个批处理引擎以使其支持流处理。这些优化包括如下几个方面:
(1)网络通讯:咱们重写了 Spark 的数据层,经过使用异步 I/O 使得带有远程输入的任务,好比说reduce 任务,可以更快地获取它们。
(2)时间间隔流水线化:由于每个时间间隔内的任务均可能没有充分地使用集群的资源(好比说,在每个时间间隔的末端, 可能只有不多的几个任务还在运行),因此,咱们修改了 Spark的调度器,使它容许在当前的时间间隔尚未结束的时候调用下一个时间间隔的任务。例如,考虑咱们在表 4.3 提到的 map + runningReduce 做业。咱们之因此可以在时间间隔 1 的 reduce 操做结束以前就能够执行时间间隔 2 的 map 操做,这是由于每一步的 map操做都是独立的。
(3)任务调度:咱们对 Spark 的任务调度器作了大量的优化,好比说手工调整控制消息的大小,使得每隔几百毫秒就能够启动上百个任务的并行做业。
(4)存储层:为了支持 RDDs 的异步检查点和性能提高,咱们重写了 Spark 的存储层。由于 RDDs 是不可变的,因此能够在不阻塞计算和减慢做业的状况下经过网络对 RDDs 设置检查点。在可能的状况下,新的数据层还会使用零拷贝。
(5)lineage 截断:由于在 D-Streams 中 RDDs 之间的 lineage 能够无限增加,咱们修改了调度器使之在一个 RDD 被设置检查点以后删除本身的 lineage,修改以后 RDDs 之间的 lineage 不能任意生长。相似地,对于 Spark 中的其余无限增加的数据结构来讲,将会按期调用一个清理进程来清理它们。
(6)Master 的恢复:由于流应用须要不间断运行 7 天 24 小时,咱们给 Spark 加入对 master 状态恢复的支持。
针对流处理所作的优化还提升了 Spark 在批处理标准测试上的性能,大概是以前的 2 倍. Spark 的引擎可以同时应对流处理和批处理,这是其强大之处。
内存管理:
在当前的 Spark Streaming 实现中,每一个结点的块存储管理 RDD 的分片是以 LRU(最近最少使用)的方式,若是内存不够会依 LRU 算法将数据调换到磁盘。另外,用户能够设置最大的超时时间,当达到这个时间以后系统会直接将旧的数据块丢弃而不进行磁盘 I/O 操做(这个超时时间必须大于检查点间隔的时间)。咱们发如今不少应用中,Spark Streaming 须要的内存并非不少,这是由于一个计算中的状态一般比输入数据少不少(不少应用是计算聚合统计),而且任何可靠的流式处理系统都须要像咱们这样经过网络来复制数据到多个结点。可是,Spark团队仍是会探索优化内存使用的方式。
并行恢复:
DStreams的肯定性使得可使用两种有效却不适合常规流式系统的恢复技术来恢复工做节点状态:并行恢复和推测执行。此外,它也简化了主节点的恢复。
当集群中一个Worker失败,该节点上的RDD分片状态、Running中的任务,DStreams都会在其余Worker上从新并行计算。经过异步的复制RDD状态到其它的工做节点,系统能够周期性地设置RDDs状态地检查点。例如,在运行时统计页面浏览数的程序中,系统可能对于该计算每分钟选择一个检查点。而后,若是一个节点失败了,系统会检查全部丢失的RDD分片,而后启动一个任务从上次的检查点开始从新计算。多个任务能够同时启动去计算不一样的分片,使得整个集群参与恢复。DStream在每一个时间片中并行的计算RDDs的分区以及并行处理每一个时间片中相互独立的操做(例如开始的map操做),由于能够从lineage中细粒度地得到依赖关系。
在上行流备份中,单个闲置机器执行了全部的恢复,而后开始处理新的纪录。在高负荷地系统中这须要很长时间才能跟上进度,这是由于在重建旧的状态过程当中新的纪录会持续到达。事实上,假设在失败以前地工做量是,而后在恢复的每分钟中备份节点只能作一分钟地工做,可是会同时收到 分钟的新任务。所以,要在的时间内从上次失败节点中彻底恢复 个单元的任务,则能够获得:。
(4.1)
在其余线路中,全部的机器参与恢复,同时也处理新的纪录。假定在任务失败以前分布式集群中有台机器,剩余的台机器,如今每一个机器须要恢复个工做,同时接收数据的速率是。它们追赶到来的数据流时间知足
。
(4.2)
所以,拥有更多的节点,并行恢复可以跟上到来的数据流,这比上行留备份要快得多。
除了节点故障,在大型集群中另外一个值得关注的问题是运行较慢的节点。幸运的是,DStreams一样也可让咱们像批处理系统那样减小较慢节点的影响,这是经过推测性(speculative)地运行较慢任务地备份副本实现的。这种推测执行在连续的处理系统中可能很难实现,由于它须要启动一个节点的新副本,填充新副本的状态,并追遇上较慢的副本。事实上,流式处理中的复制算法,好比Flux和DPC,主要在于研究两个副本之间的同步。
在Spark Streaming的实现中,使用了一个简单的阈值来检测较慢的节点:若是一个任务的运行时长比它所处的工做阶段中的平均值高1.4倍以上,那么就标记它为慢节点。
7*24运行Spark Streaming的一个最终要求是可以容忍Spark master的故障[51]。Spark经过两个步骤来作到这些,第一步是当开始每一个时序时可靠的记录计算的状态,第二步是当旧的master失败时,让计算节点链接到一个新的master而且报告它们的RDD分区。DStreams简化恢复的一个关键方面是,若是一个给定的RDD被计算两次是没有问题的。由于操做是肯定的,这一结果与从故障中进行恢复相似。由于任务能够从新计算,这意味着当master从新链接时丢掉一些运行中的任务也是能够的。
Spark Sreaming目前的实现方式是将DStreams元数据存储在HDFS,记录(1)用户的DStreams图以及代表用户代码的Scala的函数对象,(2)最后的检查点的时间,还有(3)自检查点开始的RDD的ID号,其中检查点经过在每一个时序进行重命名(原子操做)来更新HDFS文件。恢复后,新master会读取这个文件找到它断开的地方,并从新链接到计算节点,以便肯定哪些RDD分区是在内存中。而后再继续处理每个漏掉的时序。虽然Spark Streaming尚未优化恢复处理,但它是至关快了,100个节点的集群能够在12秒内恢复。
4.2 DStreams API
由于DStreams是主要的执行策略(描述如何将一个计算分解成多个步骤),所以它们被用在流式系统中实现了多个标准的操做,好比滑动窗口和增量式处理,以简单的对它们的执行批处理到各个小的时间间隔中。
在Spark Streaming中,用户使用函数API来注册一个或多个数据流。程序能够将输入数据流定义为从外部系统中读取数据,该系统经过从对节点端口监听或周期性地从一个存储系统(例如,HDFS)加载来获取数据。它能够适用于两种类型对这些数据流的操做:转换操做,从一个或多个父数据流建立一个新的DStreams。这些操做多是无状态的,在这个时间周期内对RDD分别进行处理,或它们可能跨越周期来建立状态。输出操做,使得程序将数据写入外部系统。例如,save操做将DStreams中的每个RDD输出到数据库。
DStreams支持在典型批处理框架中所拥有的无状态的转换操做,包括map,reduce,groupBy和join。咱们在Spark中提供了全部操做。例如,一个程序使用如下的代码能够在DStreams的每个时间周期内,运行一个规范的MapReduce WC程序实例。
pairs=words.map(w=>(w,1)) //构造键值对RDD,并统计数量
counts=pairs.reduceByKey((a,b)=>a+b) //聚合
此外,为了支持跨越多个周期的计算,DStreams提供了多个有状态的转换操做,这些操做包括:窗口、增量式聚合、状态跟踪等。这些操做是在基于标准的数据流处理技术的基础上例如滑动窗口。图4.2 用于单一关联和关联+可逆版本的操做执行的reduceByWindow。这两个版本为每一个时间间隔只进行一次计数的计算,可是第二个版本的操做避免了对每个窗口进行从新求和。方框表示RDDs,箭头表示用来计算窗口的操做[t,t+5)。
图4.2 用于单一关联和关联+可逆版本的操做执行的reduceByWindow
(1)窗口操做:window操做将每个过去的时间周期的滑动窗口里的全部记录组合到一个RDD。例如,代码words.window(“5s”),会产生一个包含周期内单词的RDDs的DStream[0,5),[1,6),[2,7)等。
(2)增量式聚合操做:对于经常使用的聚合计算的用例,就像在一个滑动窗口上进行count或max操做,DStreams有增量reduceByWindow操做的几个变种操做。最简单的一个是仅仅用一个关联的合并函数来对值进行合并。例如,在上述代码中,用户能够写:pairs.ReduceByWindow(“5s”,(a,b)=>a+b)。对于每个时间周期只对该周期的计数进行一次计算,但不得不反复地对过去的5秒去添加计数,如图4.2(a)所示。若是聚合函数也是可逆的,一个更加高效的版本还须要“减值和增量式维护状态的一个函数(图4.2(b)):paris.reduceByWindow(“5s”,
(a,b)=>a+b,(a,b)=>a-b))。
(3)状态跟踪:一般,应用程序为了对表示状态变化的事件流进行响应,须要对各种对象进行状态跟踪。
在Spark中可使用批处理的操做来实现这些操做,经过将批处理操做应用到来自父数据流中不一样时间的RDDs,例如,能够由updateStateByKey构建的RDDs,经过对旧的状态和每一个周期的新事件进行分组来实现。最后,用户调用输出操做符将Spark Streaming的结果发送到外部系统(例如,展现在dashboard上)。咱们提供了两个这样的操做:save操做,将Dstream中的每个RDD写入到一个存储系统(例如,HDFS或HBase),和foreachRDD,在每个RDD上执行一段用户代码段(任意的Spark代码)。例如,用户能够用counts.foreachRDD(rdd=>print(rdd.top(K)))来打印top K的计数。
4.3 Window Operations
Spark Streaming还提供了窗口计算,容许您在数据的滑动窗口上应用转换。下图说明了这个滑动窗口。
图4.3 Window Operations
如图4.3所示,每当窗口滑过源DStream时,落在窗口内的源RDD被组合并进行操做以产生窗口DStream的RDD。在这种具体状况下,操做应用于最近3个时间单位的数据,并以2个时间单位滑动。这代表任何窗口操做都须要指定两个参数。
- windowLength窗口长度 - 窗口的持续时间(4.3图中的3)
- slideInterval滑动间隔 - 执行窗口操做的间隔(4.3图中的2)
这两个参数必须是源DStream的批间隔的倍数(图中的1)。
咱们以一个例子来讲明窗口操做。假设但愿经过在过去30秒的数据中每10秒产生一个字数来扩展早期的示例。为此,必须在最近30秒的数据中对(word,1)对的对DStream应用reduceByKey操做。这是使用reduceByKeyAndWindow操做完成的。
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
一些常见的窗口操做以下。全部这些操做都采用上述两个参数 - windowLength和slideInterval。
5、实验步骤
5.1 数据源
实验采用Sina股票数据接口。以字符串数据的形式范围,简单易用且直观。
5.2 测试数据源
针对股票的数据接口,一下代码提供简单的测试,以解析返回的数据。
解析数据
伴生对象:
5.3 Spark Streaming编程
数据接口调试完毕,股票数据也解析好了,下面就开始Streaming。Spark Streaming必定会涉及数据源,且该数据源是一个主动推送的过程,即spark被动接受该数据源的数据进行分析。但Sina的接口是一个很简单的HttpResponse,没法主动推送数据,因此咱们须要实现一个Custom Receiver,可参考 http://spark.apache.org/docs/latest/streaming-custom-receivers.html
下面是具体的代码,其实定制化一个Receiver简单来讲就是实现onStart/onStop。onStart用来初始化资源,给获取数据作准备,获取到的数据用store发送给SparkStreaming便可;onStop用来释放资源
Receiver搞定以后就能够开始编写股票预测的main函数了,股票预测的方法之一,就是统计一段时间内股票上涨的次数,并展现上涨次数TopN的股票信息,但本文一切从简,并无实现所有的功能,只是统计了股票上涨的次数,也就是对上涨与否进行WordCount。
5.4 运行结果分析
因为ssc的时间间隔为1,因此每秒都会查询大同煤业的股票数据,这就是下面每一个Time打印的第一行数据(由于stockState先进行print,因此每次查询的股票数据是第一行);又由于slide设置为3,因此每隔3秒会进行reduceFunc计算,该函数处理windowsize个RDD(此处设置为6),对这6个RDD按照时间前后顺序进行reduce。
须要特别说明的是spark的reduce默认从左到右进行fold(折叠),从最左边取两个数进行reduce计算产生临时结果,再与后面的数据进行reduce,以此类推动行计算,其实就是foldLeft。
6、总结
实验针对新浪股票数据接口,基于Spark Streaming实时接收并处理数据,并对特定的某一支股票走势进行简单预测。使学生可以对Spark Streaming流式计算框架有所全面的了解,并掌握DStreams抽象的操做。