大数据开发实战:Spark Streaming流计算开发

  一、背景介绍

      Storm以及离线数据平台的MapReduce和Hive构成了Hadoop生态对实时和离线数据处理的一套完整处理解决方案。除了此套解决方案以外,还有一种很是流行的并且完整的离线和java

    实时数据处理方案。这种方案就是Spark。Spark本质上是对Hadoop特别是MapReduce的补充、优化和完善,尤为是数据处理速度、易用性、迭代计算和复杂数据分析等方面。算法

      Spark Streaming 做为Spark总体解决方案中实时数据处理部分,本质上仍然是基于Spark的弹性分布式数据集(Resilient Distributed Datasets :RDD)概念。Spark Streaming将源头shell

    数据划分为很小的批,并以相似于离线批的方式来处理这部分微批数据。数据库

      相对于Storm这种原生的实时处理框架,Spark Streaming基于微批的的方案带来了吞吐量的提高,可是也致使了数据处理延迟的增长---基于Spark Streaming实时数据处理方案的数据apache

    延迟一般在秒级甚至分钟级。编程

  二、Spark生态和核心概念

    2.一、Spark概览

      Spark诞生于美国伯克利大学的AMPLab,它最初属于伯克利大学的研究性项目,与2010年正式开源,于2013年成为Apache基金项目,冰雨2014年成为Apache基金的顶级项目。api

      Spark用了不到5年的时间就成了Apache的顶级项目,目前已被国内外的众多互联网公司使用,包括Amazon、EBay、淘宝、腾讯等。数组

      Spark的流行和它解决了Hadoop的不少不足密不可分。缓存

      传统Hadoop基于MapReduce的方案适用于大多数的离线批处理场景,可是对于实时查询、迭代计算等场景很是不适合,这是有其内在局限决定的。服务器

      一、MapReduce只提供Map和Reduce两个操做,抽象程度低,可是复杂的计算一般须要不少操做,并且操做之间有复杂的依赖关系。

      二、MapReduce的中间处理结果是放在HDFS文件系统中的,每次的落地和读取都消耗大量的时间和资源。

      三、固然,MapReduce也不支持高级数据处理API、DAG(有向五环图)计算、迭代计算等。

      Spark则较好地解决了上述这些问题。

      一、Spark经过引入弹性分布式数据集(Resilient Distributed Datasets:RDD)以及RDD丰富的动做操做API,很是好地支持了DGA的计算和迭代计算。

      二、Spark经过内存计算和缓存数据很是好地支持了迭代计算和DAG计算的数据共享、减小了数据读取的IO开销、大大提升了数据处理速度。

      三、Spark为批处理(Spark Core)、流式处理(Spark Streaming)、交互分析(Spark SQL)、机器学习(MLLib)和图计算(GraphX)提供了一个同一的平台和API,很是便于使用。

      四、Spark很是容易使用、Spark支持java、Python和Scala的API,还支持超过80种高级算法,使得用户能够快速构建不一样的应用。Spark支持交互式的Python和Scala的shell,这意味着

          能够很是方便地在这些shell中使用Spark集群来验证解决问题的方法,而不是像之前同样,须要打包、上传集群、验证等。这对于原型开发尤为重要。

      五、Spark能够很是方便地与其余开源产品进行融合:好比,Spark可使用Hadoop的YARN和Apache Mesos做为它的资源管理和调度器,而且能够处理全部Hadoop支持的数据,包括HDFS、

        HBase和Cassandra等。Spark也能够不依赖于第三方的资源管理和调度器,它实现了Standalone做为其内置的资源管理和调度框架,这样进一步下降了Spark的使用门槛。

      六、External Data Source多数据源支持:Spark能够独立运行,除了能够运行在当下的Yarn集群管理以外,它还能够读取已有的Hadoop数据。它能够运行多种数据源,好比Parquet、Hive、

        HBase、HDFS等。

   2.二、Spark核心概念

        RDD是Spark中最为核心和重要的概念。RDD,全称为 Resilient Distributed Dataset,在Spark官方文档中被称为“一个可并行操做的有容错机制的数据集合”。实际上RDD就是

      一个数据集,并且是分布式的。同时Spark还对这个分布式数据集提供了丰富的数据操做和容错性。

      一、RDD建立

        Spark中建立RDD最直接的方法是调用SparkContext(SparkContext是Spark集群环境的访问入口,Spark Streaming也有本身对应的对象StreamContext)的parallelize方法。

        List<Integer> data =  Arrays.asList(1,2,3,4,5);

        HavaRDD<Integer> distData = sc.parallelize(data);

        上述代码会将数据集合 (data)转换为这个分布式数据集(distData),以后就能够对此RDD执行各类转换等。好比调用distData.reduce((a,b) => a+b),将这个数组中的元素项加,

      此外,还能够经过设置parallelize的第二个参数手动设置生成RDD的分区数:sc.parallelize(data,10),若是不设定的话,Spark会自动设定。

        但在实际的项目中,RDD通常是从源头数据建立的。Spark支持从任何一个Hadoop支持的存储数据建立RDD,包括本地文件系统、HDFS、Cassandna、HBase和Amazon S3等。

      另外,Spark也支持从文本文件,SequenceFiles和其它Hadoop InputFormat的格式文件中建立RDD。建立的方法也很简单,只须要指定源头文件并调用对应的方法便可:

        JavaRDD<String> distFile = sc.textFile("data.txt");

        Spark 中转换SequenceFile的SparkContext方法是sequenceFile,转换Hadoop InputFormats的SparkContext方法是HadoopRDD。

      二、RDD操做

        RDD操做分为转换(transformation)和行动(action),transformation是根据原有的RDD建立一个新的RDD,action则吧RDD操做后的结果返回给driver。例如map 是一个转换,

      它把数据集中的每一个元素通过一个方法处理的结果返回一个新的RDD,reduce是一个action,它收集RDD的全部数据通过一些方法的处理,最后把结果返回给driver。

        Spark对transformation的抽象能够大大提升性能,这是由于在Spark中,全部transformation操做都是lazy模式,即Spark不会当即计算结果,而只是简单地记住全部对数据集的

      转换操做逻。这些转换只有遇到action操做的时候才会开始计算。这样的设计使得Spark更加高效,例如能够经过map建立一个新数据集在reduce中使用,并仅仅返回reduce的

      结果给driver,而不是整个大大的map过的数据集。

      三、RDD持久化

        Spark最重要的一个功能是它能够经过各类操做持久化(或缓存)一个集合到内存中。当持久化一个RDD的时候,每个节点都将参与计算的全部分区数据存储到内存中,

      而且这些数据能够被这个集合(以及这个集合衍生的其余集合)的动做重复利用。这个能力使后续的动做速度更快(一般快10倍以上)。对应迭代算法和快速的交互应用来讲,

      缓存是一个关键的工具。

        能够经过 persist()或者cache()方法持久化一个RDD。先在action中计算获得RDD,而后将其保存在每一个节点的内存中。Spark的缓存是一个容错的技术,也就是说,若是RDD的

      任何一个分区丢失,它能够经过原有的转换操做自动重复计算而且建立出这个分区。

        此外,还能够利用不一样的存储级别存储每个被持久化的RDD,。例如,它容许持久化集合到磁盘上、将集合做为序列化的Java对象持久化到内存中、在节点间复制集合或存储集合

      到Tachyon中。能够经过传递一个StorageLevel对象给persist()方式设置这些存储级别。cache()使用了默认的存储级别-----StorageLevel.MEMORY_ONLY。

      四、Spark生态圈

        Spark创建在统一抽象的RDD之上,使得它能够以基本一致的方式应对不一样的大数据处理场景,包括批处理,流处理、SQL、Machine Learning以及GraphX等。这就是Spark设计的“

      通用的编程抽象”( Unified Programming Abstraction),也正是Spark独特的地方。

        Spark生态圈包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等组件,其中Spark Core提供内存计算框架、SparkStreaming提供实时处理应用、Spark SQL提供

      即席查询,再加上MLlib的机器学习和GraphX的图处理,它们能无缝集成并提供Spark一站式的大数据解决平台和生态圈。

      

      Spark Core:Spark Core实现了Spark的基本功能,包括任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core还包括了RDD的API定义,并提供了建立和操做RDD的

        丰富API。Spark Core是Spark其它组件的基础和根本。

      Spark Streaming:他是Spark提供的对实时数据进行流计算的组件,提供了用来操做数据流的API,而且与Spark Core中的RDD API高度对应。Spark Streaming支持与Spark Core

        同级别的容错性、吞吐量和伸缩性。

      Spark SQL:它是Spark用来操做结构化数据的程序包,经过Spark SQL,可使用SQL或类SQL语言来查询数据;同时Spark SQL支持多种数据源,好比Hive表、Parquet以及

        JSON等,除了为Spark提供一个SQL接口,Spark SQL还支持开发者将SQL和传统的RDD编程的数据操做方式向结合,不管是使用Python、Java仍是Scala,开发者均可以在

        单个应用中同时使用SQL和复杂的数据分析。

      MLLib:Spark提供了常见的机器学习功能的程序库,叫作MLlib,MLlib提供了多种机器学习算法,包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的

        支持功能。此外,MLLib还提供了一些更底层的机器学习原语,包括一个通用的梯度降低优化算法,全部这些方法都被设计为能够在集群上轻松伸缩的架构。

      GraphX:GraphX是用来操做图(如社交网络的朋友圈)的程序库,能够进行并行的图计算。与Spark Streaming和Spark SQL相似,GraphX也扩展了Spark的RDD API,

        能用来建立一个顶点和边都包含任意属性的有向图。GraphX还支持针对图的各类操做(如进行图分割的subgraph和操做全部顶点的mapVertices),以及一些经常使用的图算法

       (如PageRank和三角计算)。

   三、Spark生态的流计算技术:Spark Streaming

        Spark Streaming做为Spark的核心组件之一,通Storm同样,主要对数据进行实时的流处理,可是不一样于Apache Storm(这里指的是原生Storm,非Trident),在Spark Streaming

      中数据处理的单位是是一批而不是一条,Spark会等采集的源头数据累积到设置的间隔条件后,对数据进行统一的微批处理。这个间隔是Spark Streaming中的核心概念和关键参数,

      直接决定了Spark Streaming做业的数据处理延迟,固然也决定着数据处理的吞吐量和性能。

        相对于Storm的毫秒级延迟来讲,Spark Streaming的延迟最多只能到几百毫秒,通常是秒级甚至分钟级,所以对于实时数据处理延迟要很是高的场合,Spark Streaming并不合适。

        另外,Spark Streaming底层依赖于Spark Core 的RDD实现,即它和Spark框架总体是绑定在一块儿的,这是优势也是缺点。

        对于已经采用Spark 做为大数据处理框架,同时对数据延迟性要求不是很高的场合,Spark Streaming很是适合做为事实流处理的工具和方案,缘由以下:

        一、Spark Streaming内部的实现和调度方式高度依赖于Spark的DAG调度器和RDD,Spark Streaming的离散流(DStream)本质上是RDD在流式数据上的抽象,所以熟悉Spark和

        和RDD概念的用户很是容易理解Spark Streaming已经其DSream。

        二、Spark上各个组件编程模型都是相似的,因此若是熟悉Spark的API,那么对Spark Streaming的API也很是容易上手和掌握。

        可是,若是已经采用了其余诸如Hadoop和Storm的数据处理方案,那么若是使用Spark Streaming,则面临着Spark以及Spark Streaming的概念及原理的学习成本。

        整体来讲,Spark Streaming做为Spark核心API的一个扩展,它对实时流式数据的处理具备可扩展性、高吞吐量、和能够容错性等特色。

        同其余流处理框架同样,Spark Streaming从Kafka、Flume、Twitter、ZeroMQ、Kinesis等源头获取数据,并map、reduce、join、window等组成的复杂算法计算出指望的结果,处理

      后的结果数据可被推送到文件系统,数据库、实时仪表盘中,固然,也能够将处理后的数据应用到Spark的机器学习算法、图处理算法中。整个的数据处理流程以下:

      

 

    3.一、Spark Streaming基本原理

      Spark Streaming 中基本的抽象是离散流(即DStream).DStream表明一个连续的数据流。在Spark Streaming内部中,DStream其实是由一系列连续RDD组成。每一个RDD包含肯定

    时间间隔内的数据,这些离散的RDD连在一块儿,共同组成了对应的DStream。

    

      实际上任何,任何对DStream的操做都转换成了对DStream隐含的一系列对应RDD的操做,如上图中对lines DStream是的flatMap操做,实际上应用于lines对应每一个RDD的操做,并生成了

    对应的work DStream的RDD。

      也就是上面所说的,Spark Streaming底层依赖于Spark Core的RDD实现。从本质上来讲,Spark Streaming只不过是将流式的数据流根据设定的间隔分红了一系列的RDD,而后在每一个RDD上

    应用相应的各类操做和协做,因此Spark Streaming底层的运行引擎其实是Spark Core。

   3.二、Spark Streaming核心API

      SparkStreaming完整的API包括StreamingContext、DStream输入、DStream上的各类操做和动做、DStream输出等。

      一、StreamingContext

        为了初始化Spark Streaming程序,必须建立一个StreamingContext对象,该对象是Spark Streaming全部流操做的主要入口。一个StreamingContext对象能够用SparkConf对象建立:

        import org.apache.spark.*;

        import org.apache.spark.streaming.api.Java.*;

        SparkConf conf =  new SparkConf().setAppName(appName).setMaster(master);

        JavaStreamingContext ssc =  new JavaStreamingContext(conf, new Duration(1000));

      二、DStream输入

        DStream输入表示从数据源获取的原始数据流。每一个输入流DStream和一个接收器(receiver)对象相关联,这个Receiver从源中获取数据,并将数据存入内存中用于处理。

        Spark Streaming有两类数据源:

        基本源(basic source):在StreamingContext API中直接可用的源头,例如文件系统、套接字链接、Akka的actor等。

        高级源(advanced source):包括 Kafka、Flume、Kinesis、Tiwtter等,他们须要经过额外的类来使用。

      三、DStream的转换

        和RDD相似,transformation用来对输入DStreams的数据进行转换、修改等各类操做,固然,DStream也支持不少在Spark RDD的transformation算子。

        

 

      四、DStream的输出

        和RDD相似,Spark Streaming容许将DStream转换后的结果发送到数据库、文件系统等外部系统中。目前,定义了Spark Streaming的输出操做:

        

 

  四、Spark Streaming实时开发实例

      下面用字符计数这个例子来讲明 Spark Streming

      首先,导入 Spark Streaming的相关类到环境中,这些类(如DStream)提供了流操做不少有用的方法,StreamingContext是Spark全部流操做的主要入口。

      其次,建立一个具备两个执行线程以及1秒批间隔时间(即以秒为单位分隔数据流)的本地StreamingContext.

      

