大数据系列之并行计算引擎Spark介绍 大数据系列之并行计算引擎Spark部署及应用

相关博文:大数据系列之并行计算引擎Spark部署及应用

Spark:

    Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。html

    Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具备的优势;但不一样于MapReduce的是Job中间输出结果能够保存在内存中,从而再也不须要读写HDFS,所以Spark能更好地适用于数据挖掘与机器学习等须要迭代的MapReduce的算法。算法

Spark 是一种与 Hadoop 类似的开源集群计算环境,可是二者之间还存在一些不一样之处,这些有用的不一样之处使 Spark 在某些工做负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了可以提供交互式查询外,它还能够优化迭代工做负载。apache

    Spark 是在 Scala 语言中实现的,它将 Scala 用做其应用程序框架。与 Hadoop 不一样,Spark 和 Scala 可以紧密集成,其中的 Scala 能够像操做本地集合对象同样轻松地操做分布式数据集。编程

尽管建立 Spark 是为了支持分布式数据集上的迭代做业,可是实际上它是对 Hadoop 的补充,能够在 Hadoop 文件系统中并行运行。经过名为 Mesos 的第三方集群框架能够支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。数组

Spark的性能特色:

1.更快的速度:内存计算下,Spark 比 Hadoop 快100倍。缓存

  1.内存计算引擎,提供Cache机制来支持须要反复迭代计算或者屡次数据共享,减小数据读取的I/O开销多线程

  2.DAG引擎,减小屡次计算之间中间结果写到HDFS的开销;并发

  3.使用多线程池模型来减小task启动开销,shuffle过程当中避免没必要要的sort操做已经减小磁盘I/O操做;框架

2.易用性:机器学习

  1.Spark 提供了80多个高级运算符。

  2.提供了丰富的API,支持JAVA,Scala,Python和R四种语言;

  3.代码量比MapReduce少2~5倍;

3.通用性:Spark 提供了大量的库,包括SQL、DataFrames、MLlib、GraphX、Spark Streaming。 开发者能够在同一个应用程序中无缝组合使用这些库。

4.支持多种资源管理器:Spark 支持 Hadoop YARN,Apache Mesos,及其自带的独立集群管理器

Spark基本原理:

  Spark Streaming:构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分红小的时间片段(几秒),以相似batch批量处理的方式来处理这小部分数据。Spark Streaming构建在Spark上,一方面是由于Spark的低延迟执行引擎(100ms+),虽然比不上专门的流式数据处理软件,也能够用于实时计算,另外一方面相比基于Record的其它处理框架(如Storm),一部分窄依赖的RDD数据集能够从源数据从新计算达到容错处理目的。此外小批量处理的方式使得它能够同时兼容批量和实时数据处理的逻辑和算法。方便了一些须要历史数据和实时数据联合分析的特定应用场合。

Spark背景:

 

  1.MapReduce局限性:

  

    1.仅支持Map和Reduce两种操做;

    2.处理效率低效;不适合迭代计算(如机器学习、图计算等),交互式处理(数据挖掘)和流失处理(日志分析)

    3.Map中间结果须要写磁盘,Reduce写HDFS,多个MR之间经过HDFS交换数据;

    4.任务调度和启动开销大;

    5.没法充分利用内存;(与MR产生时代有关,MR出现时内存价格比较高,采用磁盘存储代价小)

    6.Map端和Reduce端均须要排序;

  2.MapReduce编程不够灵活。(比较Scala函数式编程而言)

  3.框架多样化[采用一种框架技术(Spark)同时实现批处理、流式计算、交互式计算]:

    1.批处理:MapReduce、Hive、Pig;

 

    2.流式计算:Storm

    3.交互式计算:Impala  

Spark核心概念:

  1.RDD:Resilient Distributed Datasets,弹性分布式数据集

 

  

    

    1.分布在集群中的只读对象集合(由多个Partition 构成);

    2.能够存储在磁盘或内存中(多种存储级别);

    3.经过并行“转换”操做构造;

    4.失效后自动重构;

    5.RDD基本操做(operator)

      

