Spark Streaming初步使用以及工做原理详解

在大数据的各类框架中,hadoop无疑是大数据的主流,可是随着电商企业的发展,hadoop只适用于一些离线数据的处理,没法应对一些实时数据的处理分析,咱们须要一些实时计算框架来分析数据。所以出现了不少流式实时计算框架,好比Storm,Spark Streaming,Samaz等框架,本文主要讲解Spark Streaming的工做原理以及如何使用。html

1、流式计算

1.什么是流?前端

Streaming:是一种数据传送技术,它把客户机收到的数据变成一个稳定连续的
流,源源不断地送出,使用户听到的声音或看到的图象十分平稳,并且用户在
整个文件送完以前就能够开始在屏幕上浏览文件。node

2.常见的流式计算框架算法

  • Apache Storm
  • Spark Streaming
  • Apache Samza

上述三种实时计算系统都是开源的分布式系统,具备低延迟、可扩展和容错性
诸多优势,它们的共同特点在于:容许你在运行数据流代码时,将任务分配到
一系列具备容错能力的计算机上并行运行。此外,它们都提供了简单的API来
简化底层实现的复杂程度。shell

对于上面的三种流使计算框架的比较能够参考这篇文章流式大数据处理的三种框架:Storm,Spark和Samza数据库

2、Spark Streaming

1.Spark Streaming介绍express

Spark Streaming是Spark生态系统当中一个重要的框架,它创建在Spark Core之上,下面这幅图也能够看出Sparking Streaming在Spark生态系统中地位。
这里写图片描述apache

官方对于Spark Streaming的解释以下:编程

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis or TCP sockets can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.markdown

这里写图片描述

Spark Streaming是Spark Core的扩展应用,它具备可扩展,高吞吐量,对于流数据的可容错性等特色。能够监控来自Kafka,Flumn,HDFS。Kinesis,Twitter,ZeroMQ或者Scoket套接字的数据经过复杂的算法以及一系列的计算分析数据,而且能够将分析结果存入到HDFS文件系统,数据库以及前端页面中。

这里写图片描述
Spark Streaming有如下特色

  • 高可扩展性,能够运行在上百台机器上(Scales to hundreds of nodes)
  • 低延迟,能够在秒级别上对数据进行处理(Achieves low latency)
  • 高可容错性(Efficiently recover from failures)
  • 可以集成并行计算程序,好比Spark Core(Integrates with batch and interactive processing)

2.Spark Streaming工做原理
对于Spark Core它的核心就是RDD,对于Spark Streaming来讲,它的核心是DStream,DStream相似于RDD,它实质上一系列的RDD的集合,DStream能够按照秒数将数据流进行批量的划分。首先从接收到流数据以后,将其划分为多个batch,而后提交给Spark集群进行计算,最后将结果批量输出到HDFS或者数据库以及前端页面展现等等。能够参考下面这幅图来帮助理解:
这里写图片描述

对于DStream如何理解呢?它是一系列连续的RDD,它是创建在Spark之上的不可变的,分布式数据集,在DStream中的每个RDD包含着必定时间间隔的数据,以下图所示:
这里写图片描述
这里写图片描述

那么,Spark Streaming的工做原理是什么呢?它是怎么运行在集群上的呢?其原理架构图以下所示:
这里写图片描述

咱们都知道Spark Core在初始化时会生成一个SparkContext对象来对数据进行后续的处理,相对应的Spark Streaming会建立一个Streaming Context,它的底层是SparkContext,也就是说它会将任务提交给SparkContext来执行,这也很好的解释了DStream是一系列的RDD。当启动Spark Streaming应用的时候,首先会在一个节点的Executor上启动一个Receiver接受者,而后当从数据源写入数据的时候会被Receiver接收,接收到数据以后Receiver会将数据Split成不少个block,而后备份到各个节点(Replicate Blocks 容灾恢复),而后Receiver向StreamingContext进行块报告,说明数据在那几个节点的Executor上,接着在必定间隔时间内StreamingContext会将数据处理为RDD而且交给SparkContext划分到各个节点进行并行计算。

