SparkStreaming整合kafka

package com.aimuti.SparkStreaming

import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * sparkStreaming整合kafka
  */
object KafkaWordCount {
  val updateFunc = (iter:Iterator[(String, Seq[Int], Option[Int])]) => {
//    iter.flatMap(x => Some(x._2.sum + x._3.getOrElse(0)).map(y => (x._1,y)))
    iter.flatMap{case(x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x,i))}
  }
  def main(args: Array[String]): Unit = {
    val Array(zkQuorum,group,topics,numThreads) = args
    val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val smc = new StreamingContext(conf,Seconds(5))
    smc.checkpoint("d://ck2")

    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
    val data = KafkaUtils.createStream(smc,zkQuorum,group,topicMap,StorageLevel.MEMORY_AND_DISK_SER)
    val words = data.map(_._2).flatMap(_.split(" "))
    val wordCounts = words.map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(smc.sparkContext.defaultParallelism), true)

    wordCounts.print()
    smc.start()
    smc.awaitTermination()
  }
}

在这里插入图片描述

启动zookeeper,启动kafka,再启动生产者在这里插入图片描述
在这里插入图片描述