Transformation具体内容

    • map(func) :返回一个新的分布式数据集,由每一个原元素通过func函数转换后组成
    • filter(func) : 返回一个新的数据集,由通过func函数后返回值为true的原元素组成
      *flatMap(func) : 相似于map,可是每个输入元素,会被映射为0到多个输出元素(所以,func函数的返回值是一个Seq,而不是单一元素)
    • flatMap(func) : 相似于map,可是每个输入元素,会被映射为0到多个输出元素(所以,func函数的返回值是一个Seq,而不是单一元素)
    • sample(withReplacement, frac, seed) :
      根据给定的随机种子seed,随机抽样出数量为frac的数据
    • union(otherDataset) : 返回一个新的数据集,由原数据集和参数联合而成
    • groupByKey([numTasks]) :
      在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认状况下,使用8个并行任务进行分组,你能够传入numTask可选参数,根据数据量设置不一样数目的Task
    • reduceByKey(func, [numTasks]) : 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一块儿。和groupbykey相似,任务的个数是能够经过第二个可选参数来配置的。
    • join(otherDataset, [numTasks]) :
      在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每一个key中的全部元素都在一块儿的数据集
    • groupWith(otherDataset, [numTasks]) : 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操做在其它框架,称为CoGroup
    • cartesian(otherDataset) : 笛卡尔积。但在数据集T和U上调用时,返回一个(T,U)对的数据集,全部元素交互进行笛卡尔积。
    • flatMap(func) :
      相似于map,可是每个输入元素,会被映射为0到多个输出元素(所以,func函数的返回值是一个Seq,而不是单一元素)

Actions具体内容

    • reduce(func) : 经过函数func汇集数据集中的全部元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保能够被正确的并发执行
    • collect() : 在Driver的程序中,以数组的形式,返回数据集的全部元素。这一般会在使用filter或者其它操做后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,极可能会让Driver程序OOM
    • count() : 返回数据集的元素个数
    • take(n) : 返回一个数组,由数据集的前n个元素组成。注意,这个操做目前并不是在多个节点上,并行执行,而是Driver程序所在机器,单机计算全部的元素(Gateway的内存压力会增大,须要谨慎使用)
    • first() : 返回数据集的第一个元素(相似于take(1))
    • saveAsTextFile(path) : 将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每一个元素的toString方法,并将它转换为文件中的一行文本
    • saveAsSequenceFile(path) : 将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式能够转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)
    • foreach(func) : 在数据集的每个元素上,运行函数func。这一般用于更新一个累加器变量,或者和外部存储系统作交互

算子分类

大体能够分为三大类算子:

      1. Value数据类型的Transformation算子,这种变换并不触发提交做业,针对处理的数据项是Value型的数据。
      2. Key-Value数据类型的Transfromation算子,这种变换并不触发提交做业,针对处理的数据项是Key-Value型的数据对。
      3. Action算子,这类算子会触发SparkContext提交Job做业。

 

      3.示例:

     

    4.Spark RDD cache/persist

      1.Spark RDD cache

        1.容许将RDD缓存到内存中或磁盘上,以便于重用

        2.提供了多种缓存级别,以便于用户根据实际需求进行调整

          

        3.cache使用

        

       2.以前用MapReduce实现过WordCount,如今咱们用Scala实现下wordCount.是否是很简洁呢?!

        Scala学习连接:https://yq.aliyun.com/topic/69

import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount{
  def main(args: Array[String]) {
    if (args.length == 0) {
      System.err.println("Usage: SparkWordCount <inputfile> <outputfile>")
      System.exit(1)
    }

    val conf = new SparkConf().setAppName("SparkWordCount")
    val sc = new SparkContext(conf)

    val file=sc.textFile("file:///hadoopLearning/spark-1.5.1-bin-hadoop2.4/README.md")
    val counts=file.flatMap(line=>line.split(" "))
                   .map(word=>(word,1))
                   .reduceByKey(_+_)
    counts.saveAsTextFile("file:///hadoopLearning/spark-1.5.1-bin-hadoop2.4/countReslut.txt")

  }
}

 

      3.关于RDD的Transformation与Action的特色咱们介绍下;

        1.接口定义方式不一样:

          Transformation: RDD[X]-->RDD[y]

          Action:RDD[x]-->Z (Z不是一个RDD,多是一个基本类型,数组等)

        2.惰性执行:

          Transformation:只会记录RDD转化关系,并不会触发计算

          Action:是触发程序执行(分布式)的算子。

          

        程序的执行流程:

      

 

Spark运行模式:

1.Local(本地模式):

  1.单机运行,一般用于测试;

    1.local:只启动一个executor

    2.local[k]:启动k个executor

    3.local[*]:启动跟cpu数目相同的executor

2.standalone(独立模式)

  1.独立运行在一个集群中

  

3.Yarn/mesos

  1.运行在资源管理系统上,好比Yarn或mesos

  2.Spark On Yarn存在两种模式

    1.yarn-client

    

    2.yanr-cluster

    

    3.比较两种方式区别:

    

 

Spark在企业中的应用场景

1.基于日志数据的快速查询系统业务;

  1.构建于Spark之上的SparkSQL ,利用其快速以及内存表等优点,承担了日志数据的即席查询工做。

2.典型算法的Spark实现

  1.预测用户的广告点击几率;

  2.计算两个好友间的共同好友数;

  3.用于ETL的SparkSQL和DAG任务;

相关文章
相关标签/搜索