1、概述html
Apache Spark 是一个快速的, 多用途的集群计算系统。 它提供了 Java, Scala, Python 和 R 的高级 API,以及一个支持通用的执行图计算的优化过的引擎. 它还支持一组丰富的高级工具, 包括使用 SQL 处理结构化数据处理的 Spark SQL, 用于机器学习的 MLlib, 用于图计算的 GraphX, 以及 Spark Streaming。java
请注意, 在 Spark 2.0 以前, Spark 的主要编程接口是弹性分布式数据集(RDD)。 在 Spark 2.0 以后, RDD 被 Dataset 替换, 它是像RDD 同样的 strongly-typed(强类型), 可是在引擎盖下更加优化。 RDD 接口仍然受支持,可是, 咱们强烈建议您切换到使用 Dataset(数据集), 其性能要更优于 RDD。node
每个 Spark 应用程序由一个在集群上运行着用户的 main
函数和执行各类并行操做的 driver program(驱动程序)组成。Spark 提供的主要抽象是一个弹性分布式数据集(RDD),它是能够执行并行操做且跨集群节点的元素的集合。RDD 能够从一个 Hadoop 文件系统(或者任何其它 Hadoop 支持的文件系统),或者一个在 driver program(驱动程序)中已存在的 Scala 集合,以及经过 transforming(转换)来建立一个 RDD。用户为了让它在整个并行操做中更高效的重用,也许会让 Spark persist(持久化)一个 RDD 到内存中。最后,RDD 会自动的从节点故障中恢复。程序员
在 Spark 中的第二个抽象是可以用于并行操做的 shared variables(共享变量),默认状况下,当 Spark 的一个函数做为一组不一样节点上的任务运行时,它将每个变量的副本应用到每个任务的函数中去。有时候,一个变量须要在整个任务中,或者在任务和 driver program(驱动程序)之间来共享。Spark 支持两种类型的共享变量 : broadcast variables(广播变量),它能够用于在全部节点上缓存一个值,和 accumulators(累加器),他是一个只能被 “added(增长)” 的变量,例如 counters 和 sums。web
2、Spark依赖算法
Spark 2.x 默认使用 Scala 2.11 来构建和发布直到运行。(固然,Spark 也能够与其它的 Scala 版本一块儿运行)。为了使用 Scala 编写应用程序,您须要使用可兼容的 Scala 版本(例如,2.11.X)。shell
要编写一个 Spark 的应用程序,您须要在 Spark 上添加一个 Maven 依赖。Spark 能够经过 Maven 中央仓库获取:数据库
groupId = org.apache.spark artifactId = spark-core_2.11 version = 2.2.0
此外,若是您想访问一个 HDFS 集群,则须要针对您的 HDFS 版本添加一个 hadoop-client
(hadoop 客户端)依赖。apache
groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version>
最后,您须要导入一些 Spark classes(类)到您的程序中去。添加下面几行:编程
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf;
3、初始化Spark
Spark 程序必须作的第一件事情是建立一个 SparkContext 对象,它会告诉 Spark 如何访问集群。要建立一个 SparkContext
,首先须要构建一个包含应用程序的信息的 SparkConf 对象。
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf);
这个 appName
参数是一个在集群 UI 上展现应用程序的名称。 master
是一个 Spark, Mesos 或 YARN 的 cluster URL,或者指定为在 local mode(本地模式)中运行的 “local” 字符串。在实际工做中,当在集群上运行时,您不但愿在程序中将 master 给硬编码,而是用 使用 spark-submit
启动应用而且接收它。然而,对于本地测试和单元测试,您能够经过 “local” 来运行 Spark 进程。
4、弹性分布式数据集 (RDDs)
Spark 主要以一个 弹性分布式数据集(RDD)的概念为中心,它是一个容错且能够执行并行操做的元素的集合。有两种方法能够建立 RDD : 在你的 driver program(驱动程序)中 parallelizing 一个已存在的集合,或者在外部存储系统中引用一个数据集,例如,一个共享文件系统,HDFS,HBase,或者提供 Hadoop InputFormat 的任何数据源。
1、A list of partiotions 一组分区(partition),partiotion是一个具体概念,指在一个节点中的连续的空间。一个partiotione确定使在一个节点上,可是一个节点上能够有多个partiotione。用户能够在建立RDD时指定RDD的分区个数。 二、A function for computing each split 对RDD作计算,至关于对RDD的每一个split或partition作计算 3、A list of dependencies on other RDDs RDD之间有依赖关系,可溯源。 依赖还具体分为宽依赖和窄依赖,但并非全部的RDD都有依赖。 RDD的每次转换都会生成一个新的RDD,因此RDD之间就会造成相似于流水线同样的先后依赖关系。在部分分区数据丢失时,Spark能够经过这个依赖关系从新计算丢失的分区数据,而不是对RDD的全部分区进行从新计算。 四、Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 能够按key的hash值分区 五、Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 数据本地性。计算每一个split时,在split所在机器的本地上运行task是最好的,避免了数据的移动;split有多个副本,因此preferred location不止一个
4.1建立RDD
能够在您的 driver program (a Scala Seq
) 中已存在的集合上经过调用 SparkContext
的 parallelize
方法来建立并行集合。该集合的元素从一个能够并行操做的 distributed dataset(分布式数据集)中复制到另外一个 dataset(数据集)中去。例如,这里是一个如何去建立一个保存数字 1 ~ 5 的并行集合。
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
在建立后,该 distributed dataset(分布式数据集)(distData
)能够并行的执行操做。例如,咱们能够调用 distData.reduce((a, b) => a + b
) 来合计数组中的元素。后面咱们将介绍 distributed dataset(分布式数据集)上的操做。
并行集合中一个很重要参数是 partitions(分区)的数量,它可用来切割 dataset(数据集)。Spark 将在集群中的每个分区上运行一个任务。一般您但愿群集中的每个 CPU 计算 2-4 个分区。通常状况下,Spark 会尝试根据您的群集状况来自动的设置的分区的数量。固然,您也能够将分区数做为第二个参数传递到 parallelize
(例如sc.parallelize(data, 10)
) 方法中来手动的设置它。
Spark 能够从 Hadoop 所支持的任何存储源中建立 distributed dataset(分布式数据集),包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3 等等。 Spark 支持文本文件,SequenceFiles,以及任何其它的 Hadoop InputFormat。
可使用 SparkContext
的 textFile
方法来建立文本文件的 RDD。此方法须要一个文件的 URI(计算机上的本地路径 ,hdfs://
,s3n://
等等的 URI),而且读取它们做为一个 lines(行)的集合。下面是一个调用示例:
JavaRDD<String> distFile = sc.textFile("data.txt");
使用 Spark 读取文件时须要注意:
全部 Spark 基于文件的 input 方法, 包括 textFile
, 支持在目录上运行, 压缩文件, 和通配符. 例如, 您可使用 textFile("/my/directory")
, textFile("/my/directory/*.txt")
, and textFile("/my/directory/*.gz")
.
textFile
方法也能够经过第二个可选的参数来控制该文件的分区数量. 默认状况下, Spark 为文件的每个 block(块)建立的一 个 partition 分区(HDFS 中块大小默认是 128MB),固然你也能够经过传递一个较大的值来要求一个较高的分区数量。请注意,分区的数量不可以小于块的数量。
除了文本文件以外,Spark 也支持一些其它的数据格式:
textFile
相比, 它的每个文件中的每一行将返回一个记录. 分区由数据量来肯定, 某些状况下, 可能致使分区太少. 针对这些状况, wholeTextFiles
在第二个位置提供了一个可选的参数用户控制分区的最小数量.sequenceFile[K, V]
方法,其中 K
和 V
指的是文件中 key 和 values 的类型. 这些应该是 Hadoop 的 Writable 接口的子类, 像 IntWritable and Text. 此外, Spark 可让您为一些常见的 Writables 指定原生类型; 例如, sequenceFile[Int, String]
会自动读取 IntWritables 和 Texts.SparkContext.hadoopRDD
方法, 它接受一个任意的 JobConf
和 input format class, key class 和 value class. 经过相同的方法你能够设置你的 input source(输入源). 你还能够针对 InputFormats 使用基于 “new” MapReduce API (org.apache.hadoop.mapreduce
) 的 SparkContext.newAPIHadoopRDD
.RDD.saveAsObjectFile
和 SparkContext.objectFile
支持使用简单的序列化的 Java objects 来保存 RDD. 虽然这不像 Avro 这种专用的格式同样高效,但其提供了一种更简单的方式来保存任何的 RDD。4.2 RDD操做
RDDs support 两种类型的操做: transformations(转换), 它会在一个已存在的 dataset 上建立一个新的 dataset, 和 actions(动做), 将在 dataset 上运行的计算后返回到 driver 程序. 例如, map
是一个经过让每一个数据集元素都执行一个函数,并返回的新 RDD 结果的 transformation。reduce是 经过执行一些函数,聚合 RDD 中全部元素,并将最终结果给返回驱动程序的action.
Spark 中全部的 transformations 都是 lazy(懒加载的), 所以它不会马上计算出结果. 他们只应用于一些基本数据集的转换 (例如. 文件). 只有当须要返回结果给驱动程序时(action操做时),transformations 才开始计算. 这种设计使 Spark 的运行更高效.
默认状况下,每次你在 RDD 运行一个 action 的时, 每一个 transformed RDD 都会被从新计算。可是,您也可用 persist
(或 cache
) 方法将 RDD persist(持久化)到内存中;在这种状况下,Spark 为了下次查询时能够更快地访问,会把数据保存在集群上。此外,还支持持续持久化 RDDs 到磁盘,或复制到多个结点。
JavaRDD<String> lines = sc.textFile("data.txt"); JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() { public Integer call(String s) { return s.length(); } }); int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() { public Integer call(Integer a, Integer b) { return a + b; } });
第一行从外部文件中定义了一个基本的 RDD,但这个数据集并未加载到内存中或即将被行动: line
仅仅是一个相似指针的东西,指向该文件. 第二行定义了 lineLengths
做为 map
的结果。请注意,因为 laziness
(延迟加载)lineLengths
不会被当即计算. 最后,咱们运行 reduce
,这是一个 action。此时,Spark 分发计算任务到不一样的机器上运行,每台机器都运行 map 的一部分并本地运行 reduce,仅仅返回它聚合后的结果给驱动程序.
若是咱们也但愿之后再次使用 lineLengths
,咱们能够在 reduce
以前添加如下代码,这样它就会被保存在 memory 中。
lineLengths.persist(StorageLevel.MEMORY_ONLY());
4.2.1 理解闭包
在集群中执行代码时,一个关于 Spark 更难的事情是理解变量和方法的范围和生命周期。
int counter = 0; JavaRDD<Integer> rdd = sc.parallelize(data); // Wrong: Don't do this!! rdd.foreach(x -> counter += x); println("Counter value: " + counter);
Local(本地)vs. cluster(集群)模式
foreach
函数引用的时候,它已经再也不是 driver node 的 counter 了。虽然在 driver node 仍然有一个 counter 在内存中,可是对 executors 已经不可见。executor 看到的只是序列化的闭包一个副本。因此 counter 最终的值仍是 0,由于对 counter
全部的操做均引用序列化的 closure 内的值。local
本地模式,在某些状况下的 foreach
功能其实是同一 JVM 上的驱动程序中执行,并会引用同一个原始的 counter 计数器,实际上可能更新值。rdd.foreach(println)
或 rdd.map(println)
。在一台机器上,这将产生预期的输出和打印 RDD 的全部元素。cluster
模式下,stdout
输出正在被执行写操做 executors 的 stdout
代替,而不是在一个驱动程序上,所以 stdout
的 driver
程序不会显示这些!driver
程序的全部元素,可使用的 collect()
方法首先把 RDD 放到 driver 程序节点上: rdd.collect().foreach(println)
。这可能会致使 driver 程序耗尽内存,虽然说,由于 collect()
获取整个 RDD 到一台机器; 若是你只须要打印 RDD 的几个元素,一个更安全的方法是使用 take()
: rdd.take(100).foreach(println)
。4.2.2 Key-Value Pairs
虽然大多数 Spark 操做工做在包含任何类型对象的 RDDs 上,只有少数特殊的操做可用于 Key-Value 对的 RDDs. 最多见的是分布式 “shuffle” 操做,如经过元素的 key 来进行 grouping 或 aggregating(聚合) 操做。
在java中, key-value pairs 是使用Scala标准库中的 scala.Tuple2 类来表明,你可使用new Tuple2(a, b)
来建立一个Tuple,访问它的元素使用tuple._1()
和 tuple._2()
可使用 mapToPair
和 flatMapToPair将JavaRDDs转换为
JavaPairRDDs.
JavaRDD<String> lines = sc.textFile("data.txt"); JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
咱们也可使用 counts.sortByKey()
,例如,在对按字母顺序排序,最后 counts.collect()
把他们做为一个数据对象返回给 driver 程序。
当在 key-value pair 操做中使用自定义的 objects 做为 key 时, 您必须确保有一个自定义的 equals()
方法和一个 hashCode()
方法。
4.3 算子
下表列出了一些 Spark 经常使用的 transformations(转换). 详情请参考 RDD API 文档 (Scala, Java, Python, R) 和 pair RDD 函数文档 (Scala, Java).
下表列出了一些 Spark 经常使用的 actions 操做。详细请参考 RDD API 文档 (Scala, Java, Python, R)和 pair RDD 函数文档 (Scala, Java).
4.2.4 shuffle操做
Spark 里的某些操做会触发 shuffle。shuffle 是spark 从新分配数据的一种机制,使得这些数据能够跨不一样的区域进行分组。这一般涉及在 executors和机器之间拷贝数据,这使得 shuffle 成为一个复杂的、代价高的操做。
为了明白 reduceByKey
操做的过程,咱们以 reduceByKey
为例。reduceBykey 操做产生一个新的 RDD,其中 key 全部相同的的值组合成为一个 tuple - key 以及与 key 相关联的全部值在 reduce 函数上的执行结果。面临的挑战是,一个 key 的全部值不必定都在一个同一个 paritition 分区里,甚至是不必定在同一台机器里,可是它们必须共同被计算。
在 spark 里,特定的操做须要数据不跨分区分布。在计算期间,一个任务在一个分区上执行,为了全部数据都在单个 reduceByKey
的 reduce 任务上运行,咱们须要执行一个 all-to-all 操做。它必须从全部分区读取全部的 key 和 key对应的全部的值,而且跨分区汇集去计算每一个 key 的结果 - 这个过程就叫作 shuffle.。
尽管每一个分区新 shuffle 的数据集将是肯定的,分区自己的顺序也是这样,可是这些数据的顺序是不肯定的。若是但愿 shuffle 后的数据是有序的,可使用:
mapPartitions
对每一个 partition 分区进行排序,例如, .sorted
repartitionAndSortWithinPartitions
在分区的同时对分区进行高效的排序.sortBy
对 RDD 进行全局的排序触发的 shuffle 操做包括 repartition 操做,如 repartition
和 coalesce
, ‘ByKey 操做像 groupByKey
和 reduceByKey
, 和 join操做, 像 cogroup
和 join
.
4.2.5 RDD Persistence(持久化)
Spark 中一个很重要的能力是将数据 persisting 持久化(或称为 caching 缓存),在多个操做间均可以访问这些持久化的数据。当持久化一个 RDD 时,每一个节点的其它分区均可以使用 RDD 在内存中进行计算,在该数据上的其余 action 操做将直接使用内存中的数据。这样会让之后的 action 操做计算速度加快(一般运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。
RDD 可使用 persist()
方法或 cache()
方法进行持久化。数据将会在第一次 action 操做时进行计算,并缓存在节点的内存中。Spark 的缓存具备容错机制,若是一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动从新计算并进行缓存。
另外,每一个持久化的 RDD 可使用不一样的 storage level 存储级别进行缓存,例如,持久化到磁盘、已序列化的 Java 对象形式持久化到内存(能够节省空间)、跨节点间复制、以 off-heap 的方式存储在 Tachyon。这些存储级别经过传递一个 StorageLevel
对象给 persist()
方法进行设置。cache()
方法是使用默认存储级别的快捷设置方法,默认的存储级别是 StorageLevel.MEMORY_ONLY
(将反序列化的对象存储到内存中)。详细的存储级别介绍以下:
Storage Level(存储级别) | Meaning(含义) |
---|---|
MEMORY_ONLY | 将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中. 若是内存空间不够,部分数据分区将再也不缓存,在每次须要用到这些数据时从新进行计算. 这是默认的级别. |
MEMORY_AND_DISK | 将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。若是内存空间不够,将未缓存的数据分区存储到磁盘,在须要使用这些分区时从磁盘读取. |
MEMORY_ONLY_SER (Java and Scala) |
将 RDD 以序列化的 Java 对象的形式进行存储(每一个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省不少空间,尤为是在使用 fast serializer 时会节省更多的空间,可是在读取时会增长 CPU 的计算负担. |
MEMORY_AND_DISK_SER (Java and Scala) |
相似于 MEMORY_ONLY_SER ,可是溢出的分区会存储到磁盘,而不是在用到它们时从新计算. |
DISK_ONLY | 只在磁盘上缓存 RDD. |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 与上面的级别功能相同,只不过每一个分区在集群中两个节点上创建副本. |
OFF_HEAP (experimental 实验性) | 相似于 MEMORY_ONLY_SER, 可是将数据存储在 off-heap memory 中. 这须要启用 off-heap 内存. |
Spark 会自动监视每一个节点上的缓存使用状况,并使用 least-recently-used(LRU)的方式来丢弃旧数据分区。 若是您想手动删除 RDD 而不是等待它掉出缓存,使用 RDD.unpersist()
方法。
4.2.6 共享变量
v
上调用 SparkContext.broadcast(v)
方法来进行建立。广播变量是 v
的一个 wrapper(包装器),能够经过调用 value
方法来访问它的值。代码示例以下:
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3}); broadcastVar.value(); // returns [1, 2, 3]
在建立广播变量以后,在集群上执行的全部的函数中,应该使用该广播变量代替原来的 v
值,因此节点上的 v
最多分发一次。另外,对象 v
在广播后不该该再被修改,以保证分发到全部的节点上的广播变量具备一样的值。
Accumulators(累加器)
Accumulators(累加器)是一个仅能够执行 “added”(添加)的变量来经过一个关联和交换操做,所以能够高效地执行支持并行。累加器能够用于实现 counter( 计数,相似在 MapReduce 中那样)或者 sums(求和)。原生 Spark 支持数值型的累加器,而且程序员能够添加新的支持类型。
做为一个用户,您能够建立 accumulators(累加器)而且重命名. 以下图所示, 一个命名的 accumulator 累加器(在这个例子中是 counter
)将显示在 web UI 中,用于修改该累加器的阶段。 Spark 在 “Tasks” 任务表中显示由任务修改的每一个累加器的值。
SparkContext.longAccumulator()
或 SparkContext.doubleAccumulator()
方法建立数值类型的 accumulator
(累加器)以分别累加 Long 或 Double 类型的值。集群上正在运行的任务就可使用 add
方法来累计数值。然而,它们不可以读取它的值。只有 driver program(驱动程序)才可使用 value
方法读取累加器的值。
LongAccumulator accum = jsc.sc().longAccumulator(); sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x)); // ... // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s accum.value(); // returns 10
虽然此代码使用 Long 类型的累加器的内置支持, 可是开发者经过 AccumulatorV2 它的子类来建立本身的类型. AccumulatorV2 抽象类有几个须要 override(重写)的方法: reset
方法可将累加器重置为 0, add
方法可将其它值添加到累加器中, merge
方法可将其余一样类型的累加器合并为一个. 其余须要重写的方法可参考 API documentation. 例如, 假设咱们有一个表示数学上 vectors(向量)的 MyVector
类,咱们能够写成:
class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> { private MyVector myVector = MyVector.createZeroVector(); public void reset() { myVector.reset(); } public void add(MyVector v) { myVector.add(v); } ... } // Then, create an Accumulator of this type: VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2(); // Then, register it into spark context: jsc.sc().register(myVectorAcc, "MyVectorAcc1");
注意,在开发者定义本身的 AccumulatorV2 类型时, resulting type(返回值类型)可能与添加的元素的类型不一致。
5、调度流程
5.1 宽窄依赖
窄依赖:
父RDD和子RDD partition之间的关系是一对一的。或者父RDD一个partition只对应一个子RDD的partition状况下的父RDD和子RDD partition关系是多对一的。不会有shuffle的产生。父RDD的一个分区去到子RDD的一个分区。
宽依赖:
父RDD与子RDD partition之间的关系是一对多。会有shuffle的产生。父RDD的一个分区的数据去到子RDD的不一样分区里面。
(其实区分宽窄依赖主要就是看父RDD的一个Partition的流向,要是流向一个的话就是窄依赖,流向多个的话就是宽依赖。)
5.2 stage切分
Spark任务会根据RDD之间的依赖关系,造成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每一个stage包含一个或多个task任务。而后将这些task以taskSet的形式提交给TaskScheduler运行。stage是由一组并行的task组成。
切割规则:从后往前,遇到宽依赖就切割stage。
一个stage内的窄依赖是pipeline管道计算模式,pipeline只是一种计算思想,模式。
5.3 执行流程
Driver运行在客户端:
(若是Driver运行在Worker上,客户端提交做业给Master,Master让一个Worker启动Driver,即SchedulerBackend。)
Spark on Yarn:
5.4 任务调度