Spark2.x详解

1、概述html

  Apache Spark 是一个快速的, 多用途的集群计算系统。 它提供了 Java, Scala, Python 和 R 的高级 API,以及一个支持通用的执行图计算的优化过的引擎. 它还支持一组丰富的高级工具, 包括使用 SQL 处理结构化数据处理的 Spark SQL, 用于机器学习的 MLlib, 用于图计算的 GraphX, 以及 Spark Streaming。java

  请注意, 在 Spark 2.0 以前, Spark 的主要编程接口是弹性分布式数据集(RDD)。 在 Spark 2.0 以后, RDD 被 Dataset 替换, 它是像RDD 同样的 strongly-typed(强类型), 可是在引擎盖下更加优化。 RDD 接口仍然受支持,可是, 咱们强烈建议您切换到使用 Dataset(数据集), 其性能要更优于 RDD。node

  每个 Spark 应用程序由一个在集群上运行着用户的 main 函数和执行各类并行操做的 driver program(驱动程序)组成。Spark 提供的主要抽象是一个弹性分布式数据集(RDD),它是能够执行并行操做且跨集群节点的元素的集合。RDD 能够从一个 Hadoop 文件系统(或者任何其它 Hadoop 支持的文件系统),或者一个在 driver program(驱动程序)中已存在的 Scala 集合,以及经过 transforming(转换)来建立一个 RDD。用户为了让它在整个并行操做中更高效的重用,也许会让 Spark persist(持久化)一个 RDD 到内存中。最后,RDD 会自动的从节点故障中恢复。程序员

  在 Spark 中的第二个抽象是可以用于并行操做的 shared variables(共享变量),默认状况下,当 Spark 的一个函数做为一组不一样节点上的任务运行时,它将每个变量的副本应用到每个任务的函数中去。有时候,一个变量须要在整个任务中,或者在任务和 driver program(驱动程序)之间来共享。Spark 支持两种类型的共享变量 : broadcast variables(广播变量),它能够用于在全部节点上缓存一个值,和 accumulators(累加器),他是一个只能被 “added(增长)” 的变量,例如 counters 和 sums。web

2、Spark依赖算法

  Spark 2.x 默认使用 Scala 2.11 来构建和发布直到运行。(固然,Spark 也能够与其它的 Scala 版本一块儿运行)。为了使用 Scala 编写应用程序,您须要使用可兼容的 Scala 版本(例如,2.11.X)。shell

  要编写一个 Spark 的应用程序,您须要在 Spark 上添加一个 Maven 依赖。Spark 能够经过 Maven 中央仓库获取:数据库

groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.2.0

  此外,若是您想访问一个 HDFS 集群,则须要针对您的 HDFS 版本添加一个 hadoop-client(hadoop 客户端)依赖。apache

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

  最后,您须要导入一些 Spark classes(类)到您的程序中去。添加下面几行:编程

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

3、初始化Spark

  Spark 程序必须作的第一件事情是建立一个 SparkContext 对象,它会告诉 Spark 如何访问集群。要建立一个 SparkContext,首先须要构建一个包含应用程序的信息的 SparkConf 对象。

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

  这个 appName 参数是一个在集群 UI 上展现应用程序的名称。 master 是一个 Spark, Mesos 或 YARN 的 cluster URL,或者指定为在 local mode(本地模式)中运行的 “local” 字符串。在实际工做中,当在集群上运行时,您不但愿在程序中将 master 给硬编码,而是用 使用 spark-submit 启动应用而且接收它。然而,对于本地测试和单元测试,您能够经过 “local” 来运行 Spark 进程。

4、弹性分布式数据集 (RDDs)

  Spark 主要以一个 弹性分布式数据集(RDD)的概念为中心,它是一个容错且能够执行并行操做的元素的集合。有两种方法能够建立 RDD : 在你的 driver program(驱动程序)中 parallelizing 一个已存在的集合,或者在外部存储系统中引用一个数据集,例如,一个共享文件系统,HDFS,HBase,或者提供 Hadoop InputFormat 的任何数据源。

