package com.aimuti.SparkStreaming import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * sparkStreaming整合kafka */ object KafkaWordCount { val updateFunc = (iter:Iterator[(String, Seq[Int], Option[Int])]) => { // iter.flatMap(x => Some(x._2.sum + x._3.getOrElse(0)).map(y => (x._1,y))) iter.flatMap{case(x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x,i))} } def main(args: Array[String]): Unit = { val Array(zkQuorum,group,topics,numThreads) = args val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val smc = new StreamingContext(conf,Seconds(5)) smc.checkpoint("d://ck2") val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap val data = KafkaUtils.createStream(smc,zkQuorum,group,topicMap,StorageLevel.MEMORY_AND_DISK_SER) val words = data.map(_._2).flatMap(_.split(" ")) val wordCounts = words.map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(smc.sparkContext.defaultParallelism), true) wordCounts.print() smc.start() smc.awaitTermination() } }
启动zookeeper,启动kafka,再启动生产者