Spark Streaming的核心DStream之转换操做实例

DStream的转化操做

DStream API提供的与转化操做相关的方法以下:
Spark Streaming的核心DStream之转换操做实例apache

以下举例详解transform(func) 方法和 updateStateByKey(func) 方法:服务器

(1)、transform(func) 方法

transform 方法及相似的 transformWith(func) 方法容许在 DStream 上应用任意 RDD-to-RDD 函数,它们能够被应用于未在 DStream API 中暴露的任何 RDD 操做中。
下面举例演示如何使用transform(func) 方法将一行语句分隔成多个单词,具体步骤以下:
A、在Liunx中执行命令nc –lk 9999 启动服务器且监听socket服务,而且输入数据I like spark streaming and Hadoop,具体命令以下:
Spark Streaming的核心DStream之转换操做实例
B、打开IDEA开发工具,建立一个Maven项目,而且配置pom.xml文件,引入Spark Streaming相关的依赖包,pom.xml文件配置具体以下:
<!--设置依赖版本号-->socket

<properties>
    <spark.version>2.1.1</spark.version>
    <scala.version>2.11</scala.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
</dependencies>

注意:配置好pom.xml文件后,须要在项目的/src/main和/src/test目录下分别建立scala目录。
C、在项目的/src/main/scala目录下建立包,接着建立一个名为TransformTest的scala类,主要用于编写SparkStreaming应用程序,实现一行语句分隔成多个单词的功能,具体代码以下(带注释):ide

package SparkStreaming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object TransformTest {
  def main(args: Array[String]): Unit = {
    //建立SparkConf对象
    val sparkconf = new SparkConf().setAppName("TransformTest").setMaster("local[2]")
    //建立SparkContext对象
    val sc = new SparkContext(sparkconf)
    //设置日志级别
    sc.setLogLevel("WARN")
    //建立StreamingContext,须要建立两个参数,分别为SparkContext和批处理时间间隔
    val ssc = new StreamingContext(sc,Seconds(5))
    //链接socket服务,须要socket服务地址、端口号以及存储级别(默认的)
    val dstream:ReceiverInputDStream[String] = ssc.socketTextStream("192.168.169.200",9999)
    //经过空格分隔
    val words:DStream[String] = dstream.transform(line => line.flatMap(_.split(" ")))
    //打印输出结果
    words.print()
    //开启流式计算
    ssc.start()
    //用于保护程序正常运行
    ssc.awaitTermination()
  }
}

D、运行程序能够看出,语句I like spark streaming and Hadoop在5s内被分割成6个单词,结果以下图:
Spark Streaming的核心DStream之转换操做实例函数

(2)、 updateStateByKey(func) 方法

updateStateByKey(func) 方法能够保持任意状态,同时容许不断有新的信息进行更新。
下面举例演示如何使用updateStateByKey(func) 方法进行词频统计:
在项目的/src/main/scala目录下建立包,接着建立一个名为UpdateStateByKeyTest的scala类,主要用于编写SparkStreaming应用程序,实现词频统计,具体代码以下:工具

package SparkStreaming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object UpdateStateByKeyTest {

  def updateFunction(newValues:Seq[Int],runningCount:Option[Int]) : Option[Int] = {
    val newCount = runningCount.getOrElse(0)+newValues.sum
    Some(newCount)
  }
  def main(args: Array[String]): Unit = {
    //建立SparkConf对象
    val sparkconf = new SparkConf().setAppName("UpdateStateByKeyTest").setMaster("local[2]")
    //建立SparkContext对象
    val sc = new SparkContext(sparkconf)
    //设置日志级别
    sc.setLogLevel("WARN")
    //建立StreamingContext,须要建立两个参数,分别为SparkContext和批处理时间间隔
    val ssc = new StreamingContext(sc,Seconds(5))
    //链接socket服务,须要socket服务地址、端口号以及存储级别(默认的)
    val dstream:ReceiverInputDStream[String] = ssc.socketTextStream("192.168.169.200",9999)
    //经过逗号分隔第一个字段和第二个字段
    val words:DStream[(String,Int)] = dstream.flatMap(_.split(" ")).map(word => (word,1))
    //调用updateStateByKey操做
    var result:DStream[(String,Int)] = words.updateStateByKey(updateFunction)
    //若是用到updateStateByKey,此处要加上ssc.checkpoint("目录")这一句,不然会报错:
    // The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
    //为何要用到checkpoint?由于以前的案例是没有状态的,用完以后就丢掉,不须要了,
    // 可是如今要用到以前的那些数据,要把以前的状态保留下来
    //“.”的意思是当前目录
    ssc.checkpoint(".")
    //打印输出结果
    result.print()
    //开启流式计算
    ssc.start()
    //用于保护程序正常运行
    ssc.awaitTermination()
  }
}

而后在9999端口不断输入单词,具体内容以下:oop

Spark Streaming的核心DStream之转换操做实例
运行程序从控制台输出的结果看出每隔5s接受一次数据,一共接受了两次数据,而且每接受一次数据就会进行词频统计并输出结果。
Spark Streaming的核心DStream之转换操做实例开发工具

相关文章
相关标签/搜索