Spark Streaming概述

             Spark Streaming概述html

                                     做者:尹正杰算法

版权声明:原创做品,谢绝转载!不然将追究法律责任。数据库

 

 

 

一.Spark Streaming概览apache

1>.什么是Spark Streaming架构

  Spark Streaming是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理。

  以下图所示,Spark Streaming支持的数据输入源不少,例如:Kafka、Flume、Twitter、ZeroMQ,Kinesis,HDFS,简单的TCP套接字,甚至你还能够自定义数据源等等。

  数据输入后能够用Spark的高度抽象原语如:map、reduce、join、window等进行运算。

  最后,能够将处理后的数据推送到文件系统,数据库和实时仪表板。

  实际上,您能够在数据流上应用Spark的机器学习和图形处理算法。

  在内部,它的工做方式以下。Spark Streaming接收实时输入数据流,并将数据分为批次,而后由Spark引擎进行处理,以生成批次的最终结果流。

  和Spark基于RDD的概念很类似,Spark Streaming使用离散化流(discretized stream)做为抽象表示,叫做DStream。DStream是随时间推移而收到的数据的序列。在内部,每一个时间区间收到的数据都做为 RDD 存在,而DStream是由这些RDD所组成的序列(所以得名“离散化”)。

  Spark Streaming提供了称为离散流或DStream的高级抽象,它表示连续的数据流。DStreams能够根据来自Kafka和Kinesis等来源的输入数据流来建立,也能够经过对其余DStreams应用高级操做来建立。在内部,DStream表示为RDD序列。

  博主推荐阅读:
    http://spark.apache.org/docs/latest/streaming-programming-guide.html

2>.Spark Streaming的特色机器学习

  易用:
    Spark Streaming将Apache Spark的 语言集成API 引入流处理,使您能够像编写批处理做业同样编写流做业。它支持Java,Scala和Python。

  容错:
    Spark Streaming能够当即恢复丢失的工做和操做员状态(例如,滑动窗口),而无需任何额外的代码。

  易整合到Spark体系:
    经过在Spark上运行,Spark Streaming可以让您将相同的代码重用于批处理,针对历史数据加入流或对流状态运行临时查询。构建功能强大的交互式应用程序,而不单单是分析。

3>.Spark Streaming架构socket

 

 

二.DStream入门案例(wordcount)ide

  需求说明:
    使用netcat工具向8888端口不断的发送数据,经过SparkStreaming读取端口数据并统计不一样单词出现的次数。

1>.添加依赖关系工具

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

2>.安装netcat工具并监听相应端口oop

[root@hadoop101.yinzhengjie.org.cn ~]# yum -y install nc
Loaded plugins: fastestmirror
Loading mirror speeds from cached hostfile
 * base: mirrors.aliyun.com
 * extras: mirrors.aliyun.com
 * updates: mirror.bit.edu.cn
Resolving Dependencies
--> Running transaction check
---> Package nmap-ncat.x86_64 2:6.40-19.el7 will be installed
--> Finished Dependency Resolution

Dependencies Resolved

==============================================================================================================================================================================================================================================================================
 Package                                                           Arch                                                           Version                                                                  Repository                                                    Size
==============================================================================================================================================================================================================================================================================
Installing:
 nmap-ncat                                                         x86_64                                                         2:6.40-19.el7                                                            base                                                         206 k

Transaction Summary
==============================================================================================================================================================================================================================================================================
Install  1 Package

Total download size: 206 k
Installed size: 423 k
Downloading packages:
nmap-ncat-6.40-19.el7.x86_64.rpm                                                                                                                                                                                                                       | 206 kB  00:00:00     
Running transaction check
Running transaction test
Transaction test succeeded
Running transaction
  Installing : 2:nmap-ncat-6.40-19.el7.x86_64                                                                                                                                                                                                                             1/1 
  Verifying  : 2:nmap-ncat-6.40-19.el7.x86_64                                                                                                                                                                                                                             1/1 

Installed:
  nmap-ncat.x86_64 2:6.40-19.el7                                                                                                                                                                                                                                              

Complete!
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# yum -y install nc  
[root@hadoop101.yinzhengjie.org.cn ~]# nc -lk 8888          #监听端口

3>.编写wordcount代码

package com.yinzhengjie.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCount {
  def main(args: Array[String]): Unit = {

    /**
      *   1>.初始化Spark配置信息
      */
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingWordCount")

    /**
      *   2>.初始化SparkStreamingContext(实时数据分析环境对象)
      *
      *   自定义采集周期:
      *     以指定的时间为周期采集实时数据。我这里指定采集周期是5秒.生产环境中咱们能够将这个值改小,好比每秒采集一次.
      */
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    /**
      *   3>.经过监控端口建立DStream,读进来的数据为一行行(即从指定端口中采集数据)
      */
    val socketLineDStream:ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101.yinzhengjie.org.cn", 8888)

    /**
      *   4>.将采集的数据进行扁平化操做(即将每一行数据作切分,造成一个个单词)
      */
    val wordDStreams:DStream[String] = socketLineDStream.flatMap(_.split(" "))

    /**
      *   5>.将数据进行结构的转换方便统计分析(即将单词映射成元组(word,1))
      */
    val wordAndOneStreams:DStream[(String,Int)] = wordDStreams.map((_, 1))

    /**
      *   6>.将相同的单词次数作统计
      */
    val wordToCountDStream:DStream[(String,Int)] = wordAndOneStreams.reduceByKey(_+_)

    /**
      *   7>.将结果打印出来
      */
    wordToCountDStream.print()

    /**
      *   8>.启动(SparkStreamingContext)采集器
      */
    ssc.start()

    /**
      *   9>.Driver等待采集器的执行(即禁止main线程主动退出)
      */
    ssc.awaitTermination()

    /**
      *   舒适提示:
      *       我们的程序是实时处理数据的,所以生产环境中不能中止采集程序,所以不建议使用哟~
      */
    //    ssc.stop()

  }
}

 

三.博主推荐阅读

  Spark Streaming-DStream实战案例:
    https://www.cnblogs.com/yinzhengjie2020/p/13233192.html
相关文章
相关标签/搜索