Ref: 一文读懂 Spark 和 Spark Streaming【简明扼要的概览】html
在讲解 "流计算" 以前,先作一个简单的回顾,亲!python
MapReduce 模型的诞生是大数据处理从无到有的飞跃。但随着技术的进步,对大数据处理的需求也变得愈来愈复杂,MapReduce 的问题也日渐凸显。mysql
一般,咱们将 MapReduce 的输入和输出数据保留在 HDFS 上,不少时候,复杂的 ETL、数据清洗等工做没法用一次 MapReduce 完成,因此须要将多个 MapReduce 过程链接起来:sql
Figure,上图中只有两个 MapReduce 串联,实际上可能有几十个甚至更多,依赖关系也更复杂。数据库
这种方式下,每次中间结果都要写入 HDFS 落盘保存,代价很大(别忘了,HDFS 的每份数据都须要冗余若干份拷贝)。编程
另外,因为本质上是屡次 MapReduce 任务,调度也比较麻烦,实时性无从谈起。并发
通常来讲,想作到 fault-tolerance 只有两个方案:app
可是之因此能作到这一点,是依赖于一个额外的假设:全部计算过程都是肯定性的(deterministic)。框架
Spark 借鉴了函数式编程思想,提出了 RDD(Resilient Distributed Datasets),译做“弹性分布式数据集”。less
Figure,上图演示了 RDD 分区的恢复。为了简洁并无画出分区,实际上恢复是以分区为单位的。
RDD 的数据由多个分区(partition)构成,这些分区能够分布在集群的各个机器上,这也就是 RDD 中 “distributed” 的含义。
熟悉 DBMS(数据库管理系统)的同窗能够把 RDD 理解为逻辑执行计划,partition 理解为物理执行计划。
以前有涉及到一部份内容:[Spark] 01 - What is Spark
简单的说,宽依赖就是 "一对多“。
在执行时,窄依赖能够很容易的按流水线(pipeline)的方式计算:对于每一个分区从前到后依次代入各个算子便可。
然而,宽依赖须要等待前继 RDD 中全部分区计算完成;
换句话说,宽依赖就像一个栅栏(barrier)会阻塞到以前的全部计算完成。整个计算过程被宽依赖分割成多个阶段(stage),如上右图所示。
命令式 & 声明式 编程:声明式的要简洁的多!但声明式编程依赖于执行者产生真正的程序代码,因此除了上面这段程序,还须要把数据模型(即 schema)一并告知执行者。
声明式编程最广为人知的形式就是 SQL。Spark SQL 就是这样一个基于 SQL 的声明式编程接口。
你能够将它看做在 Spark 之上的一层封装,在 RDD 计算模型的基础上,提供了 DataFrame API 以及一个内置的 SQL 执行计划优化器 Catalyst。
代码生成(codegen)转化成直接对 RDD 的操做
DataFrame 就像数据库中的表,除了数据以外它还保存了数据的 schema 信息。
Catalyst 是一个内置的 SQL 优化器,负责把用户输入的 SQL 转化成执行计划。
Catelyst 强大之处是它利用了 Scala 提供的代码生成(codegen)机制,物理执行计划通过编译,产出的执行代码效率很高,和直接操做 RDD 的命令式代码几乎没有分别。
上图是 Catalyst 的工做流程,与大多数 SQL 优化器同样是一个 Cost-Based Optimizer (CBO),但最后使用代码生成(codegen)转化成直接对 RDD 的操做。
"批处理"和"流计算"被看做大数据系统的两个方面。
在 ETL 等场合,这样的设计经常致使一样的计算逻辑被实现两次,耗费人力不说,保证一致性也是个问题。
Spark Streaming 基于 Spark,另辟蹊径提出了 D-Stream(Discretized Streams)方案:将流数据切成很小的批(micro-batch),用一系列的短暂、无状态、肯定性的批处理实现流处理。
开发者只须要维护一套 ETL 逻辑便可同时用于批处理和流计算。
The figure,
实际上,新的状态 RDD 老是不断生成,而旧的 RDD 并不会被“替代”,而是做为新 RDD 的前继依赖。
对于底层的 Spark 框架来讲,并无时间步的概念,有的只是不断扩张的 DAG 图和新的 RDD 节点。
Spark 经过 Spark Streaming 拥有了流计算能力,那 Spark SQL 是否也能具备相似的流处理能力呢?答案是确定的。
只要将数据流建模成一张不断增加、没有边界的表,在这样的语义之下,不少 SQL 操做等就能直接应用在流数据上。
出人意料的是,Spark Structured Streaming 的流式计算引擎并无复用 Spark Streaming,而是在 Spark SQL 上设计了新的一套引擎。
所以,从 Spark SQL 迁移到 Spark Structured Streaming 十分容易,但从 Spark Streaming 迁移过来就要困可贵多。
基于这样的模型,Spark SQL 中的大部分接口、实现都得以在 Spark Structured Streaming 中直接复用。
将用户的 SQL 执行计划转化成流计算执行计划的过程被称为 增量化(incrementalize),这一步是由 Spark 框架自动完成的。
对于用户来讲只要知道:每次计算的输入是某一小段时间的流数据,而输出是对应数据产生的计算结果。
窗口(window)是对过去某段时间的定义。
批处理中,查询一般是全量的(例如:总用户量是多少);而流计算中,咱们一般关心近期一段时间的数据(例如:最近24小时新增的用户量是多少)。
用户经过选用合适的窗口来得到本身所需的计算结果,常见的窗口有滑动窗口(Sliding Window)、滚动窗口(Tumbling Window)等。
水位(watermark)用来丢弃过早的数据。
在流计算中,上游的输入事件可能存在不肯定的延迟,而流计算系统的内存是有限的、只能保存有限的状态,必定时间以后必须丢弃历史数据。
以双流 A JOIN B 为例,假设窗口为 1 小时,那么 A 中比当前时间减 1 小时更早的数据(行)会被丢弃;若是 B 中出现 1 小时前的事件,由于没法处理只能忽略。
Spark 中有三个角色:Driver, Worker 和 Cluster Manager。
Spark 是一个同时支持批处理和流计算的分布式计算系统。Spark 的全部计算均构建于 RDD 之上,RDD 经过算子链接造成 DAG 的执行计划,RDD 的肯定性及不可变性是 Spark 实现故障恢复的基础。Spark Streaming 的 D-Stream 本质上也是将输入数据分红一个个 micro-batch 的 RDD。
Spark SQL 是在 RDD 之上的一层封装,相比原始 RDD,DataFrame API 支持数据表的 schema 信息,从而能够执行 SQL 关系型查询,大幅下降了开发成本。
Spark Structured Streaming 是 Spark SQL 的流计算版本,它将输入的数据流看做不断追加的数据行。
至此,经过 一文读懂 Spark 和 Spark Streaming 了解了大概框架和概念,下面继续”厦大“课程的学习,goto: 流计算概述
对时间比较敏感,过去过久的数据,其价值迅速下降。好比:用户点击流。
根据大数据,进行秒级响应进行商品推荐。
既然是实时结果,那天然是主动推送给用户,也就是实时推荐。
本质是“tiny batch processing",切分到秒级响应。
DStream就是一堆Tiny RDD的集合,使用”微小“批处理模拟流计算。
每一个receiver 单独负责一个 Input DStream。
Receiver监控这几种流,而后交给流计算组件去处理。
编写Streaming程序的套路,指挥官:streamingContext。
(1) 建立DStream,也就定义了输入源。
(2) 对DStream进行一些 “转换操做” 和 "输出操做"。
(3) 启动流计算,接收数据:streamingContext.start()
(4) 结束流计算,streamingContext.awaitTermination()
(5) 手动结束流计算进程:streamingContext.stop()
交互环境
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
独立程序
from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext conf = SparkConf() conf.setAppName('TestDStream') conf.setMaster('local[2]') sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1)
* 文件流
实时自动监控文件内容、目录内容。文件夹中新的文件添加进来,就会造成流,读入。
from pyspark import SparkContext from pyspark.streaming import StreamingContext
# 定义输入源 ssc = StreamingContext(sc, 10) lines = ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile') # <---- 这是文件夹!
# 转换和输出操做 words = lines.flatMap(lambda line: line.split(' ')) # 拍扁了一个个可乐罐,获得单词集合 wordCounts = words.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b) # 词频汇总
wordCounts.pprint() ssc.start() ssc.awaitTermination()
运行代码:
/usr/local/spark/bin/spark-submit FileStreaming.py
* 套接字流
客户端,接收
客户端进行刺频统计,并显示结果。
#!/usr/bin/env python3 # NetworkWordCount.py from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr) exit(-1) sc = SparkContext(appName = "PythonStreamingNetworkWordCount") ssc = StreamingContext(sc, 1) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a,b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()
客户端从服务端接收流数据:
# 用客户端向服务端发送流数据 $ /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost <端口>
服务端,发送
(a) 系统自带服务端 nc。
# 打开服务端 $nc -lk <端口号>
(b) 自定义服务端:有链接则发送一段字符串过去。
#!/usr/bin/env python3
# DataSourceSocket.py import socket server =socket.socket() server.bind('localhost', 9999) server.listen(1) while 1:
# step 1 print("I'm waiting for the connect") conn, addr = server.accept() print("Connect success! Connection is from %s " % addr[0])
# step 2,发送的内容做为测试例子 print("Sending data ...") conn.send('I love hadoop I love spark hadoop is good spark is fast'.encode())
# step 3 conn.close() print('Connection is broken.')
启动服务端发送流数据:
# 用客户端向服务端发送流数据 $ /usr/local/spark/bin/spark-submit DataSourceSocket.py
* RDD队列流
#!/usr/bin/env python3 import time from pyspark import SparkContext from pyspark.streaming import StreamingContext if __name__ == "__name__": sc = SparkContext(appName="PythonStreamingQueueStream") ssc = StreamingContext(sc, 2) rddQueue = [] for i in range(5): # 每一个rdd包含这1000个数字。 rddQueue += [ssc.sparkContext.parallelize( [j for j in range(1, 1001) ], 10)] time.sleep(1)
# 建立了一个RDD队列流 inputStream = ssc.queueStream(rddQueue) # 转换和输出操做:统计每一个余数出现的频率 mappedStream = inputStream.map(lambda x: (x % 10, 1)) reducedStream = mappedStream.reduceByKey(lambda a, b: a+b) reducedStream.pprint() ssc.start() ssc.stop(stopSparkContext = True, stopGraceFully = True) # 等待全部的数据处理完后优雅地退出
Ref: Spark入门:DStream转换操做
何为“无状态”?
咱们以前 “套接字流” 部分介绍的词频统计,就是采用无状态转换;
每次统计,都是只统计当前批次到达的单词的词频,和以前批次无关,不会进行累计。
下面给出一些无状态转换操做的含义:
* map(func) :对源DStream的每一个元素,采用func函数进行转换,获得一个新的DStream;
* flatMap(func): 与map类似,可是每一个输入项可用被映射为0个或者多个输出项;
* filter(func): 返回一个新的DStream,仅包含源DStream中知足函数func的项;
* repartition(numPartitions): 经过建立更多或者更少的分区改变DStream的并行程度;
* union(otherStream): 返回一个新的DStream,包含源DStream和其余DStream的元素;
* count():统计源DStream中每一个RDD的元素数量;
* reduce(func):利用函数func汇集源DStream中每一个RDD的元素,返回一个包含单元素RDDs的新DStream;
* countByValue():应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每一个键的值是在原DStream的每一个RDD中的出现次数;
* reduceByKey(func, [numTasks]):当在一个由(K,V)键值对组成的DStream上执行该操做时,返回一个新的由(K,V)键值对组成的DStream,每个key的值均由给定的recuce函数(func)汇集起来;
* join(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新DStream;
* cogroup(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组;
* transform(func):经过对源DStream的每一个RDD应用RDD-to-RDD函数,建立一个新的DStream。支持在新的DStream中作任何RDD操做。
下面给给出一些窗口转换操做的含义:
* window(windowLength, slideInterval):基于源DStream产生的窗口化的批数据,计算获得一个新的DStream;
* countByWindow(windowLength, slideInterval):返回流中元素的一个滑动窗口数;
* reduceByWindow(func, windowLength, slideInterval):返回一个单元素流。利用函数func汇集滑动时间间隔的流的元素建立这个单元素流。函数func必须知足结合律,从而能够支持并行计算;
* reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认状况下,这个算子利用了Spark默认的并发任务数去分组。能够经过numTasks参数的设置来指定不一样的任务数;
* reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):更加高效的reduceByKeyAndWindow,每一个窗口的reduce值,是基于先前窗口的reduce值进行增量计算获得的;它会对进入滑动窗口的新数据进行reduce操做,并对离开窗口的老数据进行“逆向reduce”操做。可是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入);
* countByValueAndWindow(windowLength, slideInterval, [numTasks]):当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每一个key的值都是它们在滑动窗口中出现的频率。
逆操做,如何减小计算量?
参考:http://www.javashuo.com/article/p-ghrpdumz-mv.html
问:reduceByWindow() 和 reduceByKeyAndWindow() 让咱们能够对每一个窗口更高效地进行归约操做。
它们接收一个归约函数,在整个窗口上执行,好比 +。除此之外,它们还有一种特殊形式,经过只考虑新进入窗口的数据和离开窗 口的数据,让 Spark 增量计算归约结果。
这种特殊形式须要提供归约函数的一个逆函数,比 如 + 对应的逆函数为 -。对于较大的窗口,提供逆函数能够大大提升执行效率。
[左图] 本来的意图是”从新计算这个窗口”,但没有必要。
[右图] 只计算“增量”。
代码展现:
归约操做在以下代码中的体现:30表明窗口大小,10是步长;
两个lambda中的y的理解,第一个表明“新增”,第二个表明“淘汰出窗口“的内容。
“数据源”终端:
nc -lk 9999
“流计算”终端:
/usr/local/spark/bin/spark-submit \
> WindowedNetworkWordCount.py localhost 9999
经过 updateStateByKey() 操做实现历史状态不断地累加,第二个参数last_sum至关于static变量。
经过 ssc.checkpoint() 声明一个检查点,防止数据丢失
代码展现:
from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: stateful_network_wordcount.py <hostname> <port>", file=sys.stderr) exit(-1)
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount") ssc = StreamingContext(sc, 1)
ssc.checkpoint("file:///usr/local/spark/mycode/streaming/")
# RDD with initial state (key, value) pairs initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)]) def updateFunc(new_values, last_sum): return sum(new_values) + (last_sum or 0) # 第一次时last_sum不存在的,就使用0
# 建立“套接字流” lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
running_counts = lines.flatMap(lambda line: line.split(" "))\ .map(lambda word: (word, 1))\ .updateStateByKey(updateFunc, initialRDD=initialStateRDD) # <--- 跨批次的状态累加
# map以后,按正常来说用reduceByKey(),是“无状态”模式;
# (hadoop,1), (hadoop,1), (spark,1), (spark,1)
# (hadoop,2), (spark,2)
# 将结果保存在文本或打印出
running_counts.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/output.txt")
running_counts.pprint()
ssc.start()
ssc.awaitTermination()
"数据源" 终端:
nc -lk 9999
测试“跨批次”状态维护,因此前后输入若干句子,词频结果可以“跨批次”累计。
“流计算” 终端:
/usr/local/spark/bin/spark-submit \
NetworkWordCountStateful.py localhost 9999
流计算的结果保存到文本文件中去,放在updateStateByKey以后。
running_counts.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/output.txt")
数据库方面的操做。
service mysql start mysql -u root -p # 提示输入密码
mysql> use spark mysql> create table wordcount(word char(20), count int(4));
为了让MySQL支持Python,须要安装 PyMySQL。
流计算的结果保存到数据库中去,放在running_counts.pprint() 以后。
# 写入操做: each partition
def dbfunc(records): db = pymysql.connect("localhost", "root", "123456", "spark") cursor = db.cursor()
def doinsert(p): sql = "insert into wordcount(word,count) values ('%s','%s')" % (str(p)[0]),str(p[1])) try: cursor.execute(sql) db.commit() except: db.rollback()
# 每个分区中的每一条记录,进行sql操做 for item in records: doinsert(item)
def func(rdd):
# 分区太多,可能会并发链接数据库;故,这里改成3个分区 repartitionedRDD = rdd.repartition(3)
# 处理每个分区 repartitionedRDD.foreachPartition(dbfunc)
# DStream 类型:running_counts,本质上就是一堆rdd的集合,这里天然就可以挨个遍历rdd running_counts.foreachRDD(func) ssc.start() ssc.awaitTermination()
End.