官网http://spark.apache.org/docs/latest/streaming-programming-guide.htmlhtml
1.安装并启动生成者node
首先在一台Linux(ip:192.168.10.101)上用YUM安装nc工具sql
yum install -y ncapache
启动一个服务端并监听9999端口socket
nc -lk 9999ide
2.编写Spark Streaming程序函数
package org.apache.spark import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds object TCPWordCount { def main(args: Array[String]) { //setMaster("local[2]")本地执行2个线程,一个用来接收消息,一个用来计算 val conf = new SparkConf().setMaster("local[2]").setAppName("TCPWordCount") //建立spark的streaming,传入间隔多长时间处理一次,间隔在5秒左右,不然打印控制台信息会被冲掉 val scc = new StreamingContext(conf, Seconds(5)) //读取数据的地址:从某个ip和端口收集数据 val lines = scc.socketTextStream("192.168.74.100", 9999) //进行rdd处理 val results = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) //将结果打印控制台 results.print() //启动spark streaming scc.start() //等待终止 scc.awaitTermination() } }
3.启动Spark Streaming程序:因为使用的是本地模式"local[2]"因此能够直接在本地运行该程序工具
注意:要指定并行度,如在本地运行设置setMaster("local[2]"),至关于启动两个线程,一个给receiver,一个给computer。若是是在集群中运行,必需要求集群中可用core数大于1测试
4.在Linux端命令行中输入单词网站
5.在IDEA控制台中查看结果
问题:结果每次在Linux段输入的单词次数都被正确的统计出来,可是结果不能累加!若是须要累加须要使用updateStateByKey(func)来更新状态,下面给出一个例子:
package org.apache.spark import org.apache.spark.HashPartitioner import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext object TCPWordCountUpdate { /** * String:某个单词 * Seq:[1,1,1,1,1,1],当前批次出现的次数的序列 * Option:历史的结果的sum */ val updateFunction = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0))) //iter.map{case(x,y,z)=>(x,y.sum+z.getOrElse(0))} } def updateFunction2(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { Some(newValues.sum + runningCount.getOrElse(0)) } def main(args: Array[String]) { //setMaster("local[2]")本地执行2个线程,一个用来接收消息,一个用来计算 val conf = new SparkConf().setMaster("local[2]").setAppName("TCPWordCount") //建立spark的streaming,传入间隔多长时间处理一次,间隔在5秒左右,不然打印控制台信息会被冲掉 val scc = new StreamingContext(conf, Seconds(5)) scc.checkpoint("./")//读取数据的地址:从某个ip和端口收集数据 val lines = scc.socketTextStream("192.168.74.100", 9999) //进行rdd处理 /** * updateStateByKey()更新数据 * 一、更新数据的具体实现函数 * 二、分区信息 * 三、boolean值 */ //val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction2 _) val results = lines.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction, new HashPartitioner(scc.sparkContext.defaultParallelism), true) //将结果打印控制台 results.print() //启动spark streaming scc.start() //等待终止 scc.awaitTermination() } }
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkSqlTest { def main(args: Array[String]) { LoggerLevels.setStreamingLogLevels() val conf = new SparkConf().setAppName("sparksql").setMaster("local[2]") val ssc = new StreamingContext(conf,Seconds(5)) ssc.checkpoint("./") val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.74.100",9999) val result: DStream[(String, Int)] = textStream.flatMap(_.split(" ")).map((_,1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(5),Seconds(5)) result.print() ssc.start() ssc.awaitTermination() } }
1.安装并配置zk
2.安装并配置Kafka
3.启动zk
4.启动Kafka
5.建立topic
bin/kafka-topics.sh --create --zookeeper node1.itcast.cn:2181,node2.itcast.cn:2181 \
--replication-factor 3 --partitions 3 --topic urlcount
6.编写Spark Streaming应用程序
package cn.itcast.spark.streaming package cn.itcast.spark import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object UrlCount { val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))} } def main(args: Array[String]) { //接收命令行中的参数 // val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args val Array(zkQuorum, groupId, topics, numThreads) = Array[String]("master1ha:2181,master2:2181,master2ha:2181","g1","wangsf-test","2") //建立SparkConf并设置AppName val conf = new SparkConf().setAppName("UrlCount") //建立StreamingContext val ssc = new StreamingContext(conf, Seconds(2)) //设置检查点 ssc.checkpoint(hdfs) //设置topic信息 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap //重Kafka中拉取数据建立DStream val lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2) //切分数据,截取用户点击的url val urls = lines.map(x=>(x.split(" ")(6), 1)) //统计URL点击量 val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true) //将结果打印到控制台 result.print() ssc.start() ssc.awaitTermination() } }
生产数据测试:
kafka-console-producer.sh --broker-list h2slave1:9092 --topic wangsf-test