Spark提升篇——RDD/DataSet/DataFrame(一)

该部分分为两篇,分别介绍RDD与Dataset/DataFrame:html

1、RDDjava

2、DataSet/DataFramenode

 

先来看下官网对RDD、DataSet、DataFrame的解释:sql

1.RDD数据库

Resilient distributed dataset(RDD),which is a fault-tolerant collection of elements that can be operated on in parallelapache

RDD——弹性分布式数据集,分布在集群的各个结点上具备容错性的元素集,能够被并行处理。api

参考连接:http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds缓存

2. DataSet & DataFrameapp

A Dataset is a distributed collection of data.分布式

DataSet——分布式数据集。

A DataFrame is a Dataset organized into named columns. 

DataFrame——按列命名的分布式数据集。
 
API文档中能够看到,DataFrame其实就是指定了元素类型为Row的DataSet。而 Row类型即具备肯定元素个数的行结构。能够看出DataFrame实际上是相似于数据库的表结构,后面能够看到对它的操做也和表的操做很相似。

type DataFrame = DataSet[Row]

 
官网更鼓励使用DataSet/DataFrame,见下文:

Note that, before Spark 2.0, the main programming interface of Spark was the Resilient Distributed Dataset (RDD). After Spark 2.0, RDDs are replaced by Dataset, which is strongly-typed like an RDD, but with richer optimizations under the hood. The RDD interface is still supported, and you can get a more complete reference at the RDD programming guide. However, we highly recommend you to switch to use Dataset, which has better performance than RDD. See the SQL programming guide to get more information about Dataset.

能够看到Spark2.0之后,DataSet取代了RDD,并具备更高的性能(其中一点即是DataSet支持sql(如select、join、union、groupBy等)操做,能够像操做数据库表/视图似的来进行数据处理)。

参考连接:http://spark.apache.org/docs/latest/quick-start.html

固然,有的场景RDD比DataSet/DataFrame更方便数据处理,好比有个数据集,每行包含不少字段,可是咱们只须要获取其中的某几个字段,若是用DataSet/DataFrame,必须定义全部字段的结构,可是,若是使用RDD进行处理,直接获取每行的指定字段便可,不须要关心其余字段,后续对特定字段的操做再转换为DataSet/DataFrame处理便可,可见,RDD和DataSet结合使用有时候更方便数据数据。

 

下面分别对RDD、DataSet、DataFrame的使用方法进行介绍。

一.RDD

1.1 RDD操做类型

        RDD操做主要分为两类:Transformations与Actions。官方将Transformations操做定义为从一个数据集中生成另外一个数据集;将Actions操做定义为对数据集进行一系列计算之后返回给驱动程序一个值。能够看出数据转换(map)、合并(union)、过滤(filter)等操做均为Transformations类型,由于他们的结果仍然是一个数据集,而数据聚合(reduce)、统计(count)、保存(saveAsXXX)等操做均为Actions类型,缘由是他们的最终都要将结果返回给驱动程序(即对结果进行汇总,而Transformations操做只须要在各个node/slave上执行)。

        之因此要区分操做类型,是由于Transformations操做是滞后的,不会立刻执行,只有当程序要返回结果给驱动程序时才会执行,因此定义了Transformations操做后立马执行println来输出某个值是得不到结果的,只有执行过Actions操做才能获得结算结果,且Actions操做会被当即执行。

        官方列出的经常使用Transformations操做包括:map、filter、flatMap、mapPartitions、mapPartitionsWithIndex、sample、union、intersection、distinct、groupByKey、reduceBykey、aggregateByKey、sortByKey、join、cogroup、cartesian、pipe、coalsce、repartition、repartitionAndSortWithinPartitions;Actions操做包括:reduce、collect、count、first、take、takeSample、takeOrdered、saveAsTextFile、saveAsSequenceFile、saveAsObjectFile、countByKey、foreach。具体用法能够参考官方API

1.2 生成RDD

咱们能够经过SparkContext来生成RDD,下面是两种获取SparkContext实例的方法。

//1.SparkContext
val sc = new SparkContext(new SparkConf().setAppName("Spark Context"))
val rdd1 = sc.textFile("data.txt")

//2.SparkSession
val spark = SparkSession.builder().appName("Spark Session").getOrCreate()
val rdd2 = spark.sparkContext.textFile("data.txt")

上例中经过textFile读取本地文件来生成RDD,textFile参数能够是HDFS路径、本地文件(非单机模式下须要每一个node上都有)或者任何hadoop文件系统支持的URI;除了textFile还可使用hadoopFile、hadoopRDD、parallelize、makeRDD来生成RDD。这里提一下,textFile支持通配符形式的path,好比hdfs://xx.xx.xx.xx:9000/path1/ds=*/*.gz,特别适用于按分区存储的数据处理。

1.3 RDD处理

下面经过一个例子演示一下RDD经常使用操做的用法:

下面的代码对保存在HDFS上的日志文件进行解析、过滤、统计Title字段的字节数并计算Title的最大长度。

package com.personal.test

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel

