原本这篇是准备5.15更的,可是上周一直在忙签证和工做的事,没时间就推迟了,如今终于有时间来写写Learning Spark最后一部份内容了。html
第10-11 章主要讲的是Spark Streaming 和MLlib方面的内容。咱们知道Spark在离线处理数据上的性能很好,那么它在实时数据上的表现怎么样呢?在实际生产中,咱们常常须要即便处理收到的数据,好比实时机器学习模型的应用,自动异常的检测,实时追踪页面访问统计的应用等。Spark Streaming能够很好的解决上述相似的问题。算法
了解Spark Streaming ,只须要掌握如下几点便可:apache
- DStream
- 概念:离散化流(discretized stream),是随时间推移的数据。由每一个时间区间的RDD组成的序列。DStream能够从Flume、Kafka或者HDFS等多个输入源建立。
- 操做:转换和输出,支持RDD相关的操做,增长了“滑动窗口”等于时间相关的操做。
下面以一张图来讲明Spark Streaming的工做流程:缓存
从上图中也能够看到,Spark Streaming把流式计算当作一系列连续的小规模批处理来对待。它从各类输入源读取数据,并把数据分组为小的批次,新的批次按均匀的时间间隔建立出来。在每一个时间区间开始的时候,一个新的批次就建立出来,在该区间内收到的数据都会被添加到这个批次中去。在时间区间结束时,批次中止增加。机器学习
转化操做分布式
- 无状态转化操做:把简单的RDDtransformation分别应用到每一个批次上,每一个批次的处理不依赖于以前的批次的数据。包括map()、filter()、reduceBykey()等。
- 有状态转化操做:须要使用以前批次的数据或者中间结果来计算当前批次的数据。包括基于滑动窗口的转化操做,和追踪状态变化的转化操做(updateStateByKey())
无状态转化操做ide
有状态转化操做函数
Windows机制(一图盛千言)oop
上图应该很容易看懂,下面举个实例(JAVA写的):性能
UpdateStateByKey()转化操做
主要用于访问状态变量,用于键值对形式的DStream。首先会给定一个由(键,事件)对构成的DStream,并传递一个指定如何我的剧新的事件更新每一个键对应状态的函数,它能够构建出一个新的DStream,为(键,状态)。通俗点说,加入咱们想知道一个用户最近访问的10个页面是什么,能够把键设置为用户ID,而后UpdateStateByKey()就能够跟踪每一个用户最近访问的10个页面,这个列表就是“状态”对象。具体的要怎么操做呢,UpdateStateByKey()提供了一个update(events,oldState)函数,用于接收与某键相关的时间以及该键以前对应的状态,而后返回这个键对应的新状态。
- events:是在当前批次中收到的时间列表()可能为空。
- oldState:是一个可选的状态对象,存放在Option内;若是一个键没有以前的状态,能够为空。
- newState:由函数返回,也以Option形式存在。若是返回一个空的Option,表示想要删除该状态。
UpdateStateByKey()的结果是一个新的DStream,内部的RDD序列由每一个时间区间对应的(键,状态)对组成。
接下来说一下输入源
- 核心数据源:文件流,包括文本格式和任意hadoop的输入格式
- 附加数据源:kafka和flume比较经常使用,下面会讲一下kafka的输入
- 多数据源与集群规模
Kafka的具体操做以下:
基于MLlib的机器学习
通常咱们经常使用的算法都是单机跑的,可是想要在集群上运行,不能把这些算法直接拿过来用。一是数据格式不一样,单机上咱们通常是离散型或者连续型的数据,数据类型通常为array、list、dataframe比较多,以txt、csv等格式存储,可是在spark上,数据是以RDD的形式存在的,如何把ndarray等转化为RDD是一个问题;此外,就算咱们把数据转化成RDD格式,算法也会不同。举个例子,你如今有一堆数据,存储为RDD格式,而后设置了分区,每一个分区存储一些数据准备来跑算法,能够把每一个分区看作是一个单机跑的程序,可是全部分区跑完之后呢?怎么把结果综合起来?直接求平均值?仍是别的方式?因此说,在集群上跑的算法必须是专门写的分布式算法。并且有些算法是不能分布式的跑。Mllib中也只包含可以在集群上运行良好的并行算法。
MLlib的数据类型
- Vector:向量(mllib.linalg.Vectors)支持dense和sparse(稠密向量和稀疏向量)。区别在与前者的没一个数值都会存储下来,后者只存储非零数值以节约空间。
- LabeledPoint:(mllib.regression)表示带标签的数据点,包含一个特征向量与一个标签,注意,标签要转化成浮点型的,经过StringIndexer转化。
- Rating:(mllib.recommendation),用户对一个产品的评分,用于产品推荐
- 各类Model类:每一个Model都是训练算法的结果,通常都有一个predict()方法能够用来对新的数据点或者数据点组成的RDD应用该模型进行预测
通常来讲,大多数算法直接操做由Vector、LabledPoint或Rating组成的RDD,一般咱们从外部数据读取数据后须要进行转化操做构建RDD。具体的聚类和分类算法原理很少讲了,能够本身去看MLlib的在线文档里去看。下面举个实例----垃圾邮件分类的运行过程:
步骤:
1.将数据转化为字符串RDD
2.特征提取,把文本数据转化为数值特征,返回一个向量RDD
3.在训练集上跑模型,用分类算法
4.在测试系上评估效果
具体代码:
1 from pyspark.mllib.regression import LabeledPoint 2 from pyspark.mllib.feature import HashingTF 3 from pyspark.mllib.calssification import LogisticRegressionWithSGD 4 5 spam = sc.textFile("spam.txt") 6 normal = sc.textFile("normal.txt") 7 8 #建立一个HashingTF实例来把邮件文本映射为包含10000个特征的向量 9 tf = HashingTF(numFeatures = 10000) 10 #各邮件都被切分为单词,每一个单词背映射为一个特征 11 spamFeatures = spam.map(lambda email: tf.transform(email.split(" "))) 12 normalFeatures = normal.map(lambda email: tf.transform(email.split(" "))) 13 14 #建立LabeledPoint数据集分别存放阳性(垃圾邮件)和阴性(正常邮件)的例子 15 positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1,features)) 16 negativeExamples = normalFeatures.map(lambda features: LabeledPoint(0,features)) 17 trainingData = positiveExamples.union(negativeExamples) 18 trainingData.cache#由于逻辑回归是迭代算法,因此缓存数据RDD 19 20 #使用SGD算法运行逻辑回归 21 model = LogisticRegressionWithSGD.train(trainingData) 22 23 #以阳性(垃圾邮件)和阴性(正常邮件)的例子分别进行测试 24 posTest = tf.transform("O M G GET cheap stuff by sending money to...".split(" ")) 25 negTest = tf.transform("Hi Dad, I stared studying Spark the other ...".split(" ")) 26 print "Prediction for positive test examples: %g" %model.predict(posTest) 27 print "Prediction for negative test examples: %g" %model.predict(negTest)
这个例子很简单,讲的也颇有限,建议你们根据本身的需求,直接看MLlib的官方文档,关于聚类,分类讲的都很详细。
注:图片参考同事的PPT讲义^_^,已受权哈哈