package com.xp.cn.streaming import org.apache.log4j.{Level, Logger} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{HashPartitioner, SparkContext, SparkConf} /** * Created by xupan on 2017/12/16. */ object UpdateStateByKeyWordCount { def main(args: Array[String]) { Logger.getLogger("org").setLevel(Level.ERROR) //建立conf,spark streaming至少要启动两个线程,一个负责接受数据,一个负责处理数据 val conf = new SparkConf().setAppName("FirstStreaming").setMaster("local[2]") //建立SparkContext val sc = new SparkContext(conf) //建立StreamingContext,每隔10秒产生一个批次 val ssc = new StreamingContext(sc, Seconds(10)) //设置checkpoint,hdfs或则本地文件系统 ssc.checkpoint("data/checkpoint") val zkQuorum = "xupan001:2181,xupan002:2181,xupan003:2181" val groupId = "g1" val topic = Map[String,Int]("test001" -> 1) //建立kafkaDStream //createDirectStream:上产环境用,底层API, //createStream,效率低,数据可能丢失 val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic) val lines = data.map(_._2) //Option[Int]:历史结果 //Seq[Int]:单词出现的个数 //String:单词 val updateFUN = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { //iter.map(v => (v._1, v._2.sum + v._3.getOrElse(0))) iter.map { case (x, y, z) => { (x, y.sum + z.getOrElse(0)) } } //上下等价 } val countSD = lines .flatMap(_.split(" ")) .map(word => (word, 1)) .updateStateByKey(updateFUN, new HashPartitioner(ssc.sparkContext.defaultMinPartitions), true) //action触发,每次计算只是计算当前批次的结果 countSD.print() //启动,开始接收数据并用streamingContext.start() ssc.start() //等待处理中止,streamingContext.awaitTermination() ssc.awaitTermination() } }
-------------------------------------------
Time: 1513416280000 ms
-------------------------------------------
(d,1)
(s,1)
(a,1)apache
-------------------------------------------
Time: 1513416290000 ms
-------------------------------------------
(d,3)
(s,3)
(a,3)spa
-------------------------------------------
Time: 1513416300000 ms
-------------------------------------------
(d,3)
(w,1)
(s,3)
(e,1)
(a,3)
(q,1)线程
-------------------------------------------
Time: 1513416310000 ms
-------------------------------------------
(d,3)
(w,2)
(s,3)
(e,2)
(a,3)
(q,2)scala
-------------------------------------------
Time: 1513416320000 ms
-------------------------------------------
(d,4)
(w,2)
(s,4)
(e,2)
(a,4)
(q,2)code
问题:get
若是程序退出,不会累加以前的数据kafka