spark生态圈

 

http://www.cnblogs.com/-wangjiannan/p/3671247.htmlhtml

王健男前端

初步了解Spark生态系统及Spark Streamingjava

1、        场景python

Spark[4]git

Scope:  a MapReduce-like cluster computing framework designed for low-latency iterativejobs and interactive use from an interpreter(在大规模的特定数据集上的迭代运算或重复查询检索)github

正如其目标scopeSpark适用于须要屡次操做特定数据集的应用场合。须要反复操做的次数越多,所需读取的数据量越大,受益越大,数据量小可是计算密集度较大的场合,受益就相对较小。web

 

Spark Streaming(构建在Spark上处理Stream数据的框架)算法

可以知足除对实时性要求很是高(如高频实时交易)以外的全部流式准实时计算场景[2]sql

 

2、    Spark生态系统包括[1]shell

   Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive同样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了HiveAPI来实现query Parsing Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替Hadoop MapReduce。经过配置Shark参数,Shark能够自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark 经过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一块儿,最大化RDD的重复使用。

Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分红小的时间片段(几秒),以相似batch批量处理的方式来处理这小部 分数据。Spark Streaming构建在Spark上,一方面是由于Spark的低延迟执行引擎(100ms+)能够用于实时计算,另外一方面相比基于Record的其它 处理框架(Storm)RDD数据集更容易作高效的容错处理。此外小批量处理的方式使得它能够同时兼容批量和实时数据处理的逻辑和算法。方便了一些需 要历史数据和实时数据联合分析的特定应用场合。

详见10、Spark Streaming ...

   Bagel: Pregel on Spark,能够用Spark进行图计算,这是个很是有用的小项目。Bagel自带了一个例子,实现了GooglePageRank算法。

 

3、    运行模式[1]

   本地模式

   Standalone模式

   Mesoes模式

   yarn模式

4、    SparkHadoop的对比[1]

   Spark的中间数据放到内存中,对于迭代运算效率更高。

  Spark更适合于迭代运算比较多的MLDM运算。由于在Spark里面,有RDD的抽象概念。

   SparkHadoop更通用。

  Spark提供的数据集操做类型有不少种,不像Hadoop只提供了MapReduce两种操做。好比map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操做类型,Spark把这些操做称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操做。

  这些多种多样的数据集操做类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通讯模型再也不像Hadoop那样就是惟一的Data Shuffle一种模式。用户能够命名,物化,控制中间结果的存储、分区等。能够说编程模型比Hadoop更灵活。

  不过因为RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。

   容错性。

  在分布式数据集计算时经过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户能够控制采用哪一种方式来实现容错。

   可用性。

  Spark经过提供丰富的Scala, JavaPython API及交互式Shell来提升可用性。

  SparkHadoop的结合

   Spark能够直接对HDFS进行数据的读写,一样支持Spark on YARNSpark能够与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive彻底兼容。

 

5、    在业界的使用[1]

Spark项目在2009年启动,2010年开源, 如今使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘宝等,豆瓣也在使用Sparkpython克隆版Dpark

 

