Apache Spark是一个开源分布式运算框架,最初是由加州大学柏克莱分校AMPLab所开发。html
Hadoop MapReduce的每一步完成必须将数据序列化写到分布式文件系统致使效率大幅下降。Spark尽量地在内存上存储中间结果, 极大地提升了计算速度。java
MapReduce是一路计算的优秀解决方案, 但对于多路计算的问题必须将全部做业都转换为MapReduce模式并串行执行。python
Spark扩展了MapReduce模型,容许开发者使用有向无环图(DAG)开发复杂的多步数据管道。而且支持跨有向无环图的内存数据共享,以便不一样的做业能够共同处理同一个数据程序员
Spark不是Hadoop的替代方案而是其计算框架Hadoop MapReduce的替代方案。Hadoop更多地做为集群管理系统为Spark提供底层支持。算法
Spark可使用本地Spark, Hadoop YARN或Apache Mesos做为集群管理系统。Spark支持HDFS,Cassandra, OpenStack Swift做为分布式存储解决方案。sql
Spark采用Scala语言开发运行于JVM上,并提供了Scala,Python, Java和R语言API,可使用其中的Scala和Python进行交互式操做。shell
本文测试环境为Spark 2.1.0, Python API.apache
弹性分布式数据集(Resilient Distributed Dataset, RDD)是Saprk的基本数据结构, 表明能够跨机器进行分割的只读对象集合。api
RDD能够由Hadoop InputFormats建立(好比HDFS上的文件)或者由其它RDD转换而来, RDD一旦建立便不可改变。RDD操做分为变换和行动两种:缓存
变换(Transformation): 接受一个RDD做为参数,返回一个新的RDD, 原RDD不变。
包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe以及coalesce
行动(Action): 接受一个RDD做为参数, 进行查询并返回一个值。
包括: reduce,collect,count,first,take,countByKey以及foreach
Spark的核心组件包括:
Spark Core: 核心功能, 提供RDD及其API和操做。
Spark SQL: 提供经过Apache Hive的SQL变体HiveQL与Spark进行交互的API。每一个数据表被当作一个RDD,Spark SQL查询被转换为Spark操做。
Spark Streaming:容许对实时数据流进行处理和控制,park Streaming容许程序可以像普通RDD同样处理实时数据。
MLlib:一个经常使用机器学习算法库,算法被实现为对RDD的Spark操做。这个库包含可扩展的学习算法,好比分类、回归等须要对大量数据集进行迭代的操做
GraphX: 图计算框架, GraphX扩展了RDD API,包含控制图、建立子图、访问路径上全部顶点的操做。
对于Linux和Mac用户只须要在本地安装java运行环境并在官网中下载Pre-built版本的压缩包, 解压缩以后便可以单机模式使用Spark。
进入解压后的spark目录, 其中包含一些脚本和二进制程序:
sbin
: 管理员命令目录
spark-config.sh
将spark运行配置写入环境变量spark-daemon.sh
在本地启动守护进程spark-daemons.sh
在全部slave主机上启动守护进程start-master.sh
启动master进程start-slave.sh
在本地上启动slave进程start-slaves.sh
根据conf/slaves配置文件在slave主机上启动slave进程start-all.sh
启动全部守护进程,启动本地master进程, 根据conf/slaves启动slave进程stop-all.sh
中止全部守护进程及其下的master/slave进程stop-master.sh
中止master进程stop-slave.sh
中止本地的slave进程stop-slaves.sh
中止全部slave进程bin
普通用户工具目录
pyspark
: python交互环境spark-shell
scala交互环境sparkR
R交互环境spark-submit
将Spark应用提交到集群上运行spark-sql
spark-sql交互环境run-example
运行示例使用sbin/start-all.sh
启动spark而后调用bin/pyspark
进入Python交互界面:
SparkSession和SparkContext初始化成功后, 能够确认交互界面已正确启动。
>>> txt = sc.textFile("README.md") >>> txt.count() 104
上述代码中,sc是SparkContext的别名, 咱们根据"README.md"的内容建立了一个RDD并用count()
方法取出RDD中项目的数量。
bin/spark-submit
能够将使用python编写的Spark应用提交到集群上运行。
咱们将上文中的示例写成脚本, 与交互模式不一样的是脚本须要手动进行一些配置:
from pyspark import SparkConf, SparkContext APP_NAME = "My Spark Application" MASTER_URL = "local[*]" conf = SparkConf().setAppName(APP_NAME) conf = conf.setMaster(MASTER_URL) sc = SparkContext(conf=conf) def main(sc): txt = sc.textFile("README.md") print(txt.count()) if __name__ == '__main__': main(sc)
保存为my_test.py
, 使用spark-submit提交做业:
$ bin/spark-submit my_test.py 104
如今对上述代码作一些说明。
APP_NAME
是应用的名称由程序员自定义,MASTER_URL
用于指定集群Master的位置:
URL | 含义 |
---|---|
local | 用一个worker线程本地运行Spark |
local[K] | 用k个worker线程本地运行Spark(一般设置为机器核心数) |
local[*] | 用尽量多的worker线程本地运行Spark |
spark://HOST:PORT | 链接到给定的Spark独立部署集群master, 默认端口7077 |
mesos://HOST:PORT | 链接到给定的mesos集群 |
yarn-client | 以client模式链接到Yarn集群。集群位置将基于经过HADOOP_CONF_DIR变量找到 |
yarn-cluster | 以cluster模式链接到Yarn集群。群集位置将基于经过HADOOP_CONF_DIR变量找到 |
并行集合(Parallelized collections)基于python可迭代对象(iterable)建立:
>>> data = [1,2,3,4] >>> para_data = sc.parallelize(data) >>> para_data ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 >>> para_data.reduce(lambda x, y: x+y) 10
RDD一旦建立便可以并行模式运算.
除了使用内部的iterable对象建立RDD外, 也可使用外部数据源建立RDD.
Spark 能够从任何一个 Hadoop 支持的存储源建立分布式数据集,包括你的本地文件系统,HDFS,Cassandra,HBase等.
>>> src_uri = "README.md" >>> txt = sc.textFile(src_uri) >>> txt.count() 104
Spark支持textFile, SequenceFile和其它Hadoop InputFormat做为外部数据源。
src_uri支持的协议包括hdfs://
, s3n://
和file://
等。直接填写路径则默认采用file://
即本地文件系统路径.
若是src_uri使用本地文件系统路径,文件必须能在 worker 节点上用相同的路径访问到。要么复制文件到全部的 workers,要么使用网络共享文件系统.
Spark采用惰性求值的机制进行运算, 咱们用一个简单的例子说明Spark的运算过程:
lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
第一行从外部数据集建立了一个名为lines的RDD, lines只是一个指针文件内容没有真的被读入内存。
第二行执行了map操做, 一样的lineLength并无被当即求值。
第三行执行了reduce操做, Spark 把计算分红多个任务(task),而且让它们运行在多个机器上。每台机器都运行本身的 map 部分和本地 reduce 部分, 并把结果返回给Master。
前文已经说明transformation是根据RDD建立新的RDD的操做,这里将说明一些经常使用的操做,更多内容请参见官方文档.
rdd.map(func): 将数据源的每一个元素传递给func函数, 获得func的返回值组成新RDD
在示例lines.map(lambda s: len(s))
中lines的元素类型为str, map函数将其映射为长度元素长度的集合。
>>> r = sc.parallelize([1,2,3,4]).flatMap(lambda x: [x, x+1]) >>> r.collect() # show all elements >>> [1, 2, 2, 3, 3, 4, 4, 5]
rdd.filter(func): 将数据源的每一个元素传递给func函数, 使func返回True的元素加入到结果RDD中
rdd1.union(rdd2): 求rdd1与rdd2的并集
rdd1.intersection(rdd2): 求rdd1和rdd2的交集
rdd.distinct(): 返回去除重复元素后的rdd
action是对RDD进行查询并返回单个元素的操做, 这里将说明一些经常使用的操做,更多内容请参见官方文档。
rdd.reduce(func): func是接受两个参数并返回一个值的函数, reduce使用func对集合进行聚合。
这个过程能够理解为从集合中任取两个元素传给func, 而后将返回值加入集合中并删除两个参数, 反复迭代直至集合只有一个元素, 该元素即为最后的返回值。
示例lineLengths.reduce(lambda a, b: a + b)
中, reduce函数对RDD内全部函数进行了求和。
rdd.collect(): 以python list的形式返回集合中全部元素
rdd.first(): 返回集合中第一个元素, 对集合不产生影响
rdd.take(n): 返回集合中前n个元素组成的list, 下标从1开始
rdd.count(): 返回集合中元素的数目
rdd.foreach(func): func是接受一个参数的函数, 对集合中每一个元素调用func函数, foreach返回None
上文中的RDD对元素的类型是基本没有限制的, 相似于python内置的list(其实更相似于ORM的查询集)。RDD在使用二元组做为元素时, Spark会将二元组做为一个键值对处理, 二元组的第一个元素被认为是键, 第二个元素认为是值。
元素为二元组的RDD仍然可使用普通RDD的操做,Spark也为这类RDD定义了一些基于键值对的操做:
groupByKey():将key相同的键值对合并为一个键值对: (key,[val, val, ...])
reduceByKey(func): 对key相同的键值对应用func进行聚合: (key,RDD<val>.reduce(func))
rdd.sortByKey(ascending=True): 按key对键值对进行排序,默认为升序
rdd1.join(rdd2): 对两个键值对形式的rdd进行合并,(k, v)和(k,w)将被合并为(k, (v,w))
countByKey(): 返回每一个键对应键值对的个数(key, count), 返回值为dict而非RDD.
RDD持久化是Spark的一个重要功能, 上文已经说起Spark提供了持久化到内存的功能极大的提升了运算速度, 也是Spark比Hadoop MapReduce更先进的缘由之一。
rdd.persist([level])能够根据指定等级执行持久化:
>>> from pyspark import StorageLevel >>> r.persist(StorageLevel.MEMORY_ONLY) PythonRDD[16]
Spark支持的持久化级别包括:
MEMORY_ONLY: 将RDD做为java对象存储在JVM中,若RDD的某部分没法做为java对象存储,则不对该部分进行缓存。默认缓存级别。
MEMORY_AND_DISK: 将RDD做为java对象存储在JVM中,若RDD的某部分没法做为java对象存储,则将该部分用pickle序列化后缓存到磁盘上。
MEMORY_ONLY_SER: 将RDD序列化后做为java byte[]存储在内存中,不合适的分区不缓存。 比较节省内存可是消耗时间
MEMORY_AND_DISK_SER: 将RDD序列化后做为java byte[]存储在内存中,不合适的分区序列化后存储到磁盘上
DISK_ONLY: 序列化后仅存储在磁盘上
MEMORY_ONLY_2, MEMORY_AND_DISK_2等: 与上述存储级别相似, 不过是存储到两个节点上
OFF_HEAP: 将RDD序列化后缓存到分布式内存存储Tachyon上
Spark官方文档给出了一些关于选择存储级别的建议:
若是你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。由于这是cpu利用率最高的选项,会使RDD上的操做尽量的快。
若是不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提升对象的空间使用率,可是仍可以至关快的访问。
除非函数计算RDD的花费较大或者它们须要过滤大量的数据,不要将RDD存储到磁盘上,不然,重复计算一个分区就会和重磁盘上读取数据同样慢。
若是你但愿更快的错误恢复,能够利用重复(replicated)存储级别。全部的存储级别均可以经过重复计算丢失的数据来支持完整的容错,可是重复的数据可以使你在RDD上继续运行任务,而不须要重复计算丢失的数据。
Spark提供了rdd.cache()
方法, 它与rdd.persist(StorageLevel.MEMORY_ONLY)
功能相同。
Spark自动的监控每一个节点缓存的使用状况,利用最近最少使用原则删除老旧的数据。rdd.unpersist()
能够手动删除缓存。
一个传递给Spark操做(例如map和reduce)的函数在远程节点上面运行时,Spark操做实际上操做的是这个函数所用变量的一个独立副本。
这些变量被复制到每台机器上,而且这些变量在远程机器上 的全部更新都不会传递回驱动程序,一般这种跨任务的读写变量是低效的。
Spark提供了两个共享变量: 广播变量(broadcast variable)和累加器(accumulator)进行跨任务共享。
广播变量在每台机器上面缓存一个只读变量,而不是每一个任务保存一个副本。Spark也尝试着利用有效的广播算法去分配广播变量,以减小通讯的成本。
>>> broadcastVar = sc.broadcast([1, 2, 3]) >>> broadcastVar.value [1, 2, 3]
广播变量建立后咱们可使用它代替原变量,其操做与RDD基本相同。
累加器特性与广播变量相似, 另外定义了add方法用于累加。
>>> accum = sc.accumulator(0) >>> accum Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) >>> accum.value 10
累加器默认使用python内置int类型计数, 咱们能够自定义计数类型。一般自定义类型为多维向量,用来进行复杂计数。