1、A list of partiotions
一组分区(partition),partiotion是一个具体概念,指在一个节点中的连续的空间。一个partiotione确定使在一个节点上,可是一个节点上能够有多个partiotione。用户能够在建立RDD时指定RDD的分区个数。
二、A function for computing each split
对RDD作计算,至关于对RDD的每一个split或partition作计算
3、A list of dependencies on other RDDs
RDD之间有依赖关系,可溯源。
依赖还具体分为宽依赖和窄依赖,但并非全部的RDD都有依赖。 
RDD的每次转换都会生成一个新的RDD,因此RDD之间就会造成相似于流水线同样的先后依赖关系。在部分分区数据丢失时,Spark能够经过这个依赖关系从新计算丢失的分区数据,而不是对RDD的全部分区进行从新计算。
四、Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
能够按key的hash值分区
五、Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
数据本地性。计算每一个split时,在split所在机器的本地上运行task是最好的,避免了数据的移动;split有多个副本,因此preferred location不止一个
RDD五大特性

 4.1建立RDD

  • 并行集合

  能够在您的 driver program (a Scala Seq) 中已存在的集合上经过调用 SparkContext 的 parallelize 方法来建立并行集合。该集合的元素从一个能够并行操做的 distributed dataset(分布式数据集)中复制到另外一个 dataset(数据集)中去。例如,这里是一个如何去建立一个保存数字 1 ~ 5 的并行集合。

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

  在建立后,该 distributed dataset(分布式数据集)(distData)能够并行的执行操做。例如,咱们能够调用 distData.reduce((a, b) => a + b) 来合计数组中的元素。后面咱们将介绍 distributed dataset(分布式数据集)上的操做。

  并行集合中一个很重要参数是 partitions(分区)的数量,它可用来切割 dataset(数据集)。Spark 将在集群中的每个分区上运行一个任务。一般您但愿群集中的每个 CPU 计算 2-4 个分区。通常状况下,Spark 会尝试根据您的群集状况来自动的设置的分区的数量。固然,您也能够将分区数做为第二个参数传递到 parallelize (例如sc.parallelize(data, 10)) 方法中来手动的设置它。

  • 外部 Datasets(数据集)

  Spark 能够从 Hadoop 所支持的任何存储源中建立 distributed dataset(分布式数据集),包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3 等等。 Spark 支持文本文件,SequenceFiles,以及任何其它的 Hadoop InputFormat。

  可使用 SparkContext 的 textFile 方法来建立文本文件的 RDD。此方法须要一个文件的 URI(计算机上的本地路径 ,hdfs://s3n:// 等等的 URI),而且读取它们做为一个 lines(行)的集合。下面是一个调用示例:

JavaRDD<String> distFile = sc.textFile("data.txt");

  使用 Spark 读取文件时须要注意:

    • 全部 Spark 基于文件的 input 方法, 包括 textFile, 支持在目录上运行, 压缩文件, 和通配符. 例如, 您可使用 textFile("/my/directory")textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").

    • textFile 方法也能够经过第二个可选的参数来控制该文件的分区数量. 默认状况下, Spark 为文件的每个 block(块)建立的一 个 partition 分区(HDFS 中块大小默认是 128MB),固然你也能够经过传递一个较大的值来要求一个较高的分区数量。请注意,分区的数量不可以小于块的数量。

  除了文本文件以外,Spark 也支持一些其它的数据格式:

    • JavaSparkContext.wholeTextFile 能够读取包含多个小文本文件的目录, 而且将它们做为一个 (filename, content) pairs 来返回. 这与 textFile 相比, 它的每个文件中的每一行将返回一个记录. 分区由数据量来肯定, 某些状况下, 可能致使分区太少. 针对这些状况, wholeTextFiles 在第二个位置提供了一个可选的参数用户控制分区的最小数量.
    • 针对 SequenceFiles, 使用 SparkContext 的 sequenceFile[K, V] 方法,其中 K 和 V 指的是文件中 key 和 values 的类型. 这些应该是 Hadoop 的 Writable 接口的子类, 像 IntWritable and Text. 此外, Spark 可让您为一些常见的 Writables 指定原生类型; 例如, sequenceFile[Int, String]会自动读取 IntWritables 和 Texts.
    • 针对其它的 Hadoop InputFormats, 您可使用 SparkContext.hadoopRDD 方法, 它接受一个任意的 JobConf 和 input format class, key class 和 value class. 经过相同的方法你能够设置你的 input source(输入源). 你还能够针对 InputFormats 使用基于 “new” MapReduce API (org.apache.hadoop.mapreduce) 的 SparkContext.newAPIHadoopRDD.
    • RDD.saveAsObjectFile 和 SparkContext.objectFile 支持使用简单的序列化的 Java objects 来保存 RDD. 虽然这不像 Avro 这种专用的格式同样高效,但其提供了一种更简单的方式来保存任何的 RDD。

 4.2 RDD操做

  RDDs support 两种类型的操做: transformations(转换), 它会在一个已存在的 dataset 上建立一个新的 dataset, 和 actions(动做), 将在 dataset 上运行的计算后返回到 driver 程序. 例如, map 是一个经过让每一个数据集元素都执行一个函数,并返回的新 RDD 结果的 transformation。reduce是 经过执行一些函数,聚合 RDD 中全部元素,并将最终结果给返回驱动程序的action.

  Spark 中全部的 transformations 都是 lazy(懒加载的), 所以它不会马上计算出结果. 他们只应用于一些基本数据集的转换 (例如. 文件). 只有当须要返回结果给驱动程序时(action操做时),transformations 才开始计算. 这种设计使 Spark 的运行更高效.

  默认状况下,每次你在 RDD 运行一个 action 的时, 每一个 transformed RDD 都会被从新计算。可是,您也可用 persist (或 cache) 方法将 RDD persist(持久化)到内存中;在这种状况下,Spark 为了下次查询时能够更快地访问,会把数据保存在集群上。此外,还支持持续持久化 RDDs 到磁盘,或复制到多个结点。

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});

  第一行从外部文件中定义了一个基本的 RDD,但这个数据集并未加载到内存中或即将被行动: line 仅仅是一个相似指针的东西,指向该文件. 第二行定义了 lineLengths 做为 map 的结果。请注意,因为 laziness(延迟加载)lineLengths 不会被当即计算. 最后,咱们运行 reduce,这是一个 action。此时,Spark 分发计算任务到不一样的机器上运行,每台机器都运行 map 的一部分并本地运行 reduce,仅仅返回它聚合后的结果给驱动程序.

  若是咱们也但愿之后再次使用 lineLengths,咱们能够在 reduce 以前添加如下代码,这样它就会被保存在 memory 中。

