六、SparkStream的滑动窗口函数

##Spark提供了一个滑动窗口给咱们使用,借助于它,咱们很方便就能够按照本身想要的时间间隔来处理数据。好比说:股票一分钟的展现,一分钟最新动态java

##前面的处理都同样,该从Kafka哪里读数据仍是同样。只是不同的是在reduceByKey变成了reduceByKeyWithWindow(updateFunc,Seconds(30),Seconds(20))apache

#若是是须要累加求和,那么须要定义这个updateFunc方法。它里面须要传递两个参数,为本身定义的case calss组装的信息。数组

#代码实现以下: package com.liufu.org.streaming import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils函数

/**
  * Created by liufu on 2016/11/20.
  */
object StreamAndWindow {
  //定义一个累计函数,将之前的数据和如今的数据加起来,而后继续保持在checkpoint
  val updateFunc = (old:MsgBean, n:MsgBean) => {
    MsgBean(old.downFloe + n.downFloe, old.upflow + n.upflow)
  }

  def main(args: Array[String]): Unit = {

    //定义一个数组来对接main方法传入的参数。
    val Array(zkList,cumsumorGroup,topics,numThreads) = args

    //将topics和numThreads组装成一个map。这是当去消费kafka数据的时候须要的。
    val topic_numThread: Map[String, Int] = topics.split(",").map((_,numThreads.toInt)).toMap

    val conf = new SparkConf().setAppName("streamTest").setMaster("local[2]")
    //建立Streamingcomtext对象,而后指定处理数据的时间间隔
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(10))

    //设置一个文件目录,用于保存之前数据。
    ssc.checkpoint("file:///E:/checkpoint")

    //读取Kafka的数据。
    val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkList,cumsumorGroup,topic_numThread,StorageLevel.MEMORY_AND_DISK)

    //将kafka的数据拿出来处理
    val msgBeanSet: DStream[((String), MsgBean)] = kafkaStream.map(line => {
      val fields = line._2.split(",")
      ((fields(0)), MsgBean(fields(1).toInt, fields(2).toInt))
    })

    //既然是reduceByKey那么数据必须是k-v类型。千方百计组装便可。
    val result: DStream[(String, MsgBean)] = msgBeanSet.reduceByKeyAndWindow(updateFunc,Seconds(40),Seconds(20))

    result.print()

//也能够将这个result写入到redies或者Hbase中。

    //必定要启动streamContext程序,而后一直等待,不然任务不会提交的。
    ssc.start()
    ssc.awaitTermination()
  }
}

//定义一个case class ,用来组装切分后的消息。
case class MsgBean(upflow:Int,downFloe:Int)

#总结:spa

  • 1:利用case class封装字段数据,等价于javaBean
  • 2:能够将result这个Dstream 写入到Redies或者Hbase中。而Dstream可能有多个分区,咱们能够以一个分区做为单位,而后建立一个链接一次性写入进去,最后在断开这个链接。
  • 3:在流式计算中,最好搞一个redies或者Hbase的链接池,由于程序一直在运行,因此频繁的建立和销毁链接开销很大。
  • 4:Redies3.0已经能够集群了。
相关文章
相关标签/搜索