object RDDTest {
  def main(args: Array[String]): Unit = {
    val MinFieldsLength = 53
    val VTitleIndex = 11

    val inputPath = "hdfs://192.168.1.44:9000/user/name/input/attempt_1530774346064"
    val outputPath = "hdfs://192.68.1.44:9000/user/name/output/"

    val sparkConf = new SparkConf().setAppName("RDD Test")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.textFile(inputPath)

    val lineCounter = sc.longAccumulator("LineCounter")

    val resultRdd = rdd.map(_.split("\t"))
      .filter(
        fields =>{
          lineCounter.add(1)
          if(fields.length < MinFieldsLength) false else true
        }
      )
      .map(fields => fields(VTitleIndex).length)
      .persist(StorageLevel.MEMORY_ONLY)

    resultRdd.saveAsTextFile(outputPath)
    val maxTitleLength = resultRdd.reduce((a, b) => if (a>b) a else b)

    println(s"Line count: ${lineCounter.value}")
    println(s"Max title length: ${maxTitleLength}")

    sc.stop()
  }
}

        例中先初始化一个SparkContext对象,而后经过textFile读取hdfs中的文件,生成一个RDD,接着调用map逐行分割字符串,再调用filter对字段数不合法的行进行过滤,接着再计算每行的Title字段长度并写入hdfs,同时使用reduce计算Title的最大长度,最后输出统计信息。根据RDD操做类型定义,文中调用map->filter->map的过程是不会立刻被执行的,直到调用saveAsTextFile和reduce时才会被执行。

        上例中用到了一个特殊的变量——累加器(Accumulator),经过SparkContext.longAccumulator(name: String)定义,顾名思义,只能进行加法操做,用于计数器或求总和。这类变量在Spark中称为共享变量(Shared Variables),即在集群的各个node中的值是相同的),与共享变量相反,程序中定义的其余变量在集群的各个node之间是互相独立的

        除了计数器,Spark还支持另外一种共享变量——广播变量(Broadcast Variables),它是只读的,被cache到每台机器中,经常使用于各个node之间的大规模数据分发。Spark任务是分阶段执行的,各个阶段须要的数据即是经过broadcast方式分发的,cache时进行序列化,任务执行时再反序列化。所以,只有在各个阶段须要同一份数据或须要cache反序列化后的值时才须要显式定义broadcast变量,经过调用SparkContext.broadcast(value: T)来定义。

        org.apache.spark.rdd.RDD.map原型为:

def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U]
      Return a new RDD by applying a function to all elements of this RDD.

        能够看到,map是一个高阶函数,即参数也是一个函数;第二个为隐式参数,不须要显示赋值(须要初始化spark后"import spark.implicits._"),程序会根据上下文自动赋值。map经常使用于对数据逐行处理,返回值是个新的RDD,处理后的结果数不变。如上例中:

val resultRdd = rdd.map(_.split("\t"))

         org.apache.spark.rdd.RDD.filter原型为:

def filter(f: (T) ⇒ Boolean): RDD[T]
      Return a new RDD containing only the elements that satisfy a predicate.

        同map同样,filter也是一个高阶函数,函数返回值为true时保留该数据,为false时过滤掉该数据。如上例中:

val resultRdd = rdd.map(_.split("\t"))
      .filter(
        fields =>{
          lineCounter.add(1)
          if(fields.length < MinFieldsLength) false else true
        }
      )

         org.apache.spark.rdd.RDD.saveAsTextFile原型为:

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
      Save this RDD as a compressed text file, using string representations of elements.

def saveAsTextFile(path: String): Unit
      Save this RDD as a text file, using string representations of elements.

        saveAsTextFile为Actions类型的方法,用于将rdd结果以text格式持久化到指定path下,写入的时候会检查是否path已经存在,存在则抛出异常。第二个参数用于指定压缩类型,如org.apache.hadoop.io.compress.GzipCodec、com.hadoop.compression.lzo.LzoCodec,默认不压缩。如上例中:

resultRdd.saveAsTextFile(outputPath)

         org.apache.spark.rdd.RDD.reduce原型为:

def reduce(f: (T, T) ⇒ T): T
      Reduces the elements of this RDD using the specified commutative and associative binary operator.

        reduce的参数为同一类型的二元操做函数,即“T <operator> T”,可用于求最值,求和等聚合需求。如上例中:

val maxTitleLength = resultRdd.reduce((a, b) => if (a>b) a else b)

         上例中还用到了一个Spark中很重要的功能——持久化(Persistence),它将RDD持久化/缓存到各个node的内存中以加速后续的计算。能够经过调用persist() 或 cache()来使RDD持久化,cache的存储方式是反序列化后写入内存,persist的存储方式(StorageLevel)能够经过参数指定,不指定参数等同于cache,可选的存储方式包括:

类型 说明
MEMORY_ONLY 将RDD反序列化为java objects写入JVM。若没法彻底写入内存,则部分partiton内的数据将在须要的时候从新计算。
MEMORY_AND_DISK 将RDD反序列化为java objects写入JVM。若没法彻底写入内存,则没法写入内存的写入磁盘,须要的时候从磁盘读取。
MEMORY_ONLY_SER 将RDD序列化为java objects写入JVM。
MEMORY_AND_DISK_SER 将RDD序列化为java objects写入JVM。若没法彻底写入,则没法写入内存的部分写入磁盘。
DISK_ONLY 将RDD写入磁盘
MEMORY_ONLY_2
MEMORY_AND_DISK_2
与MEMORY_ONLY、MEMORY_AND_DISK相似,但每一个partition会备份到两个nodes中。
OFF_HEAP 与MEMORY_ONLY_SER相似,可是会写入堆外内存(off-heap memory),前提是启用了堆外内存。

        其余操做能够查阅API文档或其余资料,这里再也不举例。

相关文章
相关标签/搜索