Spark Streaming概述html
做者:尹正杰算法
版权声明:原创做品,谢绝转载!不然将追究法律责任。数据库
一.Spark Streaming概览apache
1>.什么是Spark Streaming架构
Spark Streaming是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理。
以下图所示,Spark Streaming支持的数据输入源不少,例如:Kafka、Flume、Twitter、ZeroMQ,Kinesis,HDFS,简单的TCP套接字,甚至你还能够自定义数据源等等。
数据输入后能够用Spark的高度抽象原语如:map、reduce、join、window等进行运算。
最后,能够将处理后的数据推送到文件系统,数据库和实时仪表板。
实际上,您能够在数据流上应用Spark的机器学习和图形处理算法。
在内部,它的工做方式以下。Spark Streaming接收实时输入数据流,并将数据分为批次,而后由Spark引擎进行处理,以生成批次的最终结果流。
和Spark基于RDD的概念很类似,Spark Streaming使用离散化流(discretized stream)做为抽象表示,叫做DStream。DStream是随时间推移而收到的数据的序列。在内部,每一个时间区间收到的数据都做为 RDD 存在,而DStream是由这些RDD所组成的序列(所以得名“离散化”)。 Spark Streaming提供了称为离散流或DStream的高级抽象,它表示连续的数据流。DStreams能够根据来自Kafka和Kinesis等来源的输入数据流来建立,也能够经过对其余DStreams应用高级操做来建立。在内部,DStream表示为RDD序列。 博主推荐阅读: http://spark.apache.org/docs/latest/streaming-programming-guide.html
2>.Spark Streaming的特色机器学习
易用:
Spark Streaming将Apache Spark的 语言集成API 引入流处理,使您能够像编写批处理做业同样编写流做业。它支持Java,Scala和Python。
容错:
Spark Streaming能够当即恢复丢失的工做和操做员状态(例如,滑动窗口),而无需任何额外的代码。
易整合到Spark体系:
经过在Spark上运行,Spark Streaming可以让您将相同的代码重用于批处理,针对历史数据加入流或对流状态运行临时查询。构建功能强大的交互式应用程序,而不单单是分析。
3>.Spark Streaming架构socket
二.DStream入门案例(wordcount)ide
需求说明:
使用netcat工具向8888端口不断的发送数据,经过SparkStreaming读取端口数据并统计不一样单词出现的次数。
1>.添加依赖关系工具
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.1</version> </dependency>
2>.安装netcat工具并监听相应端口oop
[root@hadoop101.yinzhengjie.org.cn ~]# yum -y install nc Loaded plugins: fastestmirror Loading mirror speeds from cached hostfile * base: mirrors.aliyun.com * extras: mirrors.aliyun.com * updates: mirror.bit.edu.cn Resolving Dependencies --> Running transaction check ---> Package nmap-ncat.x86_64 2:6.40-19.el7 will be installed --> Finished Dependency Resolution Dependencies Resolved ============================================================================================================================================================================================================================================================================== Package Arch Version Repository Size ============================================================================================================================================================================================================================================================================== Installing: nmap-ncat x86_64 2:6.40-19.el7 base 206 k Transaction Summary ============================================================================================================================================================================================================================================================================== Install 1 Package Total download size: 206 k Installed size: 423 k Downloading packages: nmap-ncat-6.40-19.el7.x86_64.rpm | 206 kB 00:00:00 Running transaction check Running transaction test Transaction test succeeded Running transaction Installing : 2:nmap-ncat-6.40-19.el7.x86_64 1/1 Verifying : 2:nmap-ncat-6.40-19.el7.x86_64 1/1 Installed: nmap-ncat.x86_64 2:6.40-19.el7 Complete! [root@hadoop101.yinzhengjie.org.cn ~]#
[root@hadoop101.yinzhengjie.org.cn ~]# nc -lk 8888 #监听端口
3>.编写wordcount代码
package com.yinzhengjie.bigdata.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object WordCount { def main(args: Array[String]): Unit = { /** * 1>.初始化Spark配置信息 */ val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingWordCount") /** * 2>.初始化SparkStreamingContext(实时数据分析环境对象) * * 自定义采集周期: * 以指定的时间为周期采集实时数据。我这里指定采集周期是5秒.生产环境中咱们能够将这个值改小,好比每秒采集一次. */ val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * 3>.经过监控端口建立DStream,读进来的数据为一行行(即从指定端口中采集数据) */ val socketLineDStream:ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101.yinzhengjie.org.cn", 8888) /** * 4>.将采集的数据进行扁平化操做(即将每一行数据作切分,造成一个个单词) */ val wordDStreams:DStream[String] = socketLineDStream.flatMap(_.split(" ")) /** * 5>.将数据进行结构的转换方便统计分析(即将单词映射成元组(word,1)) */ val wordAndOneStreams:DStream[(String,Int)] = wordDStreams.map((_, 1)) /** * 6>.将相同的单词次数作统计 */ val wordToCountDStream:DStream[(String,Int)] = wordAndOneStreams.reduceByKey(_+_) /** * 7>.将结果打印出来 */ wordToCountDStream.print() /** * 8>.启动(SparkStreamingContext)采集器 */ ssc.start() /** * 9>.Driver等待采集器的执行(即禁止main线程主动退出) */ ssc.awaitTermination() /** * 舒适提示: * 我们的程序是实时处理数据的,所以生产环境中不能中止采集程序,所以不建议使用哟~ */ // ssc.stop() } }
三.博主推荐阅读
Spark Streaming-DStream实战案例: https://www.cnblogs.com/yinzhengjie2020/p/13233192.html