Spark 是一种与 Hadoop 类似的开源集群计算环境,可是二者之间还存在一些不一样之处,这些有用的不一样之处使 Spark 在某些工做负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了可以提供交互式查询外,它还能够优化迭代工做负载。html
Spark 是在 Scala 语言中实现的,它将 Scala 用做其应用程序框架。与 Hadoop 不一样,Spark 和 Scala 可以紧密集成,其中的 Scala 能够像操做本地集合对象同样轻松地操做分布式数据集。git
尽管建立 Spark 是为了支持分布式数据集上的迭代做业,可是实际上它是对 Hadoop 的补充,能够在 Hadoo 文件系统中并行运行。经过名为 Mesos 的第三方集群框架能够支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。github
虽然 Spark 与 Hadoop 有类似之处,但它提供了具备有用差别的一个新的集群计算框架。首先,Spark 是为集群计算中的特定类型的工做负载而设计,即那些在并行操做之间重用工做数据集(好比机器学习算法)的工做负载。为了优化这些类型的工做负载,Spark 引进了内存集群计算的概念,可在内存集群计算中将数据集缓存在内存中,以缩短访问延迟。shell
Spark 还引进了名为 弹性分布式数据集 (RDD) 的抽象。RDD 是分布在一组节点中的只读对象集合。这些集合是弹性的,若是数据集一部分丢失,则能够对它们进行重建。重建部分数据集的过程依赖于容错机制,该机制能够维护 “血统”(即充许基于数据衍生过程重建部分数据集的信息)。RDD 被表示为一个 Scala 对象,而且能够从文件中建立它;一个并行化的切片(遍及于节点之间);另外一个 RDD 的转换形式;而且最终会完全改变现有 RDD 的持久性,好比请求缓存在内存中。express
Spark 中的应用程序称为驱动程序,这些驱动程序可实如今单一节点上执行的操做或在一组节点上并行执行的操做。与 Hadoop 相似,Spark 支持单节点集群或多节点集群。对于多节点操做,Spark 依赖于 Mesos 集群管理器。Mesos 为分布式应用程序的资源共享和隔离提供了一个有效平台(参见 图 1)。该设置充许 Spark 与 Hadoop 共存于节点的一个共享池中。编程
驱动程序能够在数据集上执行两种类型的操做:动做和转换。动做 会在数据集上执行一个计算,并向驱动程序返回一个值;而转换会从现有数据集中建立一个新的数据集。动做的示例包括执行一个 Reduce 操做(使用函数)以及在数据集上进行迭代(在每一个元素上运行一个函数,相似于 Map 操做)。转换示例包括 Map 操做和 Cache 操做(它请求新的数据集存储在内存中)。安全
咱们随后就会看看这两个操做的示例,可是,让咱们先来了解一下 Scala 语言。bash
Scala 多是 Internet 上鲜为人知的秘密之一。您能够在一些最繁忙的 Internet 网站(如 Twitter、LinkedIn 和 Foursquare,Foursquare 使用了名为 Lift 的 Web 应用程序框架)的制做过程当中看到 Scala 的身影。还有证据代表,许多金融机构已开始关注 Scala 的性能(好比 EDF Trading 公司将 Scala 用于衍生产品订价)。
Scala 是一种多范式语言,它以一种流畅的、让人感到舒服的方法支持与命令式、函数式和面向对象的语言相关的语言特性。从面向对象的角度来看,Scala 中的每一个值都是一个对象。一样,从函数观点来看,每一个函数都是一个值。Scala 也是属于静态类型,它有一个既有表现力又很安全的类型系统。
此外,Scala 是一种虚拟机 (VM) 语言,而且能够经过 Scala 编译器生成的字节码,直接运行在使用 Java Runtime Environment V2 的 Java™ Virtual Machine (JVM) 上。该设置充许 Scala 运行在运行 JVM 的任何地方(要求一个额外的 Scala 运行时库)。它还充许 Scala 利用大量现存的 Java 库以及现有的 Java 代码。
最后,Scala 具备可扩展性。该语言(它实际上表明了可扩展语言)被定义为可直接集成到语言中的简单扩展。
让咱们来看一些实际的 Scala 语言示例。Scala 提供自身的解释器,充许您以交互方式试用该语言。Scala 的有用处理已超出本文所涉及的范围,可是您能够在 参考资料 中找到更多相关信息的连接。
清单 1 经过 Scala 自身提供的解释器开始了快速了解 Scala 语言之旅。启用 Scala 后,系统会给出提示,经过该提示,您能够以交互方式评估表达式和程序。咱们首先建立了两个变量,一个是不可变变量(即vals
,称做单赋值),另外一个变量是可变变量 (vars
)。注意,当您试图更改 b
(您的 var
)时,您能够成功地执行此操做,可是,当您试图更改 val
时,则会返回一个错误。
$ scala Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20). Type in expressions to have them evaluated. Type :help for more information. scala> val a = 1 a: Int = 1 scala> var b = 2 b: Int = 2 scala> b = b + a b: Int = 3 scala> a = 2 <console>6: error: reassignment to val a = 2 ^ |
接下来,建立一个简单的方法来计算和返回 Int
的平方值。在 Scala 中定义一个方法得先从 def
开始,后跟方法名称和参数列表,而后,要将它设置为语句的数量(在本示例中为 1)。无需指定任何返回值,由于能够从方法自己推断出该值。注意,这相似于为变量赋值。在一个名为 3
的对象和一个名为 res0
的结果变量(Scala 解释器会自动为您建立该变量)上,我演示了这个过程。这些都显示在 清单 2 中。
scala> def square(x: Int) = x*x square: (x: Int)Int scala> square(3) res0: Int = 9 scala> square(res0) res1: Int = 81 |
接下来,让咱们看一下 Scala 中的一个简单类的构建过程(参见 清单 3)。定义一个简单的 Dog
类来接收一个 String
参数(您的名称构造函数)。注意,这里的类直接采用了该参数(无需在类的正文中定义类参数)。还有一个定义该参数的方法,可在调用参数时发送一个字符串。您要建立一个新的类实例,而后调用您的方法。注意,解释器会插入一些竖线:它们不属于代码。
scala> class Dog( name: String ) { | def bark() = println(name + " barked") | } defined class Dog scala> val stubby = new Dog("Stubby") stubby: Dog = Dog@1dd5a3d scala> stubby.bark Stubby barked scala> |
完成上述操做后,只需输入 :quit
便可退出 Scala 解释器。
第一步是下载和配置 Scala。清单 4 中显示的命令阐述了 Scala 安装的下载和准备工做。使用 Scala v2.8,由于这是通过证明的 Spark 所需的版本。
$ wget http://www.scala-lang.org/downloads/distrib/files/scala-2.8.1.final.tgz $ sudo tar xvfz scala-2.8.1.final.tgz --directory /opt/ |
要使 Scala 可视化,请将下列行添加至您的 .bashrc 中(若是您正使用 Bash 做为 shell):
export SCALA_HOME=/opt/scala-2.8.1.final export PATH=$SCALA_HOME/bin:$PATH |
接着能够对您的安装进行测试,如 清单 5 所示。这组命令会将更改加载至 bashrc 文件中,接着快速测试 Scala 解释器 shell。
$ scala Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20). Type in expressions to have them evaluated. Type :help for more information. scala> println("Scala is installed!") Scala is installed! scala> :quit $ |
如清单中所示,如今应该看到一个 Scala 提示。您能够经过输入 :quit
执行退出。注意,Scala 要在 JVM 的上下文中执行操做,因此您会须要 JVM。我使用的是 Ubuntu,它在默认状况下会提供 OpenJDK。
接下来,请获取最新的 Spark 框架副本。为此,请使用 清单 6 中的脚本。
wget https://github.com/mesos/spark/tarball/0.3-scala-2.8/ mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz $ sudo tar xvfz mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz |
接下来,使用下列行将 spark 配置设置在 Scala 的根目录 ./conf/spar-env.sh 中:
export SCALA_HOME=/opt/scala-2.8.1.final |
设置的最后一步是使用简单的构建工具 (sbt
) 更新您的分布。sbt
是一款针对 Scala 的构建工具,用于 Spark 分布中。您能够在 mesos-spark-c86af80 子目录中执行更新和变异步骤,以下所示:
$ sbt/sbt update compile |
注意,在执行此步骤时,须要链接至 Internet。当完成此操做后,请执行 Spark 快速检测,如 清单 7 所示。在该测试中,须要运行 SparkPi 示例,它会计算 pi 的估值(经过单位平方中的任意点采样)。所显示的格式须要样例程序 (spark.examples.SparkPi) 和主机参数,该参数定义了 Mesos 主机(在此例中,是您的本地主机,由于它是一个单节点集群)和要使用的线程数量。注意,在 清单 7 中,执行了两个任务,并且这两个任务被序列化(任务 0 开始和结束以后,任务 1 再开始)。
$ ./run spark.examples.SparkPi local[1] 11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registered actor on port 50501 11/08/26 19:52:33 INFO spark.MapOutputTrackerActor: Registered actor on port 50501 11/08/26 19:52:33 INFO spark.SparkContext: Starting job... 11/08/26 19:52:33 INFO spark.CacheTracker: Registering RDD ID 0 with cache 11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions 11/08/26 19:52:33 INFO spark.CacheTrackerActor: Asked for current cache locations 11/08/26 19:52:33 INFO spark.LocalScheduler: Final stage: Stage 0 11/08/26 19:52:33 INFO spark.LocalScheduler: Parents of final stage: List() 11/08/26 19:52:33 INFO spark.LocalScheduler: Missing parents: List() 11/08/26 19:52:33 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ... 11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 0 11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes 11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 0 11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 1 11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 0) 11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes 11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 1 11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 1) 11/08/26 19:52:33 INFO spark.SparkContext: Job finished in 0.145892763 s Pi is roughly 3.14952 $ |
经过增长线程数量,您不只能够增长线程执行的并行化,还能够用更少的时间执行做业(如 清单 8 所示)。
$ ./run spark.examples.SparkPi local[2] 11/08/26 20:04:30 INFO spark.MapOutputTrackerActor: Registered actor on port 50501 11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registered actor on port 50501 11/08/26 20:04:30 INFO spark.SparkContext: Starting job... 11/08/26 20:04:30 INFO spark.CacheTracker: Registering RDD ID 0 with cache 11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions 11/08/26 20:04:30 INFO spark.CacheTrackerActor: Asked for current cache locations 11/08/26 20:04:30 INFO spark.LocalScheduler: Final stage: Stage 0 11/08/26 20:04:30 INFO spark.LocalScheduler: Parents of final stage: List() 11/08/26 20:04:30 INFO spark.LocalScheduler: Missing parents: List() 11/08/26 20:04:30 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ... 11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 0 11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 1 11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes 11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes 11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 0 11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 1 11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 1) 11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 0) 11/08/26 20:04:30 INFO spark.SparkContext: Job finished in 0.101287331 s Pi is roughly 3.14052 $ |
要构建 Spark 应用程序,您须要单一 Java 归档 (JAR) 文件形式的 Spark 及其依赖关系。使用 sbt
在 Spark 的顶级目录中建立该 JAR 文件,以下所示:
$ sbt/sbt assembly |
结果产生一个文件 ./core/target/scala_2.8.1/"Spark Core-assembly-0.3.jar"。将该文件添加至您的 CLASSPATH 中,以即可以访问它。在本示例中,不会用到此 JAR 文件,由于您将会使用 Scala 解释器运行它,而不是对其进行编译。
在本示例中,使用了标准的 MapReduce 转换(如 清单 9 所示)。该示例从执行必要的 Spark 类导入开始。接着,须要定义您的类 (SparkTest
) 及其主方法,用它解析稍后使用的参数。这些参数定义了执行 Spark 的环境(在本例中,该环境是一个单节点集群)。接下来,要建立 SparkContext
对象,它会告知 Spark 如何对您的集群进行访问。该对象须要两个参数:Mesos 主机名称(已传入)以及您分配给做业的名称 (SparkTest
)。解析命令行中的切片数量,它会告知 Spark 用于做业的线程数量。要设置的最后一项是指定用于 MapReduce 操做的文本文件。
最后,您将了解 Spark 示例的实质,它是由一组转换组成。使用您的文件时,可调用 flatMap
方法返回一个 RDD(经过指定的函数将文本行分解为标记)。而后经过 map
方法(该方法建立了键值对)传递此 RDD ,最终经过 ReduceByKey
方法合并键值对。合并操做是经过将键值对传递给 _ + _
匿名函数来完成的。该函数只采用两个参数(密钥和值),并返回将二者合并所产生的结果(一个String
和一个 Int
)。接着以文本文件的形式发送该值(到输出目录)。
import spark.SparkContext import SparkContext._ object SparkTest { def main( args: Array[String]) { if (args.length == 0) { System.err.println("Usage: SparkTest <host> [<slices>]") System.exit(1) } val spark = new SparkContext(args(0), "SparkTest") val slices = if (args.length > 1) args(1).toInt else 2 val myFile = spark.textFile("test.txt") val counts = myFile.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile("out.txt") } } SparkTest.main(args) |
要执行您的脚本,只须要执行如下命令:
$ scala SparkTest.scala local[1] |
您能够在输出目录中找到 MapReduce 测试文件(如 output/part-00000)。
自从开发了 Hadoop 后,市场上推出了许多值得关注的其余大数据分析平台。这些平台范围广阔,从简单的基于脚本的产品到与 Hadoop 相似的生产环境。
名为 bashreduce
的平台是这些平台中最简单的平台之一,顾名思义,它充许您在 Bash 环境中的多个机器上执行 MapReduce 类型的操做。bashreduce
依赖于您计划使用的机器集群的 Secure Shell(无密码),并以脚本的形式存在,经过它,您可使用 UNIX®-style 工具(sort
、awk
、netcat
等)请求做业。
GraphLab 是另外一个受人关注的 MapReduce 抽象实现,它侧重于机器学习算法的并行实现。在 GraphLab 中,Map 阶段会定义一些可单独(在独立主机上)执行的计算指令,而 Reduce 阶段会对结果进行合并。
最后,大数据场景的一个新成员是来自 Twitter 的 Storm(经过收购 BackType 得到)。Storm 被定义为 “实时处理的 Hadoop”,它主要侧重于流处理和持续计算(流处理能够得出计算的结果)。Storm 是用 Clojure 语言(Lisp 语言的一种方言)编写的,但它支持用任何语言(好比 Ruby 和 Python)编写的应用程序。Twitter 于 2011 年 9 月以开源形式发布 Storm。
请参阅 参考资料 得到有关的更多信息。
Spark 是不断壮大的大数据分析解决方案家族中备受关注的新增成员。它不只为分布数据集的处理提供一个有效框架,并且以高效的方式(经过简洁的 Scala 脚本)处理分布数据集。Spark 和 Scala 都处在积极发展阶段。不过,因为关键 Internet 属性中采用了它们,二者彷佛都已从受人关注的开源软件过渡成为基础 Web 技术。