6、    Spark核心概念[1]

  Resilient Distributed Dataset (RDD)弹性分布数据集

   RDDSpark的最基本抽象,是对分布式内存的抽象使用,实现了以操做本地集合的方式来操做分布式数据集的抽象实现。RDDSpark最核心的东西,它表示已被分区,不可变的并可以被并行操做的数据集合,不一样的数据集格式对应不一样的RDD实现。RDD必须是可序列化的。RDD能够cache到内存 中,每次对RDD数据集的操做以后的结果,均可以存放到内存中,下一个操做能够直接从内存中输入,省去了MapReduce大量的磁盘IO操做。这对于迭代运算比较常见的机器学习算法, 交互式数据挖掘来讲,效率提高比较大。

   RDD的特色:

  它是在集群节点上的不可变的、已分区的集合对象。

  经过并行转换的方式来建立如(map, filter, join, etc)

  失败自动重建。

  能够控制存储级别(内存、磁盘等)来进行重用。

  必须是可序列化的。

  是静态类型的。

   RDD的好处

  RDD只能从持久存储或经过Transformations操做产生,相比于分布式共享内存(DSM)能够更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可从新计算出来,而不须要作特定的Checkpoint

  RDD的不变性,能够实现类Hadoop MapReduce的推测式执行。

  RDD的数据分区特性,能够经过数据的本地性来提升性能,这与Hadoop MapReduce是同样的。

  RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的降低但不会差于如今的MapReduce

   RDD的存储与分区

  用户能够选择不一样的存储级别存储RDD以便重用。

  当前RDD默认是存储于内存,但当内存不足时,RDDspilldisk

  RDD在须要进行分区把数据分布于集群中时会根据每条记录Key进行分区(Hash 分区),以此保证两个数据集在Join时能高效。

   RDD的内部表示

  在RDD的内部实现中每一个RDD均可以使用5个方面的特性来表示:

  分区列表(数据块列表)

  计算每一个分片的函数(根据父RDD计算出此RDD)

  对父RDD的依赖列表

  对key-value RDDPartitioner【可选】

  每一个数据分片的预约义地址列表(HDFS上的数据块的地址)【可选】

   RDD的存储级别

  RDD根据useDiskuseMemorydeserializedreplication四个参数的组合提供了11种存储级别:

  val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) val MEMORY_ONLY_SER = new  StorageLevel(false, true, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)

   RDD定义了各类操做,不一样类型的数据由不一样的RDD类抽象表示,不一样的操做也由RDD进行抽实现。

  RDD的生成

   RDD有两种建立方式:

  1、从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)建立。

  2、从父RDD转换获得新RDD

   下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://...")file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码以下:

  // SparkContext根据文件/目录及可选的分片数建立RDD, 这里咱们能够看到SparkHadoop MapReduce很像 // 须要InputFormat, KeyValue的类型,其实Spark使用的HadoopInputFormat, Writable类型。 def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) .map(pair => pair._2.toString) } // 根据Hadoop配置,及InputFormat等建立HadoopRDD new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)

   RDD进行计算时,RDDHDFS读取数据时与Hadoop MapReduce几乎同样的:

  RDD的转换与操做

   对于RDD能够有两种计算方式:转换(返回值仍是一个RDD)与操做(返回值不是一个RDD)

   转换(Transformations) (如:map, filter, groupBy, join)Transformations操做是Lazy的,也就是说从一个RDD转换生成另外一个RDD的操做不是立刻执行,Spark在遇到 Transformations操做时只会记录须要这样的操做,并不会去执行,须要等到有Actions操做的时候才会真正启动计算过程进行计算。

   操做(Actions) (如:count, collect, save)Actions操做会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。

   下面使用一个例子来示例说明TransformationsActionsSpark的使用。

  val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val rdd_A = sc.textFile(hdfs://.....) val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1)) val rdd_C = sc.textFile(hdfs://.....) val rdd_D = rdd_C.map(line => (line.substring(10), 1)) val rdd_E = rdd_D.reduceByKey((a, b) => a + b) val rdd_F = rdd_B.jion(rdd_E) rdd_F.saveAsSequenceFile(hdfs://....)

 

  Lineage(血统)

   利用内存加快数据加载,在众多的其它的In-Memory数据库Cache类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案 为了保证RDD中数据的鲁棒性,RDD数据集经过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。相比其它系统的细颗粒度的内 存数据更新级别的备份或者LOG机制,RDDLineage记录的是粗颗粒度的特定数据转换(Transformation)操做(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它能够经过Lineage获取足够的信息来从新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限 制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提高。

   RDDLineage依赖方面分为两种Narrow DependenciesWide Dependencies用来解决数据容错的高效性。Narrow Dependencies是指父RDD的每个分区最多被一个子RDD的分区所用 表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子 RDD的多个分区。Wide Dependencies是指子RDD的分区依赖于父RDD的多个分区或全部分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。对与 Wide Dependencies,这种计算的输入和输出在不一样的节点上,lineage方法对与输入节点无缺,而输出节点宕机时,经过从新计算,这种状况下,这 种方法容错是有效的,不然无效,由于没法重试,须要向上其祖先追溯看是否能够重试(这就是lineage,血统的意思)Narrow Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。

 

7、    容错[1]

   RDD计算,经过checkpint进行容错,作checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户能够控制采用哪一种方式来实现容错,默认是logging the updates方式,经过记录跟踪全部生成RDD的转换(transformations)也就是记录每一个RDDlineage(血统)来从新计算生成 丢失的分区数据。

 

8、    资源管理与做业调度[1]

   Spark对于资源管理与做业调度可使用Standalone(独立模式)Apache MesosHadoop YARN来实现。 Spark on YarnSpark0.6时引用,但真正可用是在如今的branch-0.8版本。Spark on Yarn遵循YARN的官方规范实现,得益于Spark天生支持多种SchedulerExecutor的良好设计,对YARN的支持也就很是容 易,Spark on Yarn的大体框架图。

 

Spark运行于YARN上与Hadoop共用集群资源能够提升资源利用率。

 

9、    编程接口[1]

   Spark经过与编程语言集成的方式暴露RDD的操做,相似于DryadLINQFlumeJava,每一个数据集都表示为RDD对象,对数据集的操做就 表示成对RDD对象的操做。Spark主要的编程语言是Scala,选择Scala是由于它的简洁性(Scala能够很方便在交互式下使用)和性能 (JVM上的静态强类型语言)

   SparkHadoop MapReduce相似,由Master(相似于MapReduceJobtracker)Workers(SparkSlave工做节点)组成。 用户编写的Spark程序被称为Driver程序,Dirver程序会链接master并定义了对各RDD的转换与操做,而对RDD的转换与操做经过 Scala闭包(字面量函数)来表示,Scala使用Java对象来表示闭包且都是可序列化的,以此把对RDD的闭包操做发送到各Workers节点。 Workers存储着数据分块和享有集群内存,是运行在工做节点上的守护进程,当它收到对RDD的操做时,根据数据分片信息进行本地化数据操做,生成新的 数据分片、返回结果或把RDD写入存储系统。

    Scala

  Spark使用Scala开发,默认使用Scala做为编程语言。编写Spark程序比编写Hadoop MapReduce程序要简单的多,SparK提供了Spark-Shell,能够在Spark-Shell测试程序。写SparK程序的通常步骤就是创 建或使用(SparkContext)实例,使用SparkContext建立RDD,而后就是对RDD进行操做。如:

  val sc = new SparkContext(master, appName, [sparkHome], [jars]) val textFile = sc.textFile("hdfs://.....") textFile.map(....).filter(.....).....

  Java

   Spark支持Java编程,但对于使用Java就没有了Spark-Shell这样方便的工具,其它与Scala编程是同样的,由于都是JVM上的语言,ScalaJava能够互操做,Java编程接口其实就是对Scala封装。如:

  JavaSparkContext sc = new JavaSparkContext(...); JavaRDD lines = ctx.textFile("hdfs://..."); JavaRDD words = lines.flatMap( new FlatMapFunction() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } );

  Python

   如今Spark也提供了Python编程接口,Spark使用py4j来实现pythonjava的互操做,从而实现使用python编写Spark 序。Spark也一样提供了pyspark,一个Sparkpython shell,能够以交互式的方式使用Python编写Spark程序。 如:

from pyspark import SparkContext sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) words = sc.textFile("/usr/share/dict/words") words.filter(lambda w: w.startswith("spar")).take(5)

 

