【转】Spark:一个高效的分布式计算系统

原文地址:http://tech.uc.cn/?p=2116

概述

什么是Spark

  • Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具备的优势;但不一样于MapReduce的是Job中间输出和结果能够保存在内存中,从而再也不须要读写HDFS,所以Spark能更好地适用于数据挖掘与机器学习等须要迭代的map reduce的算法。其架构以下图所示:

spark-framwork

Spark与Hadoop的对比

  • Spark的中间数据放到内存中,对于迭代运算效率更高。
    • Spark更适合于迭代运算比较多的ML和DM运算。由于在Spark里面,有RDD的抽象概念。
  • Spark比Hadoop更通用。
    • Spark提供的数据集操做类型有不少种,不像Hadoop只提供了Map和Reduce两种操做。好比mapfilterflatMapsamplegroupByKeyreduceByKeyunionjoincogroup,mapValuessort,partionBy等多种操做类型,Spark把这些操做称为Transformations。同时还提供Countcollectreducelookupsave等多种actions操做。
    • 这些多种多样的数据集操做类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通讯模型再也不像Hadoop那样就是惟一的Data Shuffle一种模式。用户能够命名,物化,控制中间结果的存储、分区等。能够说编程模型比Hadoop更灵活。
    • 不过因为RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。
  • 容错性。
    • 在分布式数据集计算时经过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户能够控制采用哪一种方式来实现容错。
  • 可用性。
    • Spark经过提供丰富的Scala, Java,Python API及交互式Shell来提升可用性。

 

Spark与Hadoop的结合

  • Spark能够直接对HDFS进行数据的读写,一样支持Spark on YARN。Spark能够与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive彻底兼容。

Spark的适用场景

  • Spark是基于内存的迭代计算框架,适用于须要屡次操做特定数据集的应用场合。须要反复操做的次数越多,所需读取的数据量越大,受益越大,数据量小可是计算密集度较大的场合,受益就相对较小
  • 因为RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。
  • 总的来讲Spark的适用面比较普遍且比较通用。

运行模式

  • 本地模式
  • Standalone模式
  • Mesoes模式
  • yarn模式

Spark生态系统

  • Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive同样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现query Parsing和 Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替Hadoop MapReduce。经过配置Shark参数,Shark能够自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark经过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一块儿,最大化RDD的重复使用。
  • Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分红小的时间片段(几秒),以相似batch批量处理的方式来处理这小部分数据。Spark Streaming构建在Spark上,一方面是由于Spark的低延迟执行引擎(100ms+)能够用于实时计算,另外一方面相比基于Record的其它处理框架(如Storm),RDD数据集更容易作高效的容错处理。此外小批量处理的方式使得它能够同时兼容批量和实时数据处理的逻辑和算法。方便了一些须要历史数据和实时数据联合分析的特定应用场合。
  • Bagel: Pregel on Spark,能够用Spark进行图计算,这是个很是有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。

在业界的使用

  • Spark项目在2009年启动,2010年开源, 如今使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘宝等,豆瓣也在使用Spark的python克隆版Dpark。

Spark核心概念

