spark总结

 

RDD及其特色

一、RDD是Spark的核心数据模型,可是个抽象类,全称为Resillient Distributed Dataset,即弹性分布式数据集。java

二、RDD在抽象上来讲是一种元素集合,包含了数据。它是被分区的,分为多个分区,每一个分区分布在集群中的不一样节点上,从而让RDD中的数据能够被并行操做。(分布式数据集)算法

三、RDD一般经过Hadoop上的文件,即HDFS文件或者Hive表,来进行建立;有时也能够经过应用程序中的集合来建立。sql

四、RDD最重要的特性就是,提供了容错性,能够自动从节点失败中恢复过来。即若是某个节点上的RDDpartition,由于节点故障,致使数据丢了,那么RDD会自动经过本身的数据来源从新计算该partition。这一切对使用者是透明的。apache

五、RDD的数据默认状况下存放在内存中的,可是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性)编程

建立RDD

进行Spark核心编程的第一步就是建立一个初始的RDD。该RDD,一般就表明和包含了Spark应用程序的输入源数据。而后经过Spark Core提供的transformation算子,对该RDD进行转换,来获取其余的RDD。数组

Spark Core提供了三种建立RDD的方式:缓存

1.使用程序中的集合建立RDD(主要用于测试)网络

List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);

2.使用本地文件建立RDD(主要用于临时性处理有大量数据的文件)app

SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("D:\\Users\\Administrator\\Desktop\\spark.txt").javaRDD();

3.使用HDFS文件建立RDD(生产环境的经常使用方式)分布式

SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();

使用HDFS文件建立RDD对比使用本地文件建立RDD,须要修改的,只有两个地方:
第一,将SparkSession对象的master("local")方法去掉
第二,咱们针对的不是本地文件了,修改成hadoop hdfs上的真正的存储大数据的文件

操做RDD

Spark支持两种RDD操做:transformation和action。

transformation操做

transformation操做会针对已有的RDD建立一个新的RDD。transformation具备lazy特性,即transformation不会触发spark程序的执行,它们只是记录了对RDD所作的操做,不会自发的执行。只有执行了一个action,以前的全部transformation才会执行。

经常使用的transformation介绍:

map :将RDD中的每一个元素传人自定义函数,获取一个新的元素,而后用新的元素组成新的RDD。

filter:对RDD中每一个元素进行判断,若是返回true则保留,返回false则剔除。

flatMap:与map相似,可是对每一个元素均可以返回一个或多个元素。

groupByKey:根据key进行分组,每一个key对应一个Iterable<value>。

reduceByKey:对每一个key对应的value进行reduce操做。

sortByKey:对每一个key对应的value进行排序操做。

join:对两个包含<key,value>对的RDD进行join操做,每一个keyjoin上的pair,都会传入自定义函数进行处理。

cogroup:同join,可是每一个key对应的Iterable<value>都会传入自定义函数进行处理。

sparkRDD算子:
map与flatmap的区别:扁平化
map函数:会对每一条输入进行指定func操做,而后为每一条输入返回一个对象
flatmap函数:先进行map映射,而后在flatten(进行扁平化操做)

reducebykey算子:
首先会触发shuffle,会进行两次聚合操做
1,按照key将数据放到一块儿(本地聚合--Shuffle Write)
2,将相同key的数据聚合(全局聚合--Shuffle Reader)

action操做

action操做主要对RDD进行最后的操做,好比遍历,reduce,保存到文件等,并能够返回结果给Driver程序。action操做执行,会触发一个spark job的运行,从而触发这个action以前全部的transformation的执行,这是action的特性。

经常使用的action介绍:

reduce:将RDD中的全部元素进行聚合操做。第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推。

collect:将RDD中全部元素获取到本地客户端(通常不建议使用)。

count:获取RDD元素总数。

take(n):获取RDD中前n个元素。

 

saveAsTextFile:将RDD元素保存到文件中,对每一个元素调用toString方法。

countByKey:对每一个key对应的值进行count计数。

foreach:遍历RDD中的每一个元素。

RDD持久化

要持久化一个RDD,只要调用其cache()或者persist()方法便可。在该RDD第一次被计算出来时,就会直接缓存在每一个节点中。可是cache()或者persist()的使用是有规则的,必须在transformation或者textFile等建立了一个RDD以后,直接连续调用cache()或persist()才能够。

若是你先建立一个RDD,而后单独另起一行执行cache()或persist()方法,是没有用的,并且会报错,大量的文件会丢失。

val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()

Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。

通用的持久化级别的选择建议:

一、优先使用MEMORY_ONLY,若是能够缓存全部数据的话,那么就使用这种策略。由于纯内存速度最快,并且没有序列化,不须要消耗CPU进行反序列化操做。

二、若是MEMORY_ONLY策略,没法存储全部数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化进行存储,纯内存操做仍是很是快,只是要消耗CPU进行反序列化。

三、若是须要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不须要从新计算了。

