一.SparkStreaming在线另类实验
如何清晰的看到数据的流入、被处理的过程?使用一个小技巧,经过调节放大BatchInterval的方式,来下降批处理次数,以方便看清楚各个环节。咱们从已写过的广告点击的在线黑名单过滤的SparkStreaming应用程序入手。一下是具体的实验源码:
package com.dt.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 背景描述:在广告点击计费系统中,咱们在线过滤掉黑名单的点击,进而保护广告商的利益,
* 只进行有效的广告点击计费。或者在防刷评分(或者流量)系统,过滤掉无效的投票或者评分或者流量。
* 实现技术:使用transform API直接基于RDD编程,进行join操做
*
* Created by Administrator on 2016/4/30.
*/
object OnlineBlackListFilter {
def main(args: Array[String]) {
/**
* 第一步:建立Spark的配置对象,设置Spark程序的运行时的配置信息
* 例如说经过setMaster来设置程序要链接的spark集群的master的url,若是设置为
* local, 则表明Spark程序在本地运行,特别适合于机器配置条件很是差
* (例如只有1g的内存)的初学者
*/
val conf = new SparkConf() //建立SparkConf对象
conf.setAppName("OnlineBlackListFilter") //设置Spark应用程序的名称,在程序运行的监控界面能够看到名称
// conf.setMaster("local") //此时,程序在本地运行,不须要安装Spark集群
conf.setMaster("spark://master:7077") //此时,程序在本地运行,不须要安装Spark集群
val ssc = new StreamingContext(conf, Seconds(300))
/**
* 黑名单数据准备,实际上黑名单通常都是动态的,例如在Redis中或者数据库中,黑名单的生成每每有复杂的业务逻辑,
* 具体状况算法不一样,可是在SparkStreaming进行处理的时候每次都可以访问完整的信息
*
*/
val blackList = Array(("hadoop", true), ("mahout", true))
val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)
val adsClickStream = ssc.socketTextStream("master", 9999)
/**
* 此处模拟的广告点击的每条数据的格式为:time、name
* 此处map操做的结果是name, (time, name)的格式
*/
val adsClickStreamFormatted = adsClickStream.map(ads =>(ads.split(" ")(1), ads))
adsClickStreamFormatted.transform(userClickRDD =>{
//经过leftOuterJoin操做既保留了左侧用户广告点击内容的RDD的全部内容,又得到了相应点击内容是否在黑名单中
val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)
val validClicked = joinedBlackListRDD.filter(joinedItem => {
/**
*进行filter过滤的时候,其输入元素是一个Tuple:(name,((time, name), boolean))
* 其中第一个元素是黑名单的名称,第二个元素的第二个元素是进行leftOuterJoin的时候是否存在该值
* 若是存在的话,代表当前广告点击是黑名单,须要过滤掉,不然的话则是有效点击内容;
*/
if(joinedItem._2._2.getOrElse(false)){
false
} else {
true
}
})
validClicked.map(validClicked =>{ validClicked._2._1 })
}).print()
/**
* 计算后的有效数据通常都会写入Kafka中,下游的计费系统会从Kafka中pull到有效数据进行计费
*/
ssc.start()
ssc.awaitTermination()
}
}
启动nc -lk 9999,将应用发布到Spark集群上运行,并在nc中发送以下数据:算法
执行shell代码 shell
sh内容
/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-submit –class com.dt.spark.sparkstreaming.OnlineBlackListFilter –master spark://Master:7077 /root/Documents/SparkApps/WordCount.jar数据库
2016-05-01 mahout
2016-05-01 scala
2016-05-01 hadoo
2016-05-01 spark
在应用收到数据后会有以下输出apache
2016-05-01 scala
2016-05-01 spark
咱们运行完程序,看到过滤结果之后,中止程序,打开HistoryServer http://master:18080/编程
点击App ID进去,打开,会看到以下图所示的4个Job,从实际执行的Job是1个Job,可是图中显示有4个Job,从这里能够看出Spark Streaming运行的时候本身会启动一些Job。框架
先看看job id 为0 的详细信息socket
很明显是咱们定义的blackListRDD数据的生成。对应的代码为
val blackList = Array((“Hadoop”, true), (“Mathou”, true))
//把Array变成RDD
val blackListRDD = ssc.sparkContext.parallelize(blackList)
而且它作了reduceBykey的操做(代码中并无此步操做,SparkStreaming框架自行生成的)。
这里有两个Stage,Stage 0和Stage 1 。 oop
Job 1的详细信息url
一个makeRDD,这个RDD是receiver不断的接收数据流中的数据,在时间间隔达到batchInterval后,将全部数据变成一个RDD。而且它的耗时也是最长的,59s 。 spa
特别说明:此处能够看出,receiver也是一个独立的job。由此咱们能够得出一个结论:咱们在应用程序中,能够启动多个job,而且不用的job之间能够相互配合,这就为咱们编写复杂的应用程序打下了基础。
咱们点击上面的start at OnlineBlackListFilter.scala:64查看详细信息
根据上图的信息,只有一个Executor在接收数据,最最重要的是红色框中的数据本地性为PROCESS_LOCAL,由此能够知道receiver接收到数据后会保存到内存中,只要内存充足是不会写到磁盘中的。
即使在建立receiver时,指定的存储默认策略为MEMORY_AND_DISK_SER_2
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope(“socket text stream”) {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
job 2的详细信息
Job 2 将前两个job生成的RDD进行leftOuterJoin操做。
从Stage Id的编号就能够看出,它是依赖于上两个Job的。
Receiver接收数据时是在spark-master节点上,可是Job 2在处理数据时,数据已经到了spark-worker1上了(由于个人环境只有两个worker,数据并无分散到全部worker节点,worker节点若是多一点,状况可能不同,每一个节点都会处理数据)
点击上面的Stage Id 3查看详细信息:
Executor上运行,而且有5个Task 。
Job 3的详细信息
总结:咱们能够看出,一个batchInterval并非仅仅触发一个Job。
二.Spark Streaming本质的理解
根据上面的描述,咱们更细致的了解了DStream和RDD的关系了。DStream就是一个个batchInterval时间内的RDD组成的。只不过DStream带上了时间维度,是一个无边界的集合。
Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各类来源的实时输入数据,进行处理后,处理结果保存在HDFS、Databases等各类地方。
Spark Streaming接收这些实时输入数据流,会将它们按批次划分,而后交给Spark引擎处理,生成按照批次划分的结果流。
Spark Streaming使用数据源产生的数据流建立DStream,也能够在已有的DStream上使用一些操做来建立新的DStream。
在咱们前面的实验中,每300秒会接收一批数据,基于这批数据会生成RDD,进而触发Job,执行处理。
DStream是一个没有边界的集合,没有大小的限制。
DStream表明了时空的概念。随着时间的推移,里面不断产生RDD。
锁定到时间片后,就是空间的操做,也就是对本时间片的对应批次的数据的处理。
下面用实例来说解数据处理过程。
从Spark Streaming程序转换为Spark执行的做业的过程当中,使用了DStreamGraph。
Spark Streaming程序中通常会有若干个对DStream的操做。DStreamGraph就是由这些操做的依赖关系构成。
对DStream的操做会构建成DStream Graph
从每一个foreach开始,都会进行回溯。从后往前回溯这些操做之间的依赖关系,也就造成了DStreamGraph。
在每到batchInterval时间间隔后,Job被触发,DStream Graph将会被转换成RDD Graph
空间维度肯定以后,随着时间不断推动,会不断实例化RDD Graph,而后触发Job去执行处理。