Spark RDD(Resilient Distributed Datasets)论文
- 概要
- 1: 介绍
- 2: Resilient Distributed Datasets(RDDs)
- 3 Spark 编程接口
- 4 表达 RDDs
- 5 实现
- 7 讨论
- 8 相关工做
- 9 结尾
- 鸣谢
- 引用资料
- 原文连接
- 贡献者
概要
为了能解决程序员能在大规模的集群中以一种容错的方式进行内存计算这个问题, 咱们提出了 RDDs 的概念. 当前的不少框架对迭代式算法场景与交互性数据挖掘场景的处理性能很是差, 这个是 RDDs 的提出的动机. 若是能将数据保存在内存中, 将会使的上面两种场景的性能提升一个数量级. 为了能达到高效的容错, RDDs 提供了一种受限制的共享内存的方式, 这种方式是基于粗粒度的转换共享状态而非细粒度的更新共享状态. 然而, 咱们分析代表 RDDs 能够表达出不少种类的计算, 包括目前专门从事迭代任务的编程计算模型, 好比 Pregel, 固然也能够表达出目前模型表达不出的计算. 咱们经过 Spark 系统来实现了 RDDs, 而且经过各类各样的用户应用和测试来评估了这个系统.html
1: 介绍
像 MapReduce 和 Dryad 等分布式计算框架已经普遍应用于大数据集的分析. 这些系统可让用户不用担忧分布式工做以及容错, 而是使用一系列的高层次的操做 api 来达到并行计算的目的.java
虽然当前的框架提供了大量的对访问利用计算资源的抽象, 可是它们缺乏了对利用分布式内存的抽象.样使的它们在处理须要在多个计算之间复用中间结果的应用的时候会很是的不高效. 数据的复用在迭代机器学习和图计算领域(好比 PageRank, K-means 以及线性回归等算法)是很常见的. 在交互式数据挖中, 一个用户会常常对一个相同的数据子集进行屡次不一样的特定查询, 因此数据复用在交互式数据挖掘也是很常见的. 然而, 目前的大部分的框架对计算之间的数据复用的处理方式就是将中间数据写到一个靠稳定的系统中(好比分布式文件系统), 这样会因为数据的复制备份, 磁盘的 I/O 以及数据的序列化而致应用任务执行很费时间.git
认识到这个问题后, 研究者们已经为一些须要中间数据复用的应用开发出了一些特殊的框架.好比Pregel 在作迭代式图计算的时候会将中间结果放在内存中. HaLoop 也提供了迭代式 MapReduce 接口.然而, 这些框架仅仅支持一些特殊的计算模式(好比循环一系列的 MapReduce 步骤), 而且它们是隐式的为些计算模式提供数据共享. 它们没有提供更加广泛数据复用的抽象, 好比可让用户加载几个数据集到存中而后对这些内存中的数据集进行专门的查询.程序员
在这篇论文中, 咱们提出了一个全新的抽象, 叫作 RDDs, 它能够高效的处理普遍的应用中涉及到的数据用的场景. RDDs 是一个能够容错且并行的数据结构, 它可让用户显式的将中间结果数据集保存在内中、控制数据集的分区来达到数据存放处理最优以及可使用丰富的操做 api 来操做数据集在设计 RDDs 的时候, 最大的挑战是定义一个能够高效容错的编程接口. 已经存在的分布式内存抽象系统好比 distributed shared memory、key-value stores、databases 以及 Poccolo, 都是提供了基于粒度的更新可变状态(好比 table 中的 cells)的接口, 基于这种接口下, 保证容错的方式无非是将数据复备份到多台机器或者在多台机器上记录更新的日志, 这两种方式在数据密集性的工做任务中都是很是的时的, 由于须要经过网络传输在机器节点间复制大量的数据, 宽带传输数据的速度远远比 RAM 内存慢, 而这两种方式会占用大量的存储空间.github
与这些系统相反, RDDs 提供了基于粗粒度转换(好比 map, filter 以及 join)的接口, 这些接口能够对多的数据条目应用相同的操做.这样就能够经过记录来生成某个数据集的一系列转换 (就是这个数据集 lineage)而不是记录真实的数据来达到提供高效的容错机制. 这个 RDD 就有足够的信息知道它是从哪 RDDs 转换计算来的, 若是一个 RDD 的分区数据丢失掉了, 那么从新计算这个 RDD 所依赖的那个 RDD 对应的区就好了. 所以能够很快且不用经过复制备份方式来恢复丢失的数据.web
虽然基于粗粒度的转换一开始看起来受限制, 可是 RDDs 很是适合不少并行计算的应用, 由于这些应用基都是在大量的数据元素上应用相同的操做方法. 事实上, 咱们分析代表 RDDs 不只能够高效的表达出目前括 MapReduce, DryadLINQ, SQL, Pregel 以及 HaLoop 等系统提出的分布式编程模型, 并且还能表达它们表达不了的新的应用的计算模型, 好比交互型数据挖掘. 咱们相信, RDDs 解决那些新的框架提出来计算需求的能力将会成为是 RDD 抽象强大的最有力证据.算法
咱们在 Spark 系统中实现了 RDDs, 这个系统已经在 UC Berkeley 以及好些个公司中应用于研究和生产应中.Spark 和 DryadLINQ 相似使用scala语言提供了很方便语言集成编程接口.另外, Spark能够利用 scala 的解释器来对大数据集进行交互式的查询.咱们相信 spark 是首个容许使用多种编程语言来进行分布式内存中交互式数据挖掘的系统.shell
咱们经过为基准测试以及用户应用的测试两个方面来评估了 RDDs 和 spark. 咱们分析显示, Spark 在迭代应用中能够比 hadoop 快上 20 倍以上、使的现实中的数据分析报表的速度提高了 40 倍以及使的交互式的扫1TB数据集的延迟在 5-7 秒. 更重要的是, 为了彰显 RDDs 的广泛性, 咱们基于spark 用相对较小的程序(每一个包只有 200 行代码)实现了 Pregel 和 HaLoop 的编程模型, 包括它们使用的数据分布优化. 本篇论文以 RDDs(第二节)和 Spark(第三节)的概述开始. 而后在第四节中讨论 了RDD s内部的表达、在第节中讨论了咱们的实现以及在第六节中讨论了实验结果. 最后, 咱们讨论了 RDDs 是怎么样来表达如今已存在的几个系统的编程模型(第七节)、调查相关工做(第八节)以及总结.数据库
2: Resilient Distributed Datasets(RDDs)
这节主要讲述 RDDs 的概要, 首先定义 RDDs(2.1)以及介绍 RDDs 在 spark 中的编程接口(2.2), 而后对 RDDs 和细粒度共享内存抽象进行的对比(2.3).最后咱们讨论了 RDD 模型的限制性.apache
2.1 RDD 抽象
一个 RDD 是一个只读, 被分区的数据集.咱们能够经过两种对稳定的存储系统和其余的 RDDs 进行操做而建立一个新的 RDDs.为了区别开 RDDs 的其余操做, 咱们称这些操做为 transformations, 好比 map, filter 以及 join 等都是 transformations 操做.
RDDs 并不要始终被具体化, 一个 RDD 有足够的信息知道本身是从哪一个数据集计算而来的(就是所谓的依赖血统), 这是一个很是强大的属性:其实, 一个程序你能引用一个不能从失败中从新构建的 RDD.
最后, 用户能够控制 RDDs 的两个方面:数据存储和分区.对于须要复用的 RDD, 用户能够明确的选择一个数据存储策略(好比内存缓存). 他们也能够基于一个元素的 key 来为 RDD 全部的元素在机器节点间进行数据分区, 这样很是利于数据分布优化, 好比给两个数据集进行相同的 hash 分区, 而后进行 join, 能够提升 join 的性能.
2.2 Spark 编程接口
Spark 和 DryadLINQ 和 FlumeJava 同样经过集成编程语言 api 来暴露 RDDs, 这样的话, 每个数据集就表明一个对象, 咱们能够调用这个对象中的方法来操做这个对象.
编程者能够经过对稳定存储的数据进行转换操做(即 transformations, 好比 map 和 filter 等)来获得一个或者多个 RDDs. 而后能够对这些 RDDs 进行 actions 操做, 这些操做能够是获得应用的结果值, 也能够是将结果数据写入到存储系统中, actions 包括: count(表示返回这个数据集的元素的个数)、collect(表示返回数据集的全部元素)以及 save(表示将输出结果写入到存储系统中). 和 DryadLINQ 同样, spark 在定义 RDDs 的时候并不会真正的计算, 而是要等到对这个 RDDs 触发了 actions 操做才会真正的触发计算, 这个称之为 RDDs 的 lazy 特性, 因此咱们能够先对 transformations 进行组装一系列的 pipelines, 而后再计算.
另外, 编程者能够经过调用 RDDs 的 persist 方法来缓存后续须要复用的 RDDs. Spark 默认是将缓存数据放在内存中, 可是若是内存不足的话则会写入到磁盘中. 用户能够经过 persist 的参数来调整缓存策略, 好比只将数据存储在磁盘中或者复制备份数据到多台机器. 最后, 用户能够为每个 RDDs 的缓存设置优先级, 以达到哪一个在内存中的 RDDs 应该首先写道磁盘中
2.2.1 例子 – 监控日志数据挖掘
假设一个 web 服务正发生了大量的错误, 而后运维人员想从存储在 hdfs 中的几 TB 的日志中找出错误的缘由. 运维人员能够经过 spark 将日志中的错误信息加载到分布式的内存中, 而后对这些内存中的数据进行查询. 她首先须要写下面的 scala 代码:
line = spark.textFile("hdfs://..") errors = lines.filter(_.startsWith("ERROR")) errors.persist()
第一行表示从一个 HDFS 文件(许多行的文件数据集)上定义了一个 RDD, 第二行表示基于前面定义的 RDD 进行过滤数据.第三行将过滤后的 RDD 结果存储在内存中, 以达到多个对这个共享 RDD 的查询. 须要注意的事, filter 的参数是 scala 语法中的闭包.
到目前为止, 集群上尚未真正的触发计算.然而, 用户能够对RDD进行action操做, 好比对错误信息的计数:
errors.count()
用户也能够继续对 RDD 进行 transformations 操做, 而后计算其结果, 好比:
//对错误中含有 ”MySQL” 单词的数据进行计数 errors.filters(_.contains("MySQL")).count() //返回错误信息中含有 "HDFS" 字样的信息中的时间字段的值(假设每行数据的字段是以 tab 来切分的, 时间字段是第 3 个字段) errors.filter(_.contains("HDFS")) .map(_.split("\t")(3)) .collect()
在对 errors 第一次作 action 操做的后, spark 会将 errors 的全部分区的数据存储在内存中, 这样后面对 errors 的计算速度会有很大的提高.须要注意的是, 像 lines 这种基础数据的 RDD 是不会存储在内存中的.由于包含错误信息的数据可能只是整个日志数据的一小部分, 因此将包含错误数据的日志放在内存中是比较合理的.
最后, 为了说明咱们的模型是如何达到容错的, 咱们在图一种展现了第三个查询的血缘关系图(lineage graph).在这个查询种, 咱们以对 lines 进行过滤后的 errors 开始, 而后在对 errors 进行了 filter 和 map 操做, 最后作了 action 操做即 collect. Spark 会最后两个 transformations 组成一个 pipeline, 而后将这个 pipeline 分解成一系列的 task, 最后将这些 task 调度到含有 errors 缓存数据的机器上进行执行. 此外, 若是 errors 的一个分区的数据丢失了, spark 会对 lines 的相对应的分区应用 filter 函数来从新建立 errors 这个分区的数据
图一: 咱们例子中第三个查询的血缘关系图, 其中方框表示 RDDs, 箭头表示转换
2.3 RDD 模型的优点
为了理解做为分布式内存抽象的 RDDs 的好处, 咱们在表一种用 RDDs 和分布式共享内存系统(Distributed shared memory 即 DSM)进行了对比. 在全部的 DSM 系统中, 应用从一个全局的地址空间中的任意位置中读写数据. 须要注意的是, 依据这个定义, 咱们所说的 DSM 系统不只包含了传统的共享内存系统, 还包含了对共享状态的细粒度写操做的其余系统(好比 Piccolo), 以及分布式数据库. DSM 是一个很广泛的抽象, 可是这个广泛性使得它在商用集群中实现高效且容错的系统比较困难.
Aspect(概念) | RDDs | Distribute shared memory(分布式共享内存) |
---|---|---|
Reads | 粗粒度或者细粒度 | 细粒度 |
Writes | 粗粒度 | 细粒度 |
数据一致性 | 不重要的(由于RDD是不可变的) | 取决于app 或者 runtime |
容错 | 利用lineage达到细粒度且低延迟的容错 | 须要应用checkpoints(就是须要写磁盘) 而且须要程序回滚 |
计算慢的任务 | 能够利用备份的任务来解决 | 很难作到 |
计算数据的位置 | 自动的机遇数据本地性 | 取决于app (runtime是以透明为目标的) |
内存不足时的行为 | 和已经存在的数据流处理系统同样, 写磁盘 | 很是糟糕的性能(须要内存的交换?) |
表一: RDDs 和 Distributed shared memory 对比
RDDs 只能经过粗粒度的转换被建立(或者被写) , 然而 DSM 容许对每个内存位置进行读写, 这个是 RDDs 和 DSM 最主要的区别. 这样使都 RDDs在 应用中大量写数据受到了限制, 可是可使的容错变的更加高效. 特别是, RDDs 不须要发生很是耗时的 checkpoint 操做, 由于它能够根据 lineage 进行恢复数据 . 并且, 只有丢掉了数据的分区才会须要从新计算, 并不须要回滚整个程序, 而且这些从新计算的任务是在多台机器上并行运算的.
RDDs 的第二个好处是:它不变的特性使的它能够和 MapReduce 同样来运行执行很慢任务的备份任务来达到缓解计算很慢的节点的问题. 在 DSM 中, 备份任务是很难实现的, 由于原始任务和备份任务或同时更新访问同一个内存地址和接口.
最后, RDDs 比 DSM 多提供了两个好处. 第一, 在对 RDDs 进行大量写操做的过程当中, 咱们能够根据数据的本地性来调度 task 以提升性能. 第二, 若是在 scan-base 的操做中, 且这个时候内存不足以存储这个 RDDs, 那么 RDDs 能够慢慢的从内存中清理掉. 在内存中存储不下的分区数据会被写到磁盘中, 且提供了和现有并行数据处理系统相同的性能保证.
2.4 不适合用 RDDs 的应用
通过上面的讨论介绍, 咱们知道 RDDs 很是适合将相同操做应用在整个数据集的全部的元素上的批处理应用. 在这些场景下, RDDs 能够利用血缘关系图来高效的记住每个 transformations 的步骤, 而且不须要记录大量的数据就能够恢复丢失的分区数据. RDDs 不太适合用于须要异步且细粒度的更新共享状态的应用, 好比一个 web 应用或者数据递增的 web 爬虫应用的存储系统. 对于这些应用, 使用传统的纪录更新日志以及对数据进行 checkpoint 会更加高效. 好比使用数据库、RAMCloud、Percolator 以及 Piccolo. 咱们的目标是给批量分析提供一个高效的编程模型, 对于这些异步的应用须要其余的特殊系统来实现.
3 Spark 编程接口
Spark 使用 scala 语言实现了抽象的 RDD, scala 是创建在 java VM 上的静态类型函数式编程语言. 咱们选择 scala 是由于它结合了简洁(很方便进行交互式使用)与高效(因为它的静态类型). 然而, 并非说 RDD 的抽象须要函数式语言来实现.
开发员须要写链接集群中的 workers 的 driver 程序来使用 spark, 就好比图 2 展现的. Driver 端程序定义了一系列的 RDDs 而且调用了 RDD 的 action 操做. Driver 的程序同时也会跟踪 RDDs 之间的的血缘关系. workers 是能够将 RDD 分区数据存储在内存中的长期存活的进程.
图二: 这个是 Spark 运行时的图, 用户写的 driver 端程序启动多个 workers, 这些 workers 能够从分布书的存储系统中读取数据块而且能够将计算出来的 RDD 分区数据存放在内存中.
在 2.2.1 小节中的日志挖掘例子中, 咱们提到, 用户提供给 RDD 操做好比 map 以参数做为这个操做的闭包(说白了就是函数). Scala 将这些函数看做一个 java 对象, 这些对象是能够序列化的, 而且能够经过网络传输传输到其余的机器节点上的. Scala 将函数中的变量看做一个对象中的变量. 好比, 咱们能够写一段这样的代码: var x = 5; rdd.map(_ + 5)来达到给这个 RDD 每个元素加上 5 的目的.
RDDs 是被一元素类型参数化的静态类型对象, 好比, RDD[Int] 表示一个类型为整数的 RDD. 然而, 咱们不少例子中的 RDD 都会省去这个类型, 这个是由于 scala 支持类型推断.
虽然咱们用 scala 实现 RDD 的方法很简单, 可是咱们须要处理用反射实现的闭包对象相关的工做, 咱们还须要作不少的工做使的 spark 能够用 scala 的解释器, 这个咱们在 5.2 小节中会讨论到. 尽管如此, 咱们是不须要修改 scala 的编译器的.
3.1 Spark 中 RDD 的操做
表 2 中列举了 Spark 中 RDD 经常使用的 transformations 和 actions 操做, 且描述了每个方法的签名以及类型.咱们须要记住 transformations 是用来定义一个新的 RDD 的 lazy 操做, 而actions 是真正触发一个能返回结果或者将结果写到文件系统中的计算.
表二: Spark 中 RDD 经常使用的 transformations 和 actions 操做.Seq[T] 表示元素类型为 T 的一个列表.
须要注意的是, 一些操做好比 join 只适合用于 key-value 类型的 RDDs. 咱们取的函数的名称和 scala 或者其余函数式编程语言的函数名是一致的. 好比, map 是一个 one-to-one 的映射操做, 而 flatMap 的每个输入值会对应一个或者更多的输出值(有点像 MapReduce 中的 map)
除了这些操做, 用户能够经过 persist 操做来请求缓存 RDD. 另外, 用户能够拿到被 Partitioner 分区后的分区数以及根据 Partitioner 对另外一个 dataset 进行分区. 像 groupByKey、reduceByKey 以及 sort 等操做都是通过了hash 或者 rang 分区后的 RDD.
3.2 举例应用
咱们用两个迭代式的应用:线性回归和 PageRank 来补充 2.2.1 提到的数据挖掘的例子. 稍后也会展现下如何控制 RDD 的分区以达到提高性能的目的.
3.2.1 线性回归
不少的机器学习算法通常都是迭代式的计算, 由于它们须要跑迭代的优化程序(好比梯度降低)来达到最大化功能. 他们将数据存放在内存中以达到很快的速度.
做为一个例子, 下面的程序实现了线性回归, 一个能找到最佳区分两种点集(垃圾邮件以及非垃圾邮件)的超平面 w 的经常使用的分类算法. 这个算法用了梯度降低的方法:一个随机的值做为 w 的初始值, 每次迭代都会将含有 w 的方法应用到每个数据点而后累加获得梯度值, 而后将 w 往改善结果的方向移动.
一开始咱们定义一个叫 points 的 RDD, 这个 RDD 从一个文本文件中通过 map 将每一行转换为 Point 对象获得. 而后咱们重复对 points 进行 map 和 reduce 操做计算出每一步的梯度值. 在迭代之间咱们将 points 存放在内存中可使的性能提升 20 倍, 咱们将会在 6.1 节中讨论.
3.2.2 PageRank
在 PageRank 中数据共享更加复杂. 若是一个文档引用另外一个文档, 那被引用的文档的排名值(rank)须要加上引用的文档发送过来的贡献值, 固然这个过程是个迭代的过程. 在每一次迭代中, 每个文档都会发送 r/n 的贡献值给它的邻居, 其中 r 表示这个文档的排名值, n 表示这个文档的邻居数量. 而后更新文档的排名值为, 这个表达式值表示这个文档收到的贡献值, N 表示全部的文档的数量, 咱们能够用以下的 spark 代码来表达 PageRank:
其中 links 表示( URL , outlinks )键值对. 这个程序的 RDD 的血缘关系图如图三. 在每一次迭代中咱们都是根据上一次迭代的 contribs 和 ranks 以及原始不变的 links 数据集来建立一个新的 ranks 数据集. 随着迭代次数的变多这张图会变的越长, 这个是这个图比较有意思的特色. 若是这个 job 的迭代次数不少的话, 那么备份一些版本的 ranks 来达到减小从错误中恢复出来的时间是颇有必要的, 用户能够调用标记为 RELIABLE 的 persist 函数来达到这个目的. 须要注意的是, links 是不须要备份的, 由于它的分区数据能够快速的从从新计算输入文件中对应的数据块而获得, 这个数据集通常会比 ranks 数据集大上不少倍, 由于每个文档会有不少的链接但只会有一个排名值, 因此利用 RDD 的血缘关系来恢复数据确定比 checkpoint 内存中的数据快不少(由于数据量太大).
最后, 咱们能够控制 RDDs 的分区方式来优化 PageRank 中的节点通信. 若是咱们事先为 links 指定一个分区方式(好比, 根据 link 的 url 来 hash 分区, 就是将相同的 url 发送到同一个节点中), 而后咱们对 ranks 进行相同的分区方式, 这样就能够保证 links 和 ranks 之间的 join 不须要机器节点之间的通信(由于相同的 url 都在同一个机器节点了, 那么相对应的 rank 和 link 确定也是在同一个机器节点了). 咱们也能够自定义分区器来实现将一组页面 url 放到一块儿(好比按照 url 的 domain 进行分区). 以上两种优化方式均可以经过在定义 links 的时候调用 partitionBy 来实现:
在调用了 partitionBy 后, links 和 ranks 之间的 join 操做会自动的在 link 所在的机器进行每个 URL 的贡献值的聚合计算, 而后在相同的机器计算新的排名值, 而后计算出来的新的 ranks 在相同的机器和 links 进行 join. 这种在迭代之间进行数据一致分区是像 Pregel 这种框架中的主要的优化计算方式. RDDs 使的用户能够直接本身来实现这种优化机制.
4 表达 RDDs
在抽象 RDDs 的过程当中, 怎么表达出 RDDs 能跟踪不少的 transformations 操做之间血缘关系是一个比较大的挑战. 理想的状况下, 一个实现 RDDs 系统应该是尽量多的提供 transformations 操做(好比表二中的操做), 而且可让用户以任意的方式来组合这些 transformations 操做. 咱们提出了基于图的 RDDs 展示方式来达到以上的目的. 咱们在 spark 中利用这种展示方式达到了在不须要给调度系统为每个 transformation 操做增长任何的特殊逻辑就能够支持大量的 transformations 操做, 这样极大的简化了咱们的系统设计.
归纳的说, 如下五个信息能够表达 RDDs: 一个分区列表, 每个分区就是数据集的原子块. 一个父亲 RDDs 的依赖列表. 一个计算父亲的数据集的函数. 分区模式的元数据信息以及数据存储信息. 好比, 基于一个 HDFS 文件建立出来的的 RDD 中文件的每个数据块就是一个分区, 而且这个 RDD 知道每个数据块存储在哪些机器上, 同时, 在这个 RDD 上进行 map 操做后的结果有相同的分区数, 当计算元素的时候, 将 map 函数应用到父亲 RDD 数据中的. 咱们在表三总结了这些接口:
操做接口 | 含义 |
---|---|
partitions() | 返回一个分区对象的列表 |
preferredLocations(p) | 分区p数据存储在哪些机器节点中 |
dependencies() | 返回一个依赖列表 |
iterator(p, parentIters) | 根据父亲分区的数据输入计算分区p的全部数据 |
partitioner() | 返回这个RDD是hash仍是range分区的元数据信息 |
表三: Spark 中表达 RDDs 的接口
在设计如何表达 RDDs 之间依赖的接口是一个很是有意思的问题. 咱们发现将依赖定义成两种类型就足够了: 窄依赖, 表示父亲 RDDs 的一个分区最多被子 RDDs 一个分区所依赖. 宽依赖, 表示父亲 RDDs 的一个分区能够被子 RDDs 的多个子分区所依赖. 好比, map 操做是一个窄依赖, join 操做是一个宽依赖操做(除非父亲 RDDs 已经被 hash 分区过), 图四显示了其余的例子:
图四:窄依赖和宽依赖的例子.每个方框表示一个 RDD , 带有颜色的矩形表示分区
如下两个缘由使的这种区别颇有用, 第一, 窄依赖可使得在集群中一个机器节点的执行流计算全部父亲的分区数据, 好比, 咱们能够将每个元素应用了 map 操做后紧接着应用 filter 操做, 与此相反, 宽依赖须要父亲 RDDs 的全部分区数据准备好而且利用相似于 MapReduce 的操做将数据在不一样的节点之间进行从新洗牌和网络传输. 第二, 窄依赖从一个失败节点中恢复是很是高效的, 由于只须要从新计算相对应的父亲的分区数据就能够, 并且这个从新计算是在不一样的节点进行并行重计算的, 与此相反, 在一个含有宽依赖的血缘关系 RDDs 图中, 一个节点的失败可能致使一些分区数据的丢失, 可是咱们须要从新计算父 RDD 的全部分区的数据.
Spark 中的这些 RDDs 的通用接口使的实现不少 transformations 操做的时候只花了少于 20 行的代码. 实际上, 新的 spark 用户能够在不了解调度系统的细节之上来实现新的 transformations 操做(好比, 采样和各类 join 操做). 下面简要的归纳了一些 RDD 的实现:
- HDFS files: 抽样的输入 RDDs 是 HDFS 中的文件.对于这些 RDDs, partitions 返回文件中每个数据块对应的一个分区信息(数据块的位置信息存储在 Partition 对象中), preferredLocations 返回每个数据块所在的机器节点信息, 最后 iterator 负责数据块的读取操做.
- map: 对任意的 RDDs 调用 map 操做将会返回一个 MappedRDD 对象.这个对象含有和其父亲 RDDs 相同的分区信息和数据存储节点信息, 可是在 iterator 中对父亲的全部输出数据记录应用传给 map 的函数.
- union: 对两个 RDDs 调用 union 操做将会返回一个新的 RDD , 这个 RDD 的分区数是他全部父亲 RDDs 的全部分区数的总数.每个子分区经过相对应的窄依赖的父亲分区计算获得.
- sample: sampling 和 mapping 相似, 除了 sample RDD 中为每个分区存储了一个随机数, 做为从父亲分区数据中抽样的种子.
- join: 对两个 RDDs 进行 join 操做, 可能致使两个窄依赖(若是两个 RDDs 都是事先通过相同的 hash/range 分区器进行分区), 或者致使两个宽依赖, 或者一个窄依赖一个宽依赖(一个父亲 RDD 通过分区而另外一个没有分区).在上面全部的恶场景中, join 以后的输出 RDD 会有一个 partitioner (从父亲 RDD 中继承过来的或者是一个默认的 hash partitioner).
5 实现
咱们用了 14000 行 scala 代码实现了 spark. Spark 系统跑在集群管理者 mesos 上, 这样可使的它和其余的应用好比 hadoop 、 MPI 等共享资源, 每个 spark 程序都是由它的 driver 和 workers 组成, 这些 driver 和 workers 都是以一个 mesos 应用运行在 mesos 上的, mesos 能够管理这些应用之间的资源共享问题.
Spark 能够利用已经存在的 hadoop 的 api 组件读取任何的 hadoop 的输入数据源(好比: HDFS 和 Hbase 等), 这个程序 api 是运行在没有更改的 scala 版本上.
咱们会简要的归纳下几个比较有意思的技术点:咱们的 job 调度器( 5.1 节), 能够用于交互的 spark 解释器( 5.2 节), 内存管理( 5.3 节)以及对 checkpointing 的支持( 5.4 节).
5.1 job 调度器
spark 的调度器依赖咱们在第 4 章中讨论的 RDDs 的表达.
从整体上看, 咱们的调度系统有点和 Dryad 类似, 可是它还考虑了被存储的 RDDs 的哪些分区还在内存中.当一个用户对某个 RDD 调用了 action 操做(好比 count 或者 save )的时候调度器会检查这个 RDD 的血缘关系图, 而后根据这个血缘关系图构建一个含有 stages 的有向无环图( DAG ), 最后按照步骤执行这个 DAG 中的 stages , 如图 5 的说明.每个 stage 包含了尽量多的带有窄依赖的 transformations 操做. 这个 stage 的划分是根据须要 shuffle 操做的宽依赖或者任何能够切断对父亲 RDD 计算的某个操做(由于这些父亲 RDD 的分区已经计算过了). 而后调度器能够调度启动 tasks 来执行没有父亲 stage 的 stage (或者父亲 stage 已经计算好了的 stage ),一直到计算完咱们的最后的目标 RDD .
图五: 怎么计算 spark job stage 的例子.实现的方框表示 RDDs ,带有颜色的方形表示分区, 黑色的是表示这个分区的数据存储在内存中, 对 RDD G 调用 action 操做, 咱们根据宽依赖生成不少 stages , 且将窄依赖的 transformations 操做放在 stage 中.在这个场景中, stage 1 的输出结果已经在内存中, 因此咱们开始运行 stage 2 , 而后是 stage 3.
咱们调度器在分配 tasks 的时候是采用延迟调度来达到数据本地性的目的(说白了, 就是数据在哪里, 计算就在哪里). 若是某个分区的数据在某个节点上的内存中, 那么将这个分区的计算发送到这个机器节点中. 若是某个 RDD 为它的某个分区提供了这个数据存储的位置节点, 则将这个分区的计算发送到这个节点上.
对于宽依赖(好比 shuffle 依赖), 咱们将中间数据写入到节点的磁盘中以利于从错误中恢复, 这个和 MapReduce 将 map 后的结果写入到磁盘中是很类似的.
只要一个任务所在的 stage 的父亲 stage 仍是有效的话, 那么当这个 task 失败的时候, 咱们就能够在其余的机器节点中从新跑这个任务. 若是一些 stages 变的无效的话(好比由于一个 shuffle 过程当中 map 端的一个输出结果丢失了), 咱们须要从新并行提交没有父亲 stage 的 stage (或者父亲 stage 已经计算好了的 stage )的计算任务. 虽然备份 RDD 的血缘关系图示比较容易的, 可是咱们还不能容忍调度器调度失败的场景.
虽然目前 spark 中全部的计算都是响应 driver 程序中调用的 action 操做, 可是咱们也是须要尝试在集群中调用 lookup 操做, 这种操做是根据 key 来随机访问已经 hash 分区过的 RDD 全部元素以获取相应的 value. 在这种场景中, 若是一个分区没有计算的话, 那么 task 须要将这个信息告诉调度器.
5.2 集成解释器
scala 和 Ruby 以及 Python 同样包含了一个交互型的 shell 脚本工具. 考虑到利用内存数据能够得到低延迟的特性, 咱们想让用户经过解释器来交互性的运行 spark , 从而达到查询大数据集的目的.
Scala 解释器一般是将用户输入的每一行代码编译成一个类, 而后将这个类加载到 JVM 中, 而后调用这个类的方法. 这个类中包含了一个单例对象, 这个单例对象包含了用户输入一行代码中的变量或者函数, 还包含了一个运行用户输入那行代码的初始化方法. 好比, 用户输入 var x = 5 , 而后再输入 println(x), scala 解释器定义个包含了 x 的叫作 Line 1 的类, 而后将第二行代码编译成 println(Line 1.getInstance(). x ).
咱们对 spark 中的解释器作了以下两个改变:
- Class shipping: 为了让 worker 节点能拿到用户输入的每一行代码编译成的 class 的二进制代码, 咱们使的解释器为这些 classes 的二进制代码提供 HTTP 服务.
- 修改了代码生成:正常状况下, 咱们经过访问对应的类的静态方法来达到访问将用户输入每一行代码编译成的单例对象.这个觉得着, 当咱们将一个含有在前面行中定义的变量(好比上面例子中的 Line 1.x )的闭包序列化发送到 worker 节点的时候, java 是不会经过对象图来跟踪含有 x 的实力 Line 1 的, 这样的话 worker 节点将收不到变量 x.咱们修改了代码生成逻辑来达到能直接引用每一行代码生成的实例.
图六显示了通过咱们的改变后, 解释器是如何将用户输入的一系列的代码转换成 java 对象.
图六:显示 spark 解释器是如何将用户输入的代码转换成 java 对象的例子
咱们发现 spark 解释器在处理咱们研究中的大量已经获取到的痕迹数据以及探索存储在 hdfs 中的数据集时是很是有用的.咱们正在打算用这个来实现更高层面的交互查询语言, 好比 SQL.
5.3 内存管理
Spark 在持久化 RDDs 的时候提供了 3 种存储选:存在内存中的非序列化的 java 对象、存在内存中的序列化的数据以及存储在磁盘中. 第一种选择的性能是最好的, 由于 java VM 能够很快的访问 RDD 的每个元素. 第二种选择是在内存有限的状况下, 使的用户能够以很低的性能代价而选择的比 java 对象图更加高效的内存存储的方式. 若是内存彻底不够存储的下很大的 RDDs , 并且计算这个 RDD 又很费时的, 那么选择第三种方式.
为了管理有限的内存资源, 咱们在 RDDs 的层面上采用 LRU (最近最少使用)回收策略. 当一个新的 RDD 分区被计算可是没有足够的内存空间来存储这个分区的数据的时候, 咱们回收掉最近不多使用的 RDD 的分区数据的占用内存, 若是这个 RDD 和这个新的计算分区的 RDD 时同一个 RDD 的时候, 咱们则不对这个分区数据占用的内存作回收. 在这种状况下, 咱们将相同的 RDD 的老分区的数据保存在内存中是为了避免让总是从新计算这些分区的数据, 这点事很是重要的, 由于不少操做都是对整个 RDD 的全部的 tasks 进行计算的, 因此很是有必要将后续要用到的数据保存在内存中.到目前为止, 咱们发现这种默认的机制在全部的应用中工做的很好, 可是咱们仍是将持久每个 RDD 数据的策略的控制权交给用户.
最后, 在一个集群中的每个 spark 实例的内存空间都是分开的, 咱们之后打算经过统一内存管理达到在 spark 实例之间共享 RDDs.
5.4 对 Checkpointing 的支持
虽然咱们老是可使用 RDDs 的血缘关系来恢复失败的 RDDs 的计算, 可是若是这个血缘关系链很长的话, 则恢复是须要耗费很多时间的.所以, 将一些 RDDs 的数据持久化到稳定存储系统中是有必要的
通常来讲, checkpointing 对具备很长的血缘关系链且包含了宽依赖的 RDDs 是很是有用的, 好比咱们在 3.2.2 小节中提到的 PageRank 的例子. 在这些场景下, 集群中的某个节点的失败会致使每个父亲 RDD 的一些数据的丢失, 进而须要从新全部的计算. 与此相反的, 对于存储在稳定存储系统中且是窄依赖的 RDDs (好比 3.2.1 小节中线性回归例子中的 points 和 PageRank 中的 link 列表数据), checkpointing 可能一点用都没有. 若是一个节点失败了, 咱们能够在其余的节点中并行的从新计算出丢失了数据的分区, 这个成本只是备份整个 RDD 的成本的一点点而已.
spark 目前提供了一个 checkpointing 的 api ( persist 中的标识为 REPLICATE , 还有 checkpoint ()), 可是须要将哪些数据须要 checkpointing 的决定权留给了用户. 然而, 咱们也在调查怎么样自动的 checkpoing , 由于咱们的调度系统知道数据集的大小以及第一次计算这个数据集花的时间, 因此有必要选择一些最佳的 RDDs 来进行 checkpointing , 来达到最小化恢复时间
最后, 须要知道的事 RDDs 天生的只读的特性使的他们比通常的共享内存系统作 checkpointing 更简单了. 由于不用考虑数据的一致性, 咱们能够不终止程序或者 take 快照, 而后在后台将 RDDs 的数据写入到存储系统中.
6 评估
咱们经过在亚马逊 EC 2 伤进行一系列的实验以及用用户的应用作基准测试来评估 spark , 总的来讲, 下面是咱们的结论:
- 在迭代式机器学习和图计算中, spark 以 20 倍的速度超过了 hadoop .提速的点主要是在避免了 I / O 操做以及将数据以 java 对象的形式存在内存中从而下降了反序列化的成本.
- 用户写的应用程序运行平稳以及很好扩展.特别的, 咱们利用 spark 为一个分析报表提速了 40 倍, 相对于 hadoop 来讲.
- 当节点失败的时候, spark 能够经过从新计算失去的 rdd 分区数据达到快速的恢复.
- spark 在查询 1 TB 的数据的时候的延迟能够控制在 5 到 7 秒.
咱们经过和 hadoop 对比, 展现迭代式机器学习( 6.1 节)和 PageRank ( 6.2 节)的基准测试.而后咱们评估了 spark 的错误恢复机制( 6.3 节)以及当内存不足以存储一个数据集的行为( 6.4 节), 最后咱们讨论了用户应用( 6.5 节)和交互式数据挖掘( 6.6 节)的结果 除非另外声明, 咱们都是用类型为 m 1.xlarge 的 EC 2 节点, 4 核以及 15 GB 内存. 咱们是有数据块大小为 256 M 的 HDFS 存储系统. 在每一次测试以前, 咱们都会清理 OS 的缓存, 以达到准确的测量 IO 成本的目的
6.1 迭代式机器学习应用
咱们实现了两种迭代式机器学习应用, 线性回归核 K - means , 来和下面的系统进行性能的对比:
- Hadoop:版本号为 0.20.0 的稳定版.
- HadoopBinMem:这个系统在迭代的一开始会将输入数据转换成底开销的二进制形式, 这样能够为接下来的迭代消除解析文本数据的开销, 而且将数据存储在 hdfs 实例的内存中.
- Spark:咱们的 RDDs 的实现.
咱们在 25-100 台机器上存储 100 G 数据, 两种算法都是对这 100 G 数据跑 10 次迭代. 两个应用之间的关键不一样点是他们对相同数据的计算量不同. K-means 的迭代时间都是花在计算上, 然而线性回归是一个计算量不大, 时间都是花在反序列化和 I/O 上. 因为典型的机器学习算法都是须要上千次的迭代来达到收敛, 因此咱们将第一次迭代花的时间和接下来的迭代时间分开显示. 咱们发现经过 RDDs 的共享数据极大的提升了后续的迭代速度
图七:在 100 台机器的集群上分别用 hadoop 、 hadoopBinMem 以及 spark 对 100 GB 的数据进行,线性回归和 k - means 的首次迭代和随后迭代花的时间
首次迭代:三个系统在首次迭代中都是读取 HDFS 中的数据, 从图七的条形图中咱们能够看出, 在实验中, spark 稳定的比 hadoop 要快. 这个是因为 hadoop 主从节点之间的心跳信息的信号开销致使的. HadoopBinMen 是最慢的, 这个是由于它启动了一个额外的 MapReduce 任务来将数据转换为二进制, 它还须要经过网络传输数据以达到备分内存中的数据的目的. 随后的迭代:图七也显示了随后迭代花的平均时间. 图八则是显示了集群大小不断扩展时候的花的时间. 对于线性回归, 在 100 台机器上, spark 分别比 hadoop 和 hadoopBinMem 快上 25.3 倍和 20.7 倍. 对于计算型的 k - means 应用, spark 仍然分别提升了 1.9 倍和 3.2 倍.
图八: hadoop 、 hadoopBinMem 以及 spark 在随后的迭代花的时间, 都是处理 100 G 的数据
理解为何提速了: 咱们惊奇的发现 spark 甚至比基于内存存储二进制数据的 hadoopBinMem 还要快 20 倍. 在 hadoopBinMem 中, 咱们使用的是 hadoop 标准的二进制文件格式( sequenceFile )和 256 m 这么大的数据块大小, 以及咱们强制将 hadoop 的数据目录放在一个内存的文件系统中. 然而, Hadoop 仍然由于下面几点而比 spark 慢:
- Hadoop 软件栈的最低开销.
- HDFS 提供数据服务的开销.
- 将二进制数据转换成有效的内存中的 java 对象的反序列化的成本开销.
咱们依次来调查上面的每个因素.为了测量第一个因素, 咱们跑了一些空的 hadoop 任务, 咱们发现单单完成 job 的设置、任务的启动以及任务的清理等工做就花掉了至少 25 秒钟. 对于第二个元素, 咱们发现 HDFS 须要执行多分内存数据的拷贝以及为每个数据块作 checksum 计算.
最后, 为了测试第 3 个因素, 咱们在单机上作了一个微型的基准测试, 就是针对不一样文件类型的 256 M 数据来跑线性回归计算. 咱们特别的对比了分别从 HDFS 文件( HDFS 技术栈的耗时将会很明显)和本地内存文件(内核能够很高效的将数据传输给应用程序)中处理文本和二进制类型数据所话的时间、
图九中是咱们咱们测试结果的展现. 从 In - memory HDFS (数据是在本地机器中的内存中)中读数据比从本地内存文件中读数据要多花费 2 秒中.解析文本文件要比解析二进制文件多花费 7 秒钟. 最后, 即便从本地内存文件中读数据, 可是将预先解析了二进制数据转换成 java 对象也须要 3 秒钟, 这个对于线性回归来讲也是一个很是耗时的操做. Spark 将 RDDs 全部元素以 java 对象的形式存储在内存中, 进而避免了上述说的全部的耗时
6.2 PageRank
咱们分别用 spark 和 hadoop 对 54 GB 的维基百科的转储数据进行了 PageRank 机器学习, 并比对了它们的性能. 咱们用 PageRank 的算法处理了大约 4 百万相互链接的文章, 并进行了 10 次迭代. 图十展现了在 30 个节点上, 只用内存存储使的 spark 拥有了比 hadoop 2.4 倍的性能提高. 另外, 就和 3.2.2 小节讨论的, 若是控制 RDD 的分区使的迭代之间数据的平衡更可使的性能速度提高到 7.2 倍. 将节点数量扩展到 60 个, spark 的性能速度提高也是上面的结果
图十:分别基于 Hadoop 和 spark 的 PageRank 的性能对比
咱们也评估了在第 7.1 节中提到的用咱们基于 spark 而实现的 Pregel 重写的 PageRank .迭代次数和图十是同样的, 可是慢了 4 秒钟, 这个是由于每一次迭代 Pregel 都要跑额外的操做来让顶点进行投票决定是否须要结束任务.
6.3 容错
咱们评估了当在 k - means 应用中一个节点失败了而利用 RDD 的血缘关系链来重建 RDD 的分区须要的成本.图十一对比的是在 75 个节点中运行 10 次迭代的 k - means 正常状况和在第 6 次迭代一个节点失败的状况. 若是没有任何失败的话, 每一次迭代都是在 100 GB 的数据上跑 400 个 tasks.
图十一:出现了失败的 k - means 每次迭代时间.在第 6 次迭代中一台机器被杀掉了, 致使须要利用血缘关系链重建 RDD 的部分分区
第五次迭代的时间是 58 秒. 在第 6 次迭代, 一台机器被杀死了, 致使丢失了运行在这台机器上的 tasks 以及存储在这台机器上的 RDD 分区数据. Spark 在其余机器节点上从新读取相应的输入数据以及经过血缘关系来重建 RDD , 而后并行的重跑丢失的 tasks , 使的此次迭代的时间增长到 80s. 一旦丢失的 RDD 分区数据重建好了, 迭代的时间又回到了 58s.
须要注意的是, 若是是基于 checkpoint 的容错机制的话, 那么须要经过重跑好几个迭代才能恢复, 须要重跑几个取决于 checkpoints 的频率. 此外, 系统须要经过网络传输来备份应用须要的 100GB 数据(被转化为二进制的文本数据), 以及为了达到在内存中备份数据而消耗掉 2 倍的内存, 或者等待将 100GB 数据写入到磁盘中. 与此相反的是, 在咱们的例子中每个 RDDs 的血缘关系图的大小都是小于 10KB 的.
6.4 内存不足的行为
在目前为止, 咱们都是假设集群中的每一台机器都是有足够的内存来存储迭代之间的 RDDs 的数据的. 当没有足够的内存来存储任务的数据的时候 spark 是怎么运行的呢? 在这个实验中, 咱们给 spark 每个节点配置不多的内存, 这些内存不足以存储的下 RDDs. 咱们在图十二中, 咱们展现了不一样存储空间下的运行线性回归应用须要的时间. 能够看出, 随着空间的减小, 性能速度慢慢的降低:
图十二: 每次都是使用不一样的内存, 而后在 25 台机器中对 100 GB 的数据运行线性回归的性能对比图
6.5 用 spark 构建的用户应用
内存中分析: Conviva Inc 是一个视频提供商, 他们用 spark 来加速以前在 hadoop 上运行的几个数据报表分析. 好比, 其中一个报表是运行一系列的 Hive 查询来计算一个用户的各类统计信息. 这些查询都是基于相同的数据子集(基于自定义的过滤器过滤出来的数据)可是须要不少 MapReduce 任务来为分组字段进行聚合运算(平均值、百分位数值以及 count distinct). 将这些数据子集建立成一个能够共享的 spark 中的 RDD 来实现这些查询使的这个报表的速度提高了 40 倍. 对 200 GB 已经压缩的数据在 hadoop 集群上跑这个报表花了 20 个小时, 可是利用 2 台机器的 spark 只用了 30 分钟而已. 此外, spark 程序只花了 96 G 的内存, 由于只须要将报表关心的列数据存储在内存中进行共享就行, 而不是全部的解压后的数据.
交通模型:伯克利分校的 Mobile Millennium 项目组的研究员在收集到的零星的汽车的 GPS 信息上并行运行一个机器学习算法试图推断出道路交通是否拥挤. 在都市区域道路网络中的 10000 条道路和 600000 个装有 GPS 设备的汽车点对点的旅行时间(每一条路线的旅行时间可能包含了多条道路)样本是数据源. 利用交通模型能够估算出经过每一条独立的道路须要多长时间. 研究人员利用 EM 算法来训练模型, 这个算法在迭代的过程当中重复执行 map 和 reduceByKey 步骤. 这个应用近似线性的将机器规模从 20 台扩展到 80 台, 每台机器 4 个 cores , 如图 13 ( a )所示.
图十三:两个用 spark 实现的用户应用的每次迭代的时间, 偏差线表示标准偏差
推特垃圾邮件分类:伯克利分校的 Monarch 项目利用 spark 来标记推特消息中的垃圾连接. 它们实现的线性回归分类器和第 6.1 节中很类似, 可是他们利用了分布式的 reduceByKey 来并行的累加梯度向量值. 图 13(b) 中显示了对 50 GB 的数据子集进行分类训练须要的时间(随着机器扩展), 这个数据子集中包含了 25000 URLs 以及每个 url 对应的页面的网络和内容属性相关的 10000000 个特征/纬度. 图 13(b) 中的时间不是线性的降低是由于每一次迭代花费了很高的且固定的通信成本.
6.6 交互性的数据挖掘
为了演示 spark 在交互查询大数据集的能力, 咱们来分析 1 TB 的维基页面访问日志数据( 2 年的数据). 在这个实验中, 咱们使用 100 个 m 2.4 xlarge EC 2 实例, 每个实例 8 个 cores 以及 68 G 内存. 咱们查询出( 1 )全部页面的浏览量, ( 2 )页面标题等于某个单词的页面的浏览量以及( 3 )页面标题部分的等于某个单词的页面的浏览量. 每个查询都是扫描整个输入数据集.
图十四展现的分别是查询整个数据集、一半数据集一集十分之一的数据集的响应时间. 即便是 1 TB 的数据, 用 spark 来查询仅仅花了 5-7 秒而已.这个比查询磁盘数据的速度快了一个数量级, 好比, 查询磁盘文件中的 1 TB 数据须要 170 秒.这个能够说明 RDDs 使的 spark 是一个很是强大的交互型数据挖掘的工具.
7 讨论
虽然因为 RDDs 的自然不可变性以及粗粒度的转换致使它们彷佛提供了有限制的编程接口, 可是咱们发现它们适合不少类型的应用. 特别的, RDDs 能够表达出如今各类各样的框架提出的编程模型, 并且还能够将这些模型组合在同一个程序中(好比跑一个 MapReduce 任务来建立一个图, 而后基于这个图来运行 Pregel )以及能够在这些模型中共享数据. 在这一章中, 咱们在第 7.1 节中讨论 RDDs 能够表达哪些模型以及为何适合表达这些编程模型. 另外, 咱们在第 7.2 节中讨论咱们推崇的 RDD 的血缘信息的好处, 利用这些信息能够帮助咱们 debug 模型.
7.1 已经存在的编程模型的表达
对于到目前为止不少独立提出的编程模型, RDDs 均可以高效的表达出来. 这里所说的 “高效”, 不只仅是指使用 RDDs 的输出结果和独立提出的编程模型狂简的输出结果是一致的, 并且 RDDs 在优化性能方面比这些框架还要强大, 好比将特定的数据保存在内存中、对数据分区以减小网络传输以及高效的从错误中恢复. 能够用 RDDs 表达的模型以下:
- MapReduce: 能够利用 spark 中的 flatMap 和 groupByKey 操做来表达这个模型, 或者若是须要聚合的话可使用 reduceByKey .
- DryadLINQ: DryadLINQ 系统比 MapReduce 更多的操做, 可是这些操做都是直接和 RDD 的转换操做( map , groupByKey , join 等)对应的批量操做.
- SQL: 和 DryadLINQ 同样, SQL 查询都是对一个数据集进行并行的操做计算.
- Pregel: Google 的 Pregel 是一个专门解决迭代图计算应用的模型, 它一开始看起来和面向数据集的编程模型的其余系统彻底不一样.在 Pregel 中, 一个程序运行一些列的相互协调的“ supersteps ”.在每个 superstep 上, 对图上的每个顶点运行用户自定义的函数来更新这个顶点的相关的状态、改变图的拓扑结构以及向其余顶点发送下一个 superstep 须要的消息.这种模型能够表达很是多的图计算算法, 包括最短路径、二部图匹配以及 PageRank.
Pregel 在每一次迭代中都是对全部顶点应用相同的用户定义的函数, 这个是使的咱们用 RDDs 来实现这个模型的关键点. 所以, 每次迭代后, 咱们均可以将顶点的状态保存在 RDD 中, 而后执行一个批量转换操做( apply )来应用这个函数以及生成一个消息的 RDD . 而后咱们能够用这个 RDD 通顶点的状态进行 join 来完成消息的交换. 和 Pregel 同样, RDDs 容许将点的状态保存在内存中、控制它们的分区以减小网络通信以及指出从失败中恢复. 咱们在 spark 上用了 200 行代码的包实现了 Pregel , 读者能够参考第 33 个文献来了解更多的细节
- 迭代 MapReduce: 最近提出的几个系统, 包括 HaLoop 和 Twister , 它们提供了可让用户循环跑一系列的 MapReduce 任务的迭代式 MapReduce 模型.这些系统在迭代之间保持数据分区一致, Twister 也能够将数据保存在内存中. RDDs 能够很简单的表达以上两个优化, 并且咱们基于 spark 花了 200 行代码实现了 HaLoop.
- 批量流处理: 研究人员最近提出了一些增量处理系统, 这些系统是为按期接受新数据而后根据数据更新结果的应用服务的.好比, 一个应用须要实时接收新数据, 而后每 15 分钟就将接收到的数据和前面 15 分钟的时间窗口的数据进行 join 聚合, 将聚合的结果更新到统计数据中.这些系统执行和 Dryad 相似的批处理, 可是它们将应用的状态数据存储在分布式系统中.将中间结果放在 RDDs 中能够提升处理速度.
- 阐释 RDDs 的表达力为何这么丰富:为何 RDDs 能够表达多种多样编程模型?缘由就是 RDDs 的限制性对不少并行计算的应用的影响是很小的.特别指出的是, 虽然 RDDs 只能经过批量转换而获得, 可是不少的并行计算的程序都是将相同的操做应用到大量的数据条目中, 这样使的 RDDs 的表达力变的丰富.相似的, RDDs 的不变性并非障碍, 由于咱们能够建立多个 RDDs 来表达不一样版本的相同数据集.事实上, 如今不少的 MapReduce 的应用都是运行在不能对文件修改数据的文件系统中, 好比 HDFS.
最后一个问题是为何以前的框架没有提供这中通用型的表达能力呢? 咱们相信这个是由于这些框架解决的是 MapReduce 和 Dryad 不能解决的特殊性的问题, 好比迭代, 它们没有洞察到这些问题的共同缘由是由于缺乏了数据共享的抽象.
7.2 利用 RDDs 来 debug
当咱们一开始设计 RDDs 经过从新计算来达到容错的时候, 这种特性同时也促使了 debugging 的产生. 特别的, 在一个任务中经过记录 RDDs 的建立的血缘, 咱们能够:
- 后面能够从新构建这些 RDDs 以及可让用户交互性的查询它们.
- 经过从新计算其依赖的 RDD 分区来达到在一个进程 debugger 中重跑任何的任务.
和传统的通用分布式系统的重跑 debugger 不同, 传统的须要捕获和引用多个节点之间的事件发生的顺序, RDDs 这种 debugger 方式不须要依赖任何的数据, 而只是须要记录 RDD 的血缘关系图.咱们目前正在基于这些想法来开发一个 spark debbger.
8 相关工做
集群编程模型:集群编程模型的相关工做分为如下几类:
-
第一, 像 MapReduce , Dryad 以及 Ciel 同样支持一系列处理数据的操做, 而且须要经过稳定的存储系统来共享数据, RDDs 表达了一种比稳定存储系统更高效的数据共享抽象, 由于它避免了数据备份、 I / O 以及序列化的成本.
- 第二, 几个数据流系统的高层面上的编程接口, 包括 DryadLINQ 和 FlumeJava ,它们提供了语言集成 api , 使的用户能够经过像 map 和 join 等操做来操做并行的集合.然而, 在这些系统中, 并行的集合是指在磁盘上的文件或者一个查询计划表达的临时数据集.虽然这些系统在相同的查询中的操做之间组装数据的 pipeline (好比, 一个 map 操做后跟另一个 map ),可是它们不能在查询之间进行高效的数据共享.咱们在并行集合模式上创建 spark api , 是因为它的便利性以及在集成语言接口上不要求新颖性, 可是咱们基于在这些接口背后以 RDDs 做为存储抽象, 就可使的 spark 支持大量类型的应用了.
- 第三种系统为许多专门的须要数据共享的应用提供了高层的接口.好比, pregel 支持迭代式的图计算应用、 Twister 和 HaLoop 支持迭代式的 MapReduce .然而, 这些框架只是为他们支持的计算模型隐式的共享数据, 并无提供可让用户根据本身的需求显式的共享数据的通用抽象.好比, 一个用户不能用 Pregel 或者 Twister 将数据加载到内存中而后决定在数据集上面跑什么样的查询. RDDs 提供了一个显式的分布式存储抽象, 所以能够支持那些特殊系统不能支持的应用, 好比交互式数据挖掘.
最后, 一些系统暴露共享可变状态以使的用户能够执行内存计算. 好比, Piccolo 使的用户经过分布式的函数来读取和更新分布式 hash 表中的单元格数据. DSM 和像 RAMCloud 这样的 key - value 存储系统提供了相似的模型. RDDs 和这些系统有两个方面的不一样, 第一, RDDs 基于像 map , sot 以及 join 等操做提供了高层的编程接口, 然而, 在 Piccolo 和 DSM 中的接口只是读取和更新表的单元格数据. 第二, Piccolo 和 DSM 经过 checkpoint 和回滚机制实现容错, 在许多应用中这种机制比机遇血缘机制的 RDDs 的容错的成本更大. 最后, 如 2.3 小节讨论的, 相对于 DSM , RDDs 还提供了其余的优点功能, 好比执行慢的 task 的处理机制
缓存系统: Nectar 能够经过标识通用的程序分析的子表达式来达到在 DryadLINQ 任务之间对中间数据结果的复用. 这种能力确定会加入到基于 RDD 的系统中. 然而, Nectar 即没有提供基于内存的缓存(他是将数据放到分布式文件系统中)也不能让用户能够显式的对数据集进行缓存控制和分区控制. Ciel 和 FlumeJava 一样能够缓存任务结果, 可是也不提供基于内存的缓存或者显式的控制哪些数据能够缓存
Ananthanarayanan et al 已经提出在分布式文件系统上加一层基于内存的缓存来利用数据访问的暂时性和本地性. 这种解决方案倒是加快了已经存在于文件系统中的数据访问速度, 可是它在共享同一个应用中的中间结果方面并无 RDD 高效, 由于它在每个 stage 之间仍然须要将数据写入到文件系统中
血缘:在科学计算以及数据库中, 捕获数据的血缘或者来源信息是一个很长时间被研究的话题了, 对于像解释结果的应用, 须要让他们能够从其余的应用从新产生, 且当在工做流中存在一个 bug 或者数据丢失的时候能够从新对数据进行计算. 对于这边面的容错的工做, 咱们推荐读者看第 [5] 和 [9]的资料. RDDs 提供了一种并行编程模型, 记录跟踪细粒度的血缘的成本很低, 咱们能够根据血缘来达到容错的目的.
咱们基于血缘的容错机制和 MapReduce 以及 Dryad 一个任务中的容错机制是相似的, 在 DAG 中跟踪任务的依赖. 然而, 在这些系统中, 一个任务结束后血缘信息也就丢失了, 用户须要依靠数据备份式的存储系统来共享任务之间的数据. 与此相反, RDDs 能够在任务之间经过将数据存储在内存中达到高效的数据共享, 并不须要数据的备份和磁盘的 I/O 关系型数据库: RDDs 在概念上和数据库中的视图比较相似, 存储的 RDDs 则像具体的视图. 然而, 像 DSM 系统, 数据库通常容许细粒度的对全部的数据进行读写访问, 这种方式须要对操做和数据进行记录日志, 用于容错, 以及须要额外的开销来保持数据的一致性, 对于粗粒度转换模型的 RDDs 来讲, 这些额外的开销式不须要的.
9 结尾
咱们已经展现了在集群应用中一个高效的, 通用的以及容错的对共享数据的抽象的 RDDs . RDDs 能够表达大量的并行应用, 包括特殊的编程模型提出的迭代式计算以及这些模型表达不了的新的应用. 和已经存在的对集群存储抽象不一样的是, RDDs 提供了基于粗粒度转换的 api , 可使的用户经过血缘达到高效的容错. 咱们在 spark 系统中实现了 RDDs, 在迭代式的应用中, 性能是 hadoop 的 20 倍, 而且能够用于交互式的查询数百 GB 的数据.
咱们已经在 spark-project.org 中开源了 Spark, 做为一个稳定的数据分析和系统研究的手段.
鸣谢
We thank the first Spark users, including Tim Hunter, Lester Mackey, Dilip Joseph, and Jibin Zhan, for trying out our system in their real applications, providing many good suggestions, and identifying a few research challenges along the way. We also thank our shepherd, Ed Nightingale, and our reviewers for their feedback. This research was supported in part by Berkeley AMP Lab sponsors Google, SAP, Amazon Web Services, Cloudera, Huawei, IBM, Intel, Microsoft, NEC, NetApp and VMWare, by DARPA (contract #FA8650-11-C-7136), by a Google PhD Fellowship, and by the Natural Sciences and Engineering Research Council of Canada.
引用资料
[1] ApacheHive.http://hadoop.apache.org/hive.
[2] Scala.http://www.scala-lang.org.
[3] G.Ananthanarayanan,A.Ghodsi,S.Shenker,andI.Stoica. Disk-locality in datacenter computing considered irrelevant. In HotOS ’11, 2011.
[4] P.Bhatotia,A.Wieder,R.Rodrigues,U.A.Acar,and R. Pasquin. Incoop: MapReduce for incremental computations. In ACM SOCC ’11, 2011.
[5] R.BoseandJ.Frew.Lineageretrievalforscientificdata processing: a survey. ACM Computing Surveys, 37:1–28, 2005.
[6] S.BrinandL.Page.Theanatomyofalarge-scalehypertextual web search engine. In WWW, 1998.
[7] Y.Bu,B.Howe,M.Balazinska,andM.D.Ernst.HaLoop: efficient iterative data processing on large clusters. Proc. VLDB Endow., 3:285–296, September 2010.
[8] C.Chambers,A.Raniwala,F.Perry,S.Adams,R.R.Henry, R. Bradshaw, and N. Weizenbaum. FlumeJava: easy, efficient data-parallel pipelines. In PLDI ’10. ACM, 2010.
[9] J.Cheney,L.Chiticariu,andW.-C.Tan.Provenancein databases: Why, how, and where. Foundations and Trends in Databases, 1(4):379–474, 2009.
[10] J.DeanandS.Ghemawat.MapReduce:Simplifieddata processing on large clusters. In OSDI, 2004.
[11] J. Ekanayake, H. Li, B. Zhang, T. Gunarathne, S.-H. Bae, J. Qiu, and G. Fox. Twister: a runtime for iterative mapreduce. In HPDC ’10, 2010.
[12] P.K.Gunda,L.Ravindranath,C.A.Thekkath,Y.Yu,and L. Zhuang. Nectar: automatic management of data and computation in datacenters. In OSDI ’10, 2010.
[13] Z.Guo,X.Wang,J.Tang,X.Liu,Z.Xu,M.Wu,M.F. Kaashoek, and Z. Zhang. R2: an application-level kernel for record and replay. OSDI’08, 2008.
[14] T.Hastie,R.Tibshirani,andJ.Friedman.TheElementsof Statistical Learning: Data Mining, Inference, and Prediction. Springer Publishing Company, New York, NY, 2009.
[15] B.He,M.Yang,Z.Guo,R.Chen,B.Su,W.Lin,andL.Zhou. Comet: batched stream processing for data intensive distributed computing. In SoCC ’10.
[16] A.Heydon,R.Levin,andY.Yu.Cachingfunctioncallsusing precise dependencies. In ACM SIGPLAN Notices, pages 311–320, 2000.
[17] B.Hindman,A.Konwinski,M.Zaharia,A.Ghodsi,A.D. Joseph, R. H. Katz, S. Shenker, and I. Stoica. Mesos: A platform for fine-grained resource sharing in the data center. In NSDI ’11.
[18] T.Hunter,T.Moldovan,M.Zaharia,S.Merzgui,J.Ma,M.J. Franklin, P. Abbeel, and A. M. Bayen. Scaling the Mobile Millennium system in the cloud. In SOCC ’11, 2011.
[19] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly. Dryad: distributed data-parallel programs from sequential building blocks. In EuroSys ’07, 2007. [20] S.Y.Ko,I.Hoque,B.Cho,andI.Gupta.Onavailabilityof intermediate data in cloud computations. In HotOS ’09, 2009.
[21] D. Logothetis, C. Olston, B. Reed, K. C. Webb, and K. Yocum. Stateful bulk processing for incremental analytics. SoCC ’10.
[22] G.Malewicz,M.H.Austern,A.J.Bik,J.C.Dehnert,I.Horn, N. Leiser, and G. Czajkowski. Pregel: a system for large-scale graph processing. In SIGMOD, 2010.
[23] D.G.Murray,M.Schwarzkopf,C.Smowton,S.Smith, A. Madhavapeddy, and S. Hand. Ciel: a universal execution engine for distributed data-flow computing. In NSDI, 2011.
[24] B.NitzbergandV.Lo.Distributedsharedmemory:asurveyof issues and algorithms. Computer, 24(8):52 –60, Aug 1991.
[25] J.Ousterhout,P.Agrawal,D.Erickson,C.Kozyrakis, J. Leverich, D. Mazie`res, S. Mitra, A. Narayanan, G. Parulkar, M. Rosenblum, S. M. Rumble, E. Stratmann, and R. Stutsman. The case for RAMClouds: scalable high-performance storage entirely in DRAM. SIGOPS Op. Sys. Rev., 43:92–105, Jan 2010. [26] D.PengandF.Dabek.Large-scaleincrementalprocessingusing distributed transactions and notifications. In OSDI 2010.
[27] R.PowerandJ.Li.Piccolo:Buildingfast,distributedprograms with partitioned tables. In Proc. OSDI 2010, 2010.
[28] R.RamakrishnanandJ.Gehrke.DatabaseManagement Systems. McGraw-Hill, Inc., 3 edition, 2003.
[29] K.Thomas,C.Grier,J.Ma,V.Paxson,andD.Song.Designand evaluation of a real-time URL spam filtering service. In IEEE Symposium on Security and Privacy, 2011. [30] J.W.Young.Afirstorderapproximationtotheoptimum checkpoint interval. Commun. ACM, 17:530–531, Sept 1974.
[31] Y.Yu,M.Isard,D.Fetterly,M.Budiu,U ́.Erlingsson,P.K. Gunda, and J. Currey. DryadLINQ: A system for general-purpose distributed data-parallel computing using a high-level language. In OSDI ’08, 2008.
[32] M.Zaharia,D.Borthakur,J.SenSarma,K.Elmeleegy,S. Shenker, and I. Stoica. Delay scheduling: A simple technique for achieving locality and fairness in cluster scheduling. In EuroSys ’10, 2010.
[33] M.Zaharia,M.Chowdhury,T.Das,A.Dave,J.Ma,M. McCauley, M. Franklin, S. Shenker, and I. Stoica. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing. Technical Report UCB/EECS-2011-82, EECS Department, UC Berkeley, 2011.
原文连接
http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf
贡献者
- @老汤(老汤)
- @wangyangting(那伊抹微笑)