1、Streaming与Flume的联调html
import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /* * Spark Streaming整合Flume的第一种方式 * */ object FlumePushWordCount { def main(args: Array[String]): Unit = { //外部传入参数 if (args.length != 2) { System.out.println("Usage: FlumePushWordCount <hostname> <port>") System.exit(1) } val Array(hostname, port) = args //外部args数组 val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //选择输入ssc的createStream方法,生成一个InputDStream val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt) //因为flume的内容有head有body, 须要先把内容拿出来, 并去掉空值 flumeStream.map(x => new String(x.event.getBody.array()).trim) .flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /* * SparkStreaming对接Kafka其中的Receiver-based方式 * */ object KafkaReceiverWordCount { def main(args: Array[String]): Unit = { if (args.length != 4) { System.out.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //createStream须要传入的其中一个参数是一个Map,就是topics对应的线程数 val topicsMap = topics.split(",").map((_, numThreads.toInt)).toMap val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicsMap) //必定要取Stream的第二位才是数据,能够print出来看看,在实际生产中只是更改这一行的业务逻辑!!! message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /* * SparkStreaming对接Kafka其中的Direct方式 * */ object KafkaDirectWordCount { def main(args: Array[String]): Unit = { if (args.length != 4) { System.out.println("Usage: KafkaReceiverWordCount <brokers> <topics>") System.exit(1) } val Array(brokers, topics) = args val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //createDirectStream须要传入kafkaParams和topicsSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val topicsSet = topics.split(",").toSet val message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet ) //必定要取Stream的第二位才是数据,能够print出来看看 message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }