初步了解Spark生态系统及Spark Streaming

1、        场景

◆ Spark[4]html

Scope:  a MapReduce-like cluster computing framework designed for low-latency iterativejobs and interactive use from an interpreter(在大规模的特定数据集上的迭代运算或重复查询检索)前端

正如其目标scope,Spark适用于须要屡次操做特定数据集的应用场合。须要反复操做的次数越多,所需读取的数据量越大,受益越大,数据量小可是计算密集度较大的场合,受益就相对较小。java

 

◆ Spark Streaming(构建在Spark上处理Stream数据的框架)python

可以知足除对实时性要求很是高(如高频实时交易)以外的全部流式准实时计算场景[2]git

 

2、    Spark生态系统包括[1]

  ◆ Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive同样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现query Parsing和 Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替Hadoop MapReduce。经过配置Shark参数,Shark能够自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark 经过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一块儿,最大化RDD的重复使用。github

◆ Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分红小的时间片段(几秒),以相似batch批量处理的方式来处理这小部 分数据。Spark Streaming构建在Spark上,一方面是由于Spark的低延迟执行引擎(100ms+)能够用于实时计算,另外一方面相比基于Record的其它 处理框架(如Storm),RDD数据集更容易作高效的容错处理。此外小批量处理的方式使得它能够同时兼容批量和实时数据处理的逻辑和算法。方便了一些需 要历史数据和实时数据联合分析的特定应用场合。web

详见10、Spark Streaming ...算法

  ◆ Bagel: Pregel on Spark,能够用Spark进行图计算,这是个很是有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。shell

 

3、    运行模式[1]

  ◆ 本地模式数据库

  ◆ Standalone模式

  ◆ Mesoes模式

  ◆ yarn模式

4、    Spark与Hadoop的对比[1]

  ◆ Spark的中间数据放到内存中,对于迭代运算效率更高。

  Spark更适合于迭代运算比较多的ML和DM运算。由于在Spark里面,有RDD的抽象概念。

  ◆ Spark比Hadoop更通用。

  Spark提供的数据集操做类型有不少种,不像Hadoop只提供了Map和Reduce两种操做。好比map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操做类型,Spark把这些操做称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操做。

  这些多种多样的数据集操做类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通讯模型再也不像Hadoop那样就是惟一的Data Shuffle一种模式。用户能够命名,物化,控制中间结果的存储、分区等。能够说编程模型比Hadoop更灵活。

  不过因为RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。

  ◆ 容错性。

  在分布式数据集计算时经过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户能够控制采用哪一种方式来实现容错。

  ◆ 可用性。

  Spark经过提供丰富的Scala, Java,Python API及交互式Shell来提升可用性。

  Spark与Hadoop的结合

  ◆ Spark能够直接对HDFS进行数据的读写,一样支持Spark on YARN。Spark能够与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive彻底兼容。

 

5、    在业界的使用[1]

◆ Spark项目在2009年启动,2010年开源, 如今使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘宝等,豆瓣也在使用Spark的python克隆版Dpark。

 

6、    Spark核心概念[1]

  Resilient Distributed Dataset (RDD)弹性分布数据集

  ◆ RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操做本地集合的方式来操做分布式数据集的抽象实现。RDD是Spark最核心的东西,它表示已被分区,不可变的并可以被并行操做的数据集合,不一样的数据集格式对应不一样的RDD实现。RDD必须是可序列化的。RDD能够cache到内存 中,每次对RDD数据集的操做以后的结果,均可以存放到内存中,下一个操做能够直接从内存中输入,省去了MapReduce大量的磁盘IO操做。这对于迭代运算比较常见的机器学习算法, 交互式数据挖掘来讲,效率提高比较大。

  ◆ RDD的特色:

  它是在集群节点上的不可变的、已分区的集合对象。

  经过并行转换的方式来建立如(map, filter, join, etc)。

  失败自动重建。

  能够控制存储级别(内存、磁盘等)来进行重用。

  必须是可序列化的。

  是静态类型的。

  ◆ RDD的好处

  RDD只能从持久存储或经过Transformations操做产生,相比于分布式共享内存(DSM)能够更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可从新计算出来,而不须要作特定的Checkpoint。

  RDD的不变性,能够实现类Hadoop MapReduce的推测式执行。

  RDD的数据分区特性,能够经过数据的本地性来提升性能,这与Hadoop MapReduce是同样的。

  RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的降低但不会差于如今的MapReduce。

  ◆ RDD的存储与分区

  用户能够选择不一样的存储级别存储RDD以便重用。

  当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。

  RDD在须要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。

  ◆ RDD的内部表示

  在RDD的内部实现中每一个RDD均可以使用5个方面的特性来表示:

  分区列表(数据块列表)

  计算每一个分片的函数(根据父RDD计算出此RDD)

  对父RDD的依赖列表

  对key-value RDD的Partitioner【可选】

  每一个数据分片的预约义地址列表(如HDFS上的数据块的地址)【可选】

  ◆ RDD的存储级别

  RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:

  val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) val MEMORY_ONLY_SER = new  StorageLevel(false, true, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)

  ◆ RDD定义了各类操做,不一样类型的数据由不一样的RDD类抽象表示,不一样的操做也由RDD进行抽实现。

  RDD的生成

  ◆ RDD有两种建立方式:

  一、从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)建立。

  二、从父RDD转换获得新RDD。

  ◆ 下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码以下:

  // SparkContext根据文件/目录及可选的分片数建立RDD, 这里咱们能够看到Spark与Hadoop MapReduce很像 // 须要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。 def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) .map(pair => pair._2.toString) } // 根据Hadoop配置,及InputFormat等建立HadoopRDD new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)

  ◆ 对RDD进行计算时,RDD从HDFS读取数据时与Hadoop MapReduce几乎同样的:

  RDD的转换与操做

  ◆ 对于RDD能够有两种计算方式:转换(返回值仍是一个RDD)与操做(返回值不是一个RDD)。

  ◆ 转换(Transformations) (如:map, filter, groupBy, join等),Transformations操做是Lazy的,也就是说从一个RDD转换生成另外一个RDD的操做不是立刻执行,Spark在遇到 Transformations操做时只会记录须要这样的操做,并不会去执行,须要等到有Actions操做的时候才会真正启动计算过程进行计算。

  ◆ 操做(Actions) (如:count, collect, save等),Actions操做会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。

  ◆ 下面使用一个例子来示例说明Transformations与Actions在Spark的使用。

  val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val rdd_A = sc.textFile(hdfs://.....) val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1)) val rdd_C = sc.textFile(hdfs://.....) val rdd_D = rdd_C.map(line => (line.substring(10), 1)) val rdd_E = rdd_D.reduceByKey((a, b) => a + b) val rdd_F = rdd_B.jion(rdd_E) rdd_F.saveAsSequenceFile(hdfs://....)

 

  Lineage(血统)

  ◆ 利用内存加快数据加载,在众多的其它的In-Memory类数据库或Cache类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。 为了保证RDD中数据的鲁棒性,RDD数据集经过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。相比其它系统的细颗粒度的内 存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操做(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它能够经过Lineage获取足够的信息来从新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限 制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提高。

  ◆ RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错的高效性。Narrow Dependencies是指父RDD的每个分区最多被一个子RDD的分区所用, 表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子 RDD的多个分区。Wide Dependencies是指子RDD的分区依赖于父RDD的多个分区或全部分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。对与 Wide Dependencies,这种计算的输入和输出在不一样的节点上,lineage方法对与输入节点无缺,而输出节点宕机时,经过从新计算,这种状况下,这 种方法容错是有效的,不然无效,由于没法重试,须要向上其祖先追溯看是否能够重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。

 

7、    容错[1]

  ◆ 在RDD计算,经过checkpint进行容错,作checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户能够控制采用哪一种方式来实现容错,默认是logging the updates方式,经过记录跟踪全部生成RDD的转换(transformations)也就是记录每一个RDD的lineage(血统)来从新计算生成 丢失的分区数据。

 

8、    资源管理与做业调度[1]

  ◆ Spark对于资源管理与做业调度可使用Standalone(独立模式),Apache Mesos及Hadoop YARN来实现。 Spark on Yarn在Spark0.6时引用,但真正可用是在如今的branch-0.8版本。Spark on Yarn遵循YARN的官方规范实现,得益于Spark天生支持多种Scheduler和Executor的良好设计,对YARN的支持也就很是容 易,Spark on Yarn的大体框架图。

 

◆ 让Spark运行于YARN上与Hadoop共用集群资源能够提升资源利用率。

 

9、    编程接口[1]

  ◆ Spark经过与编程语言集成的方式暴露RDD的操做,相似于DryadLINQ和FlumeJava,每一个数据集都表示为RDD对象,对数据集的操做就 表示成对RDD对象的操做。Spark主要的编程语言是Scala,选择Scala是由于它的简洁性(Scala能够很方便在交互式下使用)和性能 (JVM上的静态强类型语言)。

  ◆ Spark和Hadoop MapReduce相似,由Master(相似于MapReduce的Jobtracker)和Workers(Spark的Slave工做节点)组成。 用户编写的Spark程序被称为Driver程序,Dirver程序会链接master并定义了对各RDD的转换与操做,而对RDD的转换与操做经过 Scala闭包(字面量函数)来表示,Scala使用Java对象来表示闭包且都是可序列化的,以此把对RDD的闭包操做发送到各Workers节点。 Workers存储着数据分块和享有集群内存,是运行在工做节点上的守护进程,当它收到对RDD的操做时,根据数据分片信息进行本地化数据操做,生成新的 数据分片、返回结果或把RDD写入存储系统。

    Scala

 ◆ Spark使用Scala开发,默认使用Scala做为编程语言。编写Spark程序比编写Hadoop MapReduce程序要简单的多,SparK提供了Spark-Shell,能够在Spark-Shell测试程序。写SparK程序的通常步骤就是创 建或使用(SparkContext)实例,使用SparkContext建立RDD,而后就是对RDD进行操做。如:

  val sc = new SparkContext(master, appName, [sparkHome], [jars]) val textFile = sc.textFile("hdfs://.....") textFile.map(....).filter(.....).....

  Java

  ◆ Spark支持Java编程,但对于使用Java就没有了Spark-Shell这样方便的工具,其它与Scala编程是同样的,由于都是JVM上的语言,Scala与Java能够互操做,Java编程接口其实就是对Scala的封装。如:

  JavaSparkContext sc = new JavaSparkContext(...); JavaRDD lines = ctx.textFile("hdfs://..."); JavaRDD words = lines.flatMap( new FlatMapFunction() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } );

  Python

  ◆ 如今Spark也提供了Python编程接口,Spark使用py4j来实现python与java的互操做,从而实现使用python编写Spark程 序。Spark也一样提供了pyspark,一个Spark的python shell,能够以交互式的方式使用Python编写Spark程序。 如:

from pyspark import SparkContext sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) words = sc.textFile("/usr/share/dict/words") words.filter(lambda w: w.startswith("spar")).take(5)

 

10、    使用示例[1]

  Standalone模式

  ◆ 为方便Spark的推广使用,Spark提供了Standalone模式,Spark一开始就设计运行于Apache Mesos资源管理框架上,这是很是好的设计,可是却带了部署测试的复杂性。为了让Spark能更方便的部署和尝试,Spark所以提供了 Standalone运行模式,它由一个Spark Master和多个Spark worker组成,与Hadoop MapReduce1很类似,就连集群启动方式都几乎是同样。

  ◆ 以Standalone模式运行Spark集群

  下载Scala2.9.3,并配置SCALA_HOME

  下载Spark代码(可使用源码编译也能够下载编译好的版本)这里下载 编译好的版本(http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz)

  解压spark-0.7.3-prebuilt-cdh4.tgz安装包

  修改配置(conf/*) slaves: 配置工做节点的主机名 spark-env.sh:配置环境变量。

  SCALA_HOME=/home/spark/scala-2.9.3 JAVA_HOME=/home/spark/jdk1.6.0_45 SPARK_MASTER_IP=spark1 SPARK_MASTER_PORT=30111 SPARK_MASTER_WEBUI_PORT=30118 SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g SPARK_WORKER_PORT=30333 SPARK_WORKER_WEBUI_PORT=30119 SPARK_WORKER_INSTANCES=1

  ◆ 把Hadoop配置copy到conf目录下

  ◆ 在master主机上对其它机器作ssh无密码登陆

  ◆ 把配置好的Spark程序使用scp copy到其它机器

  ◆ 在master启动集群

  $SPARK_HOME/start-all.sh

  yarn模式

  ◆ Spark-shell如今还不支持Yarn模式,使用Yarn模式运行,须要把Spark程序所有打包成一个jar包提交到Yarn上运行。目录只有branch-0.8版本才真正支持Yarn。

  ◆ 以Yarn模式运行Spark

  下载Spark代码.

  git clone git://github.com/mesos/spark

  ◆ 切换到branch-0.8

  cd spark git checkout -b yarn --track origin/yarn

  ◆ 使用sbt编译Spark并

  $SPARK_HOME/sbt/sbt > package > assembly

  ◆ 把Hadoop yarn配置copy到conf目录下

  ◆ 运行测试

  SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0- SNAPSHOT.jar \ ./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \ --class spark.examples.SparkPi --args yarn-standalone

  使用Spark-shell

  ◆ Spark-shell使用很简单,当Spark以Standalon模式运行后,使用$SPARK_HOME/spark-shell进入shell即 可,在Spark-shell中SparkContext已经建立好了,实例名为sc能够直接使用,还有一个须要注意的是,在Standalone模式 下,Spark默认使用的调度器的FIFO调度器而不是公平调度,而Spark-shell做为一个Spark程序一直运行在Spark上,其它的 Spark程序就只能排队等待,也就是说同一时间只能有一个Spark-shell在运行。

  ◆ 在Spark-shell上写程序很是简单,就像在Scala Shell上写程序同样。

  scala> val textFile = sc.textFile("hdfs://hadoop1:2323/user/data") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 scala> textFile.count() // Number of items in this RDD res0: Long = 21374 scala> textFile.first() // First item in this RDD res1: String = # Spark

  编写Driver程序

  ◆ 在Spark中Spark程序称为Driver程序,编写Driver程序很简单几乎与在Spark-shell上写程序是同样的,不一样的地方就是SparkContext须要本身建立。如WorkCount程序以下:

import spark.SparkContext import SparkContext._ object WordCount { def main(args: Array[String]) { if (args.length ==0 ){ println("usage is org.test.WordCount ") } println("the args: ") args.foreach(println) val hdfsPath = "hdfs://hadoop1:8020" // create the SparkContext, args(0)由yarn传入appMaster地址 val sc = new SparkContext(args(0), "WrodCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val textFile = sc.textFile(hdfsPath + args(1)) val result = textFile.flatMap(line => line.split("\\s+")) .map(word => (word, 1)).reduceByKey(_ + _) result.saveAsTextFile(hdfsPath + args(2)) } }

 

 

11、    Spark Streaming:大规模流式数据处理的新贵[2]

1. 前言

提到Spark Streaming,咱们不得不说一下BDAS(Berkeley Data Analytics Stack),这个伯克利大学提出的关于数据分析的软件栈。从它的视角来看,目前的大数据处理能够分为如如下三个类型。

  • 复杂的批量数据处理(batch data processing),一般的时间跨度在数十分钟到数小时之间。
  • 基于历史数据的交互式查询(interactive query),一般的时间跨度在数十秒到数分钟之间。
  • 基于实时数据流的数据处理(streaming data processing),一般的时间跨度在数百毫秒到数秒之间。

目前已有不少相对成熟的开源软件来处理以上三种情景,咱们能够利用MapReduce来进行批量数据处理,能够用Impala来进行交互式查询,对于流式数据处理,咱们能够采用Storm。对于大多数互联网公司来讲,通常都会同时遇到以上三种情景,那么在使用的过程当中这些公司可能会遇到以下的不便。

  • 三种情景的输入输出数据没法无缝共享,须要进行格式相互转换。
  • 每个开源软件都须要一个开发和维护团队,提升了成本。
  • 在同一个集群中对各个系统协调资源分配比较困难。

BDAS就是以Spark为基础的一套软件栈,利用基于内存的通用计算模型将以上三种情景一网打尽,同时支持Batch、 Interactive、Streaming的处理,且兼容支持HDFS和S3等分布式文件系统,能够部署在YARN和Mesos等流行的集群资源管理器 之上。BDAS的构架如图1所示,其中Spark能够替代MapReduce进行批处理,利用其基于内存的特色,特别擅长迭代式和交互式数据处理;Shark处理大规模数据的SQL查询,兼容Hive的HQL。本文要重点介绍的Spark Streaming,在整个BDAS中进行大规模流式处理。



图1 BDAS软件栈

2. Spark Streaming构架

1)  计算流程

Spark Streaming是将流式计算分解成一系列短小的批处理做业。这里的批处理引擎是Spark,也就是把Spark Streaming的输入数据按照batch size(如1秒)分红一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),而后将Spark Streaming中对DStream的Transformation操做变为针对Spark中对RDD的Transformation操做,将RDD经 过操做变成中间结果保存在内存中。整个流式计算根据业务的需求能够对中间的结果进行叠加,或者存储到外部设备。图2显示了Spark Streaming的整个流程。

 

图2 Spark Streaming构架图

 

2)  容错性

对于流式计算来讲,容错性相当重要。首先咱们要明确一下Spark中RDD的容错机制。每个RDD都是一个不可 变的分布式可重算的数据集,其记录着肯定性的操做继承关系(lineage),因此只要输入数据是可容错的,那么任意一个RDD的分区 (Partition)出错或不可用,都是能够利用原始输入数据经过转换操做而从新算出的。

图3 Spark Streaming中RDD的lineage关系图

对于Spark Streaming来讲,其RDD的传承关系如图3所示,图中的每个椭圆形表示一个RDD,椭圆形中的每一个圆形表明一个RDD中的一个 Partition,图中的每一列的多个RDD表示一个DStream(图中有三个DStream),而每一行最后一个RDD则表示每个Batch Size所产生的中间结果RDD。咱们能够看到图中的每个RDD都是经过lineage相链接的,因为Spark Streaming输入数据能够来自于磁盘,例如HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming会将网络输入数据的每个数据流拷贝两份到其余的机器)都能保证容错性。因此RDD中任意的Partition出错,均可以并行地在其 他机器上将缺失的Partition计算出来。这个容错恢复方式比连续计算模型(如Storm)的效率更高。

3)  实时性

对于实时性的讨论,会牵涉到流式处理框架的应用场景。Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会通过Spark DAG图分解,以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),因此Spark Streaming可以知足除对实时性要求很是高(如高频实时交易)以外的全部流式准实时计算场景。

 

4)  扩展性与吞吐量

Spark目前在EC2上已可以线性扩展到100个节点(每一个节点4Core),能够以数秒的延迟处理6GB/s的数据量(60M records/s),其吞吐量也比流行的Storm高2~5倍,图4是Berkeley利用WordCount和Grep两个用例所作的测试,在Grep这个测试中,Spark Streaming中的每一个节点的吞吐量是670k records/s,而Storm是115k records/s。

图4 Spark Streaming与Storm吞吐量比较图

3. Spark Streaming的编程模型

Spark Streaming的编程和Spark的编程一模一样,对于编程的理解也很是相似。对于Spark来讲,编程就是对于RDD的操做;而对于Spark Streaming来讲,就是对DStream的操做。下面将经过一个你们熟悉的WordCount的例子来讲明Spark Streaming中的输入操做、转换操做和输出操做。

  • Spark Streaming初始化:在开始进行DStream操做以前,须要对Spark Streaming进行初始化生成StreamingContext。参数中比较重要的是第一个和第三个,第一个参数是指定Spark Streaming运行的集群地址,而第三个参数是指定Spark Streaming运行时的batch窗口大小。在这个例子中就是将1秒钟的输入数据进行一次Spark Job处理。

val ssc = new StreamingContext(“Spark://…”, “WordCount”, Seconds(1), [Homes], [Jars])

  •  Spark Streaming的输入操做:目前Spark Streaming已支持了丰富的输入接口,大体分为两类:一类是磁盘输入,如以batch size做为时间间隔监控HDFS文件系统的某个目录,将目录中内容的变化做为Spark Streaming的输入;另外一类就是网络流的方式,目前支持Kafka、Flume、Twitter和TCP socket。在WordCount例子中,假定经过网络socket做为输入流,监听某个特定的端口,最后得出输入DStream(lines)。

val lines = ssc.socketTextStream(“localhost”,8888)

  • Spark Streaming的转换操做:与Spark RDD的操做极为相似,Spark Streaming也就是经过转换操做将一个或多个DStream转换成新的DStream。经常使用的操做包括map、filter、flatmap和 join,以及须要进行shuffle操做的groupByKey/reduceByKey等。在WordCount例子中,咱们首先须要将 DStream(lines)切分红单词,而后将相同单词的数量进行叠加, 最终获得的wordCounts就是每个batch size的(单词,数量)中间结果。 

val words = lines.flatMap(_.split(“ ”))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

另外,Spark Streaming有特定的窗口操做,窗口操做涉及两个参数:一个是滑动窗口的宽度(Window Duration);另外一个是窗口滑动的频率(Slide Duration),这两个参数必须是batch size的倍数。例如以过去5秒钟为一个输入窗口,每1秒统计一下WordCount,那么咱们会将过去5秒钟的每一秒钟的WordCount都进行统计,而后进行叠加,得出这个窗口中的单词统计。 

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s),seconds(1))

 

但上面这种方式还不够高效。若是咱们以增量的方式来计算就更加高效,例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么 咱们能够将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量(如图5所示),这种方法能够复用中间三秒的统 计量,提升统计的效率。

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s),seconds(1))



图5 Spark Streaming中滑动窗口的叠加处理和增量处理

  • Spark Streaming的输入操做:对于输出操做,Spark提供了将数据打印到屏幕及输入到文件中。在WordCount中咱们将DStream wordCounts输入到HDFS文件中。

wordCounts = saveAsHadoopFiles(“WordCount”)

  • Spark Streaming启动:通过上述的操做,Spark Streaming尚未进行工做,咱们还须要调用Start操做,Spark Streaming才开始监听相应的端口,而后收取数据,并进行统计。

ssc.start()

4. Spark Streaming案例分析

在互联网应用中,网站流量统计做为一种经常使用的应用模式,须要在不一样粒度上对不一样数据进行统计,既有实时性的需求,又须要涉及到聚合、去重、链接等较为复杂的统计需求。传统上,如果使用Hadoop MapReduce框架,虽然能够容易地实现较为复杂的统计需求,但实时性却没法获得保证;反之如果采用Storm这样的流式框架,实时性虽能够获得保证,但需求的实现复杂度也大大提升了。Spark Streaming在二者之间找到了一个平衡点,可以以准实时的方式容易地实现较为复杂的统计需求。 下面介绍一下使用Kafka和Spark Streaming搭建实时流量统计框架。

  • 数据暂存:Kafka做为分布式消息队列,既有很是优秀的吞吐量,又有较高的可靠性和扩展性,在这里采用Kafka做为日志传递中间件来接收日志,抓取客户端发送的流量日志,同时接受Spark Streaming的请求,将流量日志按序发送给Spark Streaming集群。
  • 数据处理:将Spark Streaming集群与Kafka集群对接,Spark Streaming从Kafka集群中获取流量日志并进行处理。Spark Streaming会实时地从Kafka集群中获取数据并将其存储在内部的可用内存空间中。当每个batch窗口到来时,便对这些数据进行处理。 
  • 结果存储:为了便于前端展现和页面请求,处理获得的结果将写入到数据库中。

相比于传统的处理框架,Kafka+Spark Streaming的架构有如下几个优势。

  • Spark框架的高效和低延迟保证了Spark Streaming操做的准实时性。
  • 利用Spark框架提供的丰富API和高灵活性,能够精简地写出较为复杂的算法。 
  • 编程模型的高度一导致得上手Spark Streaming至关容易,同时也能够保证业务逻辑在实时处理和批处理上的复用。

在基于Kafka+Spark Streaming的流量统计应用运行过程当中,有时会遇到内存不足、GC阻塞等各类问题。下面介绍一下如何对Spark Streaming应用程序进行调优来减小甚至避免这些问题的影响。

5. 性能调优

1)  优化运行时间

  • 增长并行度。确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操做,增长其并行度以确保更为充分地使用集群资源。
  • 减小数据序列化、反序列化的负担。Spark Streaming默认将接收到的数据序列化后存储以减小内存的使用。但序列化和反序列化须要更多的CPU时间,所以更加高效的序列化方式(Kryo)和自定义的序列化接口能够更高效地使用CPU。 
  • 设置合理的batch窗口。在Spark Streaming中,Job之间有可能存在着依赖关系,后面的Job必须确保前面的Job执行结束后才能提交。若前面的Job执行时间超出了设置的 batch窗口,那么后面的Job就没法按时提交,这样就会进一步拖延接下来的Job,形成后续Job的阻塞。所以,设置一个合理的batch窗口确保 Job可以在这个batch窗口中结束是必须的。 
  • 减小任务提交和分发所带来的负担。一般状况下Akka框架可以高效地确保任务及时分发,但当batch窗口很是小(500ms)时,提交和分发任务的延迟就变得不可接受了。使用Standalone模式和Coarse-grained Mesos模式一般会比使用Fine-Grained Mesos模式有更小的延迟。

2)  优化内存使用

  • 控制batch size。Spark Streaming会把batch窗口内接收到的全部数据存放在Spark内部的可用内存区域中,所以必须确保当前节点Spark的可用内存至少可以容纳这个batch窗口内全部的数据,不然必须增长新的资源以提升集群的处理能力。
  • 及时清理再也不使用的数据。上面说到Spark Streaming会将接收到的数据所有存储于内部的可用内存区域中,所以对于处理过的再也不须要的数据应及时清理以确保Spark Streaming有富余的可用内存空间。经过设置合理的spark.cleaner.ttl时长来及时清理超时的无用数据。 
  • 观察及适当调整GC策略。GC会影响Job的正常运行,延长Job的执行时间,引发一系列不可预料的问题。观察GC的运行状况,采起不一样的GC策略以进一步减少内存回收对Job运行的影响。

6. 总结

Spark Streaming提供了一套高效、可容错的准实时大规模流式处理框架,它能和批处理及即时查询放在同一个软件栈中,下降学习成本。若是你学会了Spark编程,那么也就学会了Spark Streaming编程,若是理解了Spark的调度和存储,那么Spark Streaming也相似。对开源软件感兴趣的读者,咱们能够一块儿贡献社区。目前Spark已在Apache孵化器中。按照目前的发展趋势,Spark Streaming必定将会获得更大范围的使用。 

 

准实时任务(Firm Real-Time Task): 一般是指计算机系统容许任务超时,但若任务超时,该任务的计算结果没有任何意义)

 

12、    Spark on Yarn的实施过程当中遇到的问题[3]

第一个是多生态做业竞争问题,一样是好处也是一个坏处,内存消耗很是多的做业,有可能做业等待好久才能递交上去,比普通的Hadoop更加麻烦,面临着CPU、内存都申请到才能够运行

   第二机器内存性能,若是搭建Spark on Yarn集群,千万不要找内存比较小的搭集群,最好96G或者120G,机器能够相对少一点,好比100台20G搭建小集群,这样花在私人申请、任务调 度、计算时间很是多,最后让Spark性能很是差,既然要跑Spark,就不要珍惜内存了,并且内存价格愈来愈白菜价,用好机器,真正体验Spark给数 据挖掘带来质的飞跃;

  第三个粗粒度的资源预申请,Spark提交的时候,运行算法以前须要把全部算法资源申请到,可能你提交一个算法运 行十个Stage,中间一个须要1T内存,那么做业运行过程仲,要一直占1T内存不放才能成功运行。这个做者会在后面框架作一些改进,可是动做比较大,暂时问题没有办法解决,须要把算法跑须要按照最大资源申请须要的内存以及Core才能够;

  最后一个问题开发人员的内存把控能力,由于 Spark做业目前写起来有两个问题,一个是语法问题,一个对内存把控能力。做为普通的机器学习算法开发人员这方面会比较痛苦一点,由于他们更侧重机器学 习算法,而对内存把控不太好,须要学习一些东西,才能在上边娴熟开发和运行算法。

 

 

 

 

参考:

[1]. Spark:一个高效的分布式计算系统. http://soft.chinabyte.com/database/431/12914931.shtml

[2]. Spark Streaming:大规模流式数据处理的新贵.

http://www.csdn.net/article/2014-01-27/2818282-spark-streaming-big-data

[3]. 淘宝:Spark on Yarn 为"大象"插上翅膀.

http://blog.csdn.net/it_man/article/details/18961067

[4]. Spark 快速理解.

http://blog.csdn.net/colorant/article/details/8255958

相关文章
相关标签/搜索