lineLengths.persist(StorageLevel.MEMORY_ONLY());

  4.2.1 理解闭包

    在集群中执行代码时,一个关于 Spark 更难的事情是理解变量和方法的范围和生命周期。

int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);   
  • Local(本地)vs. cluster(集群)模式

    • 上面的代码行为是不肯定的,而且可能没法按预期正常工做。执行做业时,Spark 会分解 RDD 操做到每一个 executor 中的 task 里。在执行以前,Spark 计算任务的 closure(闭包)。闭包是指 executor 要在RDD上进行计算时必须对执行节点可见的那些变量和方法(在这里是foreach())。闭包被序列化并被发送到每一个 executor。
    • 闭包的变量副本发给每一个 executor ,当 counter 被 foreach 函数引用的时候,它已经再也不是 driver node 的 counter 了。虽然在 driver node 仍然有一个 counter 在内存中,可是对 executors 已经不可见。executor 看到的只是序列化的闭包一个副本。因此 counter 最终的值仍是 0,由于对 counter 全部的操做均引用序列化的 closure 内的值。
    • 在 local 本地模式,在某些状况下的 foreach 功能其实是同一 JVM 上的驱动程序中执行,并会引用同一个原始的 counter 计数器,实际上可能更新值。
    • 若是须要一些全局的聚合功能,应使用 Accumulator(累加器)。当一个执行的任务分配到集群中的各个 worker 结点时,Spark 的累加器是专门提供安全更新变量的机制。
  • 打印 RDD 的 elements
    • 另外一种常见的语法用于打印 RDD 的全部元素使用 rdd.foreach(println) 或 rdd.map(println)。在一台机器上,这将产生预期的输出和打印 RDD 的全部元素。
    • 然而,在集群 cluster 模式下,stdout 输出正在被执行写操做 executors 的 stdout 代替,而不是在一个驱动程序上,所以 stdout 的 driver 程序不会显示这些!
    • 要打印 driver 程序的全部元素,可使用的 collect() 方法首先把 RDD 放到 driver 程序节点上rdd.collect().foreach(println)。这可能会致使 driver 程序耗尽内存,虽然说,由于 collect() 获取整个 RDD 到一台机器; 若是你只须要打印 RDD 的几个元素,一个更安全的方法是使用 take()rdd.take(100).foreach(println)

  4.2.2 Key-Value Pairs 

    虽然大多数 Spark 操做工做在包含任何类型对象的 RDDs 上,只有少数特殊的操做可用于 Key-Value 对的 RDDs. 最多见的是分布式 “shuffle” 操做,如经过元素的 key 来进行 grouping 或 aggregating(聚合) 操做。

    在java中, key-value pairs 是使用Scala标准库中的 scala.Tuple2 类来表明,你可使用new Tuple2(a, b) 来建立一个Tuple,访问它的元素使用tuple._1() 和 tuple._2()

    可使用 mapToPair 和 flatMapToPair将JavaRDDs转换为JavaPairRDDs.

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

    咱们也可使用 counts.sortByKey() ,例如,在对按字母顺序排序,最后 counts.collect() 把他们做为一个数据对象返回给 driver 程序。

    当在 key-value pair 操做中使用自定义的 objects 做为 key 时, 您必须确保有一个自定义的 equals() 方法和一个 hashCode() 方法。

