Streaming是一种数据传输技术,它把客户机收到的数据变成一个稳定连续的流,源源不断的输出,使用户听到的声音和图像十分稳定,而用户在整个文件传输完成开始前就能够浏览文件。html
常见的流式计算框架:shell
l Apache stormapache
l Spark streaming编程
l Apache samzaapi
上述三种实时计算系统都是开源分布式系统,具备低延迟,可扩展和容错性诸多优势,他们的共同特点在于:容许你在运行数据流代码时,将任务分配到一系列具备容错能力的计算机上并行运行。此外,他们都提供了简单的api来简化底层复杂的程度。app
实时计算框架的对比参考文档:http://www.csdn.net/article/2015-03-09/2824135框架
Spark Streaming是对spark core api的扩展,他是一个分布式的,高吞吐量,具备容错性的实时数据处理系统。socket
Spark streaming处理数据时一批一批处理的,所以spark streaming仅是一个准实时处理系统,其底层本质上仍是基于spark core的批处理应用。分布式
参考:http://spark.apache.org/docs/1.3.0/streaming-programming-guide.htmlide
一、在shell中运行下面命令:
$ nc -lk 9999
二、打开另外一个shell,运行下面命令:
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
三、在第一个客户端下输入一些以空格分割的单词,在第二个shell端能够实时看到对这些输入进行的单词统计:
四、从以上例子中咱们能够整理出spark streaming的编程模型
//导入依赖包 import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ //初始化StreamingContext对象 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) //如下定义了从哪里读取数据 val lines = ssc.socketTextStream("localhost", 9999) //如下是真正的功能实现 val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() //启动spark streaming ssc.start() ssc.awaitTermination() |
五、初始化StreamingContext的两种方式:
1) 从sparkConf建立,一般用于在idea中编程使用。
2) 从已有的spark contact对象建立,通常应用于spark-shell测试使用。
六、spark streaming读取hdfs数据
6.1)代码:
//导入依赖包 import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ //初始化StreamingContext对象 val ssc = new StreamingContext(sc, Seconds(1)) //如下定义了从哪里读取数据 val lines = ssc.textFileStream("hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/") //如下是真正的功能实现 val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() //启动spark streaming ssc.start() ssc.awaitTermination() |
6.2)在spark-shell上运行上述代码:
建立spark streaming读取hdfs目录:
$ bin/hdfs dfs -mkdir hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/
准备数据:
$ cat /opt/datas/wc.input
hadoop
hdfs yarn mapreduce zookeeper
hive
sqoop flume oozie hue
hbase
storm scala kafka spark
启动spark-shell,手动运行以上代码:
$ bin/spark-shell --master local[2]
scala> import org.apache.spark._
import org.apache.spark._
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._
scala> import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext._
scala> val ssc = new StreamingContext(sc, Seconds(1))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@714e203a
scala> val lines = ssc.textFileStream("hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/")
17/07/12 16:56:40 INFO FileInputDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.dstream.FileInputDStream@3d18ac9
lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@74462773
scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@55322d12
scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@4d0fc96d
scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@34e46a44
scala> wordCounts.print()
//运行如下代码,即启动spark shell
scala> ssc.start()
scala> ssc.awaitTermination()
另起一个shell终端,将测试数据上传到hdfs下hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/目录下:
$ bin/hdfs dfs -put /opt/datas/wc.input hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/1
这时咱们可能从spark-shell终端获取spark streaming的输出,以下:
-------------------------------------------
Time: 1499850053000 ms
-------------------------------------------
(scala,1)
(hive,1)
(oozie,1)
(mapreduce,1)
(zookeeper,1)
(hue,1)
(yarn,1)
(kafka,1)
(sqoop,1)
(spark,1)
...
6.3)简化的测试方法
咱们能够发现,以上方法进行spark开发,须要一行一行加载代码,这种方式比较麻烦,那么有没有好的方法一次性加载全部代码呢?固然是存在的,下面咱们测试一下经过spark-shell中加载scala文件的方式进行开发测试:
首先建立一个文件用于存储上述代码:
$ cat /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/HDFSSparkStreaming.scala
//导入依赖包
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
//初始化StreamingContext对象
val ssc = new StreamingContext(sc, Seconds(1))
//如下定义了从哪里读取数据
val lines = ssc.textFileStream("hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/")
//如下是真正的功能实现
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
//启动spark streaming
ssc.start()
ssc.awaitTermination()
删除hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/目录下的全部文件:
$ bin/hdfs dfs -rm hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/*
启动一个spark-shell:
$ bin/spark-shell --master local[2]
Spark-shell以文本方式运行scala代码:
scala> :load /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/HDFSSparkStreaming.scala
另起客户端想目标目录传递文件:
$ bin/hdfs dfs -put /opt/datas/wc.input hdfs://chavin.king:9000/user/hadoop/mapreduce/wordcount/stream/1