1、Spark Streaming的介绍
1. 流处理
流式处理(Stream Processing)。流式处理就是指源源不断的数据流过系统时,系统可以不停地连续计算。因此流式处理没有什么严格的时间限制,数据从进入系统到出来结果多是须要一段时间。然而流式处理惟一的限制是系统长期来看的输出速率应当快于或至少等于输入速率。不然的话,数据岂不是会在系统中越积越多(否则数据哪去了)?如此,无论处理时是在内存、闪存仍是硬盘,迟早都会空间耗尽的。就像雪崩效应,系统愈来愈慢,数据越积越多。算法
二、spark架构

三、Spark Streaming特色
Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。apache
Spark Streaming的优点在于:架构
能运行在100+的结点上,并达到秒级延迟。app
使用基于内存的Spark做为执行引擎,具备高效和容错的特性。框架
能集成Spark的批处理和交互查询。socket
为实现复杂的算法提供和批处理相似的简单接口。函数
Spark Streaming在内部的处理机制是,接收实时流的数据,并根据必定的时间间隔拆分红一批批的数据,而后经过Spark Engine处理这些批数据,最终获得处理后的一批批结果数据。spa
对应的批数据,在Spark内核对应一个RDD实例,所以,对应流数据的DStream能够当作是一组RDDs,即RDD的一个序列。通俗点理解的话,在流数据分红一批一批后,经过一个先进先出的队列,而后 Spark Engine从该队列中依次取出一个个批数据,把批数据封装成一个RDD,而后进行处理,这是一个典型的生产者消费者模型,对应的就有生产者消费者模型的问题,即如何协调生产速率和消费速率。对象
四、程序流程blog
引入头文件
import org.apache.spark._ import org.apache.spark.streaming._ |
1. 建立StreamingContext对象 同Spark初始化须要建立SparkContext对象同样,使用Spark Streaming就须要建立StreamingContext对象。建立StreamingContext对象所需的参数与SparkContext基本一致,包括指明Master,设定名称(如NetworkWordCount)。须要注意的是参数Seconds(1),Spark Streaming须要指定处理数据的时间间隔,如上例所示的1s,那么Spark Streaming会以1s为时间窗口进行数据处理。此参数须要根据用户的需求和集群的处理能力进行适当的设置;
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) |
- 建立InputDStream Spark Streaming须要指明数据源。如上例所示的socketTextStream,Spark Streaming以socket链接做为数据源读取数据。固然Spark Streaming支持多种不一样的数据源,包括Kafka、 Flume、HDFS/S三、Kinesis和Twitter等数据源;
val lines = ssc.socketTextStream("10.2.5.3", 9999 |
- 操做DStream对于从数据源获得的DStream,用户能够在其基础上进行各类操做,如上例所示的操做就是一个典型的WordCount执行流程:对于当前时间窗口内从数据源获得的数据首先进行分割,而后利用Map和ReduceByKey方法进行计算,固然最后还有使用print()方法输出结果;
val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() |
- 启动Spark Streaming以前所做的全部步骤只是建立了执行流程,程序没有真正链接上数据源,也没有对数据进行任何操做,只是设定好了全部的执行计划,当ssc.start()启动后程序才真正进行全部预期的操做。
ssc.start() ssc.awaitTermination() |
五、单词统计例子
object Count { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) val lines = ssc.socketTextStream("10.2.5.3", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } } |
2、SparkStreaming累加器及广播变量的使用
Spark Streaming的累加器和广播变量没法从checkpoint恢复。若是在应用中既使用到checkpoint又使用了累加器和广播变量的话,最好对累加器和广播变量作懒实例化操做,这样才可使累加器和广播变量在driver失败重启时可以从新实例化。
定义累加器
Object DroppedWordsCounter { @volatile private var instance: LongAccumulator = null def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { instance = sc.longAccumulator("WordsInBlacklistCounter") } } } instance } } |
定义广播变量
object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized{ if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance } } |
删除重复的单词
object RecoverableNetworkWordCount { def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String) : StreamingContext = { println("Creating new context") val outputFile = new File(outputPath) if (outputFile.exists()) outputFile.delete() val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount").setMaster("local[2]") val ssc = new StreamingContext( sparkConf, Seconds(120) ) ssc.checkpoint( checkpointDirectory ) val lines = ssc.socketTextStream(ip, port) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map((_, 1)).reduceByKey(_ + _) wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => val blacklist = WordBlacklist.getInstance(rdd.sparkContext) val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter.add(count) false } else { true } }.collect().mkString("[", ", ", "]") val output = "Counts at time " + time + " " + counts println(output) println("Dropped " + droppedWordsCounter.value + " word(s) totally") println("Appending to " + outputFile.getAbsolutePath) Files.append(droppedWordsCounter.value + "\n", outputFile, Charset.defaultCharset()) } ssc } |
主函数
def main(args: Array[String]) { val Array(ip, port, checkpointDirectory, outputPath) = Array("10.2.5.3","9999","E:\\point","E:\\out\\test.txt") val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createContext(ip, port.toInt, outputPath, checkpointDirectory)) ssc.start() ssc.awaitTermination() } |