4.3 算子  

Transformations(转换)

下表列出了一些 Spark 经常使用的 transformations(转换). 详情请参考 RDD API 文档 (Scala, Java, Python, R) 和 pair RDD 函数文档 (Scala, Java).

Transformation(转换) Meaning(含义)
map(func) 返回一个新的 distributed dataset(分布式数据集),它由每一个 source(数据源)中的元素应用一个函数 func 来生成.
filter(func) 返回一个新的 distributed dataset(分布式数据集),它由每一个 source(数据源)中应用一个函数 func 且返回值为 true 的元素来生成.
flatMap(func) 与 map 相似,可是每个输入的 item 能够被映射成 0 个或多个输出的 items(因此 func 应该返回一个 Seq 而不是一个单独的 item).
mapPartitions(func) 与 map 相似,可是单独的运行在在每一个 RDD 的 partition(分区,block)上,因此在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator<T> => Iterator<U> 类型.(一次处理一个partition的数据,当不会内存溢出时可代替map,减小如数据库链接等次数)
mapPartitionsWithIndex(func) 与 mapPartitions 相似,可是也须要提供一个表明 partition 的 index(索引)的 interger value(整型值)做为参数的 func,因此在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator<T>) => Iterator<U> 类型.
sample(withReplacementfractionseed) 样本数据,设置是否放回(withReplacement), 采样的百分比(fraction)、使用指定的随机数生成器的种子(seed).
union(otherDataset) 反回一个新的 dataset,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的并集.
intersection(otherDataset) 返回一个新的 RDD,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的交集.
distinct([numTasks])) 返回一个新的 dataset,它包含了 source dataset(源数据集)中去重的元素.
groupByKey([numTasks]) 在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) . 
Note: 若是分组是为了在每个 key 上执行聚合操做(例如,sum 或 average),此时使用 reduceByKey 或 aggregateByKey 来计算性能会更好(shuffle的map以后自带combiner,会执行逻辑运算,同一个parition中的相同key,只用传一次结果到reduce). 
Note: 默认状况下,并行度取决于父 RDD 的分区数。能够传递一个可选的 numTasks 参数来设置不一样的任务数.
reduceByKey(func, [numTasks]) 在 (K, V) pairs 的 dataset 上调用时, 返回 dataset of (K, V) pairs 的 dataset, 其中的 values 是针对每一个 key 使用给定的函数 func 来进行聚合的, 它必须是 type (V,V) => V 的类型. 像 groupByKey 同样, reduce tasks 的数量是能够经过第二个可选的参数来配置的.
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks]) 在 (K, V) pairs 的 dataset 上调用时, 返回 (K, U) pairs 的 dataset,其中的 values 是针对每一个 key 使用给定的 combine 函数以及一个 neutral "0" 值来进行聚合的. 容许聚合值的类型与输入值的类型不同, 同时避免没必要要的配置. 像 groupByKey 同样, reduce tasks 的数量是能够经过第二个可选的参数来配置的.(shuffle的map和reduce逻辑不同时候,不能用reducebykey,好比求average,不能先在map阶段就取平均,而是总体取平均)
sortByKey([ascending], [numTasks]) 在一个 (K, V) pair 的 dataset 上调用时,其中的 K 实现了 Ordered,返回一个按 keys 升序或降序的 (K, V) pairs 的 dataset, 由 boolean 类型的 ascending 参数来指定.
join(otherDataset, [numTasks]) 在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,它拥有每一个 key 中全部的元素对。Outer joins 能够经过 leftOuterJoinrightOuterJoin 和 fullOuterJoin 来实现.
cogroup(otherDataset, [numTasks]) 在一个 (K, V) 和的 dataset 上调用时,返回一个 (K, (Iterable<V>, Iterable<W>)) tuples 的 dataset. 这个操做也调用了 groupWith.
cartesian(otherDataset) 在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) pairs 类型的 dataset(全部元素的 pairs,即笛卡尔积).
pipe(command[envVars]) 经过使用 shell 命令来将每一个 RDD 的分区给 Pipe。例如,一个 Perl 或 bash 脚本。RDD 的元素会被写入进程的标准输入(stdin),而且 lines(行)输出到它的标准输出(stdout)被做为一个字符串型 RDD 的 string 返回.
coalesce(numPartitions) Decrease(下降)RDD 中 partitions(分区)的数量为 numPartitions。对于执行filter后一个大的 dataset 操做是更有效的.默认无shuffle
repartition(numPartitions) Reshuffle(从新洗牌)RDD 中的数据以建立或者更多的 partitions(分区)并将每一个分区中的数据尽可能保持均匀. 该操做老是经过网络来 shuffles 全部的数据.
repartitionAndSortWithinPartitions(partitioner) 根据给定的 partitioner(分区器)对 RDD 进行从新分区,并在每一个结果分区中,按照 key 值对记录排序。这比每个分区中先调用 repartition 而后再 sorting(排序)效率更高,由于它能够将排序过程推送到 shuffle 操做的机器上进行.