Resilient Distributed Dataset (RDD)弹性分布数据集

  • RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操做本地集合的方式来操做分布式数据集的抽象实现。RDD是Spark最核心的东西,它表示已被分区,不可变的并可以被并行操做的数据集合,不一样的数据集格式对应不一样的RDD实现。RDD必须是可序列化的。RDD能够cache到内存中,每次对RDD数据集的操做以后的结果,均可以存放到内存中,下一个操做能够直接从内存中输入,省去了MapReduce大量的磁盘IO操做。这对于迭代运算比较常见的机器学习算法, 交互式数据挖掘来讲,效率提高比较大。
  • RDD的特色:java

    1. 它是在集群节点上的不可变的、已分区的集合对象。
    2. 经过并行转换的方式来建立如(map, filter, join, etc)。
    3. 失败自动重建。
    4. 能够控制存储级别(内存、磁盘等)来进行重用。
    5. 必须是可序列化的。
    6. 是静态类型的。
  • RDD的好处python

    1. RDD只能从持久存储或经过Transformations操做产生,相比于分布式共享内存(DSM)能够更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可从新计算出来,而不须要作特定的Checkpoint。
    2. RDD的不变性,能够实现类Hadoop MapReduce的推测式执行。
    3. RDD的数据分区特性,能够经过数据的本地性来提升性能,这与Hadoop MapReduce是同样的。
    4. RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的降低但不会差于如今的MapReduce。
  • RDD的存储与分区git

    1. 用户能够选择不一样的存储级别存储RDD以便重用。
    2. 当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。
    3. RDD在须要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。
  • RDD的内部表示
    在RDD的内部实现中每一个RDD均可以使用5个方面的特性来表示:github

    1. 分区列表(数据块列表)
    2. 计算每一个分片的函数(根据父RDD计算出此RDD)
    3. 对父RDD的依赖列表
    4. 对key-value RDD的Partitioner【可选】
    5. 每一个数据分片的预约义地址列表(如HDFS上的数据块的地址)【可选】
  • RDD的存储级别
    RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:web

  •       val NONE=newStorageLevel(false,false,false)
          val DISK_ONLY=newStorageLevel(true,false,false)
          val DISK_ONLY_2=newStorageLevel(true,false,false,2)
          val MEMORY_ONLY=newStorageLevel(false,true,true)
          val MEMORY_ONLY_2=newStorageLevel(false,true,true,2)
          val MEMORY_ONLY_SER=newStorageLevel(false,true,false)
          val MEMORY_ONLY_SER_2=newStorageLevel(false,true,false,2)
          val MEMORY_AND_DISK=newStorageLevel(true,true,true)
          val MEMORY_AND_DISK_2=newStorageLevel(true,true,true,2)
          val MEMORY_AND_DISK_SER=newStorageLevel(true,true,false)
          val MEMORY_AND_DISK_SER_2=newStorageLevel(true,true,false,2) 

     

  • RDD定义了各类操做,不一样类型的数据由不一样的RDD类抽象表示,不一样的操做也由RDD进行抽实现。算法

RDD的生成

  • RDD有两种建立方式:
    一、从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)建立。
    二、从父RDD转换获得新RDD。
  • 下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码以下:shell

  •      // SparkContext根据文件/目录及可选的分片数建立RDD, 这里咱们能够看到Spark与Hadoop MapReduce很像 
         // 须要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。
         def textFile(path:String,minSplits:Int=defaultMinSplits):RDD[String]={
             hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],
             classOf[Text],minSplits).map(pair=>pair._2.toString)}
      
         // 根据Hadoop配置,及InputFormat等建立HadoopRDD  
         newHadoopRDD(this,conf,inputFormatClass,keyClass,valueClass,minSplits)

     

  • 对RDD进行计算时,RDD从HDFS读取数据时与Hadoop MapReduce几乎同样的:数据库

  • // 根据hadoop配置和分片从InputFormat中获取RecordReader进行数据的读取。
        reader=fmt.getRecordReader(split.inputSplit.value,conf,Reporter.NULL)
        val key:K=reader.createKey()
        val value:V=reader.createValue()
        //使用Hadoop MapReduce的RecordReader读取数据,每一个Key、Value对以元组返回。
        override def getNext()={
        try{
          finished=!reader.next(key,value)
        }catch{
          caseeof:EOFException=>
            finished=true
        }
          (key,value)
        }

     