四、能不使用DISK相关的策略,就不用使用,有的时候,从磁盘读取数据,还不如从新计算一次。

共享变量

Spark提供了两种共享变量:Broadcast Variable(广播变量)和Accumulator(累加变量)。

BroadcastVariable会将使用到的变量,仅仅为每一个节点拷贝一份,更大的用处是优化性能,减小网络传输以及内存消耗。广播变量是只读的。

val factor = 3
val broadcastVars = sc.broadcast(factor);
val numberList = Array(1,2,3,4,5)
val number = sc.parallelize(numberList).map( num => num * broadcastVars.value)  //广播变量读值broadcastVars.value

Accumulator则可让多个task共同操做一份变量,主要能够进行累加操做。task只能对Accumulator进行累加操做,不能读取它的值。只有Driver程序能够读取Accumulator的值。

 
val numberList = Array(1,2,3,4,5)
val numberRDD = sc.parallelize(numberList,1)
val sum = sc.accumulator(0)
numberRDD.foreach{m => sum += m}

 

小案例

案例需求:

一、对文本文件内的每一个单词都统计出其出现的次数。
二、按照每一个单词出现次数的数量,降序排序。

步骤:

  • 1.建立RDD
  • 2.将文本进行拆分 (flatMap)
  • 3.将拆分后的单词进行统计 (mapToPair,reduceByKey)
  • 4.反转键值对 (mapToPair)
  • 5.按键升序排序 (sortedByKey)
  • 6.再次反转键值对 (mapToPair)
  • 7.打印输出(foreach)

scala编写:

package cn.spark.study.core
import org.apache.spark.sql.SparkSession
object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("WordCount").master("local").getOrCreate()
    val lines = spark.sparkContext.textFile("D:\\spark.txt")
    val words = lines.flatMap{line => line.split(" ")}
    val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _)
    val countWord = wordCounts.map{word =>(word._2,word._1)}
    val sortedCountWord = countWord.sortByKey(false)
    val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)}
    sortedWordCount.foreach(s=>
    {
      println("word \""+s._1+ "\" appears "+s._2+" times.")
    })
    spark.stop()
  }
}

  

小案例实战2

需求:

一、按照文件中的第一列排序。
二、若是第一列相同,则按照第二列排序。

实现步骤:

    • 一、实现自定义的key,要实现Ordered接口和Serializable接口,在key中实现本身对多个列的排序算法
    • 二、将包含文本的RDD,映射成key为自定义key,value为文本的JavaPairRDD(map)
    • 三、使用sortByKey算子按照自定义的key进行排序(sortByKey)
    • 四、再次映射,剔除自定义的key,只保留文本行(map)
    • 五、打印输出(foreach)

scala编写:

class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{
  override def compare(that: SecondSortKey): Int = {
    if(this.first - that.first !=0){
      this.first-that.first
    }else{
      this.second-that.second
    }
  }
}
object SecondSort {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate()
    val lines = spark.sparkContext.textFile("D:\\sort.txt")
    val pairs = lines.map{line => (
      new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
    )}
    val sortedParis = pairs.sortByKey()
    val sortedLines = sortedParis.map(pairs => pairs._2)
    sortedLines.foreach(s => println(s))
    spark.stop()
  }
}

  

小案例实战3

需求:

对每一个班级内的学生成绩,取出前3名。(分组取topn)

实现步骤:

1.建立初始RDD

2.对初始RDD的文本行按空格分割,映射为key-value键值对

3.对键值对按键分组

4.获取分组后每组前3的成绩:

  • 4.1 遍历每组,获取每组的成绩
  • 4.2 将一组成绩转换成一个数组缓冲
  • 4.3 将数组缓冲按从大到小排序
  • 4.4 对排序后的数组缓冲取其前三

5.打印输出

如下是使用scala实现:

object GroupTop3 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("GroupTop3").master("local").getOrCreate()
    //建立初始RDD
    val lines = spark.sparkContext.textFile("D:\\score.txt")
    //对初始RDD的文本行按空格分割,映射为key-value键值对
    val pairs = lines.map(line => (line.split(" ")(0), line.split(" ")(1).toInt))
    //对pairs键值对按键分组
    val groupedPairs = pairs.groupByKey()
    //获取分组后每组前3的成绩
    val top3Score = groupedPairs.map(classScores => {
      var className = classScores._1
      //获取每组的成绩,将其转换成一个数组缓冲,并按从大到小排序,取其前三
      var top3 = classScores._2.toBuffer.sortWith(_>_).take(3)
      Tuple2(className,top3)
    })
    top3Score.foreach(m => {
      println(m._1)
      for(s <- m._2) println(s)
      println("------------------")
    })
  }
}

  以上三个小案例都用Scala实现了,用到了Scala中的集合的操做、高阶函数、链式调用、隐式转换等知识,本身动手实现,对Scala有个比较好的理解和掌握。

相关文章
相关标签/搜索