Actions(动做)

下表列出了一些 Spark 经常使用的 actions 操做。详细请参考 RDD API 文档 (Scala, Java, Python, R)和 pair RDD 函数文档 (Scala, Java).

Action(动做) Meaning(含义)
reduce(func) 使用函数 func 聚合 dataset 中的元素,这个函数 func 输入为两个元素,返回为一个元素。这个函数应该是可交换(commutative )和关联(associative)的,这样才能保证它能够被并行地正确计算.
collect() 在 driver 程序中,以一个 array 数组的形式返回 dataset 的全部元素。这在过滤器(filter)或其余操做(other operation)以后返回足够小(sufficiently small)的数据子集一般是有用的.
count() 返回 dataset 中元素的个数.
first() 返回 dataset 中的第一个元素(相似于 take(1).
take(n) 将数据集中的前 n 个元素做为一个 array 数组返回.
takeSample(withReplacementnum, [seed]) 对一个 dataset 进行随机抽样,返回一个包含 num 个随机抽样(random sample)元素的数组,参数 withReplacement 指定是否有放回抽样,参数 seed 指定生成随机数的种子.
takeOrdered(n[ordering]) 返回 RDD 按天然顺序(natural order)或自定义比较器(custom comparator)排序后的前 n 个元素.
saveAsTextFile(path) 将 dataset 中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中的给定目录中。Spark 将对每一个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录.
saveAsSequenceFile(path
(Java and Scala)
将 dataset 中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统指定的路径中。该操做能够在实现了 Hadoop 的 Writable 接口的键值对(key-value pairs)的 RDD 上使用。在 Scala 中,它还能够隐式转换为 Writable 的类型(Spark 包括了基本类型的转换,例如 Int, Double, String 等等).
saveAsObjectFile(path
(Java and Scala)
使用 Java 序列化(serialization)以简单的格式(simple format)编写数据集的元素,而后使用 SparkContext.objectFile() 进行加载.
countByKey() 仅适用于(K,V)类型的 RDD 。返回具备每一个 key 的计数的 (K , Int)pairs 的 hashmap.
foreach(func) 对 dataset 中每一个元素运行函数 func 。这一般用于反作用(side effects),例如更新一个 Accumulator(累加器)或与外部存储系统(external storage systems)进行交互。Note:修改除 foreach()以外的累加器之外的变量(variables)可能会致使未定义的行为(undefined behavior)。详细介绍请阅读 Understanding closures(理解闭包) 部分.

4.2.4  shuffle操做

  Spark 里的某些操做会触发 shuffle。shuffle 是spark 从新分配数据的一种机制,使得这些数据能够跨不一样的区域进行分组。这一般涉及在 executors和机器之间拷贝数据,这使得 shuffle 成为一个复杂的、代价高的操做。

  为了明白 reduceByKey 操做的过程,咱们以 reduceByKey 为例。reduceBykey 操做产生一个新的 RDD,其中 key 全部相同的的值组合成为一个 tuple - key 以及与 key 相关联的全部值在 reduce 函数上的执行结果。面临的挑战是,一个 key 的全部值不必定都在一个同一个 paritition 分区里,甚至是不必定在同一台机器里,可是它们必须共同被计算。

  在 spark 里,特定的操做须要数据不跨分区分布。在计算期间,一个任务在一个分区上执行,为了全部数据都在单个 reduceByKey 的 reduce 任务上运行,咱们须要执行一个 all-to-all 操做。它必须从全部分区读取全部的 key 和 key对应的全部的值,而且跨分区汇集去计算每一个 key 的结果 - 这个过程就叫作 shuffle.。

  尽管每一个分区新 shuffle 的数据集将是肯定的,分区自己的顺序也是这样,可是这些数据的顺序是不肯定的。若是但愿 shuffle 后的数据是有序的,可使用:

  • mapPartitions 对每一个 partition 分区进行排序,例如, .sorted
  • repartitionAndSortWithinPartitions 在分区的同时对分区进行高效的排序.
  • sortBy 对 RDD 进行全局的排序

触发的 shuffle 操做包括 repartition 操做,如 repartition 和 coalesce‘ByKey 操做像 groupByKey 和 reduceByKey, 和 join操做, 像 cogroup 和 join.

4.2.5 RDD Persistence(持久化)

  Spark 中一个很重要的能力是将数据 persisting 持久化(或称为 caching 缓存),在多个操做间均可以访问这些持久化的数据。当持久化一个 RDD 时,每一个节点的其它分区均可以使用 RDD 在内存中进行计算,在该数据上的其余 action 操做将直接使用内存中的数据。这样会让之后的 action 操做计算速度加快(一般运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。

  RDD 可使用 persist() 方法或 cache() 方法进行持久化。数据将会在第一次 action 操做时进行计算,并缓存在节点的内存中。Spark 的缓存具备容错机制,若是一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动从新计算并进行缓存。

  另外,每一个持久化的 RDD 可使用不一样的 storage level 存储级别进行缓存,例如,持久化到磁盘、已序列化的 Java 对象形式持久化到内存(能够节省空间)、跨节点间复制、以 off-heap 的方式存储在 Tachyon。这些存储级别经过传递一个 StorageLevel 对象给 persist() 方法进行设置。cache() 方法是使用默认存储级别的快捷设置方法,默认的存储级别是 StorageLevel.MEMORY_ONLY(将反序列化的对象存储到内存中)。详细的存储级别介绍以下:

Storage Level(存储级别) Meaning(含义)
MEMORY_ONLY 将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中. 若是内存空间不够,部分数据分区将再也不缓存,在每次须要用到这些数据时从新进行计算. 这是默认的级别.
MEMORY_AND_DISK 将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。若是内存空间不够,将未缓存的数据分区存储到磁盘,在须要使用这些分区时从磁盘读取.
MEMORY_ONLY_SER 
(Java and Scala)
将 RDD 以序列化的 Java 对象的形式进行存储(每一个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省不少空间,尤为是在使用 fast serializer 时会节省更多的空间,可是在读取时会增长 CPU 的计算负担.
MEMORY_AND_DISK_SER 
(Java and Scala)
相似于 MEMORY_ONLY_SER ,可是溢出的分区会存储到磁盘,而不是在用到它们时从新计算.
DISK_ONLY 只在磁盘上缓存 RDD.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 与上面的级别功能相同,只不过每一个分区在集群中两个节点上创建副本.
OFF_HEAP (experimental 实验性) 相似于 MEMORY_ONLY_SER, 可是将数据存储在 off-heap memory 中. 这须要启用 off-heap 内存.

  Spark 会自动监视每一个节点上的缓存使用状况,并使用 least-recently-used(LRU)的方式来丢弃旧数据分区。 若是您想手动删除 RDD 而不是等待它掉出缓存,使用 RDD.unpersist() 方法。

4.2.6 共享变量

  • 广播变量
    • Broadcast variables(广播变量)容许程序员将一个 read-only(只读的)变量缓存到每台机器上,而不是给任务传递一个副本。它们是如何来使用呢,例如,广播变量能够用一种高效的方式给每一个节点传递一份比较大的 input dataset(输入数据集)副本。在使用广播变量时,Spark 也尝试使用高效广播算法分发 broadcast variables(广播变量)以下降通讯成本。
    • Spark 的 action(动做)操做是经过一系列的 stage(阶段)进行执行的,这些 stage(阶段)是经过分布式的 “shuffle” 操做进行拆分的。Spark 会自动广播出每一个 stage(阶段)内任务所须要的公共数据。这种状况下广播的数据使用序列化的形式进行缓存,并在每一个任务运行前进行反序列化。这也就意味着,只有在跨越多个 stage(阶段)的多个任务会使用相同的数据,或者在使用反序列化形式的数据特别重要的状况下,使用广播变量会有比较好的效果。
    • 广播变量经过在一个变量 v 上调用 SparkContext.broadcast(v) 方法来进行建立。广播变量是 v 的一个 wrapper(包装器),能够经过调用 value方法来访问它的值。代码示例以下:
      Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
      broadcastVar.value();
      // returns [1, 2, 3]
    • 在建立广播变量以后,在集群上执行的全部的函数中,应该使用该广播变量代替原来的 v 值,因此节点上的 v 最多分发一次。另外,对象 v 在广播后不该该再被修改,以保证分发到全部的节点上的广播变量具备一样的值。

  • Accumulators(累加器)

    • Accumulators(累加器)是一个仅能够执行 “added”(添加)的变量来经过一个关联和交换操做,所以能够高效地执行支持并行。累加器能够用于实现 counter( 计数,相似在 MapReduce 中那样)或者 sums(求和)。原生 Spark 支持数值型的累加器,而且程序员能够添加新的支持类型。

    • 做为一个用户,您能够建立 accumulators(累加器)而且重命名. 以下图所示, 一个命名的 accumulator 累加器(在这个例子中是 counter)将显示在 web UI 中,用于修改该累加器的阶段。 Spark 在 “Tasks” 任务表中显示由任务修改的每一个累加器的值。

    • 能够经过调用 SparkContext.longAccumulator() 或 SparkContext.doubleAccumulator() 方法建立数值类型的 accumulator(累加器)以分别累加 Long 或 Double 类型的值。集群上正在运行的任务就可使用 add 方法来累计数值。然而,它们不可以读取它的值。只有 driver program(驱动程序)才可使用 value 方法读取累加器的值。
      LongAccumulator accum = jsc.sc().longAccumulator();
      
      sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
      // ...
      // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
      
      accum.value();
      // returns 10
    • 虽然此代码使用 Long 类型的累加器的内置支持, 可是开发者经过 AccumulatorV2 它的子类来建立本身的类型. AccumulatorV2 抽象类有几个须要 override(重写)的方法: reset 方法可将累加器重置为 0, add 方法可将其它值添加到累加器中, merge 方法可将其余一样类型的累加器合并为一个. 其余须要重写的方法可参考 API documentation. 例如, 假设咱们有一个表示数学上 vectors(向量)的 MyVector 类,咱们能够写成:

      class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {
      
        private MyVector myVector = MyVector.createZeroVector();
      
        public void reset() {
          myVector.reset();
        }
      
        public void add(MyVector v) {
          myVector.add(v);
        }
        ...
      }
      
      // Then, create an Accumulator of this type:
      VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
      // Then, register it into spark context:
      jsc.sc().register(myVectorAcc, "MyVectorAcc1");

      注意,在开发者定义本身的 AccumulatorV2 类型时, resulting type(返回值类型)可能与添加的元素的类型不一致。

5、调度流程

5.1 宽窄依赖

  

窄依赖:

父RDD和子RDD partition之间的关系是一对一的。或者父RDD一个partition只对应一个子RDD的partition状况下的父RDD和子RDD partition关系是多对一的。不会有shuffle的产生。父RDD一个分区去到子RDD的一个分区

宽依赖:

父RDD与子RDD partition之间的关系是一对多会有shuffle的产生。父RDD的一个分区的数据去到子RDD的不一样分区里面。

(其实区分宽窄依赖主要就是看父RDD的一个Partition的流向,要是流向一个的话就是窄依赖,流向多个的话就是宽依赖。)

 5.2 stage切分

  

  Spark任务会根据RDD之间的依赖关系,造成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每一个stage包含一个或多个task任务。而后将这些task以taskSet的形式提交给TaskScheduler运行stage是由一组并行的task组成。

  切割规则:从后往前遇到宽依赖就切割stage。

  一个stage内的窄依赖是pipeline管道计算模式,pipeline只是一种计算思想,模式。

  • Spark的pipeLine的计算模式,至关于执行了一个高阶函数f3(f2(f1(textFile))) !+!+!=3 也就是来一条数据而后计算一条数据,把全部的逻辑走完,而后落地,准确的说一个task处理遗传分区的数据 由于跨过了不一样的逻辑的分区。而MapReduce是 1+1=2,2+1=3的模式,也就是计算完落地,而后在计算,而后再落地到磁盘或内存,最后数据是落在计算节点上,按reduce的hash分区落地。因此这也是比Mapreduce快的缘由,彻底基于内存计算。
  • 管道中的数据什么时候落地:shuffle write的时候,对RDD进行持久化的时候。
  • Stage的task并行度是由stage的最后一个RDD的分区数来决定的 。通常来讲,一个partiotion对应一个task,但最后reduce的时候能够手动改变reduce的个数,也就是分区数,即改变了并行度。例如reduceByKey(XXX,3),GroupByKey(4),union由的分区数由前面的相加。
  • 如何提升stage的并行度:reduceBykey(xxx,numpartiotion),join(xxx,numpartiotion)

5.3 执行流程

   

  Driver运行在客户端:

    • 客户端启动后直接运行用户程序,启动Driver相关的工做,初始化SparkContext时候最重要的就是构造一个DAGScheduler和TaskScheduler。
    • DAGScheduler首先建立一个finalStage,而后递归方式倒着切割Stage
    • 客户端的Driver向Master注册。
    • Master还会让Worker启动Exeuctor。Worker建立一个ExecutorRunner线程,ExecutorRunner会启动ExecutorBackend进程。
    • ExecutorBackend启动后会向Driver的SchedulerBackend注册。
    • 每一个Stage包含的Task经过TaskScheduler分配给Executor执行。
    • 全部stage都完成后做业结束。

    (若是Driver运行在Worker上,客户端提交做业给Master,Master让一个Worker启动Driver,即SchedulerBackend。)

  Spark on Yarn:

  

 5.4 任务调度

  

相关文章
相关标签/搜索