RDD的转换与操做

  • val sc=newSparkContext(master,"Example",System.getenv("SPARK_HOME"),
              Seq(System.getenv("SPARK_TEST_JAR")))
       
          val rdd_A=sc.textFile(hdfs://.....)
          val rdd_B=rdd_A.flatMap((line=>line.split("\\s+"))).map(word=>(word,1))
       
          val rdd_C=sc.textFile(hdfs://.....)
          val rdd_D=rdd_C.map(line=>(line.substring(10),1))
          val rdd_E=rdd_D.reduceByKey((a,b)=>a+b)
      
          val rdd_F=rdd_B.jion(rdd_E)
      
          rdd_F.saveAsSequenceFile(hdfs://....)

     

    对于RDD能够有两种计算方式:转换(返回值仍是一个RDD)与操做(返回值不是一个RDD)。
  • 转换(Transformations) (如:map, filter, groupBy, join等),Transformations操做是Lazy的,也就是说从一个RDD转换生成另外一个RDD的操做不是立刻执行,Spark在遇到Transformations操做时只会记录须要这样的操做,并不会去执行,须要等到有Actions操做的时候才会真正启动计算过程进行计算。
  • 操做(Actions) (如:count, collect, save等),Actions操做会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
  • 下面使用一个例子来示例说明Transformations与Actions在Spark的使用。编程

  •  

     

SparkTA11

Lineage(血统)

  • 利用内存加快数据加载,在众多的其它的In-Memory类数据库或Cache类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。为了保证RDD中数据的鲁棒性,RDD数据集经过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操做(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它能够经过Lineage获取足够的信息来从新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提高。
  • RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错的高效性。Narrow Dependencies是指父RDD的每个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。Wide Dependencies是指子RDD的分区依赖于父RDD的多个分区或全部分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。对与Wide Dependencies,这种计算的输入和输出在不一样的节点上,lineage方法对与输入节点无缺,而输出节点宕机时,经过从新计算,这种状况下,这种方法容错是有效的,不然无效,由于没法重试,须要向上其祖先追溯看是否能够重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。

容错

  • 在RDD计算,经过checkpint进行容错,作checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户能够控制采用哪一种方式来实现容错,默认是logging the updates方式,经过记录跟踪全部生成RDD的转换(transformations)也就是记录每一个RDD的lineage(血统)来从新计算生成丢失的分区数据。

资源管理与做业调度

  • Spark对于资源管理与做业调度可使用Standalone(独立模式),Apache Mesos及Hadoop YARN来实现。 Spark on Yarn在Spark0.6时引用,但真正可用是在如今的branch-0.8版本。Spark on Yarn遵循YARN的官方规范实现,得益于Spark天生支持多种Scheduler和Executor的良好设计,对YARN的支持也就很是容易,Spark on Yarn的大体框架图。缓存

 Spark架构图

  • 让Spark运行于YARN上与Hadoop共用集群资源能够提升资源利用率。


编程接口

  • Spark经过与编程语言集成的方式暴露RDD的操做,相似于DryadLINQ和FlumeJava,每一个数据集都表示为RDD对象,对数据集的操做就表示成对RDD对象的操做。Spark主要的编程语言是Scala,选择Scala是由于它的简洁性(Scala能够很方便在交互式下使用)和性能(JVM上的静态强类型语言)。
  • Spark和Hadoop MapReduce相似,由Master(相似于MapReduce的Jobtracker)和Workers(Spark的Slave工做节点)组成。用户编写的Spark程序被称为Driver程序,Dirver程序会链接master并定义了对各RDD的转换与操做,而对RDD的转换与操做经过Scala闭包(字面量函数)来表示,Scala使用Java对象来表示闭包且都是可序列化的,以此把对RDD的闭包操做发送到各Workers节点。 Workers存储着数据分块和享有集群内存,是运行在工做节点上的守护进程,当它收到对RDD的操做时,根据数据分片信息进行本地化数据操做,生成新的数据分片、返回结果或把RDD写入存储系统。 

runtime

Scala

Spark使用Scala开发,默认使用Scala做为编程语言。编写Spark程序比编写Hadoop MapReduce程序要简单的多,SparK提供了Spark-Shell,能够在Spark-Shell测试程序。写SparK程序的通常步骤就是建立或使用(SparkContext)实例,使用SparkContext建立RDD,而后就是对RDD进行操做。如:

1     val sc=newSparkContext(master,appName,[sparkHome],[jars])
2     val textFile=sc.textFile("hdfs://.....")
3     textFile.map(....).filter(.....).....

 

Java

  • Spark支持Java编程,但对于使用Java就没有了Spark-Shell这样方便的工具,其它与Scala编程是同样的,由于都是JVM上的语言,Scala与Java能够互操做,Java编程接口其实就是对Scala的封装。如:

  •    JavaSparkContext sc=newJavaSparkContext(...);  
          JavaRDD lines=ctx.textFile("hdfs://...");
          JavaRDD words=lines.flatMap(
            newFlatMapFunction<String,String>(){
               publicIterable call(Strings){
                  returnArrays.asList(s.split(" "));
               }
             }
          );

     

Python

  • 如今Spark也提供了Python编程接口,Spark使用py4j来实现python与java的互操做,从而实现使用python编写Spark程序。Spark也一样提供了pyspark,一个Spark的python shell,能够以交互式的方式使用Python编写Spark程序。 如:

1 from pyspark import SparkContext
2     sc=SparkContext("local","Job Name",pyFiles=['MyFile.py','lib.zip','app.egg'])
3     words=sc.textFile("/usr/share/dict/words")
4     words.filter(lambdaw:w.startswith("spar")).take(5)

 


使用示例

Standalone模式

  • 为方便Spark的推广使用,Spark提供了Standalone模式,Spark一开始就设计运行于Apache Mesos资源管理框架上,这是很是好的设计,可是却带了部署测试的复杂性。为了让Spark能更方便的部署和尝试,Spark所以提供了Standalone运行模式,它由一个Spark Master和多个Spark worker组成,与Hadoop MapReduce1很类似,就连集群启动方式都几乎是同样。
  • 以Standalone模式运行Spark集群

    • 下载Scala2.9.3,并配置SCALA_HOME
    • 下载Spark代码(可使用源码编译也能够下载编译好的版本)这里下载 编译好的版本(http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz
    • 解压spark-0.7.3-prebuilt-cdh4.tgz安装包
    • 修改配置(conf/*) slaves: 配置工做节点的主机名 spark-env.sh:配置环境变量。

       

      1 SCALA_HOME=/home/spark/scala-2.9.3
      2 JAVA_HOME=/home/spark/jdk1.6.0_45
      3 SPARK_MASTER_IP=spark1            
      4 SPARK_MASTER_PORT=30111
      5 SPARK_MASTER_WEBUI_PORT=30118
      6 SPARK_WORKER_CORES=2SPARK_WORKER_MEMORY=4g
      7 SPARK_WORKER_PORT=30333
      8 SPARK_WORKER_WEBUI_PORT=30119
      9 SPARK_WORKER_INSTANCES=1

       

    • 把Hadoop配置copy到conf目录下

    • 在master主机上对其它机器作ssh无密码登陆

    • 把配置好的Spark程序使用scp copy到其它机器

    • 在master启动集群:$SPARK_HOME/start-all.sh 

yarn模式

  • Spark-shell如今还不支持Yarn模式,使用Yarn模式运行,须要把Spark程序所有打包成一个jar包提交到Yarn上运行。目录只有branch-0.8版本才真正支持Yarn。
  • 以Yarn模式运行Spark

    • 下载Spark代码:git clonegit://github.com/mesos/spark 

    • 切换到branch-0.8

      cd  spark
      git  checkout - b yarn -- track  origin / yarn
    • 使用sbt编译Spark并

      $SPARK_HOME/sbt/sbt
      >package
      >assembly
    • 把Hadoop yarn配置copy到conf目录下

    • 运行测试

SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar\
./run spark.deploy.yarn.Client--jar examples/target/scala-2.9.3/\
--classspark.examples.SparkPi--args yarn-standalone

 

使用Spark-shell

  • Spark-shell使用很简单,当Spark以Standalon模式运行后,使用$SPARK_HOME/spark-shell进入shell便可,在Spark-shell中SparkContext已经建立好了,实例名为sc能够直接使用,还有一个须要注意的是,在Standalone模式下,Spark默认使用的调度器的FIFO调度器而不是公平调度,而Spark-shell做为一个Spark程序一直运行在Spark上,其它的Spark程序就只能排队等待,也就是说同一时间只能有一个Spark-shell在运行。
  • 在Spark-shell上写程序很是简单,就像在Scala Shell上写程序同样。

    1     scala>val textFile=sc.textFile("hdfs://hadoop1:2323/user/data")
    2     textFile:spark.RDD[String]=spark.MappedRDD@2ee9b6e3
    3  
    4     scala>textFile.count()// Number of items in this RDD
    5     res0:Long=21374
    6  
    7     scala>textFile.first()// First item in this RDD
    8     res1:String=# Spark

编写Driver程序

    • 在Spark中Spark程序称为Driver程序,编写Driver程序很简单几乎与在Spark-shell上写程序是同样的,不一样的地方就是SparkContext须要本身建立。如WorkCount程序以下:

       

       1 import spark.SparkContext
       2 import SparkContext._
       3  
       4 objectWordCount{
       5   def main(args:Array[String]){
       6     if(args.length==0){
       7       println("usage is org.test.WordCount <master>")
       8     }
       9     println("the args: ")
      10     args.foreach(println)
      11  
      12     val hdfsPath="hdfs://hadoop1:8020"
      13  
      14     // create the SparkContext, args(0)由yarn传入appMaster地址
      15     val sc=newSparkContext(args(0),"WrodCount",
      16     System.getenv("SPARK_HOME"),Seq(System.getenv("SPARK_TEST_JAR")))
      17  
      18     val textFile=sc.textFile(hdfsPath+args(1))
      19  
      20     val result=textFile.flatMap(line=>line.split("\\s+"))
      21         .map(word=>(word,1)).reduceByKey(_+_)
      22  
      23     result.saveAsTextFile(hdfsPath+args(2))
      24   }
      25 }
相关文章
相关标签/搜索