3.Spark Streaming Demo

介绍完Spark Streaming的基本原理以后,下面来看看如何运行Spark Streaming,官方给出了一个例子,从Socket源端监控收集数据运行wordcount的案例,案例很简单,这里再也不说明,读者可参考官方文档【http://spark.apache.org/docs/1.3.0/streaming-programming-guide.html】

对于Spark Streaming的编程模型有两种方式

第一种:经过SparkConf来建立SparkStreaming

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ 
val conf=new SparkConf().setAppName("SparkStreamingDemo").setMaster("master")
val scc=new StreamingContext(conf,Seconds(1)) //每一个1秒钟检测一次数据
  • 1
  • 2
  • 3
  • 4
  • 5

第二种:经过SparkContext来建立,也就是在Spark-Shell命令行运行:

import org.apache.spark.streaming._
val scc=new StreamingContext(sc,Seconds(1))
  • 1
  • 2

固然,咱们也能够收集来自HDFS文件系统中数据,查阅Spark的源码,能够发现以下方法:
这里写图片描述
这个方法会监控指定HDFS文件目录下的数据,不过忽略以“.”开头的文件,也就是不会收集以“.”开头的文件进行数据的处理。

下面介绍一下如何从HDFS文件系统上监控数据运行wordcount案例统计单词数而且将结果打印出来的案例:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ 

val ssc = new StreamingContext(sc, Seconds(5))

// read data
val lines = ssc.textFileStream("hdfs://hadoop-senior.shinelon.com:8020/user/shinelon/spark/streaming/")

// process
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
 
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

上面程序会每一个5秒钟检测一下HDFS文件系统下的hdfs://hadoop-senior.shinelon.com:8020/user/shinelon/spark/streaming/目录是否有新的数据,若是有就进行统计,而后将结果打印在控制台。运行上面代码有两种方式,能够运行Spark-shell客户端后将上述命令一条条粘贴到命令行执行,显然这样很麻烦;第二种就是将上面的程序写入到一个脚本文件中加载到Spark-shell命令行中执行,这里采用第二种方式:
在一个目录下建立SparkStreamingDemo.scala文件,内容如上面的代码所示。而后启动Spark-shell客户端。

$ bin/spark-shell --master local[2]
  • 1

而后加载Spark Streaming应用:

scala>:load /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/SparkStreamingDemo.scala
  • 1

而后上传数据到上述HDFS文件目录下:

$ bin/hdfs dfs -put /opt/datas/wc.input /user/shinelon/spark/streaming/input7
  • 1

该文件内容以下所示:

hadoop hive
hadoop hbase
hadoop yarn
hadoop hdfs
hdfs spark
  • 1
  • 2
  • 3
  • 4
  • 5

运行结果以下所示:
这里写图片描述

一般对于一个Spark Streaming的应用程序的编写分下面几步:

  1. 定义一个输入流源,好比获取socket端的数据,HDFS,kafka中数据等等
  2. 定义一系列的处理转换操做,好比上面的map,reduce操做等等,Spark Streaming也有相似于SparkCore的transformation操做
  3. 启动程序收集数据(start())
  4. 等待程序中止(遇到错误终止或者手动中止awaitTermination())
  5. 手动终止应用程序(stop())

可使用saveAsTextFiles()方法将结果输出到HDFS文件系统上,读者能够自行试验将结果存入HDFS文件系统中。

最后,介绍一下Spark Streaming应用程序开发的几种常见方式:

  1. Spark Shell Code:开发、测试(上面提到过,将代码一条条粘贴到命令行执行,这种方式只适用于测试)
  2. Spark Shell Load Scripts:开发、测试(编写scala脚本到spark-shell中执行)
  3. IDE Develop App:开发、测试、打包JAR(生产环境),spark-submit提交应用程序
相关文章
相关标签/搜索