1、Spark Streaming工做原理
Spark Streaming 是 Spark Core API 的扩展,它支持弹性的,高吞吐的,容错的实时数据流的处理。数据能够经过多种数据源获取,例如 Kafka,Flume,Kinesis 以及 TCP sockets数据源。也能够经过例如 map,reduce,join,window 等的高级函数组成的复杂算法处理。最终,处理后的数据能够输出到文件系统,数据库以及实时仪表盘中。
Spark Streaming支持的输入、输出源以下图:
Spark Streaming的工做原理图以下:
Spark Streaming 提供了一个名为 discretized stream 或 DStream 的高级抽象,它表明一个连续的数据流。DStream 能够从数据源的输入数据流建立,例如 Kafka,Flume 以及 Kinesis,或者在其余 DStream 上进行高层次的操做以建立。
算法
2、Spark Streaming的核心DStream
一、DStream的转化操做
DStream API提供的与转化操做相关的方法以下:。
以下举例详解transform(func) 方法和 updateStateByKey(fhnc) 方法:
(1)、transform(func) 方法
transform 方法及相似的 transformWith(func) 方法容许在 DStream 上应用任意 RDD-to-RDD 函数,它们能够被应用于未在 DStream API 中暴露的任何 RDD 操做中。
下面举例演示如何使用transform(func) 方法将一行语句分隔成多个单词,具体步骤以下:
A、在Liunx中执行命令启动9999服务器且监听socket服务,而且输入数据I like spark streaming and Hadoop,具体命令以下:
B、打开IDEA开发工具,建立一个Maven项目,而且配置pom.xml文件,引入Spark Streaming相关的依赖包,pom.xml文件配置具体以下:
数据库
<!--设置依赖版本号--> <properties> <spark.version>2.1.1</spark.version> <scala.version>2.11</scala.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> </dependencies>
注意:配置好pom.xml文件后,须要在项目的/src/main和/src/test目录下分别建立scala目录。
C、在项目的/src/main/scala目录下建立包,接着建立一个名为TransformTest的scala类,主要用于编写SparkStreaming应用程序,实现一行语句分隔成多个单词的功能,具体代码以下(带注释):
apache
package SparkStreaming import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object TransformTest { def main(args: Array[String]): Unit = { //建立SparkConf对象 val sparkconf = new SparkConf().setAppName("TransformTest").setMaster("local[2]") //建立SparkContext对象 val sc = new SparkContext(sparkconf) //设置日志级别 sc.setLogLevel("WARN") //建立StreamingContext,须要建立两个参数,分别为SparkContext和批处理时间间隔 val ssc = new StreamingContext(sc,Seconds(5)) //链接socket服务,须要socket服务地址、端口号以及存储级别(默认的) val dstream:ReceiverInputDStream[String] = ssc.socketTextStream("192.168.169.200",9999) //经过空格分隔 val words:DStream[String] = dstream.transform(line => line.flatMap(_.split(" "))) //打印输出结果 words.print() //开启流式计算 ssc.start() //用于保护程序正常运行 ssc.awaitTermination() } }
D、运行程序能够看出,语句I like spark streaming and Hadoop在5s内被分割成6个单词,结果以下图:
数组
(2)、 updateStateByKey(func) 方法
updateStateByKey(func) 方法能够保持任意状态,同时容许不断有新的信息进行更新。
下面举例演示如何使用updateStateByKey(func) 方法进行词频统计:
在项目的/src/main/scala目录下建立包,接着建立一个名为UpdateStateByKeyTest的scala类,主要用于编写SparkStreaming应用程序,实现词频统计,具体代码以下:
服务器
package SparkStreaming import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object UpdateStateByKeyTest { def updateFunction(newValues:Seq[Int],runningCount:Option[Int]) : Option[Int] = { val newCount = runningCount.getOrElse(0)+newValues.sum Some(newCount) } def main(args: Array[String]): Unit = { //建立SparkConf对象 val sparkconf = new SparkConf().setAppName("UpdateStateByKeyTest").setMaster("local[2]") //建立SparkContext对象 val sc = new SparkContext(sparkconf) //设置日志级别 sc.setLogLevel("WARN") //建立StreamingContext,须要建立两个参数,分别为SparkContext和批处理时间间隔 val ssc = new StreamingContext(sc,Seconds(5)) //链接socket服务,须要socket服务地址、端口号以及存储级别(默认的) val dstream:ReceiverInputDStream[String] = ssc.socketTextStream("192.168.169.200",9999) //经过逗号分隔第一个字段和第二个字段 val words:DStream[(String,Int)] = dstream.flatMap(_.split(" ")).map(word => (word,1)) //调用updateStateByKey操做 var result:DStream[(String,Int)] = words.updateStateByKey(updateFunction) //若是用到updateStateByKey,此处要加上ssc.checkpoint("目录")这一句,不然会报错: // The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint(). //为何要用到checkpoint?由于以前的案例是没有状态的,用完以后就丢掉,不须要了, // 可是如今要用到以前的那些数据,要把以前的状态保留下来 //“.”的意思是当前目录 ssc.checkpoint(".") //打印输出结果 result.print() //开启流式计算 ssc.start() //用于保护程序正常运行 ssc.awaitTermination() } }
而后在9999端口不断输入单词,具体内容以下:
socket
运行程序从控制台输出的结果看出每隔5s接受一次数据,一共接受了两次数据,而且每接受一次数据就会进行词频统计并输出结果。
ide