##Spark提供了一个滑动窗口给咱们使用,借助于它,咱们很方便就能够按照本身想要的时间间隔来处理数据。好比说:股票一分钟的展现,一分钟最新动态java
##前面的处理都同样,该从Kafka哪里读数据仍是同样。只是不同的是在reduceByKey变成了reduceByKeyWithWindow(updateFunc,Seconds(30),Seconds(20))apache
#若是是须要累加求和,那么须要定义这个updateFunc方法。它里面须要传递两个参数,为本身定义的case calss组装的信息。数组
#代码实现以下: package com.liufu.org.streaming import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils函数
/** * Created by liufu on 2016/11/20. */ object StreamAndWindow { //定义一个累计函数,将之前的数据和如今的数据加起来,而后继续保持在checkpoint val updateFunc = (old:MsgBean, n:MsgBean) => { MsgBean(old.downFloe + n.downFloe, old.upflow + n.upflow) } def main(args: Array[String]): Unit = { //定义一个数组来对接main方法传入的参数。 val Array(zkList,cumsumorGroup,topics,numThreads) = args //将topics和numThreads组装成一个map。这是当去消费kafka数据的时候须要的。 val topic_numThread: Map[String, Int] = topics.split(",").map((_,numThreads.toInt)).toMap val conf = new SparkConf().setAppName("streamTest").setMaster("local[2]") //建立Streamingcomtext对象,而后指定处理数据的时间间隔 val ssc: StreamingContext = new StreamingContext(conf,Seconds(10)) //设置一个文件目录,用于保存之前数据。 ssc.checkpoint("file:///E:/checkpoint") //读取Kafka的数据。 val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkList,cumsumorGroup,topic_numThread,StorageLevel.MEMORY_AND_DISK) //将kafka的数据拿出来处理 val msgBeanSet: DStream[((String), MsgBean)] = kafkaStream.map(line => { val fields = line._2.split(",") ((fields(0)), MsgBean(fields(1).toInt, fields(2).toInt)) }) //既然是reduceByKey那么数据必须是k-v类型。千方百计组装便可。 val result: DStream[(String, MsgBean)] = msgBeanSet.reduceByKeyAndWindow(updateFunc,Seconds(40),Seconds(20)) result.print() //也能够将这个result写入到redies或者Hbase中。 //必定要启动streamContext程序,而后一直等待,不然任务不会提交的。 ssc.start() ssc.awaitTermination() } } //定义一个case class ,用来组装切分后的消息。 case class MsgBean(upflow:Int,downFloe:Int)
#总结:spa