走近RDD

  RDD(Resilient Distributed Datasets)弹性分布式数据集。RDD能够当作是一个简单的"数组",对其进行操做也只须要调用有限的"数组"中的方法便可,但它与通常数组的区别在于:RDD是分布式存储,能够跟好的利用现有的云数据平台,并在内存中进行。此处的弹性指的是数据的存储方式,及数据在节点中进行存储的时候,既可使用内存也可使用磁盘。此外,RDD还具备很强的容错性,在spark运行计算的过程当中,不会由于某个节点错误而使得整个任务失败;不通节点中并发运行的数据,若是在某个节点发生错误时,RDD会自动将其在不一样的节点中重试。apache

  RDD一大特性是延迟计算,即一个完整的RDD运行任务被分红2部分:Transformation和Action。数组

  Transformation用于对RDD的建立。在spark中,RDD只能使用Transformation来建立,同时Transformation还提供了大量的操做方法。RDD还能够利用Transformation来生成新的RDD,这样能够在有限的内存空间中生成竟可能多的数据对象。不管发生了多少次Transformation,此时,在RDD中真正数据计算运行的操做Action都没真正的开始运行。并发

 

 

  Action是数据的执行部分,其也提供了大量的方法去执行数据的计算操做部分。分布式

 

   RDD能够将其当作一个分布在不一样节点中的分布式数据集,并将数据以数据块(Block)的形式存储在各个节点的计算机中。每一个BlockMaster管理着若干个BlockSlave,而每一个BlockSlave又管理着若干个BlockNode。当BlockSlave得到了每一个Node节点的地址,又会反向向BlockMaster注册每一个Node的基本信息,这样就造成了分层管理。ide

 

  RDD依赖spa

    窄依赖 (narrowdependencies) 和宽依赖 (widedependencies) 。窄依赖是指 父 RDD 的每一个分区都只被子 RDD 的一个分区所使用,例如map、filter。相应的,那么宽依赖就是指父 RDD 的分区被多个子 RDD 的分区所依赖,例如groupByKey、reduceByKey等操做。若是父RDD的一个Partition被一个子RDD的Partition所使用就是窄依赖,不然的话就是宽依赖。
  这种划分有两个用处。首先,窄依赖支持在一个结点上管道化执行。例如基于一对一的关系,能够在 filter 以后执行 map 。其次,窄依赖支持更高效的故障还原。由于对于窄依赖,只有丢失的父 RDD 的分区须要从新计算。而对于宽依赖,一个结点的故障可能致使来自全部父 RDD 的分区丢失,所以就须要彻底从新执行。所以对于宽依赖,Spark 会在持有各个父分区的结点上,将中间数据持久化来简化故障还原,就像 MapReduce 会持久化 map 的输出同样。对于join操做有两种状况,若是join操做的使用每一个partition仅仅和已知的Partition进行join,此时的join操做就是窄依赖;其余状况的join操做就是宽依赖;由于是肯定的Partition数量的依赖关系,因此就是窄依赖,得出一个推论,窄依赖不只包含一对一的窄依赖,还包含一对固定个数的窄依赖(也就是说对父RDD的依赖的Partition的数量不会随着RDD数据规模的改变而改变)
                
 
   下面就是RDD API
  一、parallelize
   def parallelize[T](seq : scala.Seq[T], numSlices : scala.Int = { /* compiled code */ }) //第一个参数是数据,同时还有一个带有默认数值的参数,改参数为1,该参数表示的是将数据分布在多少个数据节点中存放。
  二、aggregate
   def aggregate[U](zeroValue : U)(seqOp : scala.Function2[U, T, U], combOp : scala.Function2[U, U, U]) //seqOp 是给定的计算方法,combOp 是合并方法,将第一个计算方法得出的结果与源码中的zeroValue进行合并。实例:
import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr=sc.parallelize(Array(1,2,3,4,5,6,7,8))//parallelize将内存数据读入Spark系统中,做为总体数据集
    val result=arr.aggregate(0)(math.max(_,_),_+_)//_+_ 对传递的第一个方法的结果集进行进一步处理
    println(result)
  }
}

结果为8scala

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr=sc.parallelize(Array("abd","hello world","hello sb"))//parallelize将内存数据读入Spark系统中,做为总体数据集
    val result=arr.aggregate("")((value,word)=>value+word,_+_)//_+_ 对传递的第一个方法的结果集进行进一步处理
    println(result)
  }
}

结果为abdhello worldhello sbcode

  三、cache是将数据内容计算并保存在计算节点的内存中orm

  四、cartesion是用于对不一样的数组进行笛卡尔操做,要求是数组的长度必须相同对象

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr1=sc.parallelize(Array(1,2,3,4))//parallelize将内存数据读入Spark系统中,做为总体数据集
    val arr2=sc.parallelize(Array(4,3,2,1))
    val res=arr1.cartesian(arr2)
    res.foreach(print)
  }
}

结果:(1,4)(1,3)(1,2)(1,1)(2,4)(2,3)(2,2)(2,1)(3,4)(3,3)(3,2)(3,1)(4,4)(4,3)(4,2)(4,1)

  五、Coalesce是将已经存储的数据从新分片后再进行存储(repartition与Coalesce相似)

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr1=sc.parallelize(Array(1,2,3,4,5,6))//parallelize将内存数据读入Spark系统中,做为总体数据集
    val arr2=arr1.coalesce(2,true)
    val res1=arr1.aggregate(0)(math.max(_,_),_+_)
    println(res1)
    val res2=arr2.aggregate(0)(math.max(_,_),_+_)
    println(res2)
  }
}

结果为6    11

  六、countByValue是计算数据集中某个数据出现的个数,并将其以map的形式返回

  七、countByKey是计算数据集中元数据键值对key出现的个数

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr1=sc.parallelize(Array((1,"a"),(2,'b'),(1,'c'),(1,'d'),(2,'a')))//parallelize将内存数据读入Spark系统中,做为总体数据集
    val res1=arr1.countByValue()
    res1.foreach(println)
    val res2=arr1.countByKey()
    res2.foreach(println)
  }
}
//结果:((1,c),1)
((2,a),1)
((1,a),1)
((1,d),1)
((2,b),1)
(1,3)
(2,2)
View Code

  八、filter是对数据集进行过滤

  九、flatMap是对RDD中的数据进行总体操做的一个特殊方法,其在定义时就是针对数据集进行操做

  十、map能够对RDD中的数据集进行逐个操做,其与flatmap不一样得是,flatmap是将数据集中的数据做为一个总体去处理,以后再对其中的数据作计算,而map则直接对数据集中的数据作单独的处理

  十一、groupBy是将传入的数据进行分组

  十二、keyBy是为数据集中的每一个个体数据添加一个key,从而造成键值对

  1三、reduce同时对2个数据进行处理,主要是对传入的数据进行合并处理

  1四、sortBy是对已有的RDD进行从新排序

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr1=sc.parallelize(Array((1,"a"),(2,"c"),(3,"b"),(4,"x"),(5,"f")))//parallelize将内存数据读入Spark系统中,做为总体数据集
    val res1=arr1.sortBy(word=>word._1,true)
    val res2=arr1.sortBy(word=>word._2,true)
    res1.foreach(println)
    res2.foreach(println)
  }
}

  1五、zip能够将若干个RDD压缩成一个新的RDD

相关文章
相关标签/搜索