10、    使用示例[1]

  Standalone模式

   为方便Spark的推广使用,Spark提供了Standalone模式,Spark一开始就设计运行于Apache Mesos资源管理框架上,这是很是好的设计,可是却带了部署测试的复杂性。为了让Spark能更方便的部署和尝试,Spark所以提供了 Standalone运行模式,它由一个Spark Master和多个Spark worker组成,与Hadoop MapReduce1很类似,就连集群启动方式都几乎是同样。

   Standalone模式运行Spark集群

  下载Scala2.9.3,并配置SCALA_HOME

  下载Spark代码(可使用源码编译也能够下载编译好的版本)这里下载 编译好的版本(http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz)

  解压spark-0.7.3-prebuilt-cdh4.tgz安装包

  修改配置(conf/*) slaves: 配置工做节点的主机名 spark-env.sh:配置环境变量。

  SCALA_HOME=/home/spark/scala-2.9.3 JAVA_HOME=/home/spark/jdk1.6.0_45 SPARK_MASTER_IP=spark1 SPARK_MASTER_PORT=30111 SPARK_MASTER_WEBUI_PORT=30118 SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g SPARK_WORKER_PORT=30333 SPARK_WORKER_WEBUI_PORT=30119 SPARK_WORKER_INSTANCES=1

   Hadoop配置copyconf目录下

   master主机上对其它机器作ssh无密码登陆

   把配置好的Spark程序使用scp copy到其它机器

   master启动集群

  $SPARK_HOME/start-all.sh

  yarn模式

   Spark-shell如今还不支持Yarn模式,使用Yarn模式运行,须要把Spark程序所有打包成一个jar包提交到Yarn上运行。目录只有branch-0.8版本才真正支持Yarn

   Yarn模式运行Spark

  下载Spark代码.

  git clone git://github.com/mesos/spark

   切换到branch-0.8

  cd spark git checkout -b yarn --track origin/yarn

   使用sbt编译Spark

  $SPARK_HOME/sbt/sbt > package > assembly

   Hadoop yarn配置copyconf目录下

   运行测试

  SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0- SNAPSHOT.jar \ ./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \ --class spark.examples.SparkPi --args yarn-standalone

  使用Spark-shell

   Spark-shell使用很简单,当SparkStandalon模式运行后,使用$SPARK_HOME/spark-shell进入shell 可,在Spark-shellSparkContext已经建立好了,实例名为sc能够直接使用,还有一个须要注意的是,在Standalone模式 下,Spark默认使用的调度器的FIFO调度器而不是公平调度,而Spark-shell做为一个Spark程序一直运行在Spark上,其它的 Spark程序就只能排队等待,也就是说同一时间只能有一个Spark-shell在运行。

   Spark-shell上写程序很是简单,就像在Scala Shell上写程序同样。

  scala> val textFile = sc.textFile("hdfs://hadoop1:2323/user/data") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 scala> textFile.count() // Number of items in this RDD res0: Long = 21374 scala> textFile.first() // First item in this RDD res1: String = # Spark

  编写Driver程序

   SparkSpark程序称为Driver程序,编写Driver程序很简单几乎与在Spark-shell上写程序是同样的,不一样的地方就是SparkContext须要本身建立。如WorkCount程序以下:

import spark.SparkContext import SparkContext._ object WordCount { def main(args: Array[String]) { if (args.length ==0 ){ println("usage is org.test.WordCount ") } println("the args: ") args.foreach(println) val hdfsPath = "hdfs://hadoop1:8020" // create the SparkContext args(0)yarn传入appMaster地址 val sc = new SparkContext(args(0), "WrodCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val textFile = sc.textFile(hdfsPath + args(1)) val result = textFile.flatMap(line => line.split("\\s+")) .map(word => (word, 1)).reduceByKey(_ + _) result.saveAsTextFile(hdfsPath + args(2)) } }

 

 

11、    Spark Streaming:大规模流式数据处理的新贵[2]

1. 前言

提到Spark Streaming,咱们不得不说一下BDASBerkeley Data Analytics Stack),这个伯克利大学提出的关于数据分析的软件栈。从它的视角来看,目前的大数据处理能够分为如如下三个类型。

·         复杂的批量数据处理(batch data processing),一般的时间跨度在数十分钟到数小时之间。

·         基于历史数据的交互式查询(interactive query),一般的时间跨度在数十秒到数分钟之间。

·         基于实时数据流的数据处理(streaming data processing),一般的时间跨度在数百毫秒到数秒之间。

目前已有不少相对成熟的开源软件来处理以上三种情景,咱们能够利用MapReduce来进行批量数据处理,能够用Impala来进行交互式查询,对于流式数据处理,咱们能够采用Storm。对于大多数互联网公司来讲,通常都会同时遇到以上三种情景,那么在使用的过程当中这些公司可能会遇到以下的不便。

·         三种情景的输入输出数据没法无缝共享,须要进行格式相互转换。

·         每个开源软件都须要一个开发和维护团队,提升了成本。

·         在同一个集群中对各个系统协调资源分配比较困难。

BDAS就是以Spark为基础的一套软件栈,利用基于内存的通用计算模型将以上三种情景一网打尽,同时支持Batch InteractiveStreaming的处理,且兼容支持HDFSS3等分布式文件系统,能够部署在YARNMesos等流行的集群资源管理器 之上。BDAS的构架如图1所示,其中Spark能够替代MapReduce进行批处理,利用其基于内存的特色,特别擅长迭代式和交互式数据处理;Shark处理大规模数据的SQL查询,兼容HiveHQL。本文要重点介绍的Spark Streaming,在整个BDAS中进行大规模流式处理。



1 BDAS软件栈

2. Spark Streaming构架

1)  计算流程

Spark Streaming是将流式计算分解成一系列短小的批处理做业。这里的批处理引擎是Spark,也就是把Spark Streaming的输入数据按照batch size(如1秒)分红一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDDResilient Distributed Dataset),而后将Spark Streaming中对DStreamTransformation操做变为针对Spark中对RDDTransformation操做,将RDD 过操做变成中间结果保存在内存中。整个流式计算根据业务的需求能够对中间的结果进行叠加,或者存储到外部设备。图2显示了Spark Streaming的整个流程。

 

2 Spark Streaming构架图

 

2)  容错性

对于流式计算来讲,容错性相当重要。首先咱们要明确一下SparkRDD的容错机制。每个RDD都是一个不可 变的分布式可重算的数据集,其记录着肯定性的操做继承关系(lineage),因此只要输入数据是可容错的,那么任意一个RDD的分区 Partition)出错或不可用,都是能够利用原始输入数据经过转换操做而从新算出的。

3 Spark StreamingRDDlineage关系图

对于Spark Streaming来讲,其RDD的传承关系如图3所示,图中的每个椭圆形表示一个RDD,椭圆形中的每一个圆形表明一个RDD中的一个 Partition,图中的每一列的多个RDD表示一个DStream(图中有三个DStream),而每一行最后一个RDD则表示每个Batch Size所产生的中间结果RDD。咱们能够看到图中的每个RDD都是经过lineage相链接的,因为Spark Streaming输入数据能够来自于磁盘,例如HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming会将网络输入数据的每个数据流拷贝两份到其余的机器)都能保证容错性。因此RDD中任意的Partition出错,均可以并行地在其 他机器上将缺失的Partition计算出来。这个容错恢复方式比连续计算模型(如Storm)的效率更高。

3)  实时性

对于实时性的讨论,会牵涉到流式处理框架的应用场景。Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会通过Spark DAG图分解,以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),因此Spark Streaming可以知足除对实时性要求很是高(如高频实时交易)以外的全部流式准实时计算场景。

 

4)  扩展性与吞吐量

Spark目前在EC2上已可以线性扩展到100个节点(每一个节点4Core),能够以数秒的延迟处理6GB/s的数据量(60M records/s),其吞吐量也比流行的Storm25倍,图4Berkeley利用WordCountGrep两个用例所作的测试,在Grep这个测试中,Spark Streaming中的每一个节点的吞吐量是670k records/s,而Storm115k records/s

4 Spark StreamingStorm吞吐量比较图

3. Spark Streaming的编程模型

Spark Streaming的编程和Spark的编程一模一样,对于编程的理解也很是相似。对于Spark来讲,编程就是对于RDD的操做;而对于Spark Streaming来讲,就是对DStream的操做。下面将经过一个你们熟悉的WordCount的例子来讲明Spark Streaming中的输入操做、转换操做和输出操做。

·         Spark Streaming初始化:在开始进行DStream操做以前,须要对Spark Streaming进行初始化生成StreamingContext。参数中比较重要的是第一个和第三个,第一个参数是指定Spark Streaming运行的集群地址,而第三个参数是指定Spark Streaming运行时的batch窗口大小。在这个例子中就是将1秒钟的输入数据进行一次Spark Job处理。

val ssc = new StreamingContext(“Spark://…”, “WordCount”, Seconds(1), [Homes], [Jars])

·          Spark Streaming的输入操做:目前Spark Streaming已支持了丰富的输入接口,大体分为两类:一类是磁盘输入,如以batch size做为时间间隔监控HDFS文件系统的某个目录,将目录中内容的变化做为Spark Streaming的输入;另外一类就是网络流的方式,目前支持KafkaFlumeTwitterTCP socket。在WordCount例子中,假定经过网络socket做为输入流,监听某个特定的端口,最后得出输入DStreamlines)。

val lines = ssc.socketTextStream(“localhost”,8888)

·         Spark Streaming的转换操做:与Spark RDD的操做极为相似,Spark Streaming也就是经过转换操做将一个或多个DStream转换成新的DStream。经常使用的操做包括mapfilterflatmap join,以及须要进行shuffle操做的groupByKey/reduceByKey等。在WordCount例子中,咱们首先须要将 DStream(lines)切分红单词,而后将相同单词的数量进行叠加, 最终获得的wordCounts就是每个batch size的(单词,数量)中间结果。 

val words = lines.flatMap(_.split(“ ”))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

另外,Spark Streaming有特定的窗口操做,窗口操做涉及两个参数:一个是滑动窗口的宽度(Window Duration);另外一个是窗口滑动的频率(Slide Duration),这两个参数必须是batch size的倍数。例如以过去5秒钟为一个输入窗口,每1秒统计一下WordCount,那么咱们会将过去5秒钟的每一秒钟的WordCount都进行统计,而后进行叠加,得出这个窗口中的单词统计。 

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s)seconds(1))

 

但上面这种方式还不够高效。若是咱们以增量的方式来计算就更加高效,例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么 咱们能够将t+3时刻过去5秒的统计量加上[t+3t+4]的统计量,在减去[t-2t-1]的统计量(如图5所示),这种方法能够复用中间三秒的统 计量,提升统计的效率。

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s)seconds(1))



5 Spark Streaming中滑动窗口的叠加处理和增量处理

·         Spark Streaming的输入操做:对于输出操做,Spark提供了将数据打印到屏幕及输入到文件中。在WordCount中咱们将DStream wordCounts输入到HDFS文件中。

wordCounts = saveAsHadoopFiles(“WordCount”)

·         Spark Streaming启动:通过上述的操做,Spark Streaming尚未进行工做,咱们还须要调用Start操做,Spark Streaming才开始监听相应的端口,而后收取数据,并进行统计。

ssc.start()

4. Spark Streaming案例分析

在互联网应用中,网站流量统计做为一种经常使用的应用模式,须要在不一样粒度上对不一样数据进行统计,既有实时性的需求,又须要涉及到聚合、去重、链接等较为复杂的统计需求。传统上,如果使用Hadoop MapReduce框架,虽然能够容易地实现较为复杂的统计需求,但实时性却没法获得保证;反之如果采用Storm这样的流式框架,实时性虽能够获得保证,但需求的实现复杂度也大大提升了。Spark Streaming在二者之间找到了一个平衡点,可以以准实时的方式容易地实现较为复杂的统计需求。 下面介绍一下使用KafkaSpark Streaming搭建实时流量统计框架。

·         数据暂存:Kafka做为分布式消息队列,既有很是优秀的吞吐量,又有较高的可靠性和扩展性,在这里采用Kafka做为日志传递中间件来接收日志,抓取客户端发送的流量日志,同时接受Spark Streaming的请求,将流量日志按序发送给Spark Streaming集群。

·         数据处理:将Spark Streaming集群与Kafka集群对接,Spark StreamingKafka集群中获取流量日志并进行处理。Spark Streaming会实时地从Kafka集群中获取数据并将其存储在内部的可用内存空间中。当每个batch窗口到来时,便对这些数据进行处理。 

·         结果存储:为了便于前端展现和页面请求,处理获得的结果将写入到数据库中。

相比于传统的处理框架,Kafka+Spark Streaming的架构有如下几个优势。

·         Spark框架的高效和低延迟保证了Spark Streaming操做的准实时性。

·         利用Spark框架提供的丰富API和高灵活性,能够精简地写出较为复杂的算法。 

·         编程模型的高度一导致得上手Spark Streaming至关容易,同时也能够保证业务逻辑在实时处理和批处理上的复用。

在基于Kafka+Spark Streaming的流量统计应用运行过程当中,有时会遇到内存不足、GC阻塞等各类问题。下面介绍一下如何对Spark Streaming应用程序进行调优来减小甚至避免这些问题的影响。

5. 性能调优

1)  优化运行时间

·         增长并行度。确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操做,增长其并行度以确保更为充分地使用集群资源。

·         减小数据序列化、反序列化的负担。Spark Streaming默认将接收到的数据序列化后存储以减小内存的使用。但序列化和反序列化须要更多的CPU时间,所以更加高效的序列化方式(Kryo)和自定义的序列化接口能够更高效地使用CPU 

·         设置合理的batch窗口。在Spark Streaming中,Job之间有可能存在着依赖关系,后面的Job必须确保前面的Job执行结束后才能提交。若前面的Job执行时间超出了设置的 batch窗口,那么后面的Job就没法按时提交,这样就会进一步拖延接下来的Job,形成后续Job的阻塞。所以,设置一个合理的batch窗口确保 Job可以在这个batch窗口中结束是必须的。 

·         减小任务提交和分发所带来的负担。一般状况下Akka框架可以高效地确保任务及时分发,但当batch窗口很是小(500ms)时,提交和分发任务的延迟就变得不可接受了。使用Standalone模式和Coarse-grained Mesos模式一般会比使用Fine-Grained Mesos模式有更小的延迟。

2)  优化内存使用

·         控制batch sizeSpark Streaming会把batch窗口内接收到的全部数据存放在Spark内部的可用内存区域中,所以必须确保当前节点Spark的可用内存至少可以容纳这个batch窗口内全部的数据,不然必须增长新的资源以提升集群的处理能力。

·         及时清理再也不使用的数据。上面说到Spark Streaming会将接收到的数据所有存储于内部的可用内存区域中,所以对于处理过的再也不须要的数据应及时清理以确保Spark Streaming有富余的可用内存空间。经过设置合理的spark.cleaner.ttl时长来及时清理超时的无用数据。 

·         观察及适当调整GC策略。GC会影响Job的正常运行,延长Job的执行时间,引发一系列不可预料的问题。观察GC的运行状况,采起不一样的GC策略以进一步减少内存回收对Job运行的影响。

6. 总结

Spark Streaming提供了一套高效、可容错的准实时大规模流式处理框架,它能和批处理及即时查询放在同一个软件栈中,下降学习成本。若是你学会了Spark编程,那么也就学会了Spark Streaming编程,若是理解了Spark的调度和存储,那么Spark Streaming也相似。对开源软件感兴趣的读者,咱们能够一块儿贡献社区。目前Spark已在Apache孵化器中。按照目前的发展趋势,Spark Streaming必定将会获得更大范围的使用。 

 

准实时任务(Firm Real-Time Task): 一般是指计算机系统容许任务超时,但若任务超时,该任务的计算结果没有任何意义)

 

12、    Spark on Yarn的实施过程当中遇到的问题[3]

第一个是多生态做业竞争问题,一样是好处也是一个坏处,内存消耗很是多的做业,有可能做业等待好久才能递交上去,比普通的Hadoop更加麻烦,面临着CPU、内存都申请到才能够运行

   第二机器内存性能,若是搭建Spark on Yarn集群,千万不要找内存比较小的搭集群,最好96G或者120G,机器能够相对少一点,好比10020G搭建小集群,这样花在私人申请、任务调 度、计算时间很是多,最后让Spark性能很是差,既然要跑Spark,就不要珍惜内存了,并且内存价格愈来愈白菜价,用好机器,真正体验Spark给数 据挖掘带来质的飞跃;

  第三个粗粒度的资源预申请,Spark提交的时候,运行算法以前须要把全部算法资源申请到,可能你提交一个算法运 行十个Stage,中间一个须要1T内存,那么做业运行过程仲,要一直占1T内存不放才能成功运行。这个做者会在后面框架作一些改进,可是动做比较大,暂时问题没有办法解决,须要把算法跑须要按照最大资源申请须要的内存以及Core才能够;

  最后一个问题开发人员的内存把控能力,由于 Spark做业目前写起来有两个问题,一个是语法问题,一个对内存把控能力。做为普通的机器学习算法开发人员这方面会比较痛苦一点,由于他们更侧重机器学 习算法,而对内存把控不太好,须要学习一些东西,才能在上边娴熟开发和运行算法。

 

 

 

 

 

 

 

 

 

 

http://www.cnblogs.com/shishanyuan/p/4700615.html

 

Spark生态圈也称为BDAS(伯克利数据分析栈),是伯克利APMLab实验室打造的,力图在算法(Algorithms)、机器(Machines)、人(People)之间经过大规模集成来展示大数据应用的一个平台。伯克利AMPLab运用大数据、云计算、通讯等各类资源以及各类灵活的技术方案,对海量不透明的数据进行甄别并转化为有用的信息,以供人们更好的理解世界。该生态圈已经涉及到机器学习、数据挖掘、数据库、信息检索、天然语言处理和语音识别等多个领域。

Spark生态圈以Spark Core为核心,从HDFS、Amazon S3和HBase等持久层读取数据,以MESS、YARN和自身携带的Standalone为资源管理器调度Job完成Spark应用程序的计算。 这些应用程序能够来自于不一样的组件,如Spark Shell/Spark Submit的批处理、Spark Streaming的实时处理应用、Spark SQL的即席查询、BlinkDB的权衡查询、MLlib/MLbase的机器学习、GraphX的图处理和SparkR的数学计算等等。

2.1 Spark Core

前面介绍了Spark Core的基本状况,如下总结一下Spark内核架构:

l  提供了有向无环图(DAG)的分布式并行计算框架,并提供Cache机制来支持屡次迭代计算或者数据共享,大大减小迭代计算之间读取数据局的开销,这对于须要进行屡次迭代的数据挖掘和分析性能有很大提高

l  在Spark中引入了RDD (Resilient Distributed Dataset) 的抽象,它是分布在一组节点中的只读对象集合,这些集合是弹性的,若是数据集一部分丢失,则能够根据“血统”对它们进行重建,保证了数据的高容错性;

l  移动计算而非移动数据,RDD Partition能够就近读取分布式文件系统中的数据块到各个节点内存中进行计算

l  使用多线程池模型来减小task启动开稍

l  采用容错的、高可伸缩性的akka做为通信框架

2.2 SparkStreaming

SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,能够对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)进行相似Map、Reduce和Join等复杂操做,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。

Spark Streaming构架

l计算流程:Spark Streaming是将流式计算分解成一系列短小的批处理做业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分红一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),而后将Spark Streaming中对DStream的Transformation操做变为针对Spark中对RDD的Transformation操做,将RDD通过操做变成中间结果保存在内存中。整个流式计算根据业务的需求能够对中间的结果进行叠加或者存储到外部设备。下图显示了Spark Streaming的整个流程。

图Spark Streaming构架

l容错性:对于流式计算来讲,容错性相当重要。首先咱们要明确一下Spark中RDD的容错机制。每个RDD都是一个不可变的分布式可重算的数据集,其记录着肯定性的操做继承关系(lineage),因此只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是能够利用原始输入数据经过转换操做而从新算出的。  

对于Spark Streaming来讲,其RDD的传承关系以下图所示,图中的每个椭圆形表示一个RDD,椭圆形中的每一个圆形表明一个RDD中的一个Partition,图中的每一列的多个RDD表示一个DStream(图中有三个DStream),而每一行最后一个RDD则表示每个Batch Size所产生的中间结果RDD。咱们能够看到图中的每个RDD都是经过lineage相链接的,因为Spark Streaming输入数据能够来自于磁盘,例如HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming会将网络输入数据的每个数据流拷贝两份到其余的机器)都能保证容错性,因此RDD中任意的Partition出错,均可以并行地在其余机器上将缺失的Partition计算出来。这个容错恢复方式比连续计算模型(如Storm)的效率更高。

Spark Streaming中RDD的lineage关系图

l实时性:对于实时性的讨论,会牵涉到流式处理框架的应用场景。Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会通过Spark DAG图分解以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),因此Spark Streaming可以知足除对实时性要求很是高(如高频实时交易)以外的全部流式准实时计算场景。

l扩展性与吞吐量:Spark目前在EC2上已可以线性扩展到100个节点(每一个节点4Core),能够以数秒的延迟处理6GB/s的数据量(60M records/s),其吞吐量也比流行的Storm高2~5倍,图4是Berkeley利用WordCount和Grep两个用例所作的测试,在Grep这个测试中,Spark Streaming中的每一个节点的吞吐量是670k records/s,而Storm是115k records/s。

Spark Streaming与Storm吞吐量比较图

2.3 Spark SQL

Shark是SparkSQL的前身,它发布于3年前,那个时候Hive能够说是SQL on Hadoop的惟一选择,负责将SQL编译成可扩展的MapReduce做业,鉴于Hive的性能以及与Spark的兼容,Shark项目由此而生。

Shark即Hive on Spark,本质上是经过Hive的HQL解析,把HQL翻译成Spark上的RDD操做,而后经过Hive的metadata获取数据库里的表信息,实际HDFS上的数据和文件,会由Shark获取并放到Spark上运算。Shark的最大特性就是快和与Hive的彻底兼容,且能够在shell模式下使用rdd2sql()这样的API,把HQL获得的结果集,继续在scala环境下运算,支持本身编写简单的机器学习或简单分析处理函数,对HQL结果进一步分析计算。

在2014年7月1日的Spark Summit上,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上。Databricks表示,Spark SQL将涵盖Shark的全部特性,用户能够从Shark 0.9进行无缝的升级。在会议上,Databricks表示,Shark更可能是对Hive的改造,替换了Hive的物理执行引擎,所以会有一个很快的速度。然而,不容忽视的是,Shark继承了大量的Hive代码,所以给优化和维护带来了大量的麻烦。随着性能优化和先进分析整合的进一步加深,基于MapReduce设计的部分无疑成为了整个项目的瓶颈。所以,为了更好的发展,给用户提供一个更好的体验,Databricks宣布终止Shark项目,从而将更多的精力放到Spark SQL上。

Spark SQL容许开发人员直接处理RDD,同时也可查询例如在 Apache Hive上存在的外部数据。Spark SQL的一个重要特色是其可以统一处理关系表和RDD,使得开发人员能够轻松地使用SQL命令进行外部查询,同时进行更复杂的数据分析。除了Spark SQL外,Michael还谈到Catalyst优化框架,它容许Spark SQL自动修改查询方案,使SQL更有效地执行。

还有Shark的做者是来自中国的博士生辛湜(Reynold Xin),也是Spark的核心成员,具体信息能够看他的专访http://www.csdn.net/article/2013-04-26/2815057-Spark-Reynold

Spark SQL的特色:

l引入了新的RDD类型SchemaRDD,能够象传统数据库定义表同样来定义SchemaRDD,SchemaRDD由定义了列数据类型的行对象构成。SchemaRDD能够从RDD转换过来,也能够从Parquet文件读入,也可使用HiveQL从Hive中获取。

l内嵌了Catalyst查询优化框架,在把SQL解析成逻辑执行计划以后,利用Catalyst包里的一些类和接口,执行了一些简单的执行计划优化,最后变成RDD的计算

l在应用程序中能够混合使用不一样来源的数据,如能够未来自HiveQL的数据和来自SQL的数据进行Join操做。

Shark的出现使得SQL-on-Hadoop的性能比Hive有了10-100倍的提升,  那么,摆脱了Hive的限制,SparkSQL的性能又有怎么样的表现呢?虽然没有Shark相对于Hive那样瞩目地性能提高,但也表现得很是优异,以下图所示:

为何sparkSQL的性能会获得怎么大的提高呢?主要sparkSQL在下面几点作了优化:

1. 内存列存储(In-Memory Columnar Storage) sparkSQL的表数据在内存中存储不是采用原生态的JVM对象存储方式,而是采用内存列存储;

2. 字节码生成技术(Bytecode Generation) Spark1.1.0在Catalyst模块的expressions增长了codegen模块,使用动态字节码生成技术,对匹配的表达式采用特定的代码动态编译。另外对SQL表达式都做了CG优化, CG优化的实现主要仍是依靠Scala2.10的运行时放射机制(runtime reflection);

3. Scala代码优化 SparkSQL在使用Scala编写代码的时候,尽可能避免低效的、容易GC的代码;尽管增长了编写代码的难度,但对于用户来讲接口统一。

2.4 BlinkDB

BlinkDB 是一个用于在海量数据上运行交互式 SQL 查询的大规模并行查询引擎,它容许用户经过权衡数据精度来提高查询响应时间,其数据的精度被控制在容许的偏差范围内。为了达到这个目标,BlinkDB 使用两个核心思想:

l一个自适应优化框架,从原始数据随着时间的推移创建并维护一组多维样本;

l一个动态样本选择策略,选择一个适当大小的示例基于查询的准确性和(或)响应时间需求。

和传统关系型数据库不一样,BlinkDB是一个颇有意思的交互式查询系统,就像一个跷跷板,用户须要在查询精度和查询时间上作一权衡;若是用户想更快地获取查询结果,那么将牺牲查询结果的精度;一样的,用户若是想获取更高精度的查询结果,就须要牺牲查询响应时间。用户能够在查询的时候定义一个失误边界。

2.5  MLBase/MLlib

MLBase是Spark生态圈的一部分专一于机器学习,让机器学习的门槛更低,让一些可能并不了解机器学习的用户也能方便地使用MLbase。MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。

l  ML Optimizer会选择它认为最适合的已经在内部实现好了的机器学习算法和相关参数,来处理用户输入的数据,并返回模型或别的帮助分析的结果;

l  MLI 是一个进行特征抽取和高级ML编程抽象的算法实现的API或平台;

l MLlib是Spark实现一些常见的机器学习算法和实用程序,包括分类、回归、聚类、协同过滤、降维以及底层优化,该算法能够进行可扩充; MLRuntime 基于Spark计算框架,将Spark的分布式计算应用到机器学习领域。

总的来讲,MLBase的核心是他的优化器,把声明式的Task转化成复杂的学习计划,产出最优的模型和计算结果。与其余机器学习Weka和Mahout不一样的是:

l  MLBase是分布式的,Weka是一个单机的系统;

l  MLBase是自动化的,Weka和Mahout都须要使用者具有机器学习技能,来选择本身想要的算法和参数来作处理;

l  MLBase提供了不一样抽象程度的接口,让算法能够扩充

l  MLBase基于Spark这个平台

2.6 GraphX

GraphX是Spark中用于图(e.g., Web-Graphs and Social Networks)和图并行计算(e.g., PageRank and Collaborative Filtering)的API,能够认为是GraphLab(C++)和Pregel(C++)在Spark(Scala)上的重写及优化,跟其余分布式图计算框架相比,GraphX最大的贡献是,在Spark之上提供一栈式数据解决方案,能够方便且高效地完成图计算的一整套流水做业。GraphX最早是伯克利AMPLAB的一个分布式图计算框架项目,后来整合到Spark中成为一个核心组件。

GraphX的核心抽象是Resilient Distributed Property Graph,一种点和边都带属性的有向多重图。它扩展了Spark RDD的抽象,有Table和Graph两种视图,而只须要一份物理存储。两种视图都有本身独有的操做符,从而得到了灵活操做和执行效率。如同Spark,GraphX的代码很是简洁。GraphX的核心代码只有3千多行,而在此之上实现的Pregel模型,只要短短的20多行。GraphX的代码结构总体下图所示,其中大部分的实现,都是围绕Partition的优化进行的。这在某种程度上说明了点分割的存储和相应的计算优化的确是图计算框架的重点和难点。

GraphX的底层设计有如下几个关键点。

1.对Graph视图的全部操做,最终都会转换成其关联的Table视图的RDD操做来完成。这样对一个图的计算,最终在逻辑上,等价于一系列RDD的转换过程。所以,Graph最终具有了RDD的3个关键特性:Immutable、Distributed和Fault-Tolerant。其中最关键的是Immutable(不变性)。逻辑上,全部图的转换和操做都产生了一个新图;物理上,GraphX会有必定程度的不变顶点和边的复用优化,对用户透明。

2.两种视图底层共用的物理数据,由RDD[Vertex-Partition]和RDD[EdgePartition]这两个RDD组成。点和边实际都不是以表Collection[tuple]的形式存储的,而是由VertexPartition/EdgePartition在内部存储一个带索引结构的分片数据块,以加速不一样视图下的遍历速度。不变的索引结构在RDD转换过程当中是共用的,下降了计算和存储开销。

3.图的分布式存储采用点分割模式,并且使用partitionBy方法,由用户指定不一样的划分策略(PartitionStrategy)。划分策略会将边分配到各个EdgePartition,顶点Master分配到各个VertexPartition,EdgePartition也会缓存本地边关联点的Ghost副本。划分策略的不一样会影响到所须要缓存的Ghost副本数量,以及每一个EdgePartition分配的边的均衡程度,须要根据图的结构特征选取最佳策略。目前有EdgePartition2d、EdgePartition1d、RandomVertexCut和CanonicalRandomVertexCut这四种策略。在淘宝大部分场景下,EdgePartition2d效果最好。

2.7 SparkR

SparkR是AMPLab发布的一个R开发包,使得R摆脱单机运行的命运,能够做为Spark的job运行在集群上,极大得扩展了R的数据处理能力。

SparkR的几个特性:

l  提供了Spark中弹性分布式数据集(RDD)的API,用户能够在集群上经过R shell交互性的运行Spark job。

l  支持序化闭包功能,能够将用户定义函数中所引用到的变量自动序化发送到集群中其余的机器上。

l  SparkR还能够很容易地调用R开发包,只须要在集群上执行操做前用includePackage读取R开发包就能够了,固然集群上要安装R开发包。

2.8  Tachyon

Tachyon是一个高容错的分布式文件系统,容许文件之内存的速度在集群框架中进行可靠的共享,就像Spark和MapReduce那样。经过利用信息继承,内存侵入,Tachyon得到了高性能。Tachyon工做集文件缓存在内存中,而且让不一样的 Jobs/Queries以及框架都能内存的速度来访问缓存文件”。所以,Tachyon能够减小那些须要常用的数据集经过访问磁盘来得到的次数。Tachyon兼容Hadoop,现有的Spark和MR程序不须要任何修改而运行。

在2013年4月,AMPLab共享了其Tachyon 0.2.0 Alpha版本的Tachyon,其宣称性能为HDFS的300倍,继而受到了极大的关注。Tachyon的几个特性以下:

lJAVA-Like File API

Tachyon提供相似JAVA File类的API,

l兼容性

Tachyon实现了HDFS接口,因此Spark和MR程序不须要任何修改便可运行。

l可插拔的底层文件系统

Tachyon是一个可插拔的底层文件系统,提供容错功能。tachyon将内存数据记录在底层文件系统。它有一个通用的接口,使得能够很容易的插入到不一样的底层文件系统。目前支持HDFS,S3,GlusterFS和单节点的本地文件系统,之后将支持更多的文件系统。

相关文章
相关标签/搜索