import org.apache.spark.{*, SparkConf}
import org.apache.spark.api.java.function.*
import org.apache.spark.streaming.{*, Duration, Durations}
import org.apache.spark.streaming.api.java.{*, JavaDStream, JavaStreamingContext}

import scala.Tuple2;

object streaming_test {
  def main(args: Array[String]): Unit = {
    //建立一个本地的StreamingContext上下文对象,该对象包含两个工做线程,批处理间隔为1秒
    val conf  = new SparkConf().setMaster("local[2]").setAppName("Network-WordCount");

    val jssc = new JavaStreamingContext(conf,Durations.seconds(1));
    //利用这个上下文,可以建立一个DStream,它表示从TCP源(主机为localhost,端口为9999)获取的流式数据
    //建立一个链接到hostname:port的DStream对象,相似localhost:9999
    val lines =jssc.socketTextStream("localhost",9999);
    //这个lines变量是一个DStream,表示即将从数据服务器或的数据流,这个DStream的每条记录都表明一行文本,
    // 接下来须要将DStream中的每行文本都切分为单词
    val words =lines.flatMap(x:String => util.Arrays.asList(x.split(" ")).iterator());
    val pairs =words.mapToPair<s=>new Tuple2<>(s,1));
    val wordCounts =pairs.reduceByKey((i1,i2)=> i1+i2);
    wordCounts.print();
  }
}

   四、Spark Streaming调优实践

    Spark Streaming做业的调优一般都涉及做业开发的优化、并行度的优化和批大小以及内存等资源的优化。

    一、做业开发优化

      RDD复用:对于实时做业,尤为是链路较长的做业,要尽可能重复使用RDD,而不是重复建立多个RDD。另外须要屡次使用的中间RDD,能够将其持久化,以下降每次都须要重复计算的开销。

      使用效率较高的shuffle算子:如同Hadoop中的做业同样,实时做业的shuffle操做会涉及数据从新分布,所以会耗费大量的内存、网络和计算等资源,须要尽可能下降须要shuffle的数据量,

      reduceByKey/aggregateByKey相比groupByKey,会在map端先进行预聚合,所以效率较高。

      相似于Hive的MapJoin:对于实时做业,join也会涉及数据的从新分布,所以若是是大数据量的RDD和小数据量的RDD进行join,能够经过broadcast与map操做实现相似于Hive的MapJoin,

      可是须要注意小数量的RDD不能过大,否则广播数据的开销也很大。

      其它高效的例子:如使用mapPartitions替代普通map,使用foreachPartitions替代foreach,使用repartitionAndSortWithinPartitions替代repartition与 sort类操做等。

    二、并行度和批大小

      对于Spark Streaming这种基于微批处理和实时处理框架来讲,其调优不外乎两点:

      一是尽可能缩短每一批次的处理时间

      二是设置合适的batch size(即每批处理的数据量),使得数据处理的速度可以适配数据流入的速度。

      第一点一般以设置源头、处理、输出的并发度来实现。

      源头并发:若是源头的输入任务是实时做业的瓶颈,那么能够经过加大源头的并发度提供性能,来保证数据可以流入后续的处理链路。在Spark Streaming,能够经过以下代码来实现(

      一Kafka源头为例):

      int numStreams = 5;

      List<JavaPairDStream<String,String>> kafkaStreams = new ArrayList<JavaPairDStream<String,String>>(numStreams );

      for(int i=0;i<numStreams ;i++){

        kafkaStreams.add(KafkaUtils.createStream(...));

      }

       JavaPairDStream<String,String> unifiedStream = streamingContext.union( kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));

      处理并发:处理任务的并发决定了实际做业执行的物理视图。Spark Streaming做业的默认并发度能够经过spark.default.parllelism来设置,可是实际中不推荐,建议针对每一个任务单独设置

      并发度进行精细控制。

      输出并发:如图Hadoop做业同样,实时做业的shuffle操做会涉及数据从新分布,所以会耗费大量的内存、网络和计算等资源,所以须要尽可能减小shuffle操做。

      batch size:batch size主要影响系统的吞吐量和延迟。batch size 过小,通常处理延迟会下降,可是系统吞吐量会降低;batch size太大,吞吐量上去了,可是处理延迟会提升,同时要求的

      内存也会增长,所以实际中须要找到一个平衡点,既能知足吞吐量也能知足延迟的要求,那么实际中如何设置batch大小呢?

    参考资料:《离线和实时大数据开发实战》

相关